signed

QiShunwang

“诚信为本、客户至上”

ReentrantLock重入锁解析

2021/4/26 15:05:53   来源:

一、ReentrantLock

      ReentrantLock是JUC工具包中的一个核心工具类,相对于Synchronized来说提供了更加细致的锁实现,在ReentrantLock中对锁进行大量的优化过程,使用了大量的CAS操作和自旋操作,尽量避免线程进入阻塞状态避免性能的损耗。这篇文章主要根据jdk源码对ReentrantLock进行一个全面的解析。

二、源码解析

 重入锁的入口是以下方法,当有线程进来时,进入lock方法,lock调用Sync的lock方法,Sync又是继承自AbstractQueuedSynchronizer抽象类

public void lock() {
    sync.lock();
}

//调用到了 AbstractQueuedSynchronizer 的lock方法
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
    abstract void lock();
}

基于Syc抽象内部类,java又实现了公平锁(FairSync)和非公平锁(NonfairSync)两种锁,这里主要去研究非公平锁这一种

final void lock() {
    //通过CAS操作,尝试将当前运行线程替换为当前线程,相当于插队,体现出非公平锁的特性
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //没有成功抢占就去排队,进入AQS队列,排队去获取
        acquire(1);
}
进入acquire,能够看见以下代码,核心方法是tryAcquire()addWaiter()、acquireQueued()、selfInterrupt()这三个方法,现在来进行分开讨论 
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Node.EXCLUSIVE 这个是AQS队列中标识线程状态的字段,再Node源码中有以下几种状态

static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
//waitStatus值为1时表示该线程节点已释放(超时、中断),已取消的节点不会再阻塞。
static final int CANCELLED =  1;

/** waitStatus value to indicate successor's thread needs unparking */
//waitStatus为-1时表示该线程的后续线程需要阻塞,即只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程 
static final int SIGNAL    = -1;

/** waitStatus value to indicate thread is waiting on condition */
//waitStatus为-2时,表示该线程在condition队列中阻塞(Condition有使用)
static final int CONDITION = -2;
/**
 * waitStatus value to indicate the next acquireShared should unconditionally propagate
*/
//waitStatus为-3时,表示该线程以及后续线程进行无条件传播(CountDownLatch中有使用)共享模式下, PROPAGATE 状态的线程处于可运行状态
static final int PROPAGATE = -3;

tryAcquire方法,这里查看非公平锁实现实现代码的具体实现

//入参是1
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

nonfairTryAcquire方法

/**
* Performs non-fair tryLock.  tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
    //拿到当前线程
    final Thread current = Thread.currentThread();
    //获取到当前锁的状态,0标识没有线程占有锁,大于0标识当前有线程占有锁
    int c = getState();
    //没有线程占有锁
    if (c == 0) {
        //尝试通过CAS去占有锁,如果成功了,表示当前线程抢到锁了,后续就不用再走了,这里还是一个非公平的体现,还是通过插队来抢占锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前线程已经抢占到锁了,因为这是重入锁,所以重入次数+1,这里就体现重入锁的一个核心代码
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

addWaiter(Node.EXCLUSIVE)研究,Node.EXCLUSIVE是一个空节点,addWaiter最终目的是将线程插入AQS队列中去

private Node addWaiter(Node mode) {
    //构建一个新的Node节点,节点存放的是当前线程和一个空节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //得到尾结点
    Node pred = tail;
    //判断尾结点是否存在
    if (pred != null) {
        //存在,将新建节点的上一个节点指向尾结点
        node.prev = pred;
        //由于存在多个线程竞争的情况,使用CAS尝试将AQS中的尾结点替换为当前节点
        if (compareAndSetTail(pred, node)) {
            //成功,将上一个最后节点的下一个节点指向新建节点
            pred.next = node;
            return node;
        }
    }
    //没有CAS成功的线程,进入该方法
    enq(node);
    return node;
}

enq方法解析,实质就是通过自旋+CAS将第一次为CAS成功的节点全部放进双向链表中

    private Node enq(final Node node) {
        //自旋
        for (;;) {
            //拿到尾结点
            Node t = tail;
            //没有尾结点,表示是当前线程节点是第一个节点
            if (t == null) { // Must initialize
                //通过CAS将头结点设置为一个空节点对象,尾结点也是空对象,双向链表初始化过程
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //如果不是,把当前节点的前置节点设为尾节点
                node.prev = t;
                //通过CAS将新节点替换为尾结点
                if (compareAndSetTail(t, node)) {
                    //尾结点的下一个节点为新节点
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued方法解析

//入参node是新加入的Node节点,arg=1
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //这一步是拿到头结点
            final Node p = node.predecessor();
            //尝试再一次去抢占锁,抢占成功,将头结点表示为当前执行的线程,
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire

//入参是上一个结点,和当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取到上一个结点的状态
    int ws = pred.waitStatus;
    //表示后续节点需要阻塞
    if (ws == Node.SIGNAL)
        /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
        return true;
    //标识后续的节点可以进行抢占锁资源,轮到当前节点了
    if (ws > 0) {
        /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
        do {
            //把当前节点一直往前移,移到阻塞位置
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        //把头结点的下一个节点指向Node节点
        pred.next = node;
    } else {
        /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        //其他情况 将上一个节点变为阻塞节点
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt()方法是阻塞当前线程作用

private final boolean parkAndCheckInterrupt() {
    //阻塞当前线程
    LockSupport.park(this);
    //重置当前线程的中断状态	
    return Thread.interrupted();
}

cancelAcquire()这个方法是从阻塞状态唤醒时,会从这段执行这段代码

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
	
    node.thread = null;
	
    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        //把当前节点一直往前移,移到阻塞位置,把当前节点移动到头节点的后面
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
   //获取到头结点的下一个节点
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    //如果当前节点是最后一个结点,就把当前链表置空
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            //如果Node的上一个节点不是头节点,获取Node的下一个节点
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                //把头节点放置为当前节点的下一个节点
                compareAndSetNext(pred, predNext, next);
        } else {
            //去唤醒下一个节点线程
            unparkSuccessor(node);
        }
		//把node置空,方便CG回收	
        node.next = node; // help GC
    }
}

unparkSuccessor 方法

 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
     
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //把后面的数据往前挪
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //释放线程
            LockSupport.unpark(s.thread);
    }

selfInterrupt()方法是响应中断

 static void selfInterrupt() {
       //这里相当于中断传递,当前运行线程要响应中断
        Thread.currentThread().interrupt();
    }