SynchronousQueue是比较独特的队列,其本身没有容量大小,插入操作返回必须等另一个线程取操作执行成功后才能返回,反之亦然;队列由两种数据结构组成,后入先出的栈和先入先出的队列,堆栈是非公平的,队列是公平的了;
整体结构
SynchronousQueue 整体类图和 LinkedBlockingQueue 相似,都是实现了 BlockingQueue 接口,但因为其不储存数据结构,有一些方法是没有实现的,比如说 isEmpty、size、contains、remove 和迭代等方法,这些方法都是默认实现
// 堆栈和队列共同的接口
// 负责执行 put or take
abstract static class Transferer<E> {
// e 为空的,会直接返回特殊值,不为空会传递给消费者
// timed 为 true,说明会有超时时间
abstract E transfer(E e, boolean timed, long nanos);
}
// 堆栈 后入先出 非公平
// Scherer-Scott 算法
static final class TransferStack<E> extends Transferer<E> {
}
// 队列 先入先出 公平
static final class TransferQueue<E> extends Transferer<E> {
}
private transient volatile Transferer<E> transferer;
// 无参构造器默认为非公平的
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
非公平的堆栈
堆栈结构
static final class SNode {
// 栈的下一个元素,被当前栈压在下面的栈元素
volatile SNode next;
// 节点匹配,用来判断阻塞元素能被唤醒的时机
// eg: 如果先执行take操作,,此时对列中没有数据,take被阻塞,栈顶元素为SNode1,当有put操作时,会把当前put的栈元素赋值给SNode1的match属性,并唤醒take操作,当take被唤醒时,返现SNode1的match属性有值,就能拿到put进来的属性,从而返回
volatile SNode match;
// 栈元素的阻塞是通过线程阻塞来实现的,waiter 为阻塞的线程
volatile Thread waiter;
// 未投递的消息,或者未消费的消息
Object item;
//
int mode;
}
入栈和出站
- 判断是 put 或者是 take 操作 --- ①
- 判断栈头是否为空,如果为空或头节点类型和本次操作类型一致 走 3 ---- ②
- 判断是否设置超时,若设置且已超时 返回 null 否则走 4 --- ③
- 如果栈头为空,把当前操作节点设置为头,或者不为空但是栈头操作和本次操作一致也把当前操作设置为栈头 --- ④, 并查看其他线程能否满足自己,不能满足则阻塞 --- ⑤
- 如果栈头已经阻塞,需要别人唤醒,看当前操作能否唤醒,可以走 6 ,否则走4 --- ⑥
- 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点
- 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回
// put 和 take 两个方法都揉在一起
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
// e 为空则为 take 操作,不为空则为 put 操作
int mode = (e == null) ? REQUEST : DATA; // ①
// 自旋
for (;;) {
// 拿出头节点,有几种情况
// 1:头节点为空,说明队列中还没有数据
// 2:头节点不为空,并且是 take 类型的,说明头节点线程正等着拿数据。
// 3:头节点不为空,并且是 put 类型的,说明头节点线程正等着放数据。
SNode h = head;
// 当前头节点为空,而且头节点类型和本次操作类型一致
if (h == null || h.mode == mode) { // ②
// 如果超时
if (timed && nanos <= 0) { // ③
// 栈头操作被取消
if (h != null && h.isCancelled())
// 丢弃栈头,把栈头后一个设置为栈头
casHead(h, h.next);
else // 否则返回空
return null;
// 如果没超时,直接把本次操作的节点 e 作为新栈头
} else if (casHead(h, s = snode(s, e, h, mode))) { // ④
// e 等待出栈
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) {
clean(s);
return null;
}
// 本来 s 是栈头,但是 s 后又来一个节点,该节点为新栈头
if ((h = head) != null && h.next == s)
casHead(h, s.next);
return (E) ((mode == REQUEST) ? m.item : s.item); // ⑤
}
// 栈头在等其他的线程的put or take 操作
} else if (!isFulfilling(h.mode)) { // ⑥
if (h.isCancelled())
casHead(h, h.next);
//snode 方法第三个参数 h 代表栈头,赋值给 s 的 next 属性
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 自旋
for (;;) {
// m 是栈头
SNode m = s.next;
if (m == null) {
casHead(s, null);
s = null;
break;
}
// tryMatch 非常重要的方法,两个作用:
// 1 唤醒被阻塞的栈头 m,2 把当前节点 s 赋值给 m 的 match 属性
// 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s
// 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据
SNode mn = m.next;
if (m.tryMatch(s)) { // ⑥
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
s.casNext(m, mn);
}
}
} else {
SNode m = h.next;
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}
阻塞方法
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// deadline - 如果设置的超时则为 当前时间 + 超时时间,否则问题0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 如果设置了超时时间 自旋为 32次 否则为 512 次
int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 如果设置了超时时间 会被打断
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
// 如果超时取消当前线程的等待操作
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 自旋次数减少 1
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
// 把当前线程设置为waiter,通过线程完成阻塞和唤醒
else if (s.waiter == null)
s.waiter = w;
// 通过park 阻塞
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
公平队列
/** 队列头 */
transient volatile QNode head;
/** 队列尾 */
transient volatile QNode tail;
// 队列的元素
static final class QNode {
// 当前元素的下一个元素
volatile QNode next;
// 当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程
// 会把自己 set 到 item 里面
volatile Object item; // CAS'ed to or from null
// 可以阻塞住的当前线程
volatile Thread waiter; // to control park/unpark
// true 是 put,false 是 take
final boolean isData;
}
TransferQueue 内部类的 transfer 方法
- 线程 1 从队列中拿数据,发现队列中没有数据,于是被阻塞
- 线程 2 往队尾put数据,会从往前找第一个被阻塞的节点,假设此时能找到的就是节点 A,然后线程 B 把将 put 的数据放到节点 A 的 item 属性里面,并唤醒线程 1
- 线程 1 被唤醒后,就能从A.item里面拿到线程 2 put的数据,线程 1 返回
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// true 是 put,false 是 get
boolean isData = (e != null);
for (;;) {
// 队列头和尾的临时变量,队列是空的时候,t=h
QNode t = tail;
QNode h = head;
// tail 和 head 没有初始化时,无限循环
// 虽然这种 continue 非常耗cpu,但感觉不会碰到这种情况
// 因为 tail 和 head 在 TransferQueue 初始化的时候,就已经被赋值空节点了
if (t == null || h == null)
continue;
// 首尾节点相同,说明是空队列
// 或者尾节点的操作和当前节点操作一致
if (h == t || t.isData == isData) {
QNode tn = t.next;
// 当 t 不是 tail 时,说明 tail 已经被修改过了
// 因为 tail 没有被修改的情况下,t 和 tail 必然相等
// 因为前面刚刚执行赋值操作: t = tail
if (t != tail)
continue;
// 队尾后面的值还不为空,t 还不是队尾,直接把 tn 赋值给 t,这是一步加强校验。
if (tn != null) {
advanceTail(t, tn);
continue;
}
//超时直接返回 null
if (timed && nanos <= 0) // can't wait
return null;
//构造node节点
if (s == null)
s = new QNode(e, isData);
//如果把 e 放到队尾失败,继续递归放进去
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
// 阻塞住自己
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
// 队列不为空,并且当前操作和队尾不一致
// 也就是说当前操作是队尾是对应的操作
// 比如说队尾是因为 take 被阻塞的,那么当前操作必然是 put
} else { // complementary-mode
// 如果是第一次执行,此处的 m 代表就是 tail
// 也就是这行代码体现出队列的公平,每次操作时,从头开始按照顺序进行操作
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
// m 代表栈头
// 这里把当前的操作值赋值给阻塞住的 m 的 item 属性
// 这样 m 被释放时,就可得到此次操作的值
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
// 当前操作放到队头
advanceHead(h, m); // successfully fulfilled
// 释放队头阻塞节点
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
