Java并发编程—J.U.C下各种Atomic类使用及CAS相关源码分析

2020/2/20 17:01:30

本文主要是介绍Java并发编程—J.U.C下各种Atomic类使用及CAS相关源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Java并发编程

代码GitHub地址 github.com/imyiren/con…

  1. 刨根问底搞懂创建线程到底有几种方法?
  2. 如何正确得启动和停止一个线程 最佳实践与源码分析
  3. 多案例理解Object的wait,notify,notifyAll与Thread的sleep,yield,join等方法
  4. 了解线程属性,如何处理子线程异常
  5. 多线程安全和性能问题
  6. JMM(Java内存模型)在并发中的原理与应用
  7. 深入理解死锁问题及其解决方案
  8. 剖析线程池的使用与组成
  9. 带你一文搞懂ThreadLocal的用法以及内部原理
  10. J.U.C下Lock的分类及特点详解(结合案例和源码)
  11. J.U.C下各种Atomic类使用及CAS相关源码分析

0. 主要内容

  1. JUC下原子类分类介绍以及使用案例
  2. AtomicIneteger为例分析其源码,解释CAS的原理(重点)

1. J.U.C下的原子类

1.1 原子类介绍

  • 原子类,对象的数据操作不可分割的。具有原子性
  • 作用:原子类的作用和锁类似,是为了保证并发情况下线程的安全问题,不过原子类相比于锁,有一定优势:
    1. 粒度更细:原子类把竞争范围缩小到了变量级别
    2. 效率较高:通常情况下更高,但是高度竞争的情况下效率更低
  • J.U.C下的原子类,大都由CAS实现(最后会有源码分析)

1.2 各种原子类的分类

分类 类型
Atomic*基本类型 如:AtomicIntegerAtomicBoolean
Atomic*Array数组类型 如:AtomicLongrArrayAtomicReferenceArray
Atomic*FieldUpdate升级类型 如:AtomicIntegerFieldUpdate
Adder累加器 如:LongAdderDoubleAdder
Accumulator累加器 如:LongAccumulatorDoubleAccumulator

