SynchronousQueue

2021/01/12 posted in  队列

SynchronousQueue是比较独特的队列,其本身没有容量大小,插入操作返回必须等另一个线程取操作执行成功后才能返回,反之亦然;队列由两种数据结构组成,后入先出的栈和先入先出的队列,堆栈是非公平的,队列是公平的了;

整体结构

SynchronousQueue 整体类图和 LinkedBlockingQueue 相似,都是实现了 BlockingQueue 接口,但因为其不储存数据结构,有一些方法是没有实现的,比如说 isEmpty、size、contains、remove 和迭代等方法,这些方法都是默认实现

    // 堆栈和队列共同的接口
    // 负责执行 put or take
    abstract static class Transferer<E> {
        // e 为空的,会直接返回特殊值,不为空会传递给消费者
        // timed 为 true,说明会有超时时间
        abstract E transfer(E e, boolean timed, long nanos);
    }

    // 堆栈 后入先出 非公平
    // Scherer-Scott 算法
    static final class TransferStack<E> extends Transferer<E> {
    }

    // 队列 先入先出 公平
    static final class TransferQueue<E> extends Transferer<E> {
    }

    private transient volatile Transferer<E> transferer;

    // 无参构造器默认为非公平的
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

非公平的堆栈

堆栈结构

static final class SNode {
    // 栈的下一个元素,被当前栈压在下面的栈元素
    volatile SNode next;
    // 节点匹配,用来判断阻塞元素能被唤醒的时机    
    // eg: 如果先执行take操作,,此时对列中没有数据,take被阻塞,栈顶元素为SNode1,当有put操作时,会把当前put的栈元素赋值给SNode1的match属性,并唤醒take操作,当take被唤醒时,返现SNode1的match属性有值,就能拿到put进来的属性,从而返回
    volatile SNode match;
    // 栈元素的阻塞是通过线程阻塞来实现的,waiter 为阻塞的线程
    volatile Thread waiter;     
    // 未投递的消息,或者未消费的消息
    Object item;               
    // 
    int mode;
}

入栈和出站

  1. 判断是 put 或者是 take 操作 --- ①
  2. 判断栈头是否为空,如果为空或头节点类型和本次操作类型一致 走 3 ---- ②
  3. 判断是否设置超时,若设置且已超时 返回 null 否则走 4 --- ③
  4. 如果栈头为空,把当前操作节点设置为头,或者不为空但是栈头操作和本次操作一致也把当前操作设置为栈头 --- ④, 并查看其他线程能否满足自己,不能满足则阻塞 --- ⑤
  5. 如果栈头已经阻塞,需要别人唤醒,看当前操作能否唤醒,可以走 6 ,否则走4 --- ⑥
  6. 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点
  7. 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回
// put 和 take 两个方法都揉在一起
E transfer(E e, boolean timed, long nanos) {
            SNode s = null; // constructed/reused as needed
            // e 为空则为 take 操作,不为空则为 put 操作
            int mode = (e == null) ? REQUEST : DATA; // ①
            // 自旋
            for (;;) {
                // 拿出头节点,有几种情况
                // 1:头节点为空,说明队列中还没有数据
                // 2:头节点不为空,并且是 take 类型的,说明头节点线程正等着拿数据。
                // 3:头节点不为空,并且是 put 类型的,说明头节点线程正等着放数据。
                SNode h = head;
                // 当前头节点为空,而且头节点类型和本次操作类型一致
                if (h == null || h.mode == mode) {  // ②
                    // 如果超时
                    if (timed && nanos <= 0) {      // ③
                        // 栈头操作被取消
                        if (h != null && h.isCancelled())
                            // 丢弃栈头,把栈头后一个设置为栈头
                            casHead(h, h.next);
                        else // 否则返回空
                            return null;
                     // 如果没超时,直接把本次操作的节点 e 作为新栈头       
                    } else if (casHead(h, s = snode(s, e, h, mode))) {  // ④
                        // e 等待出栈
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) { 
                            clean(s);
                            return null;
                        }
                        // 本来 s 是栈头,但是 s 后又来一个节点,该节点为新栈头
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);    
                        return (E) ((mode == REQUEST) ? m.item : s.item); // ⑤
                    }
                // 栈头在等其他的线程的put or  take 操作
                } else if (!isFulfilling(h.mode)) { // ⑥
                    if (h.isCancelled())            
                        casHead(h, h.next); 
                    //snode 方法第三个参数 h 代表栈头,赋值给 s 的 next 属性           
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        // 自旋
                        for (;;) { 
                            //  m 是栈头
                            SNode m = s.next;       
                            if (m == null) {        
                                casHead(s, null);   
                                s = null;           
                                break;              
                            }
                            // tryMatch 非常重要的方法,两个作用:
                            // 1 唤醒被阻塞的栈头 m,2 把当前节点 s 赋值给 m 的 match 属性
                            // 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s
                            // 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据
                            SNode mn = m.next;
                            if (m.tryMatch(s)) { // ⑥
                                casHead(s, mn);     
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  
                                s.casNext(m, mn);   
                        }
                    }
                } else {                            
                    SNode m = h.next;               
                    if (m == null)                  
                        casHead(h, null);            
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          
                            casHead(h, mn);         
                        else                        
                            h.casNext(m, mn);       
                    }
                }
            }
        }

