Skip to content
清晨的一缕阳光
返回

Queue 并发队列详解

Queue 并发队列详解

Queue 是 Java 集合框架的重要组成部分,在并发编程中应用广泛。

一、Queue 核心接口

1.1 Queue 层次结构

Collection
    └── Queue(接口)
        ├── Deque(双端队列)
        │   ├── ArrayDeque
        │   └── LinkedList
        └── BlockingQueue(阻塞队列)
            ├── ArrayBlockingQueue
            ├── LinkedBlockingQueue
            ├── PriorityBlockingQueue
            └── DelayQueue

1.2 核心方法

// 添加元素
boolean add(E e);     // 失败抛异常
boolean offer(E e);   // 失败返回 false

// 获取并移除
E remove();           // 空时抛异常
E poll();             // 空时返回 null
E take();             // 阻塞等待(BlockingQueue)

// 获取不移除
E element();          // 空时抛异常
E peek();             // 空时返回 null

二、PriorityQueue(优先队列)

2.1 核心特性

PriorityQueue = 堆(二叉堆)

特点:
- 基于优先级堆
- 自然顺序或自定义比较器
- 不允许 null
- 非线程安全
- 出队顺序按优先级

2.2 基本使用

// 自然排序(小顶堆)
Queue<Integer> pq = new PriorityQueue<>();
pq.offer(3);
pq.offer(1);
pq.offer(2);

// 出队顺序:1 → 2 → 3
while (!pq.isEmpty()) {
    System.out.println(pq.poll());
}

// 大顶堆
Queue<Integer> maxPq = new PriorityQueue<>((a, b) -> b - a);
maxPq.offer(3);
maxPq.offer(1);
maxPq.offer(2);

// 出队顺序:3 → 2 → 1

2.3 自定义优先级

// 任务优先级
class Task implements Comparable<Task> {
    String name;
    int priority;
    
    @Override
    public int compareTo(Task other) {
        return Integer.compare(other.priority, this.priority); // 优先级高的先出
    }
}

PriorityQueue<Task> pq = new PriorityQueue<>();
pq.offer(new Task("Low", 1));
pq.offer(new Task("High", 10));
pq.offer(new Task("Medium", 5));

// 出队顺序:High → Medium → Low

2.4 应用场景

// TopK 问题
public List<Integer> topK(int[] nums, int k) {
    PriorityQueue<Integer> pq = new PriorityQueue<>(k);
    
    for (int num : nums) {
        if (pq.size() < k) {
            pq.offer(num);
        } else if (num > pq.peek()) {
            pq.poll();
            pq.offer(num);
        }
    }
    
    return new ArrayList<>(pq);
}

// 合并 K 个有序链表
public ListNode mergeKLists(ListNode[] lists) {
    PriorityQueue<ListNode> pq = new PriorityQueue<>(
        (a, b) -> Integer.compare(a.val, b.val)
    );
    
    for (ListNode node : lists) {
        if (node != null) pq.offer(node);
    }
    
    ListNode dummy = new ListNode(0);
    ListNode current = dummy;
    
    while (!pq.isEmpty()) {
        current.next = pq.poll();
        current = current.next;
        if (current.next != null) {
            pq.offer(current.next);
        }
    }
    
    return dummy.next;
}

三、BlockingQueue(阻塞队列)

3.1 核心特性

BlockingQueue = 队列 + 阻塞操作

特点:
- 线程安全
- 支持阻塞操作
- 生产者 - 消费者模式
- 四种处理方法

3.2 四种处理方式

// 添加元素
boolean add(E e);        // 满时抛异常
boolean offer(E e);      // 满时返回 false
void put(E e);           // 满时阻塞
boolean offer(E e, long timeout, TimeUnit unit); // 超时

// 获取元素
E remove();              // 空时抛异常
E poll();                // 空时返回 null
E take();                // 空时阻塞
E poll(long timeout, TimeUnit unit); // 超时

3.3 ArrayBlockingQueue

// 基于数组的有界阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

