面试常见基础题 - CAS | Eddie'Blog
面试常见基础题 - CAS

面试常见基础题 - CAS

eddie 454 2021-08-04

[toc]

目录

CAS理论 (百度理论)

CAS的意思是比较与交换(Compare And Swap),它是乐观锁的一种实现机制。

什么是乐观锁?通俗的来说就是它比较乐观,每次在修改变量的值之前不认为别的线程会修改变量,每次都会尝试去获得锁,如果获取失败了,它也会一直等待,直到获取锁为止。说白了,它就是打不死的小强。

而悲观锁呢,顾名思义,就比较悲观了,每次在修改变量前都会认为别人会动这个变量,所以它会把变量锁起来,独占,直到自己修改完毕才会释放锁。说白了,就是比较自私,把好东西藏起来自己偷偷享用,完事了再拿出来给别人。悲观锁会因线程一直阻塞导致系统上下文切换,系统的性能开销大。像之前的synchronized关键字就是悲观锁的一种实现。

CAS是一种无锁原子算法,它的操作包括三个操作数:需要读写的内存位置(V)、预期原值(A)、新值(B)。仅当 V值等于A值时,才会将V的值设为B,如果V值和A值不同,则说明已经有其他线程做了更新,则当前线程则什么都不做。最后,CAS 返回当前V的真实值。CAS 操作时抱着乐观的态度进行的,它总是认为自己可以成功完成操作。

当多个线程同时使用CAS 操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会挂起,仅是被告知失败,并且允许再次尝试,当然也允许实现的线程放弃操作。基于这样的原理,CAS 操作即使没有锁,也可以发现其他线程对当前线程的干扰。

与锁相比,使用CAS会使程序看起来更加复杂一些,但由于其非阻塞的,它对死锁问题天生免疫,并且,线程间的相互影响也非常小。更为重要的是,使用无锁的方式完全没有锁竞争带来的系统开销,也没有线程间频繁调度带来的开销,因此,他要比基于锁的方式拥有更优越的性能。




查看通俗易懂例子

private static int num_original = 0;

private void original() throws InterruptedException {
    long start = System.currentTimeMillis();
    CountDownLatch countDownLatch = new CountDownLatch(1000000);
    ThreadPoolExecutor executor = getThreadPoolExecutor();
    for (int i = 0; i < 100; i++) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    num_original++;
                    countDownLatch.countDown();
                }
            }
        });
    }
    countDownLatch.await();
    System.err.println("Original: " + num_original);
    executor.shutdown();
    long end = System.currentTimeMillis();
    System.err.println("Original Time:" + (end - start) + "毫秒\r\n");
}

按道理创建100个线程,每个线程共享 num_original 放入线程池进行累加10000次,如果按理想应该最终是100*10000=1000000,But 在多线程中程序是并发执行的。比如在多线程并发下,A和B都同时拿到 num_original 值,那么都分别累加 1,在没有做任何同步操作情况下,A和B它们都把自己累加后的 num_original 值,写入主内存。所以达不到 1000000 的结果输出:

Original: 967763
Original Time:64毫秒

在这里,说明一下 volatile 是没法解决线程之间的原子性,它只是提供可见性
参考文章:volatile不具有原子性的理解之解读i++疑惑




线程安全的方案(从繁到简)


### Synchronized 关键字保证线程安全
private static int num_sync = 0;

private void syncTest() throws InterruptedException {
    long start = System.currentTimeMillis();
    // countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行
    // 设置栅栏
    CountDownLatch countDownLatch = new CountDownLatch(1000000);
    ThreadPoolExecutor executor = getThreadPoolExecutor();
    for (int i = 0; i < 100; i++) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    // 当前类的 class 对象锁
                    synchronized (CasTest.class) {
                        num_sync++;
//                        System.out.println(num++);
                    }
                    countDownLatch.countDown();
                }
            }
        });
    }
    countDownLatch.await();
    System.err.println("SyncTest:" + num_sync);
    executor.shutdown();
    long end = System.currentTimeMillis();
    System.err.println("SyncTest Time:" + (end - start) + "毫秒\r\n");
}

在这段简单同步代码中,给当前类的 class 对象锁,保证了线程安全同步执行,在JDK1.6之后进行了优化,拥有了无锁->偏向锁->轻量级锁->重量级锁的升级过程,所以,在并发量庞大情况下,效率极低。synchronized无法知道当前线程锁状态,出现异常情况就会自动释放锁,但如果并发量庞大造成阻塞,它没办法释放锁:

