队列同步器

2020/12/31 posted in  并发工具

队列同步器AbstractQueuedSynchronizer,用来构建锁或其他同步组件的基础框架,使用int变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作

实现

同步队列

同步器依赖内部的同步队列完成同步状态的管理,当前线程获取锁失败后,同步器将当前线程和等待状态构建为一个节点并加入到同步队列中,同时阻塞当前线程,当同步状态释放后,将首节点中的线程唤醒并重新获取同步资源;

  1. 同步添加尾结点:只有一个线程获取到同步在资源,多个线程需要添加到尾部位置,需要保证线程安全,同步队列提供一个CAS方法设置tail节点
  2. 设置首节点:获取同步状态成共的的线程完成,由于只有一个线程获取到同步状态,不需要CAS方法



独占同步状态获取与释放

独占同步获取状态流程图


    public final void acquire(int arg) {
        //1. 通过调用tryAcquire完成线程安全的获取同步状态
        // 2. 如果获取失败则通过addWaiter方法将构造的节点放入同步对列尾部
        // 3. 然后调用acquireQueued方法是使该节点以死循环的方式获取同步状态
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 通过中断
            selfInterrupt();
    }
    

同步器的addWaiter()和enq()

addWaiter()

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 确保节点被线程安全添加
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

enq()

// 通过CAS将并发添加节点编程串行化
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//只有通过CAS将节点设置成尾结点 才能退出
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

acquireQueued方法

// 1. 头节点是获取同步状态的节点,释放同步状态后会唤醒其后继节点,而被唤醒的后继节点需要判断他的前驱节点是否是头节点
// 2. 维护同步队列的FIFO原则
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 只有前驱节点是头节点的才能尝试获取同步状态
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }