Skip to content

Java 并发 — 知识详解

更新: 5/15/2026 字数: 0 字 时长: 0 分钟

1. Java 内存模型(JMM)

1.1 主内存与工作内存

线程A 工作内存          主内存          线程B 工作内存
┌──────────┐     ┌──────────┐     ┌──────────┐
│ 变量副本  │ ←→  │ 共享变量  │ ←→  │ 变量副本  │
└──────────┘     └──────────┘     └──────────┘
  • 所有共享变量存储在主内存
  • 每个线程有自己的工作内存,保存变量副本
  • 线程对变量的操作必须在工作内存中进行,不能直接操作主内存
  • 线程间通信必须通过主内存

1.2 happens-before 规则

JMM 定义了 8 条 happens-before 规则,保证可见性和有序性:

  1. 程序顺序规则:同一线程中,前面的操作 happens-before 后面的操作
  2. Monitor 锁规则:unlock happens-before 后续对同一锁的 lock
  3. volatile 规则:volatile 写 happens-before 后续对同一变量的读
  4. 线程启动规则Thread.start() happens-before 该线程的任何操作
  5. 线程终止规则:线程的所有操作 happens-before Thread.join() 返回
  6. 中断规则interrupt() happens-before 被中断线程检测到中断
  7. 终结器规则:构造函数 happens-before finalize()
  8. 传递性:A happens-before B,B happens-before C → A happens-before C

2. volatile

2.1 可见性

volatile 变量的写会立即刷新到主内存,读会从主内存重新加载:

java
volatile boolean running = true;

// 线程A
public void stop() {
    running = false; // 写入主内存
}

// 线程B
public void run() {
    while (running) { // 每次从主内存读取
        // do work
    }
}

没有 volatile,线程B 可能永远看不到 running = false(工作内存缓存了旧值)。

2.2 有序性(内存屏障)

volatile 通过插入内存屏障禁止指令重排:

屏障类型插入位置作用
StoreStorevolatile 写之前禁止上面的普通写与 volatile 写重排
StoreLoadvolatile 写之后禁止 volatile 写与下面的读重排
LoadLoadvolatile 读之后禁止 volatile 读与下面的普通读重排
LoadStorevolatile 读之后禁止 volatile 读与下面的普通写重排

2.3 不保证原子性

java
volatile int count = 0;

// ❌ 不安全!i++ 是三步操作:读-改-写
count++; // 1.读count 2.count+1 3.写回count
// 两个线程可能同时读到相同值,各自+1后写回,丢失一次自增

// ✅ 用 AtomicInteger
AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet(); // CAS 保证原子性

2.4 DCL 单例中的应用

java
public class Singleton {
    // 必须加 volatile!
    private static volatile Singleton instance;

    public static Singleton getInstance() {
        if (instance == null) {                // 第一次检查
            synchronized (Singleton.class) {
                if (instance == null) {        // 第二次检查
                    instance = new Singleton(); // 非原子操作!
                }
            }
        }
        return instance;
    }
}

为什么需要 volatile? new Singleton() 分三步:

  1. 分配内存
  2. 调用构造方法初始化
  3. 将引用赋值给 instance

没有 volatile,步骤 2 和 3 可能重排序。线程A 执行了 1→3(还没执行 2),线程B 看到 instance != null 就直接返回了一个未初始化的对象。


3. synchronized

3.1 锁升级过程

无锁 → 偏向锁 → 轻量级锁 → 重量级锁(单向升级,不可降级)

偏向锁(Biased Locking):

  • 场景:只有一个线程访问同步块
  • 实现:在 Mark Word 中记录线程 ID
  • 后续该线程进入同步块时,只需比较线程 ID,无需 CAS
  • 当第二个线程尝试获取锁时,偏向锁撤销,升级为轻量级锁

轻量级锁(Lightweight Lock):

  • 场景:多个线程交替执行同步块(无竞争)
  • 实现:在栈帧中创建 Lock Record,CAS 将 Mark Word 复制到 Lock Record
  • 退出时 CAS 恢复 Mark Word
  • CAS 失败(有竞争)→ 升级为重量级锁

重量级锁(Heavyweight Lock):

  • 场景:多个线程同时竞争
  • 实现:依赖操作系统 Mutex Lock
  • 未获取锁的线程阻塞(用户态→内核态切换,开销大)
  • 底层是 ObjectMonitor:
    • _EntryList:等待获取锁的线程队列
    • _WaitSet:调用 wait() 后等待的线程
    • _owner:当前持有锁的线程

