在路上

 找回密码
 立即注册
在路上 站点首页 学习 查看内容

ConcurrentHashmap 解析

2016-12-13 12:56| 发布者: zhangjf| 查看: 577| 评论: 0

摘要: ConcurrentHashmap(JDK1.7) 总体描述: concurrentHashmap是为了高并发而实现,内部采用分离锁的设计,有效地避开了热点访问。而对于每个分段,ConcurrentHashmap采用final和内存可见修饰符volatile关键字(内 ...
ConcurrentHashmap(JDK1.7) 总体描述:

concurrentHashmap是为了高并发而实现,内部采用分离锁的设计,有效地避开了热点访问。而对于每个分段,ConcurrentHashmap采用final和内存可见修饰符volatile关键字(内存立即可见:Java 的内存模型可以保证:某个写线程对 value 域的写入马上可以被后续的某个读线程“看”到。注:并不能保证对volatile变量状态有依赖的其他操作的原子性)

借用某博客对concurrentHashmap对结构图:

ConcurrentHashmap 解析

ConcurrentHashmap 解析

不难看出,concurrenthashmap采用了二次hash的方式,第一次hash将key映射到对应的segment,而第二次hash则是映射到segment的不同桶中。

为什么要用二次hash,主要原因是为了构造分离锁,使得对于map的修改不会锁住整个容器,提高并发能力。当然,没有一种东西是绝对完美的,二次hash带来的问题是整个hash的过程比hashmap单次hash要长,所以,如果不是并发情形,不要使用concurrentHashmap。

代码实现:

该数据结构中,最核心的部分是两个内部类,HashEntry和Segment

concurrentHashmap维护一个segment数组,将元素分成若干段(第一次hash)

  1. /**
  2. * The segments, each of which is a specialized hash table.
  3. */
  4. final Segment<K,V>[] segments;
复制代码

segments的每一个segment维护一个链表数组

代码:

再来看看构造方法

  1. public ConcurrentHashMap(int initialCapacity,
  2. float loadFactor, int concurrencyLevel) {
  3. if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
  4. throw new IllegalArgumentException();
  5. if (concurrencyLevel > MAX_SEGMENTS)
  6. concurrencyLevel = MAX_SEGMENTS;
  7. // Find power-of-two sizes best matching arguments
  8. int sshift = 0;
  9. int ssize = 1;
  10. while (ssize < concurrencyLevel) {
  11. ++sshift;
  12. ssize <<= 1;
  13. }
  14. this.segmentShift = 32 - sshift;
  15. this.segmentMask = ssize - 1;
  16. if (initialCapacity > MAXIMUM_CAPACITY)
  17. initialCapacity = MAXIMUM_CAPACITY;
  18. int c = initialCapacity / ssize;
  19. if (c * ssize < initialCapacity)
  20. ++c;
  21. int cap = MIN_SEGMENT_TABLE_CAPACITY;
  22. while (cap < c)
  23. cap <<= 1;
  24. // create segments and segments[0]
  25. Segment<K,V> s0 =
  26. new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
  27. (HashEntry<K,V>[])new HashEntry[cap]);
  28. Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
  29. UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
  30. this.segments = ss;
  31. }
复制代码

代码28行,一旦指定了concurrencyLevel(segments数组大小)便不能改变,这样,一旦threshold超标,rehash真不会影响segments数组,这样,在大并发的情况下,只会影响某一个segment的rehash而其他segment不会受到影响

(put方法都要上锁)

HashEntry

与hashmap类似,concurrentHashmap也采用了链表作为每个hash桶中的元素,不过concurrentHashmap又有些不同

  1. static final class HashEntry<K,V> {
  2. final int hash;
  3. final K key;
  4. volatile V value;
  5. volatile HashEntry<K,V> next;
  6. HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
  7. this.hash = hash;
  8. this.key = key;
  9. this.value = value;
  10. this.next = next;
  11. }
  12. /**
  13. * Sets next field with volatile write semantics. (See above
  14. * about use of putOrderedObject.)
  15. */
  16. final void setNext(HashEntry<K,V> n) {
  17. UNSAFE.putOrderedObject(this, nextOffset, n);
  18. }
  19. // Unsafe mechanics
  20. static final sun.misc.Unsafe UNSAFE;
  21. static final long nextOffset;
  22. static {
  23. try {
  24. UNSAFE = sun.misc.Unsafe.getUnsafe();
  25. Class k = HashEntry.class;
  26. nextOffset = UNSAFE.objectFieldOffset
  27. (k.getDeclaredField("next"));
  28. } catch (Exception e) {
  29. throw new Error(e);
  30. }
  31. }
  32. }
复制代码