从运行结果,很明显是线程安全的

SyncTest:1000000

SyncTest Time:70毫秒




ReentrantLock 实现线程安全 (独占锁)

  • 它是 java.util.concurrent.locks 下的类:
    • ReentrantLock 表现为 API 层面的互斥锁(lock() 和 unlock() 方法配合 try/finally 语句块来完成)
private static int num_lock = 0;

private void lockTest() throws InterruptedException {
    long start = System.currentTimeMillis();
    Lock lock = new ReentrantLock();
    CountDownLatch countDownLatch = new CountDownLatch(1000000);
    ThreadPoolExecutor executor = getThreadPoolExecutor();
    for (int i = 0; i < 100; i++) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    try {
                        lock.lock();
                        num_lock++;
                    } finally {
                        lock.unlock();
                    }
                    countDownLatch.countDown();
                }
            }
        });
    }
    countDownLatch.await();
    System.err.println("LockTest:" + num_lock);
    executor.shutdown();
    long end = System.currentTimeMillis();
    System.err.println("SyncTest Time:" + (end - start) + "毫秒\r\n");
}

LockTest:1000000

SyncTest Time:38毫秒

ReentrantLock 源码

在调用 Lock lock = new ReentrantLock() 的时候,它默认创建一个非公平锁

/**
 * Creates an instance of {@code ReentrantLock}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
  * Sync object for non-fair locks
  * 非公平锁的同步实现
  */
 static final class NonfairSync extends Sync {
     private static final long serialVersionUID = 7316153563782823691L;

     /**
      * Performs lock.  Try immediate barge, backing up to normal
      * 执行锁定。尝试立即停止,恢复正常
      * acquire on failure.
      * 失败时获得
      */
     final void lock() {
     	 // java.util.concurrent.locks.AbstractQueuedSynchronizer#compareAndSetState
     	 // CAS: 获取同步状态
         if (compareAndSetState(0, 1))
         	 // 获取 true 状态,将独占锁线程设置为当前线程
             setExclusiveOwnerThread(Thread.currentThread());
         else
         	 // 获取 false 状态,将再次请求同步状态
             acquire(1);
     }
	// 省略...
 }

进入lock()方法,首先执行CAS操作(compareAndSetState(0, 1)),尝试把状态(state)从0变成1,如果返回true就代表获取同步状态成功,当前线程获取锁。如果返回状态(state)是false,表示已有线程持有该同步状态(数值=1)获取锁失败。

在高并发的场景下,可能同时存在多个线程设置状态(state),因此CAS操作保证了状态(state)的原子性,返回false后,就执行acquire(1)方法,该方法是 AbstractQueuedSynchronizer (简称AQS)中的方法

 /**
  * Acquires in exclusive mode, ignoring interrupts.  Implemented
  * by invoking at least once {@link #tryAcquire},
  * returning on success.  Otherwise the thread is queued, possibly
  * repeatedly blocking and unblocking, invoking {@link
  * #tryAcquire} until success.  This method can be used
  * to implement method {@link Lock#lock}.
  *
  * @param arg the acquire argument.  This value is conveyed to
  *        {@link #tryAcquire} but is otherwise uninterpreted and
  *        can represent anything you like.
  */
 public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }
 /**
  * Attempts to acquire in exclusive mode. This method should query
  * if the state of the object permits it to be acquired in the
  * exclusive mode, and if so to acquire it.
  *
  * <p>This method is always invoked by the thread performing
  * acquire.  If this method reports failure, the acquire method
  * may queue the thread, if it is not already queued, until it is
  * signalled by a release from some other thread. This can be used
  * to implement method {@link Lock#tryLock()}.
  *
  * <p>The default
  * implementation throws {@link UnsupportedOperationException}.
  */
 protected boolean tryAcquire(int arg) {
     throw new UnsupportedOperationException();
 }