3.2 锁优化

锁消除:JIT 编译器检测到同步块中的对象不会逃逸到其他线程,自动去除锁。

java
// StringBuffer.append 内部有 synchronized,但 sb 是局部变量不会逃逸
public String concat(String s1, String s2) {
    StringBuffer sb = new StringBuffer();
    sb.append(s1);
    sb.append(s2);
    return sb.toString();
    // JIT 会消除 append 中的 synchronized
}

锁粗化:连续对同一对象加锁解锁,合并为一次更大范围的锁。

3.3 synchronized vs ReentrantLock

特性synchronizedReentrantLock
实现JVM 内置,monitorenter/monitorexitJava API,基于 AQS
释放自动释放(退出同步块)手动 unlock()(必须在 finally 中)
可中断不可中断lockInterruptibly() 可中断
公平性非公平可选公平/非公平
条件变量只有一个 wait/notify多个 Condition
性能JDK 6 后优化,差距不大差距不大

4. AQS(AbstractQueuedSynchronizer)

4.1 核心结构

java
// 同步状态
private volatile int state;

// CLH 双向队列节点
static final class Node {
    volatile int waitStatus; // CANCELLED(1), SIGNAL(-1), CONDITION(-2), PROPAGATE(-3)
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
}

AQS 是 ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLock 的基础框架。

4.2 acquire 流程(以 ReentrantLock 为例)

java
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&                    // 1. 尝试获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 2. 失败则入队等待
        selfInterrupt();
}

详细流程:

  1. tryAcquire(1):尝试 CAS 将 state 从 0 改为 1
    • 成功 → 设置 exclusiveOwnerThread 为当前线程,获取锁成功
    • 失败 → 检查是否是重入(当前线程 == ownerThread),是则 state++
  2. addWaiter(Node.EXCLUSIVE):创建节点加入 CLH 队列尾部(CAS + 自旋)
  3. acquireQueued:在队列中自旋等待
    • 如果前驱是 head,再次 tryAcquire
    • 否则 shouldParkAfterFailedAcquire:将前驱的 waitStatus 设为 SIGNAL
    • parkAndCheckInterruptLockSupport.park() 阻塞当前线程

4.3 release 流程

java
public final boolean release(int arg) {
    if (tryRelease(arg)) {          // state-- ,state==0 时释放成功
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);     // LockSupport.unpark() 唤醒后继节点
        return true;
    }
    return false;
}

4.4 公平锁 vs 非公平锁

java
// 非公平锁的 tryAcquire(默认)
final boolean nonfairTryAcquire(int acquires) {
    // 直接 CAS 抢锁,不管队列中有没有等待的线程
    if (compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
    // ...
}

// 公平锁的 tryAcquire
protected final boolean tryAcquire(int acquires) {
    // 多了一个检查:队列中是否有等待更久的线程
    if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
    // ...
}

非公平锁性能更好(减少线程切换),但可能导致饥饿。


5. 线程池 ThreadPoolExecutor

5.1 七大参数

java
public ThreadPoolExecutor(
    int corePoolSize,      // 核心线程数(即使空闲也不回收,除非设置 allowCoreThreadTimeOut)
    int maximumPoolSize,   // 最大线程数
    long keepAliveTime,    // 非核心线程空闲存活时间
    TimeUnit unit,         // 时间单位
    BlockingQueue<Runnable> workQueue,  // 任务队列
    ThreadFactory threadFactory,        // 线程工厂
    RejectedExecutionHandler handler    // 拒绝策略
)

5.2 execute 流程

提交任务


workerCount < corePoolSize ?
  ├─ 是 → 创建核心线程执行任务

  ▼ 否
workQueue.offer(task) 成功?
  ├─ 是 → 任务入队等待

  ▼ 否(队列满)
workerCount < maximumPoolSize ?
  ├─ 是 → 创建非核心线程执行任务

  ▼ 否
执行拒绝策略

源码:

java
public void execute(Runnable command) {
    int c = ctl.get();
    // 1. 工作线程数 < 核心线程数
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) return;
        c = ctl.get();
    }
    // 2. 尝试入队
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 队列满,尝试创建非核心线程
    else if (!addWorker(command, false))
        reject(command); // 4. 拒绝
}

5.3 四种拒绝策略

