ConcurrentHashMap并发原理
约 1784 字大约 6 分钟
javaconcurrent
2025-03-03
概述
ConcurrentHashMap 是 java.util.concurrent 包中线程安全的哈希表实现,它在保证线程安全的同时提供了远超 Hashtable 和 Collections.synchronizedMap() 的并发性能。Java 7 和 Java 8 采用了完全不同的实现策略。
Java 7:Segment分段锁
Java 7 中,ConcurrentHashMap 将数据分成若干段(Segment),每个 Segment 本质上是一个小型 HashMap,拥有独立的 ReentrantLock。
// Java 7 核心结构(简化)
public class ConcurrentHashMap<K,V> {
final Segment<K,V>[] segments; // 默认16个Segment
static final class Segment<K,V> extends ReentrantLock {
transient volatile HashEntry<K,V>[] table;
transient int count;
transient int threshold;
final float loadFactor;
}
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
}并发度:默认16个Segment,意味着最多支持16个线程同时写入(不同Segment之间互不阻塞)。
定位过程:先用 hash 的高位确定 Segment,再用低位确定桶位。
Java 8:CAS + synchronized
Java 8 完全重写了 ConcurrentHashMap,抛弃 Segment 分段锁,改为对每个桶头节点加 synchronized 锁,粒度更细。
核心节点类型
// 普通链表节点
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
// 红黑树根节点包装器
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState; // 读写锁状态
}
// 红黑树节点
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev;
boolean red;
}
// 扩容转发节点
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
// hash = MOVED = -1,标识此桶已迁移
}put操作流程
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // CAS初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<>(hash, key, value, null)))
break; // CAS空桶写入
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 帮助扩容
else {
V oldVal = null;
synchronized (f) { // 锁住桶头节点
if (tabAt(tab, i) == f) { // double-check
if (fh >= 0) {
// 链表操作...
} else if (f instanceof TreeBin) {
// 红黑树操作...
}
}
}
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null) return oldVal;
break;
}
}
addCount(1L, binCount);
return null;
}初始化 — CAS 保证单线程执行
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // 其他线程正在初始化,让出CPU
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// CAS成功,获得初始化权
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
table = tab = new Node[n];
sc = n - (n >>> 2); // 0.75n
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}size计数 — CounterCell
ConcurrentHashMap 的 size() 不能简单用一个 volatile int 实现(CAS竞争太激烈),而是借鉴 LongAdder 的思路,使用 CounterCell 数组分散计数。
// addCount简化逻辑
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 先尝试CAS更新baseCount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// CAS失败则分散到CounterCell
CounterCell a; long v; int m;
if (as == null || (a = as[ThreadLocalRandom.getProbe() & (as.length - 1)]) == null
|| !(U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended); // 进一步处理
}
}
}
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 : (n > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
final long sumCount() {
CounterCell[] as = counterCells;
long sum = baseCount;
if (as != null) {
for (CounterCell a : as)
if (a != null) sum += a.value;
}
return sum;
}扩容 — 多线程协助迁移(transfer)
每个线程领取 stride(最少16)个桶进行迁移,从数组尾部向头部推进。已迁移的桶放置 ForwardingNode(hash=-1),后续 get/put 操作遇到它时自动转向新数组。
get操作 — 全程无锁
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0) // ForwardingNode 或 TreeBin
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}get 不加锁,利用 volatile 读保证可见性。Node.val 和 Node.next 都是 volatile 的。
与Hashtable/synchronizedMap对比
| 特性 | Hashtable | synchronizedMap | ConcurrentHashMap |
|---|---|---|---|
| 锁粒度 | 整个表一把锁 | 整个表一把锁 | 桶级别锁 |
| null key/value | 不允许 | 允许 | 不允许 |
| 迭代器 | fail-fast | fail-fast | 弱一致性 |
| 并发读 | 阻塞 | 阻塞 | 无锁 |
| 并发写 | 串行 | 串行 | 桶级别并行 |
| 扩容 | 单线程 | 单线程 | 多线程协助 |
| 性能 | 差 | 差 | 优秀 |
常见陷阱
1. 复合操作的原子性
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 错误:check-then-act 非原子
if (!map.containsKey("key")) {
map.put("key", 1); // 两个线程可能同时通过检查
}
// 正确:使用原子复合方法
map.putIfAbsent("key", 1);
map.computeIfAbsent("key", k -> expensiveCompute(k));
map.merge("key", 1, Integer::sum); // 原子累加2. size() 的弱一致性
size() 返回的是一个近似值,在并发修改期间可能不精确。如果需要精确值,需要在不存在并发修改的时间窗口调用。
3. 不允许null
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
// map.put("key", null); // NullPointerException
// map.put(null, "value"); // NullPointerException禁止null是设计决策:在并发环境中,get 返回 null 无法区分"key不存在"和"value为null"。
总结
ConcurrentHashMap 的演进体现了并发编程的核心思想 — 尽可能减小锁粒度。从 Java 7 的 Segment 分段锁到 Java 8 的桶级别锁 + CAS,每一步优化都在降低锁竞争、提升吞吐量。理解其实现细节,有助于在实际开发中正确使用并发集合,避免在高并发场景下踩坑。
贡献者
更新日志
9f6c2-feat: organize wiki content and refresh site setup于