阻塞方法

        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        // deadline  - 如果设置的超时则为 当前时间 + 超时时间,否则问题0
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        // 如果设置了超时时间 自旋为 32次 否则为 512 次
        int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (;;) {
            // 如果设置了超时时间 会被打断    
            if (w.isInterrupted())
                s.tryCancel();
            SNode m = s.match;
            if (m != null)
                return m;
            if (timed) {
                nanos = deadline - System.nanoTime();
                // 如果超时取消当前线程的等待操作
                if (nanos <= 0L) {
                    s.tryCancel();
                    continue;
                }
            }
            // 自旋次数减少 1 
            if (spins > 0)
               spins = shouldSpin(s) ? (spins-1) : 0;
            // 把当前线程设置为waiter,通过线程完成阻塞和唤醒
            else if (s.waiter == null)
                s.waiter = w;
            // 通过park 阻塞 
            else if (!timed)
                LockSupport.park(this);
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
            }
        }

公平队列

/** 队列头 */
transient volatile QNode head;
/** 队列尾 */
transient volatile QNode tail;

// 队列的元素
static final class QNode {
    // 当前元素的下一个元素
    volatile QNode next;         
    // 当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程
    // 会把自己 set 到 item 里面
    volatile Object item;         // CAS'ed to or from null
    // 可以阻塞住的当前线程
    volatile Thread waiter;       // to control park/unpark
    // true 是 put,false 是 take
    final boolean isData;
} 

TransferQueue 内部类的 transfer 方法

  1. 线程 1 从队列中拿数据,发现队列中没有数据,于是被阻塞
  2. 线程 2 往队尾put数据,会从往前找第一个被阻塞的节点,假设此时能找到的就是节点 A,然后线程 B 把将 put 的数据放到节点 A 的 item 属性里面,并唤醒线程 1
  3. 线程 1 被唤醒后,就能从A.item里面拿到线程 2 put的数据,线程 1 返回
E transfer(E e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    // true 是 put,false 是 get
    boolean isData = (e != null);

    for (;;) {
        // 队列头和尾的临时变量,队列是空的时候,t=h
        QNode t = tail;
        QNode h = head;
        // tail 和 head 没有初始化时,无限循环
        // 虽然这种 continue 非常耗cpu,但感觉不会碰到这种情况
        // 因为 tail 和 head 在 TransferQueue 初始化的时候,就已经被赋值空节点了
        if (t == null || h == null)
            continue;
        // 首尾节点相同,说明是空队列
        // 或者尾节点的操作和当前节点操作一致
        if (h == t || t.isData == isData) {
            QNode tn = t.next;
            // 当 t 不是 tail 时,说明 tail 已经被修改过了
            // 因为 tail 没有被修改的情况下,t 和 tail 必然相等
            // 因为前面刚刚执行赋值操作: t = tail
            if (t != tail)
                continue;
            // 队尾后面的值还不为空,t 还不是队尾,直接把 tn 赋值给 t,这是一步加强校验。
            if (tn != null) {
                advanceTail(t, tn);
                continue;
            }
            //超时直接返回 null
            if (timed && nanos <= 0)        // can't wait
                return null;
            //构造node节点
            if (s == null)
                s = new QNode(e, isData);
            //如果把 e 放到队尾失败,继续递归放进去
            if (!t.casNext(null, s))        // failed to link in
                continue;

            advanceTail(t, s);              // swing tail and wait
            // 阻塞住自己
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;
        // 队列不为空,并且当前操作和队尾不一致
        // 也就是说当前操作是队尾是对应的操作
        // 比如说队尾是因为 take 被阻塞的,那么当前操作必然是 put
        } else {                            // complementary-mode
            // 如果是第一次执行,此处的 m 代表就是 tail
            // 也就是这行代码体现出队列的公平,每次操作时,从头开始按照顺序进行操作
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                // m 代表栈头
                // 这里把当前的操作值赋值给阻塞住的 m 的 item 属性
                // 这样 m 被释放时,就可得到此次操作的值
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }
            // 当前操作放到队头
            advanceHead(h, m);              // successfully fulfilled
            // 释放队头阻塞节点
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}