经典的ABA问题与解决方法
针对ABA问题做了一下总结和尝试解决
背景
在学习乐观锁、悲观锁时,了解到CAS是一种乐观锁的实现,但是从博客中看到说CAS会存在ABA问题,于是就搜索了一番。
什么是ABA问题
来自blog 考虑如下操作: 并发1(上):获取出数据的初始值是A,后续计划实施CAS乐观锁,期望数据仍是A的时候,修改才能成功 并发2:将数据修改成B 并发3:将数据修改回A 并发1(下):CAS乐观锁,检测发现初始值还是A,进行数据修改 上述并发环境下,并发1在修改数据时,虽然还是A,但已经不是初始条件的A了,中间发生了A变B,B又变A的变化,此A已经非彼A,数据却成功修改,可能导致错误,这就是CAS引发的所谓的ABA问题。
这个应该就是产生ABA问题的真正原因。
ABA问题的举例
看了几篇讲述ABA问题的博客,如下描述。
- 某人取款,由于机器不太好使,多点了几次取款操作。后台threadA和threadB工作,此时threadA操作成功(100->50),threadB阻塞。正好某人朋友打款50元给小牛(50->100),threadC执行成功,之后threadB运行了,又改为(100->50)。 lz钱哪去了???来自blog
个人觉得这里threadB后来继续运行后,总共是会取出两次钱的。
- 假设有个线程A去判断账户里的钱此时是15,满足条件,直接+20,这时候卡里余额是35。但是此时不巧,正好在连锁店里,这个客人正在消费,又消费了20,此时卡里余额又为15,线程B去执行扫描账户的时候,发现它又小于20,又用过cas给它加了20,这样的话就相当于加了两次,这样循环往复肯定把老板的钱就坑没了!来自blog
个人觉得评论中这个例子更合适,但不知道该如何在代码中复现,所以还是按照上述的逻辑进行了复现。 A线程首先获取余额是15,然后准备加上20,在A线程还没提交的时候,此时B线程进入,将余额加上20并且成功提交,此时余额为35。但是紧接着用户又消费了20,所以余额还是15,终于A线程获取到了时间片,它比对之后发现余额还是15,所以A线程就执行了。
ABA问题的本质
ABA问题的根本在于CAS在修改变量的时候,无法记录变量的状态,比如修改的次数,是否修改过这个变量。这样就很容易在一个线程将A修改成B时,另一个线程又会把B修改成A,造成CAS多次执行的问题。
示例
来自blog 一家火锅店为了生意推出了一个特别活动,凡是在五一期间的老用户凡是卡里余额小于20的,赠送20元,但是这种活动每人只可享受一次。然后火锅店的后台程序员小王开始工作了,很简单就用CAS技术,先取用户卡里的余额,然后包装成AtomicInteger,写一个判断,开启10个线程,然后判断小于20的,一律加20,然后就很开心的交差了。
多线程增加20余额时,同时有消费线程做扣减,就有可能(不是每次都会出现)出现下图所示。 add线程0通过CAS增加余额,consume线程0通过CAS减少余额后,add线程2又通过CAS增加余额。
代码如下
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
public class AtomicStampReferenceDemo {
static AtomicStampedReference<Integer> money = new AtomicStampedReference<Integer>(19, 0);
static AtomicInteger amount = new AtomicInteger(19);
public static void main(String[] args) {
/*new Thread(new Runnable() {
@Override
public void run() {
provideByStamped();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
consumeByStamped();
}
}).start();*/
new Thread(new Runnable() {
@Override
public void run() {
provide();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
consume();
}
}).start();
}
public static void provideByStamped() {
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new ProviderStampedThread());
try {
//睡眠1s
//System.out.println(Thread.currentThread().getName());
int time = 0;
time += (i * 0);
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
t.setName("Thread-add" + i);
t.start();
}
}
public static void consumeByStamped() {
for (int j = 0; j < 1; j++) {
Thread t = new Thread(new ConsumerStampedThread());
t.setName("Thread-consume" + j);
t.start();
}
}
public static void provide() {
for (int i = 0; i < 3; i++) {
Thread t = new Thread(new ProviderThread());
t.setName("Thread-add" + i);
t.start();
}
}
public static void consume() {
for (int j = 0; j < 1; j++) {
Thread t = new Thread(new ConsumerThread());
t.setName("Thread-consume" + j);
t.start();
}
}
//充值线程类
static class ProviderStampedThread implements Runnable {
@Override
public void run() {
/*try {
//睡眠1s
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
int stamp = money.getStamp();
while (true) {
Integer account = money.getReference();
System.out.println(Thread.currentThread().getName() + "时间戳" + money.getStamp() + "充值前余额" + money.getReference());
if (account < 20) {
if (money.compareAndSet(account, account + 20, 0, 1)) {
System.out.println(Thread.currentThread().getName() + "余额小于20元,充值成功,目前余额:" + money.getReference() + "时间戳变为" + money.getStamp());
break;
} else {
System.out.println(Thread.currentThread().getName() + "已进行充值,无需充值" + "当前时间戳" + money.getStamp() + "当前余额" + money.getReference());
break;
}
} else {
System.out.println(Thread.currentThread().getName() + "余额大于20元,无需充值" + "当前时间戳" + money.getStamp() + "当前余额" + money.getReference());
break;
}
}
}
}
//消费线程类
static class ConsumerStampedThread implements Runnable {
@Override
public void run() {
while (true) {
/*try {
//睡眠1s
System.out.println(Thread.currentThread().getName());
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
int timeStamp = money.getStamp();//1
int currentMoney = money.getReference();//39
System.out.println(Thread.currentThread().getName() + "消费前时间戳" + money.getStamp() + "消费前余额" + money.getReference());
if (currentMoney > 20) {
System.out.println(Thread.currentThread().getName() + "进入消费时间戳" + money.getStamp() + "进入消费余额" + money.getReference());
if (money.compareAndSet(currentMoney, currentMoney - 20, timeStamp, timeStamp + 1)) {
System.out.println(Thread.currentThread().getName() + "消费者成功消费20元,余额" + money.getReference() + "消费后时间戳" + money.getStamp());
break;
}
} else {
System.out.println(Thread.currentThread().getName() + "没有足够的金额");
break;
}
}
}
}
//充值线程类
static class ProviderThread implements Runnable {
public ProviderThread() {
}
@Override
public void run() {
while (true) {
int value = amount.get();
System.out.println(Thread.currentThread().getName() + "充值前余额" + amount.get());
if (value < 20) {
if (amount.compareAndSet(value, value + 20)) {
System.out.println(Thread.currentThread().getName() + "余额小于20元,充值成功,目前余额:" + amount.get() + "元");
break;
}
} else {
System.out.println(Thread.currentThread().getName() + "余额大于20元,无需充值");
break;
}
}
}
}
//消费线程类
static class ConsumerThread implements Runnable {
public ConsumerThread() {
}
@Override
public void run() {
while (true) {
int currentValue = amount.get();
System.out.println(Thread.currentThread().getName() + "消费前余额" + amount.get());
if (currentValue > 20) {
System.out.println(Thread.currentThread().getName() + "进入消费余额" + amount.get());
if (amount.compareAndSet(currentValue, currentValue - 20)) {
{
System.out.println(Thread.currentThread().getName() + "消费者成功消费20元,余额" + amount.get());
break;
}
} else {
System.out.println(Thread.currentThread().getName() + "没有足够的金额");
break;
}
} else {
System.out.println(Thread.currentThread().getName() + "余额不足");
break;
}
}
}
}
}
使用AtomicStampedReference(版本号)解决ABA问题后如下图
ProviderStampedThread充值线程类在操作数据时,必须保证其时间戳Stamp为0,才可以修改Reference值和Stamp时间戳
多线程增加20余额时,同时有消费线程做扣减。 add线程0通过CAS增加余额,consume线程0通过CAS减少余额后,add线程1由于此时Stamp不为0,所以无法通过CAS增加余额。
消费线程修改值失败,其他增加值的线程同样也是失败。
代码如下
取消注释,main函数如下
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
provideByStamped();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
consumeByStamped();
}
}).start();
/*new Thread(new Runnable() {
@Override
public void run() {
provide();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
consume();
}
}).start();*/
}
写到这里,收获非常多,也熟悉了CAS的相关操作和相关类。但依然还有许多不清晰的地方或者理解不正确的地方,还请大佬们多讨论指点,后续了解之后再修改补充!