策略行为
AbortPolicy(默认)抛出 RejectedExecutionException
CallerRunsPolicy由提交任务的线程直接执行
DiscardPolicy静默丢弃任务
DiscardOldestPolicy丢弃队列中最老的任务,重新提交当前任务

5.4 常见线程池配置

java
// ❌ 不推荐使用 Executors 创建(阿里规约)
Executors.newFixedThreadPool(n);     // LinkedBlockingQueue 无界队列,可能 OOM
Executors.newCachedThreadPool();     // maximumPoolSize = Integer.MAX_VALUE,可能创建大量线程
Executors.newSingleThreadExecutor(); // 同 Fixed,无界队列

// ✅ 推荐手动创建
new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(), // CPU 密集型
    Runtime.getRuntime().availableProcessors() * 2, // IO 密集型可以更大
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000), // 有界队列
    new ThreadFactory() { ... },
    new ThreadPoolExecutor.CallerRunsPolicy()
);

5.5 Worker 工作原理

Worker 继承 AQS(实现不可重入锁)并实现 Runnable:

java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;
    Runnable firstTask;

    public void run() {
        runWorker(this);
    }
}

final void runWorker(Worker w) {
    Runnable task = w.firstTask;
    w.firstTask = null;
    while (task != null || (task = getTask()) != null) {
        w.lock(); // Worker 自身加锁,用于 shutdown 时判断是否空闲
        try {
            beforeExecute(w.thread, task);
            task.run();
            afterExecute(task, null);
        } finally {
            task = null;
            w.unlock();
        }
    }
    // getTask() 返回 null → 线程退出
}

getTask() 从队列取任务,非核心线程用 poll(keepAliveTime) 超时获取,核心线程用 take() 阻塞获取。


6. CAS 与 ABA 问题

6.1 CAS 原理

Compare And Swap:比较内存值与预期值,相等则更新为新值,整个操作是原子的。

java
// Unsafe 类提供的 CAS 操作(CPU 指令级别保证原子性)
public final native boolean compareAndSwapInt(
    Object obj, long offset, int expected, int update);

// AtomicInteger 的 incrementAndGet
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

// getAndAddInt 内部自旋 CAS
public final int getAndAddInt(Object obj, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(obj, offset); // 读取当前值
    } while (!compareAndSwapInt(obj, offset, v, v + delta)); // CAS 直到成功
    return v;
}

6.2 ABA 问题

线程1 读到值 A,线程2 将 A→B→A,线程1 CAS 发现仍是 A 就成功了,但值已经被改过。

解决方案:加版本号

java
AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(1, 0);

int[] stampHolder = new int[1];
int value = ref.get(stampHolder); // value=1, stamp=0

// CAS 时同时比较值和版本号
ref.compareAndSet(1, 2, 0, 1); // 期望值1版本0 → 新值2版本1

7. ThreadLocal

7.1 原理

每个 Thread 持有一个 ThreadLocalMap,key 是 ThreadLocal 对象(弱引用),value 是存储的值:

java
public class Thread {
    ThreadLocal.ThreadLocalMap threadLocals; // 每个线程独有
}

// ThreadLocal.set()
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = t.threadLocals;
    if (map != null)
        map.set(this, value); // this 就是 ThreadLocal 对象作为 key
    else
        createMap(t, value);
}

// ThreadLocal.get()
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = t.threadLocals;
    if (map != null) {
        Entry e = map.getEntry(this);
        if (e != null) return (T) e.value;
    }
    return setInitialValue();
}

7.2 内存泄漏

java
static class Entry extends WeakReference<ThreadLocal<?>> {
    Object value; // value 是强引用!
    Entry(ThreadLocal<?> k, Object v) {
        super(k); // key 是弱引用
        value = v;
    }
}

泄漏场景

  1. ThreadLocal 对象被回收(弱引用 key 变为 null)
  2. 但 Entry 的 value 仍然被 Entry 强引用
  3. Entry 被 ThreadLocalMap 强引用
  4. ThreadLocalMap 被 Thread 强引用
  5. 如果线程是线程池中的长期存活线程 → value 永远不会被回收

解决方案:用完后手动调用 threadLocal.remove()

java
try {
    threadLocal.set(value);
    // 使用 value
} finally {
    threadLocal.remove(); // 必须清理!
}

7.3 Android 中的应用

  • Looper.myLooper():每个线程的 Looper 存储在 ThreadLocal 中
  • Choreographer:每个线程的编舞者实例
java
// Looper 源码
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<>();

public static void prepare() {
    if (sThreadLocal.get() != null)
        throw new RuntimeException("Only one Looper may be created per thread");
    sThreadLocal.set(new Looper());
}

public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

常见坑总结

说明
volatile 不保证原子性count++ 仍需 AtomicInteger 或 synchronized
DCL 单例必须加 volatile防止指令重排导致获取未初始化对象
ThreadLocal 用完必须 remove线程池场景下会内存泄漏
Executors 创建线程池有风险无界队列或无限线程数可能 OOM
synchronized 锁的是对象不是代码锁 this、锁 Class、锁任意 Object 要区分
wait/notify 必须在 synchronized 块中否则抛 IllegalMonitorStateException
CAS 自旋在高竞争下性能差大量线程竞争时不如 synchronized

8. 并发工具类

8.1 CountDownLatch

一次性计数器,等待 N 个任务完成:

java
CountDownLatch latch = new CountDownLatch(3);

// 三个任务并行执行
executor.execute(() -> { doTask1(); latch.countDown(); });
executor.execute(() -> { doTask2(); latch.countDown(); });
executor.execute(() -> { doTask3(); latch.countDown(); });

latch.await(); // 阻塞直到计数为 0
// 三个任务都完成了

原理:基于 AQS,state 初始为 N,countDown 时 CAS 减 1,await 时检查 state 是否为 0。

不可重用:计数到 0 后无法重置。

8.2 CyclicBarrier

可重用的屏障,N 个线程互相等待到达屏障点:

java
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有线程到达屏障,继续执行");
});

// 每个线程
barrier.await(); // 阻塞直到 3 个线程都调用了 await

与 CountDownLatch 的区别:

  • CountDownLatch:一个线程等待其他 N 个线程完成,一次性
  • CyclicBarrier:N 个线程互相等待,可重用(reset)

8.3 Semaphore

信号量,控制并发访问数量:

java
Semaphore semaphore = new Semaphore(3); // 最多 3 个线程同时访问

semaphore.acquire(); // 获取许可(-1),无许可则阻塞
try {
    // 访问共享资源
} finally {
    semaphore.release(); // 释放许可(+1)
}

应用场景:数据库连接池、限流。

8.4 CompletableFuture

Java 8 引入的异步编程工具:

java
CompletableFuture.supplyAsync(() -> fetchUser(id))      // 异步执行
    .thenApply(user -> user.getName())                    // 转换结果
    .thenCombine(
        CompletableFuture.supplyAsync(() -> fetchOrder(id)), // 并行另一个任务
        (name, order) -> name + ": " + order              // 合并结果
    )
    .thenAccept(result -> System.out.println(result))     // 消费结果
    .exceptionally(e -> { log(e); return null; });        // 异常处理

常用方法:

  • supplyAsync/runAsync:异步执行
  • thenApply/thenAccept/thenRun:链式处理
  • thenCombine/thenCompose:组合多个 Future
  • allOf/anyOf:等待全部/任一完成
  • exceptionally/handle:异常处理

8.5 ForkJoinPool

分治框架,将大任务拆分为小任务并行执行:

java
class SumTask extends RecursiveTask<Long> {
    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            return directSum(); // 小任务直接计算
        }
        int mid = (start + end) / 2;
        SumTask left = new SumTask(start, mid);
        SumTask right = new SumTask(mid, end);
        left.fork();  // 异步执行左半部分
        Long rightResult = right.compute(); // 当前线程执行右半部分
        Long leftResult = left.join(); // 等待左半部分结果
        return leftResult + rightResult;
    }
}

工作窃取(Work Stealing):每个线程有自己的双端队列,空闲线程从其他线程队列的尾部窃取任务,提高 CPU 利用率。

Java 8 的并行流(parallelStream)底层就是 ForkJoinPool。

8.6 ReadWriteLock

读写锁,读读不互斥,读写/写写互斥:

java
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

// 读操作(多个线程可以同时读)
rwLock.readLock().lock();
try { return data; }
finally { rwLock.readLock().unlock(); }

// 写操作(独占)
rwLock.writeLock().lock();
try { data = newValue; }
finally { rwLock.writeLock().unlock(); }

原理:AQS 的 state 高 16 位存读锁计数,低 16 位存写锁计数。

适用场景:读多写少的缓存。