ConcurrentHashMap的源码分析

2021/4/12 12:25:12

本文主要是介绍ConcurrentHashMap的源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

ConcurrentHashMap是 HashMap 的并发版本,它是线程安全的,并且在高并发的情境下,性能优于 HashMap 很多。

jdk 1.7 采用分段锁技术,整个 Hash 表被分成多个段,每个段中会对应一个 Segment 段锁,段与段之间可以并发访问,但是多线程想要操作同一个段是需要获取锁的。所有的 put,get,remove 等方法都是根据键的 hash 值对应到相应的段中,然后尝试获取锁进行访问。

jdk 1.8 取消了基于 Segment 的分段锁思想,改用 CAS + synchronized 控制并发操作,在某些方面提升了性能。并且追随 1.8 版本的 HashMap 底层实现,使用数组+链表+红黑树进行数据存储。本篇主要介绍 1.8 版本的 ConcurrentHashMap 的具体实现,有关其之前版本的实现情况,这里推荐几篇文章:

 

CAS和Unsafe类

 

 

ConcurrentHashMap的成员变量介绍(省略HashMap相同的参数)

     private static final int MIN_TRANSFER_STRIDE = 16;

    /**
     * The number of bits used for generation stamp in sizeCtl.
     * Must be at least 6 for 32bit arrays.
     */
    private static int RESIZE_STAMP_BITS = 16; 

     /**
     * 扩容的最大的线程数,65535
     */
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    
     /**
     * 扩容的最大的线程数,65535
     */
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    /**
     * The bit shift for recording size stamp in sizeCtl.
     */
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    /*
     * 节点的hash值类型
     */
    
    // forwarding nodes节点的hash值,只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动。
    static final int MOVED     = -1; 

    //树根节点的hash值
    static final int TREEBIN   = -2; // hash for roots of trees
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

    
     /*
     * Volatile类型的Node类型的table,默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方。
     */
   transient volatile Node<K,V>[] table;

    /**
     * 默认为null,扩容时新生成的数组,其大小为原数组的两倍。
     */
    private transient volatile Node<K,V>[] nextTable;
 
    /**
     * counterCells 数组未初始化,在没有线程争用时,将 size 的变化写入此字段
     * 初始化 counterCells 数组时,没有获取到 cellsBusy 锁,会再次尝试将 size 的变化写入此字段
     */
    private transient volatile long baseCount;

    /**
     *当前未初始化:
     * 	= 0  未指定初始容量
     * 	> 0  由指定的初始容量计算而来,再找最近的2的幂次方。
     * 		比如传入6,计算公式为6+6/2+1=10,最近的2的幂次方为16,所以sizeCtl就为16。
     * 初始化中:
     * 	= -1 //table正在初始化
     * 	= -N //N是int类型,分为两部分,高15位是指定容量标识,低16位表示并行扩容线程数+1,-(1+n)表示此时有n个线程正在共同完成数组的扩容操作。
     * 初始化完成:
     * 	=table.length * 0.75  扩容阈值调为table容量大小的0.75倍
     *
     */
    private transient volatile int sizeCtl;

    /**
     * The next table index (plus one) to split while resizing.
     */
    private transient volatile int transferIndex;

    /**
     * 用于同步 counterCells 数组结构修改的乐观锁资源
     */
    private transient volatile int cellsBusy;
 
    /**
     * counterCells 数组一旦初始化,size 的变化将不再尝试写入 baseCount
     * 可以将 size 的变化写入数组中的任意元素
     * 可扩容,长度保持为 2 的幂
     */
    private transient volatile CounterCell[] counterCells;

    /**
     * CounterCell 是 ConcurrentHashMap 的一个静态内部类。
     * baseCount和counterCells一起保存着整个哈希表中存储的所有的节点的个数总和。
     */    
    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

 

节点类型:

 

 

 

这是一个重要的属性,无论是初始化哈希表,还是扩容 rehash 的过程,都是需要依赖这个关键属性的。该属性有以下几种取值:

  • 0:默认值

  • -1:代表哈希表正在进行初始化

  • 大于0:相当于 HashMap 中的 threshold,表示阈值

  • 小于-1:代表有多个线程正在进行扩容

 

 