// 生产者
new Thread(() -> {
    for (int i = 0; i < 100; i++) {
        try {
            queue.put(i); // 满时阻塞
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}).start();

// 消费者
new Thread(() -> {
    while (true) {
        try {
            Integer item = queue.take(); // 空时阻塞
            System.out.println("Consumed: " + item);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}).start();

3.4 LinkedBlockingQueue

// 基于链表的阻塞队列(可选有界)
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(100); // 有界
BlockingQueue<Integer> unbounded = new LinkedBlockingQueue<>(); // 无界

// 特点:
// - 吞吐量通常高于 ArrayBlockingQueue
// - 无界队列可能导致 OOM

3.5 生产者 - 消费者模式

public class ProducerConsumer {
    private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
    
    class Producer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    queue.put(i);
                    System.out.println("Produced: " + i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    Integer item = queue.take();
                    System.out.println("Consumed: " + item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
    }
}

四、DelayQueue(延迟队列)

4.1 核心特性

DelayQueue = 优先级队列 + 延迟

特点:
- 基于 PriorityQueue
- 元素必须实现 Delayed 接口
- 只有到期才能取出
- 无界队列

4.2 Delayed 接口

interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
    int compareTo(Delayed other);
}

4.3 使用示例

// 延迟任务
class DelayedTask implements Delayed {
    private final String name;
    private final long executeTime;
    
    public DelayedTask(String name, long delayMs) {
        this.name = name;
        this.executeTime = System.currentTimeMillis() + delayMs;
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        long diff = executeTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }
    
    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);
    }
    
    @Override
    public String toString() {
        return name;
    }
}

// 使用
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.offer(new DelayedTask("Task 1", 1000)); // 1 秒后执行
queue.offer(new DelayedTask("Task 2", 2000)); // 2 秒后执行
queue.offer(new DelayedTask("Task 3", 3000)); // 3 秒后执行

// 取出
while (!queue.isEmpty()) {
    DelayedTask task = queue.take(); // 阻塞直到到期
    System.out.println("Executing: " + task);
}

4.4 应用场景

// 订单超时取消
class OrderTimeoutTask implements Delayed {
    private final String orderId;
    private final long expireTime;
    
    @Override
    public long getDelay(TimeUnit unit) {
        long diff = expireTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }
    
    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.expireTime, ((OrderTimeoutTask) other).expireTime);
    }
}

// 定时任务调度
// 缓存过期清理

五、其他队列

5.1 SynchronousQueue

// 不存储元素的阻塞队列
BlockingQueue<Integer> queue = new SynchronousQueue<>();

// 生产者必须等待消费者
new Thread(() -> {
    try {
        queue.put(1); // 阻塞,等待消费者
        System.out.println("Produced 1");
        
        queue.put(2); // 阻塞,等待消费者
        System.out.println("Produced 2");
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

// 消费者
Thread.sleep(1000);
Integer item1 = queue.take(); // 接收 1
Thread.sleep(1000);
Integer item2 = queue.take(); // 接收 2

5.2 PriorityBlockingQueue

// 线程安全的 PriorityQueue
BlockingQueue<Integer> pq = new PriorityBlockingQueue<>();

// 多线程安全
new Thread(() -> {
    for (int i = 0; i < 100; i++) {
        pq.offer(i);
    }
}).start();

new Thread(() -> {
    while (!pq.isEmpty()) {
        System.out.println(pq.take());
    }
}).start();

5.3 ArrayDeque(双端队列)

// 双端队列,可用作栈或队列
Deque<String> deque = new ArrayDeque<>();

// 栈操作
deque.push("A");
deque.push("B");
String top = deque.pop(); // "B"

// 队列操作
deque.offer("C");
String head = deque.poll(); // "A"

// 双端操作
deque.offerFirst("X");
deque.offerLast("Y");

六、选择建议

队列类型特点适用场景
PriorityQueue优先级排序TopK、任务调度
ArrayBlockingQueue有界、基于数组生产者 - 消费者
LinkedBlockingQueue可选有界、基于链表高并发场景
DelayQueue延迟执行定时任务、超时处理
SynchronousQueue不存储、直接传递线程间直接传递
PriorityBlockingQueue线程安全 + 优先级并发优先级任务

七、总结

Queue 核心要点:

队列底层结构线程安全阻塞适用场景
PriorityQueue优先级排序
ArrayBlockingQueue数组有界缓冲
LinkedBlockingQueue链表高并发
DelayQueue优先级堆延迟任务

根据场景选择合适的队列,生产者 - 消费者用 BlockingQueue,优先级用 PriorityQueue。


分享这篇文章到:

上一篇文章
MySQL 锁优化与并发控制
下一篇文章
Go 并发安全与陷阱