这里是文章模块栏目内容页
按在地上C(被按在地上摩擦的AQS-加锁过程)

引言

谈到并发编程,就不得不谈ReentrantLock,谈到ReentrantLock就会问实现原理,谈到原理就引出AQS(AbstractQueuedSynchronized),然后就被按在地上无情的摩擦。这篇文章主要讲解加锁过程,下一篇写释放锁过程。

ReentrantLock使用

Lock lock = new ReentrantLock();

Condition condition = lock.newCondition();

lock.lock();

try {

while(条件判断表达式) {

condition.wait();

}

// 处理逻辑

} finally {

lock.unlock();

}

复制代码

lock.lock()显示的获取锁,并在finally块中显示的释放锁,目的是保证在获取到锁之后,最终能够被释放。

lock()方法调用过程(默认非公平锁)

非公平锁调用lock方法的源码如下:

static final class NonfairSync extends Sync {

private static final long serialVersionUID = 7316153563782823691L;

final void lock() {

// 表示当前没有占有锁的线程,将锁的拥有者设置为当前线程

if (compareAndSetState(0, 1))

setExclusiveOwnerThread(Thread.currentThread());

// 获取锁

else

acquire(1);

}

protected final boolean tryAcquire(int acquires) {

return nonfairTryAcquire(acquires);

}

}

复制代码

下图是ReentrantLock.lock的调用过程图。

AQS加锁实现

AQS(AbstractQueuedSynchronizer):是JDK提供的同步框架,内部维护了一个FIFO双向队列(即CLH同步队列)。通过此队列实现同步状态的管理(volatile修饰的state状态,用于标志是否持有锁)。

Node

先了解AQS维护的队列节点结构,下面是队列节点Node的源码:

static final class Node {

/** 共享节点 */

static final Node SHARED = new Node();

/** 独占节点 */

static final Node EXCLUSIVE = null;

/** 因为超时或者中断,节点会被设置成取消状态,被取消的节点不会参与到竞争中,

会一直是取消状态不会改变 */

static final int CANCELLED = 1;

/** 后继节点处于等待状态,如果当前节点释放了同步状态或者被取消,

会通知后继节点,使其得以运行 */

static final int SIGNAL = -1;

/** 节点在等待条件队列中,节点线程等待在condition上,当其他线程对condition

调用了signal后,该节点将会从等待队列中进入同步队列中,获取同步状态 */

static final int CONDITION = -2;

/**

* 下一次共享式同步状态获取会无条件的传播下去

*/

static final int PROPAGATE = -3;

/** 等待状态 */

volatile int waitStatus;

/** 前驱节点 */

volatile Node prev;

/** 后继节点 */

volatile Node next;

/** 获取同步状态的线程 */

volatile Thread thread;

/**

* 下一个条件队列等待节点

*/

Node nextWaiter;

final boolean isShared() {

return nextWaiter == SHARED;

}

final Node predecessor() throws NullPointerException {

Node p = prev;

if (p == null)

throw new NullPointerException();

else

return p;

}

Node() { // Used to establish initial head or SHARED marker

}

Node(Thread thread, Node mode) { // Used by addWaiter

this.nextWaiter = mode;

this.thread = thread;

}

Node(Thread thread, int waitStatus) { // Used by Condition

this.waitStatus = waitStatus;

this.thread = thread;

}

}

复制代码

FIFO队列(CLH队列)

队列用head,tail和state三个变量来维护,源码如下:

/** 头节点 */

private transient volatile Node head;

/** 尾节点 */

private transient volatile Node tail;

/** 同步状态 */

private volatile int state;

复制代码

结构示意图如下:

compareAndSetState调用

首先尝试获取锁,调用compareAndSetState方法,期待值为0,新值为1。使用unsafe的compareAndSwapInt方法,通过一次CAS操作来修改state属性。

CAS操作即内存拿到volatile修饰的state属性值,与期望值0对比,如果取到的值为0,则执行+1操作,将state修改为1。其中还涉及知识点volatile修饰变量保证线程间可见,以及CAS操作的经典ABA问题。

源码如下:

/**

* 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值。

* 这个操作具有volatile读和写的内存语义。

* @param expect 期望值

* @param update 新值

* @return false返回表示实际值不等于预期值,true表示成功

*/

protected final boolean compareAndSetState(int expect, int update) {

// See below for intrinsics setup to support this

return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

}

复制代码

如果此方法执行成功,则调用setExclusiveOwnerThread方法将让线程占有锁,此时state已经置为1。

