基于链表的阻塞队列,其底层数据结构是链表;列表维护先入先出; 链表的大小在初始化时可以设置,默认是Integer的最大值; 可以使用Collection 和 Iterator两个接口所有的操作
内部结构
存储列表 + 锁 + 迭代器;列表保存节点
static class Node<E> {
E item;
// 当前节点的下一个,如果为空表示为最后一个节点
Node<E> next;
Node(E x) { item = x; }
}
/** 链表的容量, or Integer.MAX_VALUE if none */
private final int capacity;
/** 链表已有元素的大小 */
private final AtomicInteger count = new AtomicInteger();
/**
* 链表头
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* 链表尾
* Invariant: last.next == null
*/
private transient Node<E> last;
/** 锁 take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** take 时的条件队列 */
private final Condition notEmpty = takeLock.newCondition();
/** 锁by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** put 时的条件队列 */
private final Condition notFull = putLock.newCondition();
阻塞增加
// 如果有新增空间直接新增,否则抛出异常
public void put(E e) throws InterruptedException {
// 为空,抛出异常
if (e == null) throw new NullPointerException();
// 预先设置 c 为 -1 ,约定负数为新增失败
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 设置中断锁
putLock.lockInterruptibly();
try {
// 队列满了
// 当前线程阻塞,等待其他线程唤醒
while (count.get() == capacity) {
// 无限等待
notFull.await();
}
// 队列没满,直接添加到尾部
enqueue(node);
// c 为返回的旧的值
c = count.getAndIncrement();
// 可以唤醒一个 put 操作
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 尝试唤醒一个take 操作
if (c == 0)
signalNotEmpty();
}
阻塞删除
public E take() throws InterruptedException {
E x;
// 默认为负数
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空阻塞
while (count.get() == 0) {
notEmpty.await();
}
// 非空队列,从头部拿一个出来
x = dequeue();
// count 减一计算
c = count.getAndDecrement();
// 如果队列有值,唤醒一个线程的put
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
