AQS源码解析与面试题汇总

AQS源码解析与面试题汇总

引言

AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。Java中的大部分同步类(LockSemaphoreReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。实际上AQS就是负责多线程同步的一个抽象类。

介绍

什么是AQS

AbstractQueuedSynchronizer简称为AQS,直译为:抽象队列同步类,是Java的抽象类。主要思想就是:为共享资源设置状态,来表示资源是否被锁定。当多线程并发访问时,将暂时无法获取资源(锁)的线程放进队列(FIFO、双向链表)中,后续通过阻塞和唤醒机制,保证锁的分配。

AQS的加锁过程

调用acquire方法尝试获取锁,如果获取成功,就修改state状态,实现独占。
如果获取失败就将当前线程加入到队列尾部。(加入尾部过程中可能出现尾分叉,会一直重试直到成功加入队尾。)
加入到队列后的线程会尝试自旋获取锁,如果尝试获取不到,就调用park实现阻塞,等待后续唤醒。

源码分析

由于AQS是抽象类,我们从ReentrantLock开始查看完整的加锁过程,会更清晰一些,当调用lock方法加锁时,源码部分如下:

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;

    public void lock() {
        sync.lock();
    }
    // AQS的实现类
    abstract static class Sync extends AbstractQueuedSynchronizer {
        abstract void lock();
        ...
    }
    // Sync 的非公平锁实现
    static final class NonfairSync extends Sync {
        ...
        final void lock() {
            // 使用CAS尝试修改AQS的state状态,修改成功则独占资源
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);// 调用AQS的acquire方法
        }
    }
   ...

加锁过程主要是通过acquire方法实现的,来看acquire方法相关源代码:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable {
    ...
    public final void acquire(int arg) {
        // 如果尝试获取锁失败且尝试加入队列失败, 则中断当前线程
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 子类需要重写的方法
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    // 将当前线程包装成一个Node节点,并尝试加入到队列的尾部
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // cas 方式加入队列的尾部,成功就将pred.next指向当前节点(表示已入队)
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果pre为空,或者没有成功加入队列尾部,无限重试将node加入队尾
        enq(node);
        return node;
    }
    // 将当前节点放入到队列的尾部
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize 懒加载头结点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 无限重试将node加入到队列的尾部并返回,高并发可能出现尾分叉
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;   // 只有当尾节点的next指向该节点时,才算完全入队,否则只是分叉
                    return t;
                }
            }
        }
    }
    // 将节点放入队列中
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 如果前驱节点是头结点,就尝试获取资源,否则判断是否自旋或者阻塞
                if (p == head && tryAcquire(arg)) {
                    // 获取成功就将当前线程节点设置为头结点,(相当于出队)
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 自旋获取锁失败,判断是否需要阻塞, 不需要就继续自旋重试尝试获取锁
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    // 释放锁
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 锁被释放成功后,唤醒h的后继节点中的线程
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    // 唤醒节点的后继节点保存的线程
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        // s为空两种情况:没有后继节点了、后继节点尚未完全入队
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    ...
}

整体的流程为:

  1. 尝试获取锁:当线程需要获取锁时,会首先调用tryAcquire()方法尝试获取锁。这是一个抽象方法,需要在具体的同步器中进行实现。如果tryAcquire()方法返回true,表示成功获取锁,可以继续执行后续的操作。如果返回false,表示获取锁失败,需要进行后续的处理。

  2. 获取锁失败:当tryAcquire()方法返回false时,表示获取锁失败。此时,线程需要进入等待状态,等待其他线程释放锁并唤醒自己。

  3. 加入等待队列:获取锁失败的线程会被封装成一个节点(Node),然后加入等待队列(等待队列是一个双向链表)。节点包含了线程的状态和其他信息。加入等待队列的操作一般是通过addWaiter()方法实现。

  4. 自旋等待:在加入等待队列后,获取锁失败的线程会进入自旋等待的状态。自旋等待是指线程会反复尝试获取锁,而不是进入真正的阻塞状态。这样可以减少线程切换的开销,提高效率。

  5. 阻塞等待:如果自旋等待一段时间后仍然无法获取到锁,线程会进入阻塞等待状态,释放CPU资源。阻塞等待的操作一般是通过park()方法实现。在阻塞等待状态下,线程会暂时释放CPU,并被挂起,直到被其他线程唤醒。

  6. 锁的释放和唤醒:当持有锁的线程完成工作后,会调用release()方法释放锁。释放锁的操作一般会将等待队列中的节点唤醒,使得等待的线程有机会再次尝试获取锁。唤醒操作一般是通过unpark()方法实现。

面试题总结

  1. 什么是AQS?
    是Java并发包下的抽象数据结构,负责线程同步,是并发包中用于实现锁、同步器以及并发容器的基础组件。Java中的很多同步器都是根据AQS实现的

  2. AQS的工作原理是什么?
    主要是通过维护一个state来控制资源的数量和状态,改变state来实现资源的锁定和释放。同时,维护FIFO的线程队列,实现对未成功获取锁的线程的管理。

  3. AQS的同步队列什么作用?
    用于存放获取锁失败的线程,实现有序等待,子类可以判断队列元素,实现公平锁或非公平锁。
    在资源被释放时,可以唤醒位于队列中的线程。

  4. AQS是如何实现加锁的?(加锁的过程)
    通过acquire方法尝试通过CAS修改state的值,并设置独占(或共享)的方式实现加锁,如果失败就被封装成一个Node放入队列的尾部,自旋重试或阻塞等待唤醒。

  5. 共享锁和排它锁有什么区别?
    共享锁允许多个线程同时获取锁,多个线程可以同时读取共享资源;而排它锁只允许一个线程获取锁,其他线程必须等待锁的释放才能获取。在实现上,共享锁使用AQS的共享模式,而排它锁使用独占模式。

  6. AQS是如何实现可重入锁的?
    重入锁中的持有锁的线程可以重复获取同样的锁,AQS通过维护一个线程与锁的映射来实现。线程在获取锁时,如果检测到是同一个线程重复获取锁,则把锁的重入次数加1;在释放锁时,如果锁的重入次数为0,则将锁释放。

参考

美团技术团队:从ReentrantLock的实现看AQS的原理及应用

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注