acquire调用

进入此方法说明,当前已经有其他线程占有锁了。由于此种加锁方式是非公平锁,进入方法后,首先尝试获取锁,如果获取不到锁,那么再将当前线程置于队列中,让当前线程中断执行。

非公平锁在此方法中首先展示不公平,这种不公平是对在队列中的线程来说的。就像我们去银行办业务,如果我是VIP用户,我可以越过等待的用户先办理,这对于其他等待用户不公平。

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

复制代码

tryAcquire

调用此方法来尝试获取锁。源码如下:

final boolean nonfairTryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

/** 首先获取state状态,此时第一个判断c==0后的操作是因为,

有可能在执行过程中,其他线程释放了锁,那么state为0,则直接让当前线程持有锁

*/

if (c == 0) {

if (compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

/** 如果当前线程就是持有锁的线程,那么state+1,

此处提现了可重入锁的概念,每次线程重入该锁就重复此操作*/

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0) // overflow

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

复制代码

此处setState(nextc),只是单纯让state+1,而没有用CAS操作。

addWaiter

负责把当前无法获得锁的线程包装为一个Node添加到队尾。

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);

return node;

}

复制代码

其中参数mode是独占锁还是共享锁,默认为null,独占锁。追加到队尾的动作分两步:
1.如果当前队尾已经存在(tail!=null),则使用CAS把当前线程追加到队尾。
2.如果当前Tail为null或则线程调用CAS设置队尾失败,则通过enq方法继续追加。

enq

通过for循环和CAS操作自旋过程,将当前线程加入队列中,源码如下:

private Node enq(final Node node) {

for (;;) {

Node t = tail;

// 如果队尾是null,则说明队列空了,将当前线程设置为头尾节点

if (t == null) {

if (compareAndSetHead(new Node()))

tail = head;

} else {

// 队尾非空,通过CAS将当前线程加入队尾。

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

复制代码

acquireQueued

此方法是对addWaiter的补充,将加入队列的线程中断执行,源码如下:

final boolean acquireQueued(final Node node, int arg) {

// 操作失败标志

boolean failed = true;

try {

// 线程中断标志

boolean interrupted = false;

for (;;) {

// 当前节点的prev节点

final Node p = node.predecessor();

// 如果前一节点是头结点,并且尝试获取同步状态成功

if (p == head && tryAcquire(arg)) {

// 将当前当前线程设置成头结点

setHead(node);

// 将prev移除队列

p.next = null; // help GC

failed = false;

return interrupted;

}

// 判断当前线程是否需要阻塞 && 阻塞当前线程并且检验线程中断状态

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

// 取消获取同步状态

if (failed)

cancelAcquire(node);

}

}

复制代码

p == head && tryAcquire(arg),这里的判断也显示了非公平的意义。队里中有等待线程还要尝试获取锁。

shouldParkAfterFailedAcquire

此方法是阻塞线程前最后的检查操作,通过prev节点的等待状态判断当前线程是否应该被阻塞,

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

// 拿到prev节点的等待状态

int ws = pred.waitStatus;

if (ws == Node.SIGNAL)

/*

* 如果prev的status是signal,表示prev处于等待状态,可以阻塞当前线程,

* 当prev释放了同步状态或者取消了,会通知当前节点。

*/

return true;

if (ws > 0) {

/*

* status > 0,表示为取消状态,需要将取消状态的节点从队列中移除

* 直到找到一个状态不是取消的节点为止

*/

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

/*

* 除了以上情况,通过CAS将prev的status设置成signal

*/

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

复制代码

parkAndCheckInterrupt

如果程序走到这个位置,那么就说明已经将当前线程加入队列中,可以让线程中断了。线程阻塞通过调用LockSupport.park()完成,而LockSupport.park()则调用sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。

private final boolean parkAndCheckInterrupt() {

LockSupport.park(this);

return Thread.interrupted();

}

复制代码

至此加锁过程完成。

总结

ReentrantLock加锁是通过AQS实现,AQS中维护了一个FIFO的队列,当存在锁竞争时构建队列,构建过程中使用CAS和自旋,保证线程能够进入队列。已经进入队列的线程需要阻塞,使用LockSupport.park()方法完成,阻塞线程能够让CPU更专注于执行持有锁的线程,而不是将资源浪费在尝试获取锁的自旋过程中。
以上是对ReentrantLock加锁的过程分析,希望大佬多提意见。

链接:https://juejin.cn/post/7028106775555473445

收藏
0
有帮助
0
没帮助
0
相关内容