1.3 `Atomic*基本类型原子类

  • 以AtomicInteger为例
  1. 常用方法
    • int get() 获取当前值
    • int getAndSet(int) 获取当前值并设置新的值
    • int getAndIncrement() 获取当前值并自增
    • int incrementAndGet() 获取自增后的值 (同一个方法相比,get在前就获取自增前的值,get在后就获取自增后的值)
    • int getAndDecrement() 获取当前值并自减
    • int getAndAdd(int) 获取当前值并加上一个值
    • boolean compareAndSet(int expect, int update) 判断当前值是否符合预期值expect,如果符合就设置更新值update
  2. 使用示例
/**
 * 使用AtomicInteger 对比 非原子类,演示线程安全问题
 *
 * @author yiren
 */
public class AtomicIntegerExample01 {
    private static AtomicInteger atomicInteger = new AtomicInteger();
    private static volatile Integer count = 0;

    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = () -> {
            for (int i = 0; i < 10000; i++) {
                atomicInteger.getAndIncrement();
                count++;
            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println("atomicInteger=" + atomicInteger);
        System.out.println("count=" + count);
    }
}
复制代码
atomicInteger=20000
count=13409

Process finished with exit code 0
复制代码
  • 我们可以看到此时AtomicInteger结果是正确的

  • Integer的结果则不正确,如果要保证线程安全,则需要加锁

  • 只要在线程中不多次调用,都可以保证线程安全,单个方法原子类都是保证线程安全的

  • 注意:原子操作+原子操作!= 原子操作

1.4 Atomic*Array数组类型分析

  • AtomicIntegerArray为例

  • 数组类型的时候,它会保证每个元素的的操作都是线程安全的

  • AtomicIntegerArray的方法和AtomicInteger的方法都类似,不过AtomicIntegerArray的方法需要指定数组的index

  • 代码演示:

/**
 * @author yiren
 */
public class AtomicIntegerArrayExample {
    private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);

    public static void main(String[] args) throws InterruptedException {
        Runnable incrRunnable = () -> {
            for (int i = 0; i < atomicIntegerArray.length(); i++) {
                atomicIntegerArray.incrementAndGet(i);
            }
        };
        Runnable decrRunnable = () -> {
            for (int i = 0; i < atomicIntegerArray.length(); i++) {
                atomicIntegerArray.decrementAndGet(i);
            }
        };

        ExecutorService executorService = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(incrRunnable);
        }
        for (int i = 0; i < 1000; i++) {
            executorService.execute(decrRunnable);
        }
        TimeUnit.SECONDS.sleep(5);
        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            System.out.print(atomicIntegerArray.get(i) + " ");
        }
    }
}
复制代码
0 0 0 0 0 0 0 0 0 0
复制代码

1.5 AtomicReference应用类型分析

  • AtomicReference类的作用,和AtomicInteger并没有太多区别,只是作用对象变了,AtomicInteger是保证一个整数的原子性,而AtomicReference是让一个对象保证原子性
  • AtomicReference会比AtomicInteger更强大,因为对象中会包含很多属性,用法是类似的
  • 类对象的方法

  • 案例
/**
 * @author yiren
 */
public class SpinLock {
    private static AtomicReference<Thread> sign = new AtomicReference<>();

    private static void lock() {
        Thread current = Thread.currentThread();
        while (!sign.compareAndSet(null, current)) {
            System.out.println("fail to set!");
        }
    }

    private static void unlock() {
        Thread thread = Thread.currentThread();
        sign.compareAndSet(thread, null);
    }

    public static void main(String[] args) {
        Runnable runnable = () -> {
            System.out.println("start to get lock");
            SpinLock.lock();
            System.out.println("got lock successfully!");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                SpinLock.unlock();
            }
        };

        Thread thread = new Thread(runnable);
        Thread thread1 = new Thread(runnable);

        thread.start();
        thread1.start();
    }
}
复制代码
  • 我们利用AtomicReference来实现一个自旋锁,通过compareAndSet方法去先比较然后赋值来避免使用锁

1.6 封装普通类型成原子类

  • AtomicIntegerFieldUpdater为例

  • AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");我们创建对象时候,需要指定目标类,以及属性。

  • 并且在操作的时候,需要传入操作的对象

  • 为什么会需要用这样的方式呢?而不直接在原有对象上修改?

    • 如果我们在编码中,仅有极少时候才会用到原子性操作,如果在原有对象直接使用原子类就十分浪费性能了
    • 此外,我们在使用别人定义的类的时候,有这样的需求,但是别人没有这样的需求,我们也不能破坏人家的定义,这个时候AtomicIntegerFieldUpdater的使用,就不会对原有类进行侵入式破坏了。
  • 注意:这个类不支持static修饰的变量

  • 案例如下:

/**
 * @author yiren
 */
public class AtomicFieldUpdaterExample {
    private static Counter one = new Counter();
    private static Counter two = new Counter();
    private static AtomicIntegerFieldUpdater<Counter> updater = AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");


    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = () -> {
            for (int i = 0; i < 10000; i++) {
                one.count++;
                updater.getAndIncrement(two);
            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println("one.count = " + one.count);
        System.out.println("two.count = " + two.count);
    }

    private static class Counter {
        volatile int count;
    }
}
复制代码
one.count = 18417
two.count = 20000

Process finished with exit code 0
复制代码
  • 可以看出,升级过后,原有的数据操作线程安全了

1.7 Adder累加器

  • 以LongAdder为例

  • LongAdder是Java8引入的新类,高并发下LongAdderAtomicLong的效率高,其本事是利用了空间换时间

  • 它其实是利用了分段锁技术,LongAdder把不同线程对应到不同的Cell上进行修改,降低了冲突的概率,提高了并发性能。

  1. 代码演示:对比AtomicLongLongAdder
/**
 * @author yiren
 */
public class AtomicLongExample {
    public static void main(String[] args) {
        AtomicLong counter = new AtomicLong();
        ExecutorService executorService = Executors.newFixedThreadPool(16);
        Runnable task = () -> {
            for (int i = 0; i < 10000; i++) {
                counter.incrementAndGet();
            }
        };
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            executorService.execute(task);
        }
        executorService.shutdown();
        while (!executorService.isTerminated()) {
        }
        long end = System.currentTimeMillis();
        System.out.println("end-start=" + (end - start)+ "ms");

    }
}
复制代码
end-start=2140ms

Process finished with exit code 0
复制代码
/**
 * @author yiren
 */
public class LongAdderExample {
    public static void main(String[] args) {
        LongAdder counter = new LongAdder();
        ExecutorService executorService = Executors.newFixedThreadPool(16);
        Runnable task = () -> {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        };
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            executorService.execute(task);
        }
        executorService.shutdown();
        while (!executorService.isTerminated()) {
        }
        long end = System.currentTimeMillis();
        System.out.println("end-start=" + (end - start)+ "ms");

    }
}
复制代码
end-start=157ms

Process finished with exit code 0
复制代码
  • 我们从上面可以看出,我本地机器是i7处理器,两个程序唯一差别就是使用的原子类相差了10多倍。LongAdder明显比AtomicLong更快
  1. 为什么差距这么大?
  • AtomicLong的操作,首先每个线程操作完后,会需要把线程本地内存的数据刷到主内存,然后另外一个线程还得从主内存中刷新新的数据。
  • LongAdder不需要这样做,LongAdder在每个线程都会有自己的一个计数器,仅仅用来在自己的线程内计数,这样一来就不会和其他线程的计数器干扰。
  • LongAdder引入的是分段累加的概念,内部有一个base变量和一个Cell[] cells数组共同参与计算
    • base:竞争不激烈就直接累加到该变量
    • cells: 竞争激烈的时候,各个线程就分散累加到自己的cells[i]
  1. 适用场景
  • AtomicLong:在竞争低的情况加和LongAdder相似,但是它具有CAS方法,可以提供更多的功能
  • LongAdder:在并发高的情况下有明显优势,但是只适用于统计求和计数的场景,有一定的局限性

1.8 Accumulator累加器

  • LongAccumulator为例

  • 基本用法

/**
 * @author yiren
 */
public class AccumulatorExample {
    public static void main(String[] args) {
        // 累加 :此处的(left, right) -> left + right 可以替换成 Long::sum
        // left=3
        LongAccumulator longAccumulator = new LongAccumulator((left, right) -> left + right, 3);
        // left=3+right=3+2=5
        longAccumulator.accumulate(2);
        // left=5+right=5+3=8
        longAccumulator.accumulate(3);
        System.out.println(longAccumulator.getThenReset());
        // left=3
        LongAccumulator longAccumulator1 = new LongAccumulator((left, right) -> left - right, 3);
        // left=3-right=3-2=1
        longAccumulator1.accumulate(2);
        // left=1-right=1-3=-2
        longAccumulator1.accumulate(3);
        System.out.println(longAccumulator1.getThenReset());
        // 求最大值
        LongAccumulator longAccumulator2 = new LongAccumulator(Math::max, -1);
        longAccumulator2.accumulate(14);
        longAccumulator2.accumulate(3);
        System.out.println(longAccumulator2.getThenReset());

    }
}
复制代码
8
-2
14

Process finished with exit code 0
复制代码
  • 详细过程可以看注释
  • 有人或许会觉得这个麻烦。这个只是单线程,原子类是保证多线程操作的。也就是说我们可以在不同的线程直接调用
/**
 * @author yiren
 */
public class AccumulatorExample01 {
    public static void main(String[] args) {
        LongAccumulator accumulator = new LongAccumulator((left, right) -> {
            long y = left;
            long x = right;
            return x + y;
        }, 0);

        ExecutorService executorService = Executors.newFixedThreadPool(100);
        IntStream.range(1, 100).forEach(item -> executorService.execute(() -> accumulator.accumulate(item)));
        executorService.shutdown();
        while (!executorService.isTerminated()) {

        }
        System.out.println(accumulator.get());
    }
}
复制代码
4950

Process finished with exit code 0
复制代码
  • 使用场景:
    • 需要并行计算,数据量大的
    • 没有顺序要求的

2. CAS原理

2.1 CAS是什么?

  • 用在并发场景的一种算法,全称是compare and swap

  • 在操作数据的时候,它会认为数据的值为A,如果是它就会把他改成B,如果不是A,就说明被别人改了,它就不改了。以避免多线程修改数据导致出错。

  • CAS有三个值。内存值V、预期值A、要修改的值B,当且仅当A==V时,才能将内存值改为B,否则什么都不做。最后返回当前的V

  • CAS在现代处理器中,是有特殊指令可以实现的,而JVM在实现的也会利用汇编指令: cmpxchg

2.2 案例演示

  • CAS的等价代码:
/**
 * @author yiren
 */
public class CasExample {
    private static volatile int value;

    public static synchronized int compareAndSwap(int expect, int target) {
        int oldValue = value;
        if (expect == oldValue) {
            value = target;
        }
        return value;
    }

    public static void main(String[] args) throws InterruptedException {
        value = 0;
        Runnable runnable = () -> {
            compareAndSwap(0, 1);
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);

        thread1.start();
        thread2.start();

        thread1.join();
        thread2.join();

        System.out.println(value);
    }
}
复制代码

2.3 应用场景

  • 在J.U.C下,大量使用CAS来实现,比如上面所说的原子类
  • 乐观锁实现也是使用CAS原理
  • 并发容器(JDK8 ConcurrentHashMap

2.4 源码分析CAS

  • 以原子类AtomicInteger的源码为案例
    • 在AtomicInteger中使用Unsafe工具来直接操作内存数据
    • 用Unsafe来实现底层操作
    • 用volatile来修饰value字段,保证可见性
  • Unsafe类:它是CAS实现的核心类,Java无法直接访问底层操作系统,而是通过本地native方法来访问,不过JVM还是提供了一个途径,JDK中的Unsafe类,它就提供了硬件级别的原子操作.
  • 以下是AtomicInteger中的属性定义:
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
	
    private volatile int value;
复制代码
  • 其中value是我们主要保存数据的属性;而valueOffset则表示变量value值在内存地址中当前对象的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的原值的,这样我们就能通过unsafe来实现CAS了

  • 我们看一下AtomicInteger的具体的操作方法

   public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }
复制代码
  • 可以看出里面主要相关的方法就是unsafecompareAndSwapIntgetAndAddInt且每个方法调用都传入了当前对象、value的偏移地址、和操作数

  • 我们看下int getAndAddInt(Object var1, long var2, int var4)方法

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }
复制代码
  • var1是当前AtomicInteger的对象,var2是value的偏移地址,通过偏移地址用UnsafegetIntVolatile获取到当前AtomicInteger对象的value值,然后调用UnsafecompareAndSwapInt方法来做CAS。
  • 看下compareAndSwapInt的定义
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
复制代码
  • 它是一个native方法,实际调用的JVM的C++实现的方法,而在C++代码中又调用的是Atomic::cmpxchg,而在现代处理器中,实际是可以对应汇编指令集中的 比较并交换指令CMPXCHG

2.5 CAS的缺点

  1. ABA问题

比如

  • 初始值为0,线程1将它改成1,然后又将它改回0
  • 线程2在线程1修改成1之前拿到了0,然后又在线程1改回0后去比较。
  • 此时线程2就会修改成功,但是线程2并不知道线程1对里面的数进行过修改
  • 对于此,可以使用版本号来解决 比如1A->2B->3A-4B这样,每个操作都有一个版本号作为记录。在比较的时候就是用版本号去比较
  1. 自旋时间过长
  • 并发高的情况下,就容易增加自旋时间。

  • 觉得可以就点个赞吧?? Thanks!

关于我

  • 坐标杭州,普通本科高校计算机科学与技术专业。
  • 20年毕业,主做Java技术栈后端开发。
  • GitHub: github.com/imyiren
  • Blog : imyi.ren


这篇关于Java并发编程—J.U.C下各种Atomic类使用及CAS相关源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程