以独占模式获取,忽略中断。 通过至少调用一次 {@link #tryAcquire} 实现,成功返回。否则线程将排队,可能重复阻塞和解除阻塞。从字面意思就是传入的 arg 标识获取同步状态的(state)后的值,因为要获取锁,而state=0时是释放锁,state=1则是获取锁,所以这里传入 acquire(1)

详细底层说明

总之ReentrantLock的底层是通过CAS+ AQS来实现的,在高并发场景下,ReentrantLock 与 synchronized 的取舍,在并发的数量多时候使用 ReentrantLock ,在并发数量小的时候使用 synchronized

lock() 和 unlock() 方法配合 try/finally 语句块来完成




CAS的实现

  • java.util.concurrent.atomic
    • AtomicBoolean
    • AtomicInteger
    • AtomicIntegerArray
    • AtomicIntegerFieldUpdater
    • AtomicLong
    • AtomicLongArray
    • AtomicLongFieldUpdater
    • AtomicMarkableReference
    • AtomicReference
    • AtomicReferenceArray
    • AtomicReferenceFieldUpdater
    • AtomicStampedReference
    • DoubleAccumulator
    • DoubleAdder
    • LongAccumulator
    • LongAdder
    • Striped64

使用 LongAdder 类

private void longAdderTest() throws InterruptedException {
    long start = System.currentTimeMillis();
    LongAdder la = new LongAdder();
    CountDownLatch countDownLatch = new CountDownLatch(1000000);
    ThreadPoolExecutor executor = getThreadPoolExecutor();
    for (int i = 0; i < 100; i++) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    la.increment();
                    countDownLatch.countDown();
                }
            }
        });
    }
    countDownLatch.await();
    System.err.println("LongAdderTest:" + la.intValue());
    executor.shutdown();
    long end = System.currentTimeMillis();
    System.err.println("LongAdderTest Time:" + (end - start) + "毫秒\r\n");
}

JDK8后追加的类,底层也是CAS机制实现,适合鱼高并发场景下, 特别是写多读少的场景,相对于Atomic开头的类性能更好, 代价就是CPU消耗更多,用性能换取时间。

LongAdderTest:1000000

LongAdderTest Time:72毫秒



使用 AtomicInteger 类

private void atomicTest() throws InterruptedException {
    long start = System.currentTimeMillis();
    // java.util.concurrent.atomic 还有其他的CAS实现类
    AtomicInteger ai = new AtomicInteger();
    // countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行
    // 设置栅栏
    CountDownLatch countDownLatch = new CountDownLatch(1000000);
    ThreadPoolExecutor executor = getThreadPoolExecutor();
    for (int i = 0; i < 100; i++) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    // 以原子方式将给定值添加到当前值
                    ai.incrementAndGet();
                    countDownLatch.countDown();
                }
            }
        });
    }
    countDownLatch.await();
    System.err.println("AtomicTest:" + ai.get());
    executor.shutdown();
    long end = System.currentTimeMillis();
    System.err.println("AtomicTest Time:" + (end - start) + "毫秒\r\n");
}

java.util.concurrent.atomic 包下的类底层是基于CAS的乐观锁实现,CAS是一种无锁技术,相对于 synchronized 效率更高:

AtomicTest:1000000

AtomicTest Time:62毫秒

AtomicInteger 源码

位于 java.util.concurrent.atomic 包下的 AtomicInteger 类:

/**
* AtomicInteger类内部通过, 设置使用Unsafe.compareAndSwapInt进行更新
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();

AtomicInteger 类全局就一个 value 变量,使用get()对外提供,这个变量使用了 volatile 修饰符,保证了线程之间的可见性。

private volatile int value;

/**
* Creates a new AtomicInteger with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
   value = initialValue;
}

/**
* Creates a new AtomicInteger with initial value {@code 0}.
*/
public AtomicInteger() {
}

/**
* Gets the current value.
*
* @return the current value
*/
public final int get() {
   return value;
}

在上面的例子中,调用了 ai.getAndAdd(1) 或 ai.incrementAndGet() 的方法Debug下去看到 compareAndSet 方法

/**
 * java.util.concurrent.atomic.AtomicInteger#incrementAndGet
 * 
 * Atomically increments by one the current value. (原子地将当前值加1)
 */
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

/**
 * sun.misc.Unsafe#getAndAddInt
 */
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

/**
 * sun.misc.Unsafe#compareAndSwapInt
 */
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

Debug之后会发现, incrementAndGet() 其实最终通过 compareAndSwapInt 调用底层的机器指令不断比较内存旧值和期望的值,如果比较返回 false 就继续循环比较,如果返回 true 就当前的新值赋予给内存里面的值。

CAS的缺点

  • ABA问题
  • 性能问题

什么是ABA

在多线程场景下CAS会出现ABA问题,关于ABA问题这里简单科普下,例如有2个线程同时对同一个值(初始值为A)进行CAS操作,这三个线程如下:

