Skip to content

并发工具类

java.util.concurrent 包提供了丰富的并发工具类,用于协调多线程之间的执行。

1. CountDownLatch

倒计时器:等待多个线程完成后再继续执行。

1.1 基本用法

java
import java.util.concurrent.*;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    // 模拟任务执行
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " 完成");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown(); // 计数减一
                }
            }).start();
        }
        
        latch.await(); // 等待所有线程完成
        System.out.println("所有任务完成");
    }
}

1.2 核心方法

方法说明
countDown()计数器减 1
await()阻塞等待计数器归零
await(time, unit)带超时的等待
getCount()获取当前计数

1.3 应用场景

  • 主线程等待多个子线程初始化完成
  • 并行计算,汇总所有结果
  • 模拟并发测试
java
// 并发测试:同时启动 100 个请求
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(100);

for (int i = 0; i < 100; i++) {
    new Thread(() -> {
        try {
            startSignal.await();  // 等待开始信号
            // 执行请求
        } finally {
            doneSignal.countDown();
        }
    }).start();
}

startSignal.countDown();  // 发出开始信号
doneSignal.await();       // 等待所有请求完成

2. CyclicBarrier

循环栅栏:让一组线程互相等待,全部到达后一起继续执行。

2.1 基本用法

java
import java.util.concurrent.*;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int parties = 3;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("所有线程已到达屏障");
        });
        
        for (int i = 0; i < parties; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 到达屏障");
                    barrier.await(); // 等待其他线程
                    System.out.println(Thread.currentThread().getName() + " 继续执行");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

2.2 与 CountDownLatch 对比

特性CountDownLatchCyclicBarrier
计数器只能用一次可重复使用
等待方一个线程等待其他线程多个线程互相等待
触发条件countDown() 减到 0await() 达到指定数量
回调支持 barrierAction

2.3 可重用性示例

java
CyclicBarrier barrier = new CyclicBarrier(3);

// 第一轮
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        barrier.await();
    }).start();
}

// 第二轮(自动重置,可继续使用)
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        barrier.await();
    }).start();
}

3. Semaphore

信号量:控制同时访问特定资源的线程数量。

3.1 基本用法

java
import java.util.concurrent.*;

public class SemaphoreExample {
    public static void main(String[] args) {
        // 允许3个线程同时访问
        Semaphore semaphore = new Semaphore(3);
        
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println(Thread.currentThread().getName() + " 获取许可");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release(); // 释放许可
                    System.out.println(Thread.currentThread().getName() + " 释放许可");
                }
            }).start();
        }
    }
}

3.2 核心方法

方法说明
acquire()获取一个许可(阻塞)
acquire(n)获取 n 个许可
tryAcquire()尝试获取(非阻塞)
release()释放一个许可
availablePermits()可用许可数量

3.3 应用场景

  • 限流:限制接口并发访问数
  • 资源池:数据库连接池、对象池
  • 限制并发数:控制同时执行的任务数
java
// 限制数据库连接数
Semaphore dbPool = new Semaphore(10);

public Connection getConnection() throws InterruptedException {
    dbPool.acquire();
    return createConnection();
}

public void releaseConnection(Connection conn) {
    closeConnection(conn);
    dbPool.release();
}

4. Exchanger

交换器:用于两个线程之间交换数据。

4.1 基本用法

java
import java.util.concurrent.*;

public class ExchangerExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        
        new Thread(() -> {
            try {
                String data = "来自线程A的数据";
                String received = exchanger.exchange(data);
                System.out.println("线程A收到: " + received);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        new Thread(() -> {
            try {
                String data = "来自线程B的数据";
                String received = exchanger.exchange(data);
                System.out.println("线程B收到: " + received);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

4.2 应用场景

  • 遗传算法中交换染色体数据
  • 管道设计模式中传递数据
  • 双缓冲技术

5. Phaser

阶段同步器:更灵活的同步工具,支持动态注册参与者。

java
import java.util.concurrent.*;

public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3); // 3个参与者
        
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                // 阶段 1
                System.out.println("阶段1: " + Thread.currentThread().getName());
                phaser.arriveAndAwaitAdvance();
                
                // 阶段 2
                System.out.println("阶段2: " + Thread.currentThread().getName());
                phaser.arriveAndAwaitAdvance();
                
                // 注销
                phaser.arriveAndDeregister();
            }).start();
        }
    }
}

6. 工具类对比

工具类核心功能可重用适用场景
CountDownLatch等待多个线程完成初始化完成后通知
CyclicBarrier多线程互相等待分阶段并行任务
Semaphore控制并发数量限流、资源池
Exchanger两线程交换数据数据交换
Phaser动态多阶段同步复杂多阶段任务