ConcurrentHashmap(JDK1.7) 总体描述: concurrentHashmap是为了高并发而实现,内部采用分离锁的设计,有效地避开了热点访问。而对于每个分段,ConcurrentHashmap采用final和内存可见修饰符volatile关键字(内存立即可见:Java 的内存模型可以保证:某个写线程对 value 域的写入马上可以被后续的某个读线程“看”到。注:并不能保证对volatile变量状态有依赖的其他操作的原子性) 借用某博客对concurrentHashmap对结构图:  不难看出,concurrenthashmap采用了二次hash的方式,第一次hash将key映射到对应的segment,而第二次hash则是映射到segment的不同桶中。 为什么要用二次hash,主要原因是为了构造分离锁,使得对于map的修改不会锁住整个容器,提高并发能力。当然,没有一种东西是绝对完美的,二次hash带来的问题是整个hash的过程比hashmap单次hash要长,所以,如果不是并发情形,不要使用concurrentHashmap。 代码实现: 该数据结构中,最核心的部分是两个内部类,HashEntry和Segment concurrentHashmap维护一个segment数组,将元素分成若干段(第一次hash) - /**
- * The segments, each of which is a specialized hash table.
- */
- final Segment<K,V>[] segments;
复制代码segments的每一个segment维护一个链表数组 代码: 再来看看构造方法 - public ConcurrentHashMap(int initialCapacity,
- float loadFactor, int concurrencyLevel) {
- if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
- throw new IllegalArgumentException();
- if (concurrencyLevel > MAX_SEGMENTS)
- concurrencyLevel = MAX_SEGMENTS;
- // Find power-of-two sizes best matching arguments
- int sshift = 0;
- int ssize = 1;
- while (ssize < concurrencyLevel) {
- ++sshift;
- ssize <<= 1;
- }
- this.segmentShift = 32 - sshift;
- this.segmentMask = ssize - 1;
- if (initialCapacity > MAXIMUM_CAPACITY)
- initialCapacity = MAXIMUM_CAPACITY;
- int c = initialCapacity / ssize;
- if (c * ssize < initialCapacity)
- ++c;
- int cap = MIN_SEGMENT_TABLE_CAPACITY;
- while (cap < c)
- cap <<= 1;
- // create segments and segments[0]
- Segment<K,V> s0 =
- new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
- (HashEntry<K,V>[])new HashEntry[cap]);
- Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
- UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
- this.segments = ss;
- }
复制代码代码28行,一旦指定了concurrencyLevel(segments数组大小)便不能改变,这样,一旦threshold超标,rehash真不会影响segments数组,这样,在大并发的情况下,只会影响某一个segment的rehash而其他segment不会受到影响 (put方法都要上锁) HashEntry 与hashmap类似,concurrentHashmap也采用了链表作为每个hash桶中的元素,不过concurrentHashmap又有些不同 - static final class HashEntry<K,V> {
- final int hash;
- final K key;
- volatile V value;
- volatile HashEntry<K,V> next;
-
- HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
- this.hash = hash;
- this.key = key;
- this.value = value;
- this.next = next;
- }
-
- /**
- * Sets next field with volatile write semantics. (See above
- * about use of putOrderedObject.)
- */
- final void setNext(HashEntry<K,V> n) {
- UNSAFE.putOrderedObject(this, nextOffset, n);
- }
-
- // Unsafe mechanics
- static final sun.misc.Unsafe UNSAFE;
- static final long nextOffset;
- static {
- try {
- UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class k = HashEntry.class;
- nextOffset = UNSAFE.objectFieldOffset
- (k.getDeclaredField("next"));
- } catch (Exception e) {
- throw new Error(e);
- }
- }
- }
复制代码HashEntry的key,hash采用final,可以避免并发修改问题,HashEntry链的尾部是不能修改的,而next和value采用volatile,可以避免使用同步造成的并发性能灾难,新版(jdk1.7)的concurrentHashmap大量使用java Unsafe类提供的原子操作,直接调用底层操作系统,提高性能(这块我也不是特别清楚) get方法(1.6 vs 1.7) 1.6 - V get(Object key, int hash) {
- if (count != 0) { // read-volatile
- HashEntry<K,V> e = getFirst(hash);
- while (e != null) {
- if (e.hash == hash && key.equals(e.key)) {
- V v = e.value;
- if (v != null)
- return v;
- return readValueUnderLock(e); // recheck
- }
- e = e.next;
- }
- }
- return null;
- }
复制代码1.6的jdk采用了乐观锁的方式处理了get方法,在get的时候put方法正在new对象,而此时value并未赋值,这时判断为空则加锁访问 1.7 - public V get(Object key) {
- Segment<K,V> s; // manually integrate access methods to reduce overhead
- HashEntry<K,V>[] tab;
- int h = hash(key);
- long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
- if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
- (tab = s.table) != null) {
- for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
- (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
- e != null; e = e.next) {
- K k;
- if ((k = e.key) == key || (e.hash == h && key.equals(k)))
- return e.value;
- }
- }
- return null;
- }
复制代码1.7并没有判断value=null的情况,不知为何 跟同事沟通过,无论是1.6还是1.7的实现,实际上都是一种乐观的方式,而乐观的方式带来的是性能上的提升,但同时也带来数据的弱一致性,如果你的业务是强一致性的业务,可能就要考虑另外的解决办法(用Collections包装或者像jdk6中一样二次加锁获取) http://ifeve.com/concurrenthashmap-weakly-consistent/ 这篇文章可以很好地解释弱一致性问题 put方法 - public V put(K key, V value) {
- Segment<K,V> s;
- if (value == null)
- throw new NullPointerException();
- int hash = hash(key);
- int j = (hash >>> segmentShift) & segmentMask;
- if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
- (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
- s = ensureSegment(j);
- return s.put(key, hash, value, false);
- }
复制代码对于put,concurrentHashmap采用自旋锁的方式,不同于1.6的直接获取锁 注:个人理解,这里采用自旋锁可能作者是觉得在分段锁的状态下,并发的可能本来就比较小,并且锁占用时间又并不是特别长,因此自旋锁可以减小线程唤醒和切换的开销 关于hash - private int hash(Object k) {
- int h = hashSeed;
- if ((0 != h) && (k instanceof String)) {
- return sun.misc.Hashing.stringHash32((String) k);
- }
- h ^= k.hashCode();
- // Spread bits to regularize both segment and index locations,
- // using variant of single-word Wang/Jenkins hash.
- h += (h << 15) ^ 0xffffcd7d;
- h ^= (h >>> 10);
- h += (h << 3);
- h ^= (h >>> 6);
- h += (h << 2) + (h << 14);
- return h ^ (h >>> 16);
- }
复制代码concurrentHashMap采用本身hashcode的同时,采用Wang/Jenkins算法对每位都做了处理,使得发生hash冲突的可能性大大减小(否则效率会很差) 而对于concurrentHashMap,segments的大小在初始时确定,此后不变,而元素所在segments桶序列由hash的高位决定 - public V put(K key, V value) {
- Segment<K,V> s;
- if (value == null)
- throw new NullPointerException();
- int hash = hash(key);
- int j = (hash >>> segmentShift) & segmentMask;
- if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
- (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
- s = ensureSegment(j);
- return s.put(key, hash, value, false);
- }
复制代码 segmentShift为(32-segments大小的二进制长度) 总结 concurrentHashmap主要是为并发设计,与Collections的包装不同,他不是采用全同步的方式,而是采用非锁get方式,通过数据的弱一致性带来性能上的大幅提升,同时采用分段锁的策略,提高并发能力 参考: http://www.jb51.net/article/49699.htm http://my.oschina.net/chihz/blog/58035 http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/ http://www.ibm.com/developerworks/cn/java/j-jtp06197.html 来自:http://my.oschina.net/zhenglingfei/blog/400515 |