HashMap源码解析-2之ConcurrentHashMap

发布 : 2019-04-22 分类 : java 浏览 :

分析一个类的时候,需要分析核心属性和核心方法。我们就从这两方面入手。首先分析属性,然后分析核心方法,当然配合好官网的英文注释说明。

在此之前,先来说明下Unsafe这个类。所有的CAS都是基于这个类来处理的。

Unsafe

use of this class is limited because only trusted code can obtain instances of it。Unsafe提供了内存和系统级别的操作指令,能直接访问内存。
jre中的rt.jar包中有这个类。java11中是在jdk.internal.misc这个包中。一个final类,无法通过new来实例化。因为:

1
private Unsafe() {}

不过在这个类里看到了一个属性:

1
private static final Unsafe theUnsafe = new Unsafe();

我们可以通过这个属性来获取Unsafe对象:

1
2
3
4
5
6
7
8
9
10
11
12
sun.misc.Unsafe U = null;

Field theUnsafeInstance = null;
try {
theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafeInstance.setAccessible(true);
U = (Unsafe) theUnsafeInstance.get(Unsafe.class);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}

我们看到这个类里都是native方法。

CAS

Compare and swap。concurrent包中基于CAS实现了很多乐观锁。

CAS一般有三个参数,内存值,预期值,要改变的值。比较期望值和内存值是否相等,相等就替换成需要改变的值,返回true。否则返回false。

复习下
一般java中的concurrent包的类都是借助于CAS来完成读写安全的。CAS通过java调用native方法来实现,native方法是借助于C来调用底层cpu指令完成资源的独占。CAS指令发生的时候,理论上会对cmpxchg指令添加lock前缀,这个lock前缀,在intel手册上介绍的结果是:(在volatile那片文章中有所说明)如果多核,会锁住缓存行,保证单核对共享内存中变量的读写。volatile中能读取到最新的值,是因为cpu的缓存同步机制,检测到缓存和内存的内容不一致,废弃掉缓存的内容,直接读取内存。保证最新。

在concurrent包中很多对象都是通过CAS来实现的。例如AtomicInteger或者AtomicXXX的一些对象,基本都是有个volatile的属性,然后通过CAS来操作该属性,保证了原子性,从而保证线程安全。

ConcurrentHashMap

回到正题,此处看看ConcurrentHashMap的原理。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//最大的容量
private static final int MAXIMUM_CAPACITY = 1 << 30;

//初始的容量
private static final int DEFAULT_CAPACITY = 16;

//最大的数组大小,需要减去8,这里留个悬念,暂时不知道为什么
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//这个变量是没有用到的
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//负载因子
private static final float LOAD_FACTOR = 0.75f;

//这两个和HashMap很像,就是转换红黑树相关的阈值
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;


/*
* Encodings for Node hash fields. See above for explanation.
*/
//其实我暂时也不清楚这些字段的含义,往下看才知道
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

//这个比较明显。傻子都知道
/** Number of CPUS, to place bounds on some sizings */
static final int NCPU = Runtime.getRuntime().availableProcessors();

这里依旧有个Node,看下Node的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val) {
this.hash = hash;
this.key = key;
this.val = val;
}

Node(int hash, K key, V val, Node<K,V> next) {
this(hash, key, val);
this.next = next;
}
///

}

截取了一小部分,可以看到,这个Node就是ConcurrentHashMap的存储。

方法

构造函数

依旧先从构造函数看起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, LOAD_FACTOR, 1);
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
//这里计算出size大小,这里为什么加1?
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
//【1】tableSizeFor。。。?
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
//赋值
this.sizeCtl = cap;
}

有一些默认的值,如LOAD_FACTOR和concurrencyLevel。一圈跑下来,感觉初始化concurrenthashmap没有做什么内存的分配动作,只是做了一些size和cap的计算。
【1】tableSizeFor函数://todo 待补充

put

核心方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//非空校验
if (key == null || value == null) throw new NullPointerException();
//【2】计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
//【3】开始循环table了,这个table的解释见下面,默认是null的。
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
//第一次循环会进入,但是第二次就不进入了。
if (tab == null || (n = tab.length) == 0)
//【4】初始化,在put的时候才开始初始化的。这个tab是volatile的。volatile之前解释过
tab = initTable();
//这里比较巧妙,用到了HashMap中的那个与运算了,这个就是随机取桶的位置,tabAt是通过Unsafe来取的,所以是线程安全的,如果这个桶位置没有东西,那么走到if,这里就是hash运算。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//这里cas操作,把新节点放到这个位置就行。然后直接退出。
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
//【5】如果正在扩容
else if ((fh = f.hash) == MOVED)
//帮助扩容。
tab = helpTransfer(tab, f);
//这个判断比较简单,就是onlyIfAbsent,不做什么,直接返回。
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
//如果hash冲突了,这个时候进入到下面的分支流程
else {
V oldVal = null;
//加锁
synchronized (f) {
//
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
//这一段基本就是在遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//把节点放到链表的结尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
//如果是树,那么是采用红黑树的方式。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

【2】spread方法:引用网上人家的解释,也是翻译过来的:
Java 8的ConcurrentHashMap同样是通过Key的哈希值与数组长度取模确定该Key在数组中的索引。同样为了避免不太好的Key的hashCode设计,它通过如下方法计算得到Key的最终哈希值。不同的是,Java 8的ConcurrentHashMap作者认为引入红黑树后,即使哈希冲突比较严重,寻址效率也足够高,所以作者并未在哈希值的计算上做过多设计,只是将Key的hashCode值与其高16位作异或并保证最高位为0(从而保证最终结果为正整数)。

【3】 table是lazy init。

1
2
3
4
5
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;

【4】initTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//这里面用到了spin。
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//这个里面需要看下SIZECTL就是取到sizeCtl内存偏移量,通过比较sc和sizeCtl的值,来进行原子操作,这个时候该地址的值已经变成了-1
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
//初始化table
table = tab = nt;
//无符号右移两位16/2/2
sc = n - (n >>> 2);//16-4=12
}
} finally {
sizeCtl = sc;
}
break;
}
}
//返回初始化的
return tab;
}

【5】扩容:参见下并发编程——ConcurrentHashMap#transfer() 扩容逐行分析,这个讲的更详细。

通过以上代码分析,发现其实在存储结构的操作上concurrentHashMap和HashMap,都是数组和链表的结构,唯一的区别是有了synchronized和CAS,保证了线程安全。

本文作者 : braveheart
原文链接 : https://zhangjun075.github.io/passages/HashMap源码解析-2之ConcurrentHashMap/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我打赏

微信扫一扫, 向我打赏

支付宝扫一扫, 向我打赏

支付宝扫一扫, 向我打赏

留下足迹