LinkedBlockingQueue

2021/01/11 posted in  并发容器

基于链表的阻塞队列,其底层数据结构是链表;列表维护先入先出; 链表的大小在初始化时可以设置,默认是Integer的最大值; 可以使用Collection 和 Iterator两个接口所有的操作


内部结构

存储列表 + 锁 + 迭代器;列表保存节点

    static class Node<E> {
        E item;
        // 当前节点的下一个,如果为空表示为最后一个节点
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** 链表的容量, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** 链表已有元素的大小 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 链表头
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * 链表尾
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** 锁 take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** take 时的条件队列 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 锁by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** put 时的条件队列 */
    private final Condition notFull = putLock.newCondition();

阻塞增加

// 如果有新增空间直接新增,否则抛出异常
public void put(E e) throws InterruptedException {
        // 为空,抛出异常
        if (e == null) throw new NullPointerException();
        // 预先设置 c 为 -1 ,约定负数为新增失败
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 设置中断锁
        putLock.lockInterruptibly();
        try {
            // 队列满了
            // 当前线程阻塞,等待其他线程唤醒
            while (count.get() == capacity) {
                // 无限等待
                notFull.await();
            }
            // 队列没满,直接添加到尾部
            enqueue(node);
            // c 为返回的旧的值
            c = count.getAndIncrement();
            // 可以唤醒一个 put 操作
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // 尝试唤醒一个take 操作
        if (c == 0)
            signalNotEmpty();
    }

阻塞删除

public E take() throws InterruptedException {
        E x;
        // 默认为负数
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 队列为空阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 非空队列,从头部拿一个出来
            x = dequeue();
            // count  减一计算
            c = count.getAndDecrement();
            // 如果队列有值,唤醒一个线程的put
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }