免费h5模板网站,重庆市网站建设,番禺网站建设哪家强,拉新推广平台有哪些转载自 ConcurrentHashMap总结并发编程实践中#xff0c;ConcurrentHashMap是一个经常被使用的数据结构#xff0c;相比于Hashtable以及Collections.synchronizedMap()#xff0c;ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力#xff0c;但同时降低了对读一…转载自 ConcurrentHashMap总结并发编程实践中ConcurrentHashMap是一个经常被使用的数据结构相比于Hashtable以及Collections.synchronizedMap()ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力但同时降低了对读一致性的要求这点好像CAP理论啊 O(∩_∩)O。ConcurrentHashMap的设计与实现非常精巧大量的利用了volatilefinalCAS等lock-free技术来减少锁竞争对于性能的影响无论对于Java并发编程的学习还是Java内存模型的理解ConcurrentHashMap的设计以及源码都值得非常仔细的阅读与揣摩。这篇日志记录了自己对ConcurrentHashMap的一些总结由于JDK678中实现都不同需要分开阐述在不同版本中的ConcurrentHashMap。之前已经在ConcurrentHashMap原理分析中解释了ConcurrentHashMap的原理主要是从代码的角度来阐述是源码是如何写的本文仍然从源码出发挑选个人觉得重要的点会用红色标注再次进行回顾以及阐述ConcurrentHashMap的一些注意点。1. JDK6与JDK7中的实现1.1 设计思路ConcurrentHashMap采用了分段锁的设计只有在同一个分段内才存在竞态关系不同的分段锁之间没有锁竞争。相比于对整个Map加锁的设计分段锁大大的提高了高并发环境下的处理能力。但同时由于不是对整个Map加锁导致一些需要扫描整个Map的方法如size(), containsValue()需要使用特殊的实现另外一些方法如clear()甚至放弃了对一致性的要求ConcurrentHashMap是弱一致性的具体请查看ConcurrentHashMap能完全替代HashTable吗。ConcurrentHashMap中的分段锁称为Segment它即类似于HashMapJDK7与JDK8中HashMap的实现的结构即内部拥有一个Entry数组数组中的每个元素又是一个链表同时又是一个ReentrantLockSegment继承了ReentrantLock。ConcurrentHashMap中的HashEntry相对于HashMap中的Entry有一定的差异性HashEntry中的value以及next都被volatile修饰这样在多线程读写过程中能够保持它们的可见性代码如下static final class HashEntryK,V {final int hash;final K key;volatile V value;volatile HashEntryK,V next;1.2 并发度Concurrency Level并发度可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数实际上就是ConcurrentHashMap中的分段锁个数即Segment[]的数组长度。ConcurrentHashMap默认的并发度为16但用户也可以在构造函数中设置并发度。当用户设置并发度时ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度假如用户设置并发度为17实际并发度则为32。运行时通过将key的高n位n 32 – segmentShift和并发度减1segmentMask做位与运算定位到所在的Segment。segmentShift与segmentMask都是在构造过程中根据concurrency level被相应的计算出来。如果并发度设置的过小会带来严重的锁竞争问题如果并发度设置的过大原本位于同一个Segment内的访问会扩散到不同的Segment中CPU cache命中率会下降从而引起程序性能下降。文档的说法是根据你并发的线程数量决定太多会导性能降低1.3 创建分段锁和JDK6不同JDK7中除了第一个Segment之外剩余的Segments采用的是延迟初始化的机制每次put之前都需要检查key对应的Segment是否为null如果是则调用ensureSegment()以确保对应的Segment被创建。ensureSegment可能在并发环境下被调用但与想象中不同ensureSegment并未使用锁来控制竞争而是使用了Unsafe对象的getObjectVolatile()提供的原子读语义结合CAS来确保Segment创建的原子性。代码段如下if ((seg (SegmentK,V)UNSAFE.getObjectVolatile(ss, u)) null) { // recheckSegmentK,V s new SegmentK,V(lf, threshold, tab);while ((seg (SegmentK,V)UNSAFE.getObjectVolatile(ss, u)) null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg s))break;}
}1.4 put/putIfAbsent/putAll和JDK6一样ConcurrentHashMap的put方法被代理到了对应的Segment定位Segment的原理之前已经描述过中。与JDK6不同的是JDK7版本的ConcurrentHashMap在获得Segment锁的过程中做了一定的优化 - 在真正申请锁之前put方法会通过tryLock()方法尝试获得锁在尝试获得锁的过程中会对对应hashcode的链表进行遍历如果遍历完毕仍然找不到与key相同的HashEntry节点则为后续的put操作提前创建一个HashEntry。当tryLock一定次数后仍无法获得锁则通过lock申请锁。需要注意的是由于在并发环境下其他线程的putrehash或者remove操作可能会导致链表头结点的变化因此在过程中需要进行检查如果头结点发生变化则重新对表进行遍历。而如果其他线程引起了链表中的某个节点被删除即使该变化因为是非原子写操作删除节点后链接后续节点调用的是Unsafe.putOrderedObject()该方法不提供原子写语义可能导致当前线程无法观察到但因为不影响遍历的正确性所以忽略不计。之所以在获取锁的过程中对整个链表进行遍历主要目的是希望遍历的链表被CPU cache所缓存为后续实际put过程中的链表遍历操作提升性能。在获得锁之后Segment对链表进行遍历如果某个HashEntry节点具有相同的key则更新该HashEntry的value值否则新建一个HashEntry节点将它设置为链表的新head节点并将原头节点设为新head的下一个节点。新建过程中如果节点总数含新建的HashEntry超过threshold则调用rehash()方法对Segment进行扩容最后将新建HashEntry写入到数组中。put方法中链接新节点的下一个节点HashEntry.setNext()以及将链表写入到数组中setEntryAt()都是通过Unsafe的putOrderedObject()方法来实现这里并未使用具有原子写语义的putObjectVolatile()的原因是JMM会保证获得锁到释放锁之间所有对象的状态更新都会在锁被释放之后更新到主存从而保证这些变更对其他线程是可见的。1.5 rehash相对于HashMap的resizeConcurrentHashMap的rehash原理类似但是Doug Lea为rehash做了一定的优化避免让所有的节点都进行复制操作由于扩容是基于2的幂指来操作假设扩容前某HashEntry对应到Segment中数组的index为i数组的容量为capacity那么扩容后该HashEntry对应到新数组中的index只可能为i或者icapacity因此大多数HashEntry节点在扩容前后index可以保持不变。基于此rehash方法中会定位第一个后续所有节点在扩容后index都保持不变的节点然后将这个节点之前的所有节点重排即可。这部分代码如下 private void rehash(HashEntryK,V node) {HashEntryK,V[] oldTable table;int oldCapacity oldTable.length;int newCapacity oldCapacity 1;threshold (int)(newCapacity * loadFactor);HashEntryK,V[] newTable (HashEntryK,V[]) new HashEntry[newCapacity];int sizeMask newCapacity - 1;for (int i 0; i oldCapacity ; i) {HashEntryK,V e oldTable[i];if (e ! null) {HashEntryK,V next e.next;int idx e.hash sizeMask;if (next null) // Single node on listnewTable[idx] e;else { // Reuse consecutive sequence at same slotHashEntryK,V lastRun e;int lastIdx idx;for (HashEntryK,V last next;last ! null;last last.next) {int k last.hash sizeMask;if (k ! lastIdx) {lastIdx k;lastRun last;}}newTable[lastIdx] lastRun;// Clone remaining nodesfor (HashEntryK,V p e; p ! lastRun; p p.next) {V v p.value;int h p.hash;int k h sizeMask;HashEntryK,V n newTable[k];newTable[k] new HashEntryK,V(h, p.key, v, n);}}}}int nodeIndex node.hash sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] node;table newTable;}1.6 remove和put类似remove在真正获得锁之前也会对链表进行遍历以提高缓存命中率。1.7 get与containsKeyget与containsKey两个方法几乎完全一致他们都没有使用锁而是通过Unsafe对象的getObjectVolatile()方法提供的原子读语义来获得Segment以及对应的链表然后对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整因此get和containsKey返回的可能是过时的数据这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性那么必须使用Collections.synchronizedMap()方法。1.8 size、containsValue这些方法都是基于整个ConcurrentHashMap来进行操作的他们的原理也基本类似首先不加锁循环执行以下操作循环所有的Segment通过Unsafe的getObjectVolatile()以保证原子读语义获得对应的值以及所有Segment的modcount之和。如果连续两次所有Segment的modcount和相等则过程中没有发生其他线程修改ConcurrentHashMap的情况返回获得的值。当循环次数超过预定义的值时这时需要对所有的Segment依次进行加锁获取返回值后再依次解锁。值得注意的是加锁过程中要强制创建所有的Segment否则容易出现其他线程创建Segment并进行putremove等操作。代码如下for(int j 0; j segments.length; j)ensureSegment(j).lock();// force creation一般来说应该避免在多线程环境下使用size和containsValue方法。注1modcount在put, replace, remove以及clear等方法中都会被修改。注2对于containsValue方法来说如果在循环过程中发现匹配value的HashEntry则直接返回true。最后与HashMap不同的是ConcurrentHashMap并不允许key或者value为null按照Doug Lea的说法这么设计的原因是在ConcurrentHashMap中一旦value出现null则代表HashEntry的key/value没有映射完成就被其他线程所见需要特殊处理。在JDK6中get方法的实现中就有一段对HashEntry.value null的防御性判断。但Doug Lea也承认实际运行过程中这种情况似乎不可能发生参考http://cs.oswego.edu/pipermail/concurrency-interest/2011-March/007799.html。2. JDK8中的实现ConcurrentHashMap在JDK8中进行了巨大改动很需要通过源码来再次学习下Doug Lea的实现方法。它摒弃了Segment锁段的概念而是启用了一种全新的方式实现,利用CAS算法。它沿用了与它同时期的HashMap版本的思想底层依然由“数组”链表红黑树的方式思想(JDK7与JDK8中HashMap的实现)但是为了做到并发又增加了很多辅助的类例如TreeBinTraverser等对象内部类。2.1 重要的属性首先来看几个重要的属性与HashMap相同的就不再介绍了这里重点解释一下sizeCtl这个属性。可以说它是ConcurrentHashMap中出镜率很高的一个属性因为它是一个控制标识符在不同的地方有不同用途而且它的取值不同也代表不同的含义。负数代表正在进行初始化或扩容操作-1代表正在初始化-N 表示有N-1个线程正在进行扩容操作正数或0代表hash表还没有被初始化这个数值表示初始化或下一次进行扩容的大小这一点类似于扩容阈值的概念。还后面可以看到它的值始终是当前ConcurrentHashMap容量的0.75倍这与loadfactor是对应的。/*** 盛装Node元素的数组 它的大小是2的整数次幂* Size is always a power of two. Accessed directly by iterators.*/transient volatile NodeK,V[] table;/*** Table initialization and resizing control. When negative, the* table is being initialized or resized: -1 for initialization,* else -(1 the number of active resizing threads). Otherwise,* when table is null, holds the initial table size to use upon* creation, or 0 for default. After initialization, holds the* next element count value upon which to resize the table.hash表初始化或扩容时的一个控制位标识量。负数代表正在进行初始化或扩容操作-1代表正在初始化-N 表示有N-1个线程正在进行扩容操作正数或0代表hash表还没有被初始化这个数值表示初始化或下一次进行扩容的大小*/private transient volatile int sizeCtl; // 以下两个是用来控制扩容的时候 单线程进入的变量/*** The number of bits used for generation stamp in sizeCtl.* Must be at least 6 for 32bit arrays.*/private static int RESIZE_STAMP_BITS 16;/*** The bit shift for recording size stamp in sizeCtl.*/private static final int RESIZE_STAMP_SHIFT 32 - RESIZE_STAMP_BITS;/** Encodings for Node hash fields. See above for explanation.*/static final int MOVED -1; // hash值是-1表示这是一个forwardNode节点static final int TREEBIN -2; // hash值是-2 表示这时一个TreeBin节点2.2 重要的类2.2.1 NodeNode是最核心的内部类它包装了key-value键值对所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似但是但是有一些差别它对value和next属性设置了volatile同步锁(与JDK7的Segment相同)它不允许调用setValue方法直接改变Node的value域它增加了find方法辅助map.get()方法。2.2.2 TreeNode树节点类另外一个核心的数据结构。当链表长度过长的时候会转换为TreeNode。但是与HashMap不相同的是它并不是直接转换为红黑树而是把这些结点包装成TreeNode放在TreeBin对象中由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类而并非HashMap中的集成自LinkedHashMap.EntryK,V类也就是说TreeNode带有next指针这样做的目的是方便基于TreeBin的访问。2.2.3 TreeBin这个类并不负责包装用户的key、value信息而是包装的很多TreeNode节点。它代替了TreeNode的根节点也就是说在实际的ConcurrentHashMap“数组”中存放的是TreeBin对象而不是TreeNode对象这是与HashMap的区别。另外这个类还带有了读写锁。这里仅贴出它的构造方法。可以看到在构造TreeBin节点时仅仅指定了它的hash值为TREEBIN常量这也就是个标识为。同时也看到我们熟悉的红黑树构造方法2.2.4 ForwardingNode一个用于连接两个table的节点类。它包含一个nextTable指针用于指向下一张表。而且这个节点的key value next指针全部为null它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点而不是以自身为头节点进行查找。/*** A node inserted at head of bins during transfer operations.*/static final class ForwardingNodeK,V extends NodeK,V {final NodeK,V[] nextTable;ForwardingNode(NodeK,V[] tab) {super(MOVED, null, null, null);this.nextTable tab;}NodeK,V find(int h, Object k) {// loop to avoid arbitrarily deep recursion on forwarding nodesouter: for (NodeK,V[] tab nextTable;;) {NodeK,V e; int n;if (k null || tab null || (n tab.length) 0 ||(e tabAt(tab, (n - 1) h)) null)return null;for (;;) {int eh; K ek;if ((eh e.hash) h ((ek e.key) k || (ek ! null k.equals(ek))))return e;if (eh 0) {if (e instanceof ForwardingNode) {tab ((ForwardingNodeK,V)e).nextTable;continue outer;}elsereturn e.find(h, k);}if ((e e.next) null)return null;}}}}2.3 Unsafe与CAS在ConcurrentHashMap中随处可以看到U, 大量使用了U.compareAndSwapXXX的方法这个方法是利用一个CAS算法实现无锁化的修改值的操作他可以大大降低锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等如果相等则接受你指定的修改的值否则拒绝你的操作。因为当前线程中的值已经不是最新的值你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁SVN的思想是比较类似的。2.3.1 unsafe静态块unsafe代码块控制了一些属性的修改工作比如最常用的SIZECTL 。在这一版本的concurrentHashMap中大量应用来的CAS方法进行变量、属性的修改工作。利用CAS进行无锁操作可以大大提高性能。 private static final sun.misc.Unsafe U;private static final long SIZECTL;private static final long TRANSFERINDEX;private static final long BASECOUNT;private static final long CELLSBUSY;private static final long CELLVALUE;private static final long ABASE;private static final int ASHIFT;static {try {U sun.misc.Unsafe.getUnsafe();Class? k ConcurrentHashMap.class;SIZECTL U.objectFieldOffset(k.getDeclaredField(sizeCtl));TRANSFERINDEX U.objectFieldOffset(k.getDeclaredField(transferIndex));BASECOUNT U.objectFieldOffset(k.getDeclaredField(baseCount));CELLSBUSY U.objectFieldOffset(k.getDeclaredField(cellsBusy));Class? ck CounterCell.class;CELLVALUE U.objectFieldOffset(ck.getDeclaredField(value));Class? ak Node[].class;ABASE U.arrayBaseOffset(ak);int scale U.arrayIndexScale(ak);if ((scale (scale - 1)) ! 0)throw new Error(data type scale not a power of two);ASHIFT 31 - Integer.numberOfLeadingZeros(scale);} catch (Exception e) {throw new Error(e);}}2.3.2 三个核心方法ConcurrentHashMap定义了三个原子操作用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。//获得在i位置上的Node节点static final K,V NodeK,V tabAt(NodeK,V[] tab, int i) {return (NodeK,V)U.getObjectVolatile(tab, ((long)i ASHIFT) ABASE);}//利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少//在CAS算法中会比较内存中的值与你指定的这个值是否相等如果相等才接受你的修改否则拒绝你的修改//因此当前线程中的值并不是最新的值这种修改可能会覆盖掉其他线程的修改结果 有点类似于SVNstatic final K,V boolean casTabAt(NodeK,V[] tab, int i,NodeK,V c, NodeK,V v) {return U.compareAndSwapObject(tab, ((long)i ASHIFT) ABASE, c, v);}//利用volatile方法设置节点位置的值static final K,V void setTabAt(NodeK,V[] tab, int i, NodeK,V v) {U.putObjectVolatile(tab, ((long)i ASHIFT) ABASE, v);}2.4 初始化方法initTable对于ConcurrentHashMap来说调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候调用时机是检查tablenull。初始化方法主要应用了关键属性sizeCtl 如果这个值〈0表示其他线程正在进行初始化就放弃这个操作。在这也可以看出ConcurrentHashMap的初始化只能由一个线程完成。如果获得了初始化权限就用CAS方法将sizeCtl置为-1防止其他线程进入。初始化数组后将sizeCtl的值改为0.75*n。/*** Initializes table, using the size recorded in sizeCtl.*/private final NodeK,V[] initTable() {NodeK,V[] tab; int sc;while ((tab table) null || tab.length 0) {//sizeCtl表示有其他线程正在进行初始化操作把线程挂起。对于table的初始化工作只能有一个线程在进行。if ((sc sizeCtl) 0)Thread.yield(); // lost initialization race; just spinelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化try {if ((tab table) null || tab.length 0) {int n (sc 0) ? sc : DEFAULT_CAPACITY;SuppressWarnings(unchecked)NodeK,V[] nt (NodeK,V[])new Node?,?[n];table tab nt;sc n - (n 2);//相当于0.75*n 设置一个扩容的阈值}} finally {sizeCtl sc;}break;}}return tab;}2.5 扩容方法 transfer当ConcurrentHashMap容量不足的时候需要对table进行扩容。这个方法的基本思想跟HashMap是很像的但是由于它是支持并发扩容的所以要复杂的多。原因是它支持多线程进行扩容操作而并没有加锁。我想这样做的目的不仅仅是为了满足concurrent的要求而是希望利用并发处理去减少扩容带来的时间影响。因为在扩容的时候总是会涉及到从一个“数组”到另一个“数组”拷贝的操作如果这个操作能够并发进行那真真是极好的了。整个扩容操作分为两个部分 第一部分是构建一个nextTable,它的容量是原来的两倍这个操作是单线程完成的。这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的这个地方在后面会有提到第二个部分就是将原来table中的元素复制到nextTable中这里允许多线程进行操作。先来看一下单线程是如何完成的它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i然后利用tabAt方法获得i位置的元素如果这个位置为空就在原table中的i位置放入forwardNode节点这个也是触发并发扩容的关键点如果这个位置是Node节点fh0如果它是一个链表的头节点就构造一个反序链表把他们分别放在nextTable的i和in的位置上如果这个位置是TreeBin节点fh0也做一个反序处理并且判断是否需要untreefi把处理的结果分别放在nextTable的i和in的位置上遍历过所有的节点以后就完成了复制工作这时让nextTable作为新的table并且更新sizeCtl为新容量的0.75倍 完成扩容。再看一下多线程是如何完成的在代码的69行有一个判断如果遍历到的节点是forward节点就向后继续遍历再加上给节点上锁的机制就完成了多线程的控制。多线程遍历节点处理了一个节点就把对应点的值set为forward另一个线程看到forward就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。 这个方法的设计实在是让我膜拜。 /*** 一个过渡的table表 只有在扩容的时候才会使用*/private transient volatile NodeK,V[] nextTable;/*** Moves and/or copies the nodes in each bin to new table. See* above for explanation.*/private final void transfer(NodeK,V[] tab, NodeK,V[] nextTab) {int n tab.length, stride;if ((stride (NCPU 1) ? (n 3) / NCPU : n) MIN_TRANSFER_STRIDE)stride MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab null) { // initiatingtry {SuppressWarnings(unchecked)NodeK,V[] nt (NodeK,V[])new Node?,?[n 1];//构造一个nextTable对象 它的容量是原来的两倍nextTab nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl Integer.MAX_VALUE;return;}nextTable nextTab;transferIndex n;}int nextn nextTab.length;ForwardingNodeK,V fwd new ForwardingNodeK,V(nextTab);//构造一个连节点指针 用于标志位boolean advance true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过boolean finishing false; // to ensure sweep before committing nextTabfor (int i 0, bound 0;;) {NodeK,V f; int fh;//这个while循环体的作用就是在控制i-- 通过i--可以依次遍历原hash表中的节点while (advance) {int nextIndex, nextBound;if (--i bound || finishing)advance false;else if ((nextIndex transferIndex) 0) {i -1;advance false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound (nextIndex stride ?nextIndex - stride : 0))) {bound nextBound;i nextIndex - 1;advance false;}}if (i 0 || i n || i n nextn) {int sc;if (finishing) {//如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTablenextTable null;table nextTab;sizeCtl (n 1) - (n 1);//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍return;}//利用CAS方法更新这个扩容阈值在这里面sizectl值减一说明新加入一个线程参与到扩容操作if (U.compareAndSwapInt(this, SIZECTL, sc sizeCtl, sc - 1)) {if ((sc - 2) ! resizeStamp(n) RESIZE_STAMP_SHIFT)return;finishing advance true;i n; // recheck before commit}}//如果遍历到的节点为空 则放入ForwardingNode指针else if ((f tabAt(tab, i)) null)advance casTabAt(tab, i, null, fwd);//如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心else if ((fh f.hash) MOVED)advance true; // already processedelse {//节点上锁synchronized (f) {if (tabAt(tab, i) f) {NodeK,V ln, hn;//如果fh0 证明这是一个Node节点if (fh 0) {int runBit fh n;//以下的部分在完成的工作是构造两个链表 一个是原链表 另一个是原链表的反序排列NodeK,V lastRun f;for (NodeK,V p f.next; p ! null; p p.next) {int b p.hash n;if (b ! runBit) {runBit b;lastRun p;}}if (runBit 0) {ln lastRun;hn null;}else {hn lastRun;ln null;}for (NodeK,V p f; p ! lastRun; p p.next) {int ph p.hash; K pk p.key; V pv p.val;if ((ph n) 0)ln new NodeK,V(ph, pk, pv, ln);elsehn new NodeK,V(ph, pk, pv, hn);}//在nextTable的i位置上插入一个链表setTabAt(nextTab, i, ln);//在nextTable的in的位置上插入另一个链表setTabAt(nextTab, i n, hn);//在table的i位置上插入forwardNode节点 表示已经处理过该节点setTabAt(tab, i, fwd);//设置advance为true 返回到上面的while循环中 就可以执行i--操作advance true;}//对TreeBin对象进行处理 与上面的过程类似else if (f instanceof TreeBin) {TreeBinK,V t (TreeBinK,V)f;TreeNodeK,V lo null, loTail null;TreeNodeK,V hi null, hiTail null;int lc 0, hc 0;//构造正序和反序两个链表for (NodeK,V e t.first; e ! null; e e.next) {int h e.hash;TreeNodeK,V p new TreeNodeK,V(h, e.key, e.val, null, null);if ((h n) 0) {if ((p.prev loTail) null)lo p;elseloTail.next p;loTail p;lc;}else {if ((p.prev hiTail) null)hi p;elsehiTail.next p;hiTail p;hc;}}//如果扩容后已经不再需要tree的结构 反向转换为链表结构ln (lc UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc ! 0) ? new TreeBinK,V(lo) : t;hn (hc UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc ! 0) ? new TreeBinK,V(hi) : t;//在nextTable的i位置上插入一个链表 setTabAt(nextTab, i, ln);//在nextTable的in的位置上插入另一个链表setTabAt(nextTab, i n, hn);//在table的i位置上插入forwardNode节点 表示已经处理过该节点setTabAt(tab, i, fwd);//设置advance为true 返回到上面的while循环中 就可以执行i--操作advance true;}}}}}}2.6 Put方法前面的所有的介绍其实都为这个方法做铺垫。ConcurrentHashMap最常用的就是put和get两个方法。现在来介绍put方法这个put方法依然沿用HashMap的put方法的思想根据hash值计算这个新插入的点在table中的位置i如果i位置是空的直接放进去否则进行判断如果i位置是树节点按照树的方式插入新的节点否则把i插入到链表的末尾。ConcurrentHashMap中依然沿用这个思想有一个最重要的不同点就是ConcurrentHashMap不允许key或value为null值。另外由于涉及到多线程put方法就要复杂一点。在多线程中可能有以下两个情况如果一个或多个线程正在对ConcurrentHashMap进行扩容操作当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到是因为transfer方法中在空结点上插入forward节点如果检测到需要插入的位置被forward节点占有就帮助进行扩容如果检测到要插入的节点是非空且不是forward节点就对这个节点加锁这样就保证了线程安全。尽管这个有一些影响效率但是还是会比hashTable的synchronized要好得多。整体流程就是首先定义不允许key或value为null的情况放入 对于每一个放入的值首先利用spread方法对key的hashcode进行一次hash计算由此来确定这个值在table中的位置。如果这个位置是空的那么直接放入而且不需要加锁操作。 如果这个位置存在结点说明发生了hash碰撞首先判断这个节点的类型。如果是链表节点fh0,则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况则只需要更新value值即可。否则依次向后遍历直到链表尾插入这个结点。如果加入这个节点以后链表长度大于8就把这个链表转换成红黑树。如果这个节点的类型已经是树节点的话直接调用树节点的插入方法进行插入新的值。public V put(K key, V value) {return putVal(key, value, false);}/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {//不允许 key或value为nullif (key null || value null) throw new NullPointerException();//计算hash值int hash spread(key.hashCode());int binCount 0;//死循环 何时插入成功 何时跳出for (NodeK,V[] tab table;;) {NodeK,V f; int n, i, fh;//如果table为空的话初始化tableif (tab null || (n tab.length) 0)tab initTable();//根据hash值计算出在table里面的位置 else if ((f tabAt(tab, i (n - 1) hash)) null) {//如果这个位置没有值 直接放进去不需要加锁if (casTabAt(tab, i, null,new NodeK,V(hash, key, value, null)))break; // no lock when adding to empty bin}//当遇到表连接点时需要进行整合表的操作else if ((fh f.hash) MOVED)tab helpTransfer(tab, f);else {V oldVal null;//结点上锁 这里的结点可以理解为hash值相同组成的链表的头结点synchronized (f) {if (tabAt(tab, i) f) {//fh〉0 说明这个节点是一个链表的节点 不是树的节点if (fh 0) {binCount 1;//在这里遍历链表所有的结点for (NodeK,V e f;; binCount) {K ek;//如果hash值和key值相同 则修改对应结点的value值if (e.hash hash ((ek e.key) key ||(ek ! null key.equals(ek)))) {oldVal e.val;if (!onlyIfAbsent)e.val value;break;}NodeK,V pred e;//如果遍历到了最后一个结点那么就证明新的节点需要插入 就把它插入在链表尾部if ((e e.next) null) {pred.next new NodeK,V(hash, key,value, null);break;}}}//如果这个节点是树节点就按照树的方式插入值else if (f instanceof TreeBin) {NodeK,V p;binCount 2;if ((p ((TreeBinK,V)f).putTreeVal(hash, key,value)) ! null) {oldVal p.val;if (!onlyIfAbsent)p.val value;}}}}if (binCount ! 0) {//如果链表长度已经达到临界值8 就需要把链表转换为树结构if (binCount TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal ! null)return oldVal;break;}}}//将当前ConcurrentHashMap的元素数量1addCount(1L, binCount);return null;}我们可以发现JDK8中的实现也是锁分离的思想只是锁住的是一个Node而不是JDK7中的Segment而锁住Node之前的操作是无锁的并且也是线程安全的建立在之前提到的3个原子操作上。2.6.1 helpTransfer方法这是一个协助扩容的方法。这个方法被调用的时候当前ConcurrentHashMap一定已经有了nextTable对象首先拿到这个nextTable对象调用transfer方法。回看上面的transfer方法可以看到当本线程进入扩容方法的时候会直接进入复制阶段。2.6.2 treeifyBin方法这个方法用于将过长的链表转换为TreeBin对象。但是他并不是直接转换而是进行一次容量判断如果容量没有达到转换的要求直接进行扩容操作并返回如果满足条件才链表的结构抓换为TreeBin 这与HashMap不同的是它并没有把TreeNode直接放入红黑树而是利用了TreeBin这个小容器来封装所有的TreeNode.2.7 get方法get方法比较简单给定一个key来确定value的时候必须满足两个条件 key相同 hash值相同对于节点可能在链表或树上的情况需要分别去查找。public V get(Object key) {NodeK,V[] tab; NodeK,V e, p; int n, eh; K ek;//计算hash值int h spread(key.hashCode());//根据hash值确定节点位置if ((tab table) ! null (n tab.length) 0 (e tabAt(tab, (n - 1) h)) ! null) {//如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点 if ((eh e.hash) h) {if ((ek e.key) key || (ek ! null key.equals(ek)))return e.val;}//如果eh0 说明这个节点在树上 直接寻找else if (eh 0)return (p e.find(h, key)) ! null ? p.val : null;//否则遍历链表 找到对应的值并返回while ((e e.next) ! null) {if (e.hash h ((ek e.key) key || (ek ! null key.equals(ek))))return e.val;}}return null;}2.8 Size相关的方法对于ConcurrentHashMap来说这个table里到底装了多少东西其实是个不确定的数量因为不可能在调用size()方法的时候像GC的“stop the world”一样让其他线程都停下来让你去统计因此只能说这个数量是个估计值。对于这个估计值ConcurrentHashMap也是大费周章才计算出来的。2.8.1 辅助定义为了统计元素个数ConcurrentHashMap定义了一些变量和一个内部类/*** A padded cell for distributing counts. Adapted from LongAdder* and Striped64. See their internal docs for explanation.*/sun.misc.Contended static final class CounterCell {volatile long value;CounterCell(long x) { value x; }}/******************************************/ /*** 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新但它并不用返回当前hashmap的元素个数 */private transient volatile long baseCount;/*** Spinlock (locked via CAS) used when resizing and/or creating CounterCells.*/private transient volatile int cellsBusy;/*** Table of counter cells. When non-null, size is a power of 2.*/private transient volatile CounterCell[] counterCells;2.8.2 mappingCount与Size方法mappingCount与size方法的类似 从Java工程师给出的注释来看应该使用mappingCount代替size方法 两个方法都没有直接返回basecount 而是统计一次这个值而这个值其实也是一个大概的数值因此可能在统计的时候有其他线程正在执行插入或删除操作。public int size() {long n sumCount();return ((n 0L) ? 0 :(n (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);}/*** Returns the number of mappings. This method should be used* instead of {link #size} because a ConcurrentHashMap may* contain more mappings than can be represented as an int. The* value returned is an estimate; the actual count may differ if* there are concurrent insertions or removals.** return the number of mappings* since 1.8*/public long mappingCount() {long n sumCount();return (n 0L) ? 0L : n; // ignore transient negative values}final long sumCount() {CounterCell[] as counterCells; CounterCell a;long sum baseCount;if (as ! null) {for (int i 0; i as.length; i) {if ((a as[i]) ! null)sum a.value;//所有counter的值求和}}return sum;}2.8.3 addCount方法在put方法结尾处调用了addCount方法把当前ConcurrentHashMap的元素个数1这个方法一共做了两件事,更新baseCount的值检测是否进行扩容。private final void addCount(long x, int check) {CounterCell[] as; long b, s;//利用CAS方法更新baseCount的值 if ((as counterCells) ! null ||!U.compareAndSwapLong(this, BASECOUNT, b baseCount, s b x)) {CounterCell a; long v; int m;boolean uncontended true;if (as null || (m as.length - 1) 0 ||(a as[ThreadLocalRandom.getProbe() m]) null ||!(uncontended U.compareAndSwapLong(a, CELLVALUE, v a.value, v x))) {fullAddCount(x, uncontended);return;}if (check 1)return;s sumCount();}//如果check值大于等于0 则需要检验是否需要进行扩容操作if (check 0) {NodeK,V[] tab, nt; int n, sc;while (s (long)(sc sizeCtl) (tab table) ! null (n tab.length) MAXIMUM_CAPACITY) {int rs resizeStamp(n);//if (sc 0) {if ((sc RESIZE_STAMP_SHIFT) ! rs || sc rs 1 ||sc rs MAX_RESIZERS || (nt nextTable) null ||transferIndex 0)break;//如果已经有其他线程在执行扩容操作if (U.compareAndSwapInt(this, SIZECTL, sc, sc 1))transfer(tab, nt);}//当前线程是唯一的或是第一个发起扩容的线程 此时nextTablenullelse if (U.compareAndSwapInt(this, SIZECTL, sc,(rs RESIZE_STAMP_SHIFT) 2))transfer(tab, null);s sumCount();}}}总结JDK6,7中的ConcurrentHashmap主要使用Segment来实现减小锁粒度把HashMap分割成若干个Segment在put的时候需要锁住Segmentget时候不加锁使用volatile来保证可见性当要统计全局时比如size首先会尝试多次计算modcount来确定这几次尝试中是否有其他线程进行了修改操作如果没有则直接返回size。如果有则需要依次锁住所有的Segment来计算。jdk7中ConcurrentHashmap中当长度过长碰撞会很频繁链表的增改删查操作都会消耗很长的时间影响性能,所以jdk8 中完全重写了concurrentHashmap,代码量从原来的1000多行变成了 6000多 行实现上也和原来的分段式存储有很大的区别。主要设计上的变化有以下几点: 不采用segment而采用node锁住node来实现减小锁粒度。设计了MOVED状态 当resize的中过程中 线程2还在put数据线程2会帮助resize。使用3个CAS操作来确保node的一些操作的原子性这种方式代替了锁。sizeCtl的不同值来代表不同含义起到了控制的作用。至于为什么JDK8中使用synchronized而不是ReentrantLock我猜是因为JDK8中对synchronized有了足够的优化吧。Reference1. http://www.jianshu.com/p/4806633fcc552. https://www.zhihu.com/question/224385893. http://blog.csdn.net/u010723709/article/details/48007881