ConcurrentHashMap的构造方法

    /**
     * 无参构造,默认构建的table数组长16
     */
    public ConcurrentHashMap() {
    }

    /**
     * 指定容量,为2的n次方,此时设置sizeCtl=容量
     */
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

    /**
     * 指定元素,sizeCtl=16
     */
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

ConcurrentHashMap的节点类型,JDK1.8 中的 ConcurrentHashMap 对节点Node类中的共享变量使用Volatile关键字,保证多线程操作时,变量的可见行!

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val; //Volatile类型
        volatile Node<K,V> next; //Volatile类型
    }

Node类型的节点有4个子类,具体如下:

 

 

 

ConcurrentHashMap的put方法

     public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        //1. 判断key和value都不能为空,否则抛出异常
        if (key == null || value == null) throw new NullPointerException();

        //2. 计算key的hash值
        int hash = spread(key.hashCode());
    
       
        int binCount = 0;

        //3.遍历table数组
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;

            //4. 如果数组为空,则执行初始化操作(第一次循环)
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();

            //5. 数组不为空(第n次循环,n>1),Unsafe获取数组该索引位置的节点f,且f为null
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

                //5.1 节点为null,直接用CAS在该索引新增一个节点,结束put
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            
            //6. f不为null,判断该节点的类型是ForwardingNode 类型,如果是那就是说明有线程正在进行扩容操作
            else if ((fh = f.hash) == MOVED)
                //6.1 那么当前线程就进入协助扩容
                tab = helpTransfer(tab, f);

            //7. f不为null ,且该节点为普通节点,上锁(首节点对象)
            else {
                V oldVal = null;
    
                //8 上synchronized锁锁住头节点,拿到锁的线程执行下一步
                synchronized (f) {

                    //9 再次确定这个节点的确是数组中的这个头结点
                    if (tabAt(tab, i) == f) {

                        //10  头节点的hash>=0,该节点为链表节点
                        if (fh >= 0) {
                            binCount = 1;//设置binCount=1

                            //10.2 遍历节点,binCount++,binCount记录找到目标节点的链表索引
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;

                                //10.3 如果链表上的节点的key和hash值和插入的相等,替换value,结束
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }

                                //10.4 否则,将e的下个节点赋值给e,不为null继续循环
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {

                                    //10.5 下个节点为null,直接添加新节点到尾部,结束
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }

                        //11. 节点类型为红黑树节点
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;//设置binCount=2
                            
                            //调用putTreeVal找到目标红黑树节点p
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {

                                //如果目标节点p不为null,替换value结束
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //binCount != 0 说明向链表或者红黑树中添加或修改一个节点成功,
                //红黑树节点时候binCount =2
                //binCount  == 0 说明put操作将一个新节点添加成为某个桶的首节点    
                if (binCount != 0) {
                    //11 如果binCount且>8,则将i位置链表转成红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        //返回老节点
                        return oldVal;
                    break;
                }
            }
        }
        //12 元素个数(类似HashMap的size)增加1, binCount表示当前插入数据所在桶元素序号(1,2...), 根据条件判断是否扩容
        addCount(1L, binCount);
        return null;
    }

在put方法中,可以看出ConcurrentHashMap通过CAS和synchronized来保证线程安全的,当找到哈希桶上的节点为null,通过CAS增加一个节点,当找到哈希桶上的节点不为null,判断节点类型,如果节点类型是ForwardingNode 类型,说明此时正在扩容,当前线程协助扩容,否则通过synchronized锁住头节点,遍历节点,找到目标节点替换value或者在尾部新增一个节点。在put操作中,如果表没有初始化,则会有一个线程执行初始化操作,最后在添加完节点后,调用addCount方法增加元素个数(size),并且判断是否需要扩容。

初始化数组

sizeCtl 是一个ConcurrentHashMap的属性,使用了 volatile 关键字修饰保证并发的可见性,默认为0,用来控制table初始化和扩容操作,当使用非空参数构造完ConcurrentHashMap的时候会将sizeCtl设置成容量,初始化数组的时候,线程会通过Unsafe.compareAndSwapInt()方法(CAS)操作将sizeCtl设置成-1,其他线程判断sizeCtl的值如果是-1则让出CPU即可,初始化table完了,将sizeCtl设置成阈值。

 

 /**
     *当前未初始化:
     *     = 0  未指定初始容量
     *     > 0  由指定的初始容量计算而来,再找最近的2的幂次方。
     *         比如传入6,计算公式为6+6/2+1=10,最近的2的幂次方为16,所以sizeCtl就为16。
     * 初始化中:
     *     = -1 //table正在初始化
     *     = -N //N是int类型,分为两部分,高15位是指定容量标识,低16位表示并行扩容线程数+1,-(1+n)表示此时有n个线程正在共同完成数组的扩容操作。
     * 初始化完成:
     *     =table.length * 0.75  扩容阈值调为table容量大小的0.75倍
     *
     */

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        //1. 在table为空的情况下执行初始化操作
        while ((tab = table) == null || tab.length == 0) {

            //2. 如果一个线程发现sizeCtl<0,意味着有另外的线程执行CAS操作成功了,当前线程只需要让出CPU即可
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin

            //3. 需要初始化,CAS将sizectl修改成-1,有且只有要给线程成功
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//SIZECTL是sizeCtl的位置
                try {
                    //4. 再次确认数组为空
                    if ((tab = table) == null || tab.length == 0) {

                        //5. sc 大于零说明容量初始化map时已经指定容量,否则(使用无参构造的)使用默认容量16构造table
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")

                        //6. 初始化数组Node
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //7. 计算阈值,相当于n*0.75
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //8. 设置阈值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

addCount 

addCount方法主要有2个功能,其一就是将ConcurrentHashMap的元素+1,其二就是根据baseCount的值判断是否进行扩容,ConcurrentHashMap的元素个数的方法如下:

    
     /**
     * 计算ConcurrentHashMap的size 
     */
    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }


  final long sumCount() {
        
        CounterCell[] as = counterCells; CounterCell a;
        //1. baseCount赋值给sum
        long sum = baseCount;

        //2. 如果CounterCell数组不为空,将CounterCell元素的value值求和后累加sum
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }

        //返回sum
        return sum;
    }

    /**
     * CounterCell 对象,有个volatile的value属性 
     */
     @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

其一:如果线程安全的操作ConcurrenHashMap的size

可以看出ConcurrentHashMap的元素个数由baseCount和CounterCell数组(CounterCell[] as)的value的和累计而得,CounterCell含有volatile修饰的value属性,当多线程增加ConcurrentHashMap的元素个数时,首先会通过CAS操作修改baseCount+1,如果修改失败,每个线程会根据一个线程随机数&CounterCell数组长度-1(ThreadLocalRandom.getProbe() & as.length - 1)生成的索引值,通过CAS去操作CounterCell数组该索引值对应的CounterCell对象的value+1

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;

        //1.1.判断如果计数盒子counterCells数组不为空,直接执行第一步,增加map元素个数
        //1.2.判断如果技术盒子counterCells为空,则CAS执行baseCount+1(元素+1),CAS成功,执行第二步(增加map元素个数已成功),CAS失败,则执行第一步去增加map元素个数
       
         
         if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {


            //第一步:增加ConcurrentHashMap的元素个数
            //2.1 进入第一步该方法说明:counterCells数组不为空或者counterCells数组为空但是CAS baseCount+1失败
            CounterCell a; long v; int m;
            boolean uncontended = true;// CAS 数组元素时,有没有发生线程争用的标志


            //2.3 如果当前线程的数组元素非空,则尝试将 x 累加到对应数组元素
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

                //2.4 如果counterCells数组为空,执行fullAddCount方法,否则执行2.5
                //2.5 判断counterCells数组的元素为空,执行fullAddCount方法,否则执行2.6
                //2.6 当前线程随机数与counterCells数组-1的值对应的CounterCell为空,执行fullAddCount方法,否则执行2.7
                //2.7 CAS拿锁(cellValue 0,1)失败,则执行fullAddCount方法

                //2.8. 执行fullAddCount方法
                fullAddCount(x, uncontended);

                //2.9 如果调用过 fullAddCount,则当前线程一定不会协助扩容
                return;
            }

            //3. 如果check<1则啥都不做; binCount=1 说明是链表且替换了head节点的val值; 或者是数组单元是空的, 添加新的head; 就不做数组扩容的操作了;
            if (check <= 1)
                return;
            
            //4 用计数盒子保存所有总节点数量; 并返回总节点数量;
            s = sumCount();
        }




        //第二步  检查是否要扩容,如需则扩容
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;

            //3.1 此时sc=sizeCtl=阈值,s=baseCount + x,x=1,baseCount=元素数,s表示加入新元素后容量大小,表示此时的map的元素超过阈值且table不为空,length没有超过最大值
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {

                //3.2 
                int rs = resizeStamp(n);

                //3.3 sc小于0 表示已经有线程处理扩容
                if (sc < 0) {

                    //3.3.1 如果sizeCtl无符号右移16不等于rs,则标识符变化了
                    //3.3.2 如果sizeCtl = rs+1,表示扩容结束了,不再有线程进行扩容
                    //3.3.3 如果sizeCtl = rs + 65535,表示已经达到最大帮助线程数量,即65535
                    //3.3.4 如果转移下标transferIndex <=0,表示扩容结束
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;//直接结束

                    //3.4 有新线程参与扩容则sizeCtl加1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }

                //3.5 CAS将sizeCtl从阈值设置成负数,成功后扩容操作
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);

                //3.6 s=元素个数
                s = sumCount();
            }
        }
    }

fullAddCount方法

从上2.5、2.6、2.7的步骤中可以看出在如果以下3种情况下执行fullAddCount方法

  1. CounterCell数组为空
  2. 或者CounterCell数组不为空但是当前线程随机数与CounterCell数组-1的值对应的CounterCell对象为空
  3. 或者CounterCell数组不为空,且当前线程对应的CounterCell数组的索引上CounterCell对象也不为空,但是CAS拿锁(cellValue 0,1)失败

fullAddCount方法主要做了以下几件事

  1. 初始化CounterCell数组,初始容量为2
  2. 创建CounterCell对象,value为x将其放到CounterCell数组线程随机数对应的索引上
  3. 给CounterCell数组扩容或者再次尝试CAS将baseCount+1
private final void fullAddCount(long x, boolean wasUncontended) {
        int h;

        //1. 将当前线程随机数赋值给h,并且判断是否为0
        if ((h = ThreadLocalRandom.getProbe()) == 0) {

            //1.1 如果当前线程随机数为0,则初始化当前线程随机数,并将其赋值给h
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();

            //1.2 将wasUncontended 设置成true(wasUncontended 在addCount默认为true,若执行CAS拿锁(cellValue 0,1)且失败时wasUncontended 为false
            wasUncontended = true;
        }

        //2. 自旋操作
        boolean collide = false;                // 用来判断CounterCell数组是否扩容
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;

            //2.1 counterCells数组不为空(addCount中2.6或2.7进入,当前线程随机数与counterCells数组-1的值对应的CounterCell为空或CAS拿锁(cellValue 0,1)失败)
            if ((as = counterCells) != null && (n = as.length) > 0) {
                    
                //2.2 如果当前线程随机数与counterCells数组-1的值对应的CounterCell为空判断是2.7的情况
                if ((a = as[(n - 1) & h]) == null) {

                    //2.3判断锁cellBusy是否是0
                    if (cellsBusy == 0) {            // Try to attach new Cell

                        //2.4 创建CounterCell,value=x
                        CounterCell r = new CounterCell(x); // Optimistic create

                        //2.5如果上锁成功
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

                            //2.5.1 上锁成功,
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;

                                //2.5.2 当前数组counterCell不为空且该索引位置为空,则将创建的CounterCell对象放到该索引位置,然后设置created为true,后面退出循环
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                //2.5.3 释放锁
                                cellsBusy = 0;
                            }
                            if (created)
                                //2.5.3 成功了就退出循环,否则继续
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }

                    //2.6 如果上锁失败将collide设置成false
                    collide = false;
                }

                //2.7counterCells数组的索引位置对象不为空,执行下面
                //2.7.1 wasUncontended为false时候进入,且执行完后直接执行3
wasUncontended为false表示CounterCell数组不为空,且通过CAS修改CounterCell对象值时失败且当前线程随机数不为空,这时候会执行执行3,重新生成一个hash值生成一个索引然后再次循环

                else if (!wasUncontended)       // CAS already known to fail
                    //2.7.2 设置成true
                    wasUncontended = true;      // Continue after rehash

                //2.7.3 再次通过CAS修改CounterCell对象的value,成功跳出,否则2.7.4
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;

                //2.7.4 如果当前线程发现counterCells 数组发生变化,或者数组的容量超过CPU核心数,设置collide = false,下一次循环则执行2.7.5
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                
                //2.7.5 如果此时collide = false,则设置collide = true,下一次循环时候,collide = true时执行2.7.6 扩容CounterCell数组,执行2.7.6
                else if (!collide)
                    collide = true;
                //2.7.6 CAS拿锁,成功后给CounterCell数组双倍扩容且转移元素
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //3. 重新设置当前线程随机数
                h = ThreadLocalRandom.advanceProbe(h);
            }

            //4.如果CounterCell数组为空,则拿锁然后初始化一个容量为2的CounterCell的数组
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {

                        //4.1 数组没有发生变化的情况下,初始化CounterCell数组
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    //4.2释放锁
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //5. 如果上面2个操作都失败了,再次尝试CAS 将baseCount+1,成功break,否则自旋
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

注意

其二:检查是否需要扩容

addCount的第二部分就是判断是否需要扩容,代码如下:

private final void addCount(long x, int check) {
        //省略第一步:给集合元素+1


        //1. 根据check大小判断是需要扩容,这里的check就是put方法统计的baseCount
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;

            //2. 如果集合容量>sizeCtl(此时为阈值),且数组不为空,集合容量小于最大值,则扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {

                //3.根据数组长度得到一个标志符号
                int rs = resizeStamp(n);

                //4. 如果sc=sizeCtl<0,表示正在扩容中,尝试帮助扩容
                if (sc < 0) {

                    //5.协助扩容判断
                    //5.1 如果sizeCtl无符号右移16位不等于rs,则标志符号变化了,不扩容,break
                    //5.2 如果sizeCtl==rs+1,扩容已经结束,不扩容,break
                    //5.3 如果sizeCtl==rs+65535,扩容线程已达最大数,不扩容,break
                    //5.4 如果nextTable为空,扩容已经结束,不扩容,break
                    //5.5 如果转移下标transferIndex<= 0,扩容已经结束,不扩容,break

                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;

                    //6.满足扩容判断,执行CAS,将sizeCtl+1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        //6.1 扩容操作
                        transfer(tab, nt);
                }
                //7. 如果没在扩容,则将sizeCtl更新成 标志符左移16位+2,也就是一个负数
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    //7.1 扩容操作
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

协助扩容方法

在put的时候,如果目标key所在的链表节点不为空,且节点类型是ForwardingNode节点(f.hash == -1)意味有其它线程正在扩容,则一起进行扩容操作,源码如下:

tab = helpTransfer(tab, f);

其中ForwardingNode 节点类型的源码如下: 继承自 Node 结点,并且它唯一的构造函数将构建一个键,值,next 都为 null 的结点,反正它就是个标识,无需那些属性。但是 hash 值却为 MOVED。

static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

这个节点内部保存了 nextTable 引用,它指向一张 hash 表。在扩容操作中,我们需要对每个桶中的结点进行转移,如果某个桶结点中所有节点都已经转移完成了(已经被转移到新表 nextTable 中了),那么会在原 table 表的该位置挂上一个 ForwardingNode 结点,说明此桶已经完成迁移。所以,我们在 putVal 方法中遍历整个 hash 表的桶结点,如果遇到 hash 值等于 MOVED,说明已经有线程正在扩容 rehash 操作,整体上还未完成,不过我们要插入的桶的位置已经完成了所有节点的迁移。由于检测到当前哈希表正在扩容,于是让当前线程去协助扩容。方法如下:

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;

        //1. 如果table不为空且节点f的类型是转移类型,同时f的nextTable(新table)不为空
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {

            //2. 根据数组长度得到一个标志符号
            int rs = resizeStamp(tab.length);

            //3. 如果nextTab没有并发修改,且tab也没有被并发修改,同时sizeCtl<0,说明还在扩容
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {

                //4. 对sizeCtl的值进行分析
                //4.1 如果sizeCtl无符号右移16不等于rs,则标识符变化了
                //4.2 如果sizeCtl = rs+1,表示扩容结束了,不再有线程进行扩容
                //4.3 如果sizeCtl = rs + 65535,表示已经达到最大帮助线程数量,即65535
                //4.4 如果转移下标transferIndex <=0,表示扩容结束
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    //5. 满足以上任意一个条件,即停止扩容
                    break;

                //6. 如果都不是,则将sizeCtl +1,表示增加一个帮助线程帮助扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {

                    //7. 对数组进行转移,执行完结束循环
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

什么时候会扩容或着协助扩容,除了上面介绍的2个情况外,如下:

  • put()方法时会判断节点类型如果是ForwardingNode 节点类型则会协助扩容

  • 除了put()添加元素时会调用addCount(),内部检查sizeCtl看是否需要扩容。

在tryPresize()被调用也会有扩容操作,此方法被调用有两个调用点:

  1. put()方法链表转红黑树时如果table容量小于64(MIN_TREEIFY_CAPACITY),则会触发扩容。

  2. 调用putAll()之类一次性加入大量元素,会触发扩容。

具体代码如下:

     /**
     * 链表转红黑树
     */

    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            //1. 如果数组长度小于64,调用tryPresize双倍扩容
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
            //链表元素转红黑树,锁住头节点
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }

     /**
     * 插入大量元素
     */
    public void putAll(Map<? extends K, ? extends V> m) {
        //1.调用tryPresize扩容
        tryPresize(m.size());
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            putVal(e.getKey(), e.getValue(), false);
    }

tryPresize方法

private final void tryPresize(int size) {
        //1. 计算容量
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        //2. sizeCtl >=0 时候循环
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            //3. 判断表为空的情况下,进行初始化操作
            if (tab == null || (n = tab.length) == 0) {

                //3.1 数组长度为sc和c的最大值
                n = (sc > c) ? sc : c;

                //3.2 cas将sizeCtl设置成-1表示初始化操作,有且只有一个线程初始化
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            //3.3 初始化table
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            //3.4求出阈值
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        //3.5阈值赋值给sizeCtl
                        sizeCtl = sc;
                    }
                }
            }

            //4。数组不为空,判断数组长度如果小于阈值,或者超过最大容量,则break 
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;

            //5. 在数组没有发生变化的情况下执行扩容判断
            else if (tab == table) {
                //5.1 根据容量求出标志符号
                int rs = resizeStamp(n);

                //5.2 判断是否有其他线程在扩容,5个判断如上
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        //5.3 不满足扩容条件,直接break
                        break;

                    //5.4 尝试将sizeCtl+1成功后扩容操作,扩容线程+1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }

                //5.5 判断没有有其他线程在扩容,该线程扩容,将sizeCtl变成负数
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

扩容转移

 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

 

Get方法

get(key)方法比较简单,不涉及并发,直接取值即可

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //1. 根据传入的key求出hash值
        int h = spread(key.hashCode());
        //2. 如果table不为空,且该hash值对应的节点(首节点)不为空
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {

            //3.首节点的hash值和key相等,返回首节点value
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //4.首节点的hash值小于0(红黑树结构),直接遍历,找到目标节点返回value,否则返回null
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;

            //5.否则首节点是链表节点,遍历链表,找到目标节点返回value,否则返回null
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

总结

 

在 JDK1.7 中,ConcurrentHashMap 采用了分段锁策略,将一个 HashMap 切割成 Segment 数组,其中 Segment 可以看成一个 HashMap, 不同点是 Segment 继承自 ReentrantLock,在操作的时候给 Segment 赋予了一个对象锁,从而保证多线程环境下并发操作安全。

但是 JDK1.7 中,HashMap 容易因为冲突链表过长,造成查询效率低,所以在 JDK1.8 中,HashMap 引入了红黑树特性,当冲突链表长度大于 8 时,会将链表转化成红黑二叉树结构。

在 JDK1.8 中,与此对应的 ConcurrentHashMap 也是采用了与 HashMap 类似的存储结构,但是 JDK1.8 中 ConcurrentHashMap 并没有采用分段锁的策略,而是在元素的节点上采用CAS + synchronized操作来保证并发的安全性

 

 

 

 

 

 

 



这篇关于ConcurrentHashMap的源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程