Queue 并发队列详解
Queue 是 Java 集合框架的重要组成部分,在并发编程中应用广泛。
一、Queue 核心接口
1.1 Queue 层次结构
Collection
└── Queue(接口)
├── Deque(双端队列)
│ ├── ArrayDeque
│ └── LinkedList
└── BlockingQueue(阻塞队列)
├── ArrayBlockingQueue
├── LinkedBlockingQueue
├── PriorityBlockingQueue
└── DelayQueue
1.2 核心方法
// 添加元素
boolean add(E e); // 失败抛异常
boolean offer(E e); // 失败返回 false
// 获取并移除
E remove(); // 空时抛异常
E poll(); // 空时返回 null
E take(); // 阻塞等待(BlockingQueue)
// 获取不移除
E element(); // 空时抛异常
E peek(); // 空时返回 null
二、PriorityQueue(优先队列)
2.1 核心特性
PriorityQueue = 堆(二叉堆)
特点:
- 基于优先级堆
- 自然顺序或自定义比较器
- 不允许 null
- 非线程安全
- 出队顺序按优先级
2.2 基本使用
// 自然排序(小顶堆)
Queue<Integer> pq = new PriorityQueue<>();
pq.offer(3);
pq.offer(1);
pq.offer(2);
// 出队顺序:1 → 2 → 3
while (!pq.isEmpty()) {
System.out.println(pq.poll());
}
// 大顶堆
Queue<Integer> maxPq = new PriorityQueue<>((a, b) -> b - a);
maxPq.offer(3);
maxPq.offer(1);
maxPq.offer(2);
// 出队顺序:3 → 2 → 1
2.3 自定义优先级
// 任务优先级
class Task implements Comparable<Task> {
String name;
int priority;
@Override
public int compareTo(Task other) {
return Integer.compare(other.priority, this.priority); // 优先级高的先出
}
}
PriorityQueue<Task> pq = new PriorityQueue<>();
pq.offer(new Task("Low", 1));
pq.offer(new Task("High", 10));
pq.offer(new Task("Medium", 5));
// 出队顺序:High → Medium → Low
2.4 应用场景
// TopK 问题
public List<Integer> topK(int[] nums, int k) {
PriorityQueue<Integer> pq = new PriorityQueue<>(k);
for (int num : nums) {
if (pq.size() < k) {
pq.offer(num);
} else if (num > pq.peek()) {
pq.poll();
pq.offer(num);
}
}
return new ArrayList<>(pq);
}
// 合并 K 个有序链表
public ListNode mergeKLists(ListNode[] lists) {
PriorityQueue<ListNode> pq = new PriorityQueue<>(
(a, b) -> Integer.compare(a.val, b.val)
);
for (ListNode node : lists) {
if (node != null) pq.offer(node);
}
ListNode dummy = new ListNode(0);
ListNode current = dummy;
while (!pq.isEmpty()) {
current.next = pq.poll();
current = current.next;
if (current.next != null) {
pq.offer(current.next);
}
}
return dummy.next;
}
三、BlockingQueue(阻塞队列)
3.1 核心特性
BlockingQueue = 队列 + 阻塞操作
特点:
- 线程安全
- 支持阻塞操作
- 生产者 - 消费者模式
- 四种处理方法
3.2 四种处理方式
// 添加元素
boolean add(E e); // 满时抛异常
boolean offer(E e); // 满时返回 false
void put(E e); // 满时阻塞
boolean offer(E e, long timeout, TimeUnit unit); // 超时
// 获取元素
E remove(); // 空时抛异常
E poll(); // 空时返回 null
E take(); // 空时阻塞
E poll(long timeout, TimeUnit unit); // 超时
3.3 ArrayBlockingQueue
// 基于数组的有界阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 生产者
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
queue.put(i); // 满时阻塞
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
// 消费者
new Thread(() -> {
while (true) {
try {
Integer item = queue.take(); // 空时阻塞
System.out.println("Consumed: " + item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
3.4 LinkedBlockingQueue
// 基于链表的阻塞队列(可选有界)
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(100); // 有界
BlockingQueue<Integer> unbounded = new LinkedBlockingQueue<>(); // 无界
// 特点:
// - 吞吐量通常高于 ArrayBlockingQueue
// - 无界队列可能导致 OOM
3.5 生产者 - 消费者模式
public class ProducerConsumer {
private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
queue.put(i);
System.out.println("Produced: " + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
Integer item = queue.take();
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
new Thread(pc.new Producer()).start();
new Thread(pc.new Consumer()).start();
}
}
四、DelayQueue(延迟队列)
4.1 核心特性
DelayQueue = 优先级队列 + 延迟
特点:
- 基于 PriorityQueue
- 元素必须实现 Delayed 接口
- 只有到期才能取出
- 无界队列
4.2 Delayed 接口
interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
int compareTo(Delayed other);
}
4.3 使用示例
// 延迟任务
class DelayedTask implements Delayed {
private final String name;
private final long executeTime;
public DelayedTask(String name, long delayMs) {
this.name = name;
this.executeTime = System.currentTimeMillis() + delayMs;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = executeTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);
}
@Override
public String toString() {
return name;
}
}
// 使用
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.offer(new DelayedTask("Task 1", 1000)); // 1 秒后执行
queue.offer(new DelayedTask("Task 2", 2000)); // 2 秒后执行
queue.offer(new DelayedTask("Task 3", 3000)); // 3 秒后执行
// 取出
while (!queue.isEmpty()) {
DelayedTask task = queue.take(); // 阻塞直到到期
System.out.println("Executing: " + task);
}
4.4 应用场景
// 订单超时取消
class OrderTimeoutTask implements Delayed {
private final String orderId;
private final long expireTime;
@Override
public long getDelay(TimeUnit unit) {
long diff = expireTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.expireTime, ((OrderTimeoutTask) other).expireTime);
}
}
// 定时任务调度
// 缓存过期清理
五、其他队列
5.1 SynchronousQueue
// 不存储元素的阻塞队列
BlockingQueue<Integer> queue = new SynchronousQueue<>();
// 生产者必须等待消费者
new Thread(() -> {
try {
queue.put(1); // 阻塞,等待消费者
System.out.println("Produced 1");
queue.put(2); // 阻塞,等待消费者
System.out.println("Produced 2");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 消费者
Thread.sleep(1000);
Integer item1 = queue.take(); // 接收 1
Thread.sleep(1000);
Integer item2 = queue.take(); // 接收 2
5.2 PriorityBlockingQueue
// 线程安全的 PriorityQueue
BlockingQueue<Integer> pq = new PriorityBlockingQueue<>();
// 多线程安全
new Thread(() -> {
for (int i = 0; i < 100; i++) {
pq.offer(i);
}
}).start();
new Thread(() -> {
while (!pq.isEmpty()) {
System.out.println(pq.take());
}
}).start();
5.3 ArrayDeque(双端队列)
// 双端队列,可用作栈或队列
Deque<String> deque = new ArrayDeque<>();
// 栈操作
deque.push("A");
deque.push("B");
String top = deque.pop(); // "B"
// 队列操作
deque.offer("C");
String head = deque.poll(); // "A"
// 双端操作
deque.offerFirst("X");
deque.offerLast("Y");
六、选择建议
| 队列类型 | 特点 | 适用场景 |
|---|---|---|
| PriorityQueue | 优先级排序 | TopK、任务调度 |
| ArrayBlockingQueue | 有界、基于数组 | 生产者 - 消费者 |
| LinkedBlockingQueue | 可选有界、基于链表 | 高并发场景 |
| DelayQueue | 延迟执行 | 定时任务、超时处理 |
| SynchronousQueue | 不存储、直接传递 | 线程间直接传递 |
| PriorityBlockingQueue | 线程安全 + 优先级 | 并发优先级任务 |
七、总结
Queue 核心要点:
| 队列 | 底层结构 | 线程安全 | 阻塞 | 适用场景 |
|---|---|---|---|---|
| PriorityQueue | 堆 | ❌ | ❌ | 优先级排序 |
| ArrayBlockingQueue | 数组 | ✅ | ✅ | 有界缓冲 |
| LinkedBlockingQueue | 链表 | ✅ | ✅ | 高并发 |
| DelayQueue | 优先级堆 | ✅ | ✅ | 延迟任务 |
根据场景选择合适的队列,生产者 - 消费者用 BlockingQueue,优先级用 PriorityQueue。