Junhc

岂止于博客

死磕Java并发之阻塞队列

什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。
1) 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
2) 支持阻塞的移除方法:意思是当队列为空时,获取元素的线程会等待队列变为非空。

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

...

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

...

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await(); // 如果队列已满,则等待
        insert(e);
    } finally {
        lock.unlock();
    }
}

private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal(); // 有新的元素被插入,通知等待中的取走元素线程
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await(); // 如果队列为空,则等待
        return extract();
    } finally {
        lock.unlock();
    }
}

private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]);
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal(); // 有新的元素被取走,通知等待中的插入元素线程
    return x;
}

...
LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE
此队列按照先进先出(FIFO)的原则对元素进行排序。

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。
也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,
指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的排序。

DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。
队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。
只用延迟期满时才能从队列中提取元素。
DelayQueue非常有用,可以将DelayQueue运用以下应用场景。
1) 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue
一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
2) 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,
比如TimerQueue就是使用DelayQueue实现的。

SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。
它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。

LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。