并发工具类
java.util.concurrent 包提供了丰富的并发工具类,用于协调多线程之间的执行。
1. CountDownLatch
倒计时器:等待多个线程完成后再继续执行。
1.1 基本用法
java
import java.util.concurrent.*;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
// 模拟任务执行
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 计数减一
}
}).start();
}
latch.await(); // 等待所有线程完成
System.out.println("所有任务完成");
}
}1.2 核心方法
| 方法 | 说明 |
|---|---|
countDown() | 计数器减 1 |
await() | 阻塞等待计数器归零 |
await(time, unit) | 带超时的等待 |
getCount() | 获取当前计数 |
1.3 应用场景
- 主线程等待多个子线程初始化完成
- 并行计算,汇总所有结果
- 模拟并发测试
java
// 并发测试:同时启动 100 个请求
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
startSignal.await(); // 等待开始信号
// 执行请求
} finally {
doneSignal.countDown();
}
}).start();
}
startSignal.countDown(); // 发出开始信号
doneSignal.await(); // 等待所有请求完成2. CyclicBarrier
循环栅栏:让一组线程互相等待,全部到达后一起继续执行。
2.1 基本用法
java
import java.util.concurrent.*;
public class CyclicBarrierExample {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("所有线程已到达屏障");
});
for (int i = 0; i < parties; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 到达屏障");
barrier.await(); // 等待其他线程
System.out.println(Thread.currentThread().getName() + " 继续执行");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}2.2 与 CountDownLatch 对比
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 计数器 | 只能用一次 | 可重复使用 |
| 等待方 | 一个线程等待其他线程 | 多个线程互相等待 |
| 触发条件 | countDown() 减到 0 | await() 达到指定数量 |
| 回调 | 无 | 支持 barrierAction |
2.3 可重用性示例
java
CyclicBarrier barrier = new CyclicBarrier(3);
// 第一轮
for (int i = 0; i < 3; i++) {
new Thread(() -> {
barrier.await();
}).start();
}
// 第二轮(自动重置,可继续使用)
for (int i = 0; i < 3; i++) {
new Thread(() -> {
barrier.await();
}).start();
}3. Semaphore
信号量:控制同时访问特定资源的线程数量。
3.1 基本用法
java
import java.util.concurrent.*;
public class SemaphoreExample {
public static void main(String[] args) {
// 允许3个线程同时访问
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " 获取许可");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
System.out.println(Thread.currentThread().getName() + " 释放许可");
}
}).start();
}
}
}3.2 核心方法
| 方法 | 说明 |
|---|---|
acquire() | 获取一个许可(阻塞) |
acquire(n) | 获取 n 个许可 |
tryAcquire() | 尝试获取(非阻塞) |
release() | 释放一个许可 |
availablePermits() | 可用许可数量 |
3.3 应用场景
- 限流:限制接口并发访问数
- 资源池:数据库连接池、对象池
- 限制并发数:控制同时执行的任务数
java
// 限制数据库连接数
Semaphore dbPool = new Semaphore(10);
public Connection getConnection() throws InterruptedException {
dbPool.acquire();
return createConnection();
}
public void releaseConnection(Connection conn) {
closeConnection(conn);
dbPool.release();
}4. Exchanger
交换器:用于两个线程之间交换数据。
4.1 基本用法
java
import java.util.concurrent.*;
public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = "来自线程A的数据";
String received = exchanger.exchange(data);
System.out.println("线程A收到: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
String data = "来自线程B的数据";
String received = exchanger.exchange(data);
System.out.println("线程B收到: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}4.2 应用场景
- 遗传算法中交换染色体数据
- 管道设计模式中传递数据
- 双缓冲技术
5. Phaser
阶段同步器:更灵活的同步工具,支持动态注册参与者。
java
import java.util.concurrent.*;
public class PhaserExample {
public static void main(String[] args) {
Phaser phaser = new Phaser(3); // 3个参与者
for (int i = 0; i < 3; i++) {
new Thread(() -> {
// 阶段 1
System.out.println("阶段1: " + Thread.currentThread().getName());
phaser.arriveAndAwaitAdvance();
// 阶段 2
System.out.println("阶段2: " + Thread.currentThread().getName());
phaser.arriveAndAwaitAdvance();
// 注销
phaser.arriveAndDeregister();
}).start();
}
}
}6. 工具类对比
| 工具类 | 核心功能 | 可重用 | 适用场景 |
|---|---|---|---|
CountDownLatch | 等待多个线程完成 | ❌ | 初始化完成后通知 |
CyclicBarrier | 多线程互相等待 | ✅ | 分阶段并行任务 |
Semaphore | 控制并发数量 | ✅ | 限流、资源池 |
Exchanger | 两线程交换数据 | ✅ | 数据交换 |
Phaser | 动态多阶段同步 | ✅ | 复杂多阶段任务 |