Skip to content

JUC 并发框架全景

java.util.concurrent(JUC)是 Java 并发编程的核心工具包,由 Doug Lea 设计。掌握 JUC,是写出高性能 Java 程序的基础。

JUC 全景图

java.util.concurrent
├── 原子类(atomic)
│   ├── AtomicInteger / AtomicLong / AtomicBoolean
│   ├── AtomicReference / AtomicStampedReference
│   └── LongAdder / LongAccumulator(高并发计数)

├── 锁(locks)
│   ├── ReentrantLock(可重入锁)
│   ├── ReentrantReadWriteLock(读写锁)
│   ├── StampedLock(乐观读锁,Java 8)
│   └── LockSupport(线程阻塞工具)

├── 同步工具
│   ├── CountDownLatch(倒计时门闩)
│   ├── CyclicBarrier(循环屏障)
│   ├── Semaphore(信号量)
│   ├── Exchanger(线程间数据交换)
│   └── Phaser(灵活的屏障)

├── 并发集合
│   ├── ConcurrentHashMap
│   ├── CopyOnWriteArrayList
│   ├── ConcurrentLinkedQueue
│   ├── LinkedBlockingQueue / ArrayBlockingQueue
│   └── PriorityBlockingQueue / DelayQueue

└── 线程池
    ├── ThreadPoolExecutor
    ├── ScheduledThreadPoolExecutor
    ├── ForkJoinPool
    └── Executors(工厂类)

原子类

java
// AtomicInteger — 原子整数
AtomicInteger counter = new AtomicInteger(0);
counter.incrementAndGet();  // ++i,返回新值
counter.getAndIncrement();  // i++,返回旧值
counter.addAndGet(5);       // 加 5,返回新值
counter.compareAndSet(5, 10);  // CAS:期望值=5时设为10

// LongAdder — 高并发计数(比 AtomicLong 更快)
// 内部分段,减少 CAS 竞争
LongAdder adder = new LongAdder();
adder.increment();
adder.add(5);
long total = adder.sum();  // 汇总所有段

// AtomicReference — 原子引用
AtomicReference<User> userRef = new AtomicReference<>(new User("Alice"));
User oldUser = userRef.get();
User newUser = new User("Bob");
boolean success = userRef.compareAndSet(oldUser, newUser);

// AtomicStampedReference — 解决 ABA 问题
AtomicStampedReference<Integer> stampedRef =
    new AtomicStampedReference<>(100, 0);
int[] stampHolder = new int[1];
int value = stampedRef.get(stampHolder);  // 获取值和版本号
stampedRef.compareAndSet(value, 200, stampHolder[0], stampHolder[0] + 1);

ReentrantLock

java
// ReentrantLock vs synchronized
// - 可中断等待
// - 可超时等待
// - 可公平锁
// - 可绑定多个 Condition

ReentrantLock lock = new ReentrantLock(true);  // true = 公平锁

// 基本用法
lock.lock();
try {
    // 临界区
} finally {
    lock.unlock();  // 必须在 finally 中释放!
}

// 可中断等待
try {
    lock.lockInterruptibly();
    try {
        // 临界区
    } finally {
        lock.unlock();
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

// 超时等待
if (lock.tryLock(3, TimeUnit.SECONDS)) {
    try {
        // 临界区
    } finally {
        lock.unlock();
    }
} else {
    System.out.println("获取锁超时");
}

// Condition — 精确唤醒
ReentrantLock lock2 = new ReentrantLock();
Condition notFull = lock2.newCondition();
Condition notEmpty = lock2.newCondition();

// 生产者
lock2.lock();
try {
    while (queue.isFull()) notFull.await();
    queue.add(item);
    notEmpty.signal();  // 只唤醒等待 notEmpty 的线程
} finally {
    lock2.unlock();
}

ReadWriteLock

java
// 读多写少场景:读锁共享,写锁独占
ReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();

// 缓存示例
public class Cache<K, V> {
    private final Map<K, V> map = new HashMap<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    public V get(K key) {
        lock.readLock().lock();  // 多个线程可以同时读
        try {
            return map.get(key);
        } finally {
            lock.readLock().unlock();
        }
    }

    public void put(K key, V value) {
        lock.writeLock().lock();  // 写时独占
        try {
            map.put(key, value);
        } finally {
            lock.writeLock().unlock();
        }
    }
}

// StampedLock — 乐观读(Java 8,性能更高)
StampedLock stampedLock = new StampedLock();

// 乐观读(不加锁,读完验证)
long stamp = stampedLock.tryOptimisticRead();
double x = this.x;
double y = this.y;
if (!stampedLock.validate(stamp)) {
    // 有写操作,升级为悲观读锁
    stamp = stampedLock.readLock();
    try {
        x = this.x;
        y = this.y;
    } finally {
        stampedLock.unlockRead(stamp);
    }
}

同步工具

java
// CountDownLatch — 等待多个任务完成
CountDownLatch latch = new CountDownLatch(3);

for (int i = 0; i < 3; i++) {
    final int taskId = i;
    executor.submit(() -> {
        try {
            doTask(taskId);
        } finally {
            latch.countDown();  // 完成一个任务
        }
    });
}

latch.await();  // 等待所有任务完成
// 或带超时:latch.await(10, TimeUnit.SECONDS)

// CyclicBarrier — 多个线程互相等待,到达屏障后一起继续
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有线程到达屏障,继续执行");
});

for (int i = 0; i < 3; i++) {
    executor.submit(() -> {
        doPhase1();
        barrier.await();  // 等待其他线程
        doPhase2();
        barrier.await();  // 可以重复使用(Cyclic)
    });
}

// Semaphore — 限制并发数
Semaphore semaphore = new Semaphore(10);  // 最多 10 个并发

executor.submit(() -> {
    semaphore.acquire();  // 获取许可
    try {
        accessResource();
    } finally {
        semaphore.release();  // 释放许可
    }
});

并发集合

java
// ConcurrentHashMap — 线程安全的 HashMap
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.putIfAbsent("key", 2);  // 原子操作
map.computeIfAbsent("key", k -> expensiveCompute(k));  // 原子计算
map.merge("key", 1, Integer::sum);  // 原子合并

// 高并发计数
map.merge("counter", 1, Integer::sum);

// CopyOnWriteArrayList — 读多写少的 List
// 写时复制整个数组,读不加锁
CopyOnWriteArrayList<String> cowList = new CopyOnWriteArrayList<>();
cowList.add("item");  // 写时复制

// BlockingQueue — 生产者消费者
BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100);

// 生产者
queue.put(task);  // 队列满时阻塞
queue.offer(task, 1, TimeUnit.SECONDS);  // 超时返回 false

// 消费者
Task task = queue.take();  // 队列空时阻塞
Task task2 = queue.poll(1, TimeUnit.SECONDS);  // 超时返回 null

// DelayQueue — 延迟队列
class DelayedTask implements Delayed {
    private final long executeTime;

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(executeTime - System.currentTimeMillis(),
                           TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.executeTime,
                           ((DelayedTask) other).executeTime);
    }
}

DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayedTask(System.currentTimeMillis() + 5000));
DelayedTask task3 = delayQueue.take();  // 5 秒后才能取出

JUC 选择指南

场景推荐工具
简单计数AtomicInteger / LongAdder
高并发计数LongAdder(比 AtomicLong 快)
读多写少ReentrantReadWriteLock / StampedLock
等待多任务CountDownLatch
多线程分阶段CyclicBarrier
限流Semaphore
线程安全 MapConcurrentHashMap
生产者消费者BlockingQueue

系统学习 Java 生态,深入底层架构