Java 并发 — 知识详解
更新: 5/15/2026 字数: 0 字 时长: 0 分钟
1. Java 内存模型(JMM)
1.1 主内存与工作内存
线程A 工作内存 主内存 线程B 工作内存
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 变量副本 │ ←→ │ 共享变量 │ ←→ │ 变量副本 │
└──────────┘ └──────────┘ └──────────┘- 所有共享变量存储在主内存
- 每个线程有自己的工作内存,保存变量副本
- 线程对变量的操作必须在工作内存中进行,不能直接操作主内存
- 线程间通信必须通过主内存
1.2 happens-before 规则
JMM 定义了 8 条 happens-before 规则,保证可见性和有序性:
- 程序顺序规则:同一线程中,前面的操作 happens-before 后面的操作
- Monitor 锁规则:unlock happens-before 后续对同一锁的 lock
- volatile 规则:volatile 写 happens-before 后续对同一变量的读
- 线程启动规则:
Thread.start()happens-before 该线程的任何操作 - 线程终止规则:线程的所有操作 happens-before
Thread.join()返回 - 中断规则:
interrupt()happens-before 被中断线程检测到中断 - 终结器规则:构造函数 happens-before
finalize() - 传递性:A happens-before B,B happens-before C → A happens-before C
2. volatile
2.1 可见性
volatile 变量的写会立即刷新到主内存,读会从主内存重新加载:
volatile boolean running = true;
// 线程A
public void stop() {
running = false; // 写入主内存
}
// 线程B
public void run() {
while (running) { // 每次从主内存读取
// do work
}
}没有 volatile,线程B 可能永远看不到 running = false(工作内存缓存了旧值)。
2.2 有序性(内存屏障)
volatile 通过插入内存屏障禁止指令重排:
| 屏障类型 | 插入位置 | 作用 |
|---|---|---|
| StoreStore | volatile 写之前 | 禁止上面的普通写与 volatile 写重排 |
| StoreLoad | volatile 写之后 | 禁止 volatile 写与下面的读重排 |
| LoadLoad | volatile 读之后 | 禁止 volatile 读与下面的普通读重排 |
| LoadStore | volatile 读之后 | 禁止 volatile 读与下面的普通写重排 |
2.3 不保证原子性
volatile int count = 0;
// ❌ 不安全!i++ 是三步操作:读-改-写
count++; // 1.读count 2.count+1 3.写回count
// 两个线程可能同时读到相同值,各自+1后写回,丢失一次自增
// ✅ 用 AtomicInteger
AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet(); // CAS 保证原子性2.4 DCL 单例中的应用
public class Singleton {
// 必须加 volatile!
private static volatile Singleton instance;
public static Singleton getInstance() {
if (instance == null) { // 第一次检查
synchronized (Singleton.class) {
if (instance == null) { // 第二次检查
instance = new Singleton(); // 非原子操作!
}
}
}
return instance;
}
}为什么需要 volatile? new Singleton() 分三步:
- 分配内存
- 调用构造方法初始化
- 将引用赋值给 instance
没有 volatile,步骤 2 和 3 可能重排序。线程A 执行了 1→3(还没执行 2),线程B 看到 instance != null 就直接返回了一个未初始化的对象。
3. synchronized
3.1 锁升级过程
无锁 → 偏向锁 → 轻量级锁 → 重量级锁(单向升级,不可降级)偏向锁(Biased Locking):
- 场景:只有一个线程访问同步块
- 实现:在 Mark Word 中记录线程 ID
- 后续该线程进入同步块时,只需比较线程 ID,无需 CAS
- 当第二个线程尝试获取锁时,偏向锁撤销,升级为轻量级锁
轻量级锁(Lightweight Lock):
- 场景:多个线程交替执行同步块(无竞争)
- 实现:在栈帧中创建 Lock Record,CAS 将 Mark Word 复制到 Lock Record
- 退出时 CAS 恢复 Mark Word
- CAS 失败(有竞争)→ 升级为重量级锁
重量级锁(Heavyweight Lock):
- 场景:多个线程同时竞争
- 实现:依赖操作系统 Mutex Lock
- 未获取锁的线程阻塞(用户态→内核态切换,开销大)
- 底层是 ObjectMonitor:
_EntryList:等待获取锁的线程队列_WaitSet:调用wait()后等待的线程_owner:当前持有锁的线程
3.2 锁优化
锁消除:JIT 编译器检测到同步块中的对象不会逃逸到其他线程,自动去除锁。
// StringBuffer.append 内部有 synchronized,但 sb 是局部变量不会逃逸
public String concat(String s1, String s2) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
return sb.toString();
// JIT 会消除 append 中的 synchronized
}锁粗化:连续对同一对象加锁解锁,合并为一次更大范围的锁。
3.3 synchronized vs ReentrantLock
| 特性 | synchronized | ReentrantLock |
|---|---|---|
| 实现 | JVM 内置,monitorenter/monitorexit | Java API,基于 AQS |
| 释放 | 自动释放(退出同步块) | 手动 unlock()(必须在 finally 中) |
| 可中断 | 不可中断 | lockInterruptibly() 可中断 |
| 公平性 | 非公平 | 可选公平/非公平 |
| 条件变量 | 只有一个 wait/notify | 多个 Condition |
| 性能 | JDK 6 后优化,差距不大 | 差距不大 |
4. AQS(AbstractQueuedSynchronizer)
4.1 核心结构
// 同步状态
private volatile int state;
// CLH 双向队列节点
static final class Node {
volatile int waitStatus; // CANCELLED(1), SIGNAL(-1), CONDITION(-2), PROPAGATE(-3)
volatile Node prev;
volatile Node next;
volatile Thread thread;
}AQS 是 ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 的基础框架。
4.2 acquire 流程(以 ReentrantLock 为例)
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 1. 尝试获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 2. 失败则入队等待
selfInterrupt();
}详细流程:
tryAcquire(1):尝试 CAS 将 state 从 0 改为 1- 成功 → 设置 exclusiveOwnerThread 为当前线程,获取锁成功
- 失败 → 检查是否是重入(当前线程 == ownerThread),是则 state++
addWaiter(Node.EXCLUSIVE):创建节点加入 CLH 队列尾部(CAS + 自旋)acquireQueued:在队列中自旋等待- 如果前驱是 head,再次 tryAcquire
- 否则
shouldParkAfterFailedAcquire:将前驱的 waitStatus 设为 SIGNAL parkAndCheckInterrupt:LockSupport.park()阻塞当前线程
4.3 release 流程
public final boolean release(int arg) {
if (tryRelease(arg)) { // state-- ,state==0 时释放成功
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // LockSupport.unpark() 唤醒后继节点
return true;
}
return false;
}4.4 公平锁 vs 非公平锁
// 非公平锁的 tryAcquire(默认)
final boolean nonfairTryAcquire(int acquires) {
// 直接 CAS 抢锁,不管队列中有没有等待的线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
// ...
}
// 公平锁的 tryAcquire
protected final boolean tryAcquire(int acquires) {
// 多了一个检查:队列中是否有等待更久的线程
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
// ...
}非公平锁性能更好(减少线程切换),但可能导致饥饿。
5. 线程池 ThreadPoolExecutor
5.1 七大参数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数(即使空闲也不回收,除非设置 allowCoreThreadTimeOut)
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 非核心线程空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)5.2 execute 流程
提交任务
│
▼
workerCount < corePoolSize ?
├─ 是 → 创建核心线程执行任务
│
▼ 否
workQueue.offer(task) 成功?
├─ 是 → 任务入队等待
│
▼ 否(队列满)
workerCount < maximumPoolSize ?
├─ 是 → 创建非核心线程执行任务
│
▼ 否
执行拒绝策略源码:
public void execute(Runnable command) {
int c = ctl.get();
// 1. 工作线程数 < 核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return;
c = ctl.get();
}
// 2. 尝试入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 队列满,尝试创建非核心线程
else if (!addWorker(command, false))
reject(command); // 4. 拒绝
}5.3 四种拒绝策略
| 策略 | 行为 |
|---|---|
| AbortPolicy(默认) | 抛出 RejectedExecutionException |
| CallerRunsPolicy | 由提交任务的线程直接执行 |
| DiscardPolicy | 静默丢弃任务 |
| DiscardOldestPolicy | 丢弃队列中最老的任务,重新提交当前任务 |
5.4 常见线程池配置
// ❌ 不推荐使用 Executors 创建(阿里规约)
Executors.newFixedThreadPool(n); // LinkedBlockingQueue 无界队列,可能 OOM
Executors.newCachedThreadPool(); // maximumPoolSize = Integer.MAX_VALUE,可能创建大量线程
Executors.newSingleThreadExecutor(); // 同 Fixed,无界队列
// ✅ 推荐手动创建
new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), // CPU 密集型
Runtime.getRuntime().availableProcessors() * 2, // IO 密集型可以更大
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 有界队列
new ThreadFactory() { ... },
new ThreadPoolExecutor.CallerRunsPolicy()
);5.5 Worker 工作原理
Worker 继承 AQS(实现不可重入锁)并实现 Runnable:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
w.lock(); // Worker 自身加锁,用于 shutdown 时判断是否空闲
try {
beforeExecute(w.thread, task);
task.run();
afterExecute(task, null);
} finally {
task = null;
w.unlock();
}
}
// getTask() 返回 null → 线程退出
}getTask() 从队列取任务,非核心线程用 poll(keepAliveTime) 超时获取,核心线程用 take() 阻塞获取。
6. CAS 与 ABA 问题
6.1 CAS 原理
Compare And Swap:比较内存值与预期值,相等则更新为新值,整个操作是原子的。
// Unsafe 类提供的 CAS 操作(CPU 指令级别保证原子性)
public final native boolean compareAndSwapInt(
Object obj, long offset, int expected, int update);
// AtomicInteger 的 incrementAndGet
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
// getAndAddInt 内部自旋 CAS
public final int getAndAddInt(Object obj, long offset, int delta) {
int v;
do {
v = getIntVolatile(obj, offset); // 读取当前值
} while (!compareAndSwapInt(obj, offset, v, v + delta)); // CAS 直到成功
return v;
}6.2 ABA 问题
线程1 读到值 A,线程2 将 A→B→A,线程1 CAS 发现仍是 A 就成功了,但值已经被改过。
解决方案:加版本号
AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(1, 0);
int[] stampHolder = new int[1];
int value = ref.get(stampHolder); // value=1, stamp=0
// CAS 时同时比较值和版本号
ref.compareAndSet(1, 2, 0, 1); // 期望值1版本0 → 新值2版本17. ThreadLocal
7.1 原理
每个 Thread 持有一个 ThreadLocalMap,key 是 ThreadLocal 对象(弱引用),value 是存储的值:
public class Thread {
ThreadLocal.ThreadLocalMap threadLocals; // 每个线程独有
}
// ThreadLocal.set()
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = t.threadLocals;
if (map != null)
map.set(this, value); // this 就是 ThreadLocal 对象作为 key
else
createMap(t, value);
}
// ThreadLocal.get()
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = t.threadLocals;
if (map != null) {
Entry e = map.getEntry(this);
if (e != null) return (T) e.value;
}
return setInitialValue();
}7.2 内存泄漏
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value; // value 是强引用!
Entry(ThreadLocal<?> k, Object v) {
super(k); // key 是弱引用
value = v;
}
}泄漏场景:
- ThreadLocal 对象被回收(弱引用 key 变为 null)
- 但 Entry 的 value 仍然被 Entry 强引用
- Entry 被 ThreadLocalMap 强引用
- ThreadLocalMap 被 Thread 强引用
- 如果线程是线程池中的长期存活线程 → value 永远不会被回收
解决方案:用完后手动调用 threadLocal.remove()
try {
threadLocal.set(value);
// 使用 value
} finally {
threadLocal.remove(); // 必须清理!
}7.3 Android 中的应用
Looper.myLooper():每个线程的 Looper 存储在 ThreadLocal 中Choreographer:每个线程的编舞者实例
// Looper 源码
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<>();
public static void prepare() {
if (sThreadLocal.get() != null)
throw new RuntimeException("Only one Looper may be created per thread");
sThreadLocal.set(new Looper());
}
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}常见坑总结
| 坑 | 说明 |
|---|---|
| volatile 不保证原子性 | count++ 仍需 AtomicInteger 或 synchronized |
| DCL 单例必须加 volatile | 防止指令重排导致获取未初始化对象 |
| ThreadLocal 用完必须 remove | 线程池场景下会内存泄漏 |
| Executors 创建线程池有风险 | 无界队列或无限线程数可能 OOM |
| synchronized 锁的是对象不是代码 | 锁 this、锁 Class、锁任意 Object 要区分 |
| wait/notify 必须在 synchronized 块中 | 否则抛 IllegalMonitorStateException |
| CAS 自旋在高竞争下性能差 | 大量线程竞争时不如 synchronized |
8. 并发工具类
8.1 CountDownLatch
一次性计数器,等待 N 个任务完成:
CountDownLatch latch = new CountDownLatch(3);
// 三个任务并行执行
executor.execute(() -> { doTask1(); latch.countDown(); });
executor.execute(() -> { doTask2(); latch.countDown(); });
executor.execute(() -> { doTask3(); latch.countDown(); });
latch.await(); // 阻塞直到计数为 0
// 三个任务都完成了原理:基于 AQS,state 初始为 N,countDown 时 CAS 减 1,await 时检查 state 是否为 0。
不可重用:计数到 0 后无法重置。
8.2 CyclicBarrier
可重用的屏障,N 个线程互相等待到达屏障点:
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程到达屏障,继续执行");
});
// 每个线程
barrier.await(); // 阻塞直到 3 个线程都调用了 await与 CountDownLatch 的区别:
- CountDownLatch:一个线程等待其他 N 个线程完成,一次性
- CyclicBarrier:N 个线程互相等待,可重用(reset)
8.3 Semaphore
信号量,控制并发访问数量:
Semaphore semaphore = new Semaphore(3); // 最多 3 个线程同时访问
semaphore.acquire(); // 获取许可(-1),无许可则阻塞
try {
// 访问共享资源
} finally {
semaphore.release(); // 释放许可(+1)
}应用场景:数据库连接池、限流。
8.4 CompletableFuture
Java 8 引入的异步编程工具:
CompletableFuture.supplyAsync(() -> fetchUser(id)) // 异步执行
.thenApply(user -> user.getName()) // 转换结果
.thenCombine(
CompletableFuture.supplyAsync(() -> fetchOrder(id)), // 并行另一个任务
(name, order) -> name + ": " + order // 合并结果
)
.thenAccept(result -> System.out.println(result)) // 消费结果
.exceptionally(e -> { log(e); return null; }); // 异常处理常用方法:
supplyAsync/runAsync:异步执行thenApply/thenAccept/thenRun:链式处理thenCombine/thenCompose:组合多个 FutureallOf/anyOf:等待全部/任一完成exceptionally/handle:异常处理
8.5 ForkJoinPool
分治框架,将大任务拆分为小任务并行执行:
class SumTask extends RecursiveTask<Long> {
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
return directSum(); // 小任务直接计算
}
int mid = (start + end) / 2;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid, end);
left.fork(); // 异步执行左半部分
Long rightResult = right.compute(); // 当前线程执行右半部分
Long leftResult = left.join(); // 等待左半部分结果
return leftResult + rightResult;
}
}工作窃取(Work Stealing):每个线程有自己的双端队列,空闲线程从其他线程队列的尾部窃取任务,提高 CPU 利用率。
Java 8 的并行流(parallelStream)底层就是 ForkJoinPool。
8.6 ReadWriteLock
读写锁,读读不互斥,读写/写写互斥:
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
// 读操作(多个线程可以同时读)
rwLock.readLock().lock();
try { return data; }
finally { rwLock.readLock().unlock(); }
// 写操作(独占)
rwLock.writeLock().lock();
try { data = newValue; }
finally { rwLock.writeLock().unlock(); }原理:AQS 的 state 高 16 位存读锁计数,低 16 位存写锁计数。
适用场景:读多写少的缓存。