线程1,期望值为A,欲更新的值为B
线程2,期望值为A,欲更新的值为B
线程3,期望值为B,欲更新的值为A

线程1抢先获得CPU时间片,而线程2因为其他原因阻塞了,线程1取值与期望的A值比较,发现相等然后将值更新为B,然后这个时候出现了线程3,期望值为B,欲更新的值为A,线程3取值与期望的值B比较,发现相等则将值更新为A,此时线程2从阻塞中恢复,并且获得了CPU时间片,这时候线程2取值与期望的值A比较,发现相等则将值更新为B,虽然线程2也完成了操作,但是线程2并不知道值已经经过了A->B->A的变化过程。

解决ABA办法

  • 要解决ABA问题,可以增加一个版本号,当内存位置V的值每次被修改后,版本号都加1。
使用 AtomicStampedReference 解决
  • AtomicStampedReference内部维护了对象值和版本号,在创建AtomicStampedReference对象时,需要传入初始值和初始版本号;
  • 当AtomicStampedReference设置对象值时,对象值以及状态戳都必须满足期望值,写入才会成功。
 /**
  * @param expectedReference 期望值
  * @param newReference 新参考
  * @param expectedStamp 预期值
  * @param newStamp 新值
  * @return {@code true} if successful
  */
 private void atomicStampedReferenceTest() {
     long start = System.currentTimeMillis();
     AtomicStampedReference<Integer> asr = new AtomicStampedReference<Integer>(100, 1);
     ThreadPoolExecutor executor = getThreadPoolExecutor();
     executor.execute(() -> {
         System.err.println("线程1-初始版本号:" + asr.getStamp());
         try {
             TimeUnit.SECONDS.sleep(1);
         } catch (InterruptedException e) {
             e.printStackTrace();
             Thread.currentThread().interrupt();
         }
         asr.compareAndSet(100, 101, asr.getStamp(), asr.getStamp() + 1);
         asr.compareAndSet(101, 100, asr.getStamp(), asr.getStamp() + 1);
     });

     executor.execute(() -> {
         int stamp = asr.getStamp();
         System.err.println("线程2-初始版本号:" + stamp);
         try {
             TimeUnit.SECONDS.sleep(3);
         } catch (InterruptedException e) {
             e.printStackTrace();
             Thread.currentThread().interrupt();
         }
         System.err.println("最新版本号: " + asr.getStamp());
         System.err.println(asr.compareAndSet(100, 2021, stamp, asr.getStamp() + 1) + "\t当前值: " + asr.getReference());
     });

     executor.shutdown();
     long end = System.currentTimeMillis();
     System.err.println("atomicStampedReferenceTest Time:" + (end - start) + "毫秒\r\n");
 }

流程:

  1. 初始值100,初始版本号1 线程1和线程2同时拿到一样的初始版本号
  2. 线程1完成ABA操作,版本号递增到3
  3. 线程2完成CAS操作,最新版本号已经变成3,跟线程2之前拿到的版本号1不相等,操作失败

线程1-初始版本号:1
atomicStampedReferenceTest Time:58毫秒
线程2-初始版本号:1
最新版本号: 3
false 当前值: 100

使用 AtomicMarkableReference 解决
private void atomicMarkableReferenceTest() {
    long start = System.currentTimeMillis();
    AtomicMarkableReference<Integer> amr = new AtomicMarkableReference<Integer>(100, false);
    ThreadPoolExecutor executor = getThreadPoolExecutor();
    executor.execute(() -> {
        System.err.println("线程1-是否被动过:" + amr.isMarked());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        amr.compareAndSet(100, 101, amr.isMarked(), true);
        amr.compareAndSet(101, 100, amr.isMarked(), true);
    });

    executor.execute(() -> {
        boolean isMarked = amr.isMarked();
        System.err.println("线程2-是否被动过:" + isMarked);
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        System.err.println("是否更改过: " + amr.isMarked());
        System.err.println(amr.compareAndSet(100, 2021, isMarked, true) + "\t当前值: " + amr.getReference());
    });
    executor.shutdown();
    long end = System.currentTimeMillis();
    System.err.println("atomicMarkableReferenceTest Time:" + (end - start) + "毫秒\r\n");
}

使用boolean变量——表示引用变量是否被更改过

atomicMarkableReferenceTest Time:73毫秒

线程1-是否被动过:false

线程2-是否被动过:false

是否更改过: true

false 当前值: 100