HashEntry的key,hash采用final,可以避免并发修改问题,HashEntry链的尾部是不能修改的,而next和value采用volatile,可以避免使用同步造成的并发性能灾难,新版(jdk1.7)的concurrentHashmap大量使用java Unsafe类提供的原子操作,直接调用底层操作系统,提高性能(这块我也不是特别清楚)

get方法(1.6 vs 1.7)

1.6

  1. V get(Object key, int hash) {
  2. if (count != 0) { // read-volatile
  3. HashEntry<K,V> e = getFirst(hash);
  4. while (e != null) {
  5. if (e.hash == hash && key.equals(e.key)) {
  6. V v = e.value;
  7. if (v != null)
  8. return v;
  9. return readValueUnderLock(e); // recheck
  10. }
  11. e = e.next;
  12. }
  13. }
  14. return null;
  15. }
复制代码

1.6的jdk采用了乐观锁的方式处理了get方法,在get的时候put方法正在new对象,而此时value并未赋值,这时判断为空则加锁访问

1.7

  1. public V get(Object key) {
  2. Segment<K,V> s; // manually integrate access methods to reduce overhead
  3. HashEntry<K,V>[] tab;
  4. int h = hash(key);
  5. long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
  6. if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
  7. (tab = s.table) != null) {
  8. for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
  9. (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
  10. e != null; e = e.next) {
  11. K k;
  12. if ((k = e.key) == key || (e.hash == h && key.equals(k)))
  13. return e.value;
  14. }
  15. }
  16. return null;
  17. }
复制代码

1.7并没有判断value=null的情况,不知为何

跟同事沟通过,无论是1.6还是1.7的实现,实际上都是一种乐观的方式,而乐观的方式带来的是性能上的提升,但同时也带来数据的弱一致性,如果你的业务是强一致性的业务,可能就要考虑另外的解决办法(用Collections包装或者像jdk6中一样二次加锁获取)

http://ifeve.com/concurrenthashmap-weakly-consistent/

这篇文章可以很好地解释弱一致性问题

put方法
  1. public V put(K key, V value) {
  2. Segment<K,V> s;
  3. if (value == null)
  4. throw new NullPointerException();
  5. int hash = hash(key);
  6. int j = (hash >>> segmentShift) & segmentMask;
  7. if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
  8. (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
  9. s = ensureSegment(j);
  10. return s.put(key, hash, value, false);
  11. }
复制代码

对于put,concurrentHashmap采用自旋锁的方式,不同于1.6的直接获取锁

注:个人理解,这里采用自旋锁可能作者是觉得在分段锁的状态下,并发的可能本来就比较小,并且锁占用时间又并不是特别长,因此自旋锁可以减小线程唤醒和切换的开销

关于hash
  1. private int hash(Object k) {
  2. int h = hashSeed;
  3. if ((0 != h) && (k instanceof String)) {
  4. return sun.misc.Hashing.stringHash32((String) k);
  5. }
  6. h ^= k.hashCode();
  7. // Spread bits to regularize both segment and index locations,
  8. // using variant of single-word Wang/Jenkins hash.
  9. h += (h << 15) ^ 0xffffcd7d;
  10. h ^= (h >>> 10);
  11. h += (h << 3);
  12. h ^= (h >>> 6);
  13. h += (h << 2) + (h << 14);
  14. return h ^ (h >>> 16);
  15. }
复制代码

concurrentHashMap采用本身hashcode的同时,采用Wang/Jenkins算法对每位都做了处理,使得发生hash冲突的可能性大大减小(否则效率会很差)


而对于concurrentHashMap,segments的大小在初始时确定,此后不变,而元素所在segments桶序列由hash的高位决定

  1. public V put(K key, V value) {
  2. Segment<K,V> s;
  3. if (value == null)
  4. throw new NullPointerException();
  5. int hash = hash(key);
  6. int j = (hash >>> segmentShift) & segmentMask;
  7. if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
  8. (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
  9. s = ensureSegment(j);
  10. return s.put(key, hash, value, false);
  11. }
复制代码

segmentShift为(32-segments大小的二进制长度)


总结

concurrentHashmap主要是为并发设计,与Collections的包装不同,他不是采用全同步的方式,而是采用非锁get方式,通过数据的弱一致性带来性能上的大幅提升,同时采用分段锁的策略,提高并发能力

参考:

http://www.jb51.net/article/49699.htm

http://my.oschina.net/chihz/blog/58035

http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/

http://www.ibm.com/developerworks/cn/java/j-jtp06197.html

来自:http://my.oschina.net/zhenglingfei/blog/400515

最新评论

小黑屋|在路上 ( 蜀ICP备15035742号-1 

;

GMT+8, 2025-7-9 20:24

Copyright 2015-2025 djqfx

返回顶部