JUC 并发框架全景
java.util.concurrent(JUC)是 Java 并发编程的核心工具包,由 Doug Lea 设计。掌握 JUC,是写出高性能 Java 程序的基础。
JUC 全景图
java.util.concurrent
├── 原子类(atomic)
│ ├── AtomicInteger / AtomicLong / AtomicBoolean
│ ├── AtomicReference / AtomicStampedReference
│ └── LongAdder / LongAccumulator(高并发计数)
│
├── 锁(locks)
│ ├── ReentrantLock(可重入锁)
│ ├── ReentrantReadWriteLock(读写锁)
│ ├── StampedLock(乐观读锁,Java 8)
│ └── LockSupport(线程阻塞工具)
│
├── 同步工具
│ ├── CountDownLatch(倒计时门闩)
│ ├── CyclicBarrier(循环屏障)
│ ├── Semaphore(信号量)
│ ├── Exchanger(线程间数据交换)
│ └── Phaser(灵活的屏障)
│
├── 并发集合
│ ├── ConcurrentHashMap
│ ├── CopyOnWriteArrayList
│ ├── ConcurrentLinkedQueue
│ ├── LinkedBlockingQueue / ArrayBlockingQueue
│ └── PriorityBlockingQueue / DelayQueue
│
└── 线程池
├── ThreadPoolExecutor
├── ScheduledThreadPoolExecutor
├── ForkJoinPool
└── Executors(工厂类)原子类
java
// AtomicInteger — 原子整数
AtomicInteger counter = new AtomicInteger(0);
counter.incrementAndGet(); // ++i,返回新值
counter.getAndIncrement(); // i++,返回旧值
counter.addAndGet(5); // 加 5,返回新值
counter.compareAndSet(5, 10); // CAS:期望值=5时设为10
// LongAdder — 高并发计数(比 AtomicLong 更快)
// 内部分段,减少 CAS 竞争
LongAdder adder = new LongAdder();
adder.increment();
adder.add(5);
long total = adder.sum(); // 汇总所有段
// AtomicReference — 原子引用
AtomicReference<User> userRef = new AtomicReference<>(new User("Alice"));
User oldUser = userRef.get();
User newUser = new User("Bob");
boolean success = userRef.compareAndSet(oldUser, newUser);
// AtomicStampedReference — 解决 ABA 问题
AtomicStampedReference<Integer> stampedRef =
new AtomicStampedReference<>(100, 0);
int[] stampHolder = new int[1];
int value = stampedRef.get(stampHolder); // 获取值和版本号
stampedRef.compareAndSet(value, 200, stampHolder[0], stampHolder[0] + 1);ReentrantLock
java
// ReentrantLock vs synchronized
// - 可中断等待
// - 可超时等待
// - 可公平锁
// - 可绑定多个 Condition
ReentrantLock lock = new ReentrantLock(true); // true = 公平锁
// 基本用法
lock.lock();
try {
// 临界区
} finally {
lock.unlock(); // 必须在 finally 中释放!
}
// 可中断等待
try {
lock.lockInterruptibly();
try {
// 临界区
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 超时等待
if (lock.tryLock(3, TimeUnit.SECONDS)) {
try {
// 临界区
} finally {
lock.unlock();
}
} else {
System.out.println("获取锁超时");
}
// Condition — 精确唤醒
ReentrantLock lock2 = new ReentrantLock();
Condition notFull = lock2.newCondition();
Condition notEmpty = lock2.newCondition();
// 生产者
lock2.lock();
try {
while (queue.isFull()) notFull.await();
queue.add(item);
notEmpty.signal(); // 只唤醒等待 notEmpty 的线程
} finally {
lock2.unlock();
}ReadWriteLock
java
// 读多写少场景:读锁共享,写锁独占
ReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();
// 缓存示例
public class Cache<K, V> {
private final Map<K, V> map = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public V get(K key) {
lock.readLock().lock(); // 多个线程可以同时读
try {
return map.get(key);
} finally {
lock.readLock().unlock();
}
}
public void put(K key, V value) {
lock.writeLock().lock(); // 写时独占
try {
map.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
}
// StampedLock — 乐观读(Java 8,性能更高)
StampedLock stampedLock = new StampedLock();
// 乐观读(不加锁,读完验证)
long stamp = stampedLock.tryOptimisticRead();
double x = this.x;
double y = this.y;
if (!stampedLock.validate(stamp)) {
// 有写操作,升级为悲观读锁
stamp = stampedLock.readLock();
try {
x = this.x;
y = this.y;
} finally {
stampedLock.unlockRead(stamp);
}
}同步工具
java
// CountDownLatch — 等待多个任务完成
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
final int taskId = i;
executor.submit(() -> {
try {
doTask(taskId);
} finally {
latch.countDown(); // 完成一个任务
}
});
}
latch.await(); // 等待所有任务完成
// 或带超时:latch.await(10, TimeUnit.SECONDS)
// CyclicBarrier — 多个线程互相等待,到达屏障后一起继续
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程到达屏障,继续执行");
});
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
doPhase1();
barrier.await(); // 等待其他线程
doPhase2();
barrier.await(); // 可以重复使用(Cyclic)
});
}
// Semaphore — 限制并发数
Semaphore semaphore = new Semaphore(10); // 最多 10 个并发
executor.submit(() -> {
semaphore.acquire(); // 获取许可
try {
accessResource();
} finally {
semaphore.release(); // 释放许可
}
});并发集合
java
// ConcurrentHashMap — 线程安全的 HashMap
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.putIfAbsent("key", 2); // 原子操作
map.computeIfAbsent("key", k -> expensiveCompute(k)); // 原子计算
map.merge("key", 1, Integer::sum); // 原子合并
// 高并发计数
map.merge("counter", 1, Integer::sum);
// CopyOnWriteArrayList — 读多写少的 List
// 写时复制整个数组,读不加锁
CopyOnWriteArrayList<String> cowList = new CopyOnWriteArrayList<>();
cowList.add("item"); // 写时复制
// BlockingQueue — 生产者消费者
BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100);
// 生产者
queue.put(task); // 队列满时阻塞
queue.offer(task, 1, TimeUnit.SECONDS); // 超时返回 false
// 消费者
Task task = queue.take(); // 队列空时阻塞
Task task2 = queue.poll(1, TimeUnit.SECONDS); // 超时返回 null
// DelayQueue — 延迟队列
class DelayedTask implements Delayed {
private final long executeTime;
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.executeTime,
((DelayedTask) other).executeTime);
}
}
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayedTask(System.currentTimeMillis() + 5000));
DelayedTask task3 = delayQueue.take(); // 5 秒后才能取出JUC 选择指南
| 场景 | 推荐工具 |
|---|---|
| 简单计数 | AtomicInteger / LongAdder |
| 高并发计数 | LongAdder(比 AtomicLong 快) |
| 读多写少 | ReentrantReadWriteLock / StampedLock |
| 等待多任务 | CountDownLatch |
| 多线程分阶段 | CyclicBarrier |
| 限流 | Semaphore |
| 线程安全 Map | ConcurrentHashMap |
| 生产者消费者 | BlockingQueue |