bobby.peng

zhendecai

  • 主页
  • 技术
  • 杂谈
  • Tips
所有文章 友链 关于我

bobby.peng

zhendecai

  • 主页
  • 技术
  • 杂谈
  • Tips

AQS(简要)

2018-04-11

独占锁

acquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
// tryAcquire用于确认是否能够获取到锁(根据status的状态来判定),默认需要继承类自己来实现
// Node.EXCLUSIVE 独占锁标志
// addWaiter是将该节点加入到后续节点之中
// acquireQueued是将当前线程阻塞(其实LockSupport.park()是线程waiting,而不是blocking)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

是否可重入取决于tryAquire的实现,包括公正锁非公正锁实现等等

addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
// 当前方法只是塞节点(并不涉及任何阻塞操作)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 如果已经被初始化过head & tail,可以先cas一下
// 大多数情况下没有那么强的竞争,一次就可以解决所有问题
// enq有部分初始化的功能,加上死循环的cas
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 初始化流程,包含cas判断,防止创建多个head
// 空节点时,head == tail 等于new node()
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 曾经一度认为链表的两段式操作一定要加锁
// 其实只要cas node tail成功就可以
// 就算不成功,当前node.prev重新设置就可以了,不影响原链表
// 只需要操作tail节点,因为head节点是当前操作的线程
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 在park之前先判断一下是否能够被唤醒
// unpark之后判断是否是执行线程
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// parkAndCheckInterrupt 即是调用 LockSupport方法
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//当时不明白为什么要这个finally操作,当时可以理解的部分是可能出现了runtimeException
//翻看了1.6的代码,确实是catch(RuntimeException e) {cancel}
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// cancelled 所以需要直接跳过
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

ReentrantLock中tryAcquire的实现(公正锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//公正锁非公正锁区别取决于hasQueuedPredecessors这个方法
//hasQueuedPredecessors用于判断当前队列是否有后续的排队节点
//state == 0 表示当前没有其他的竞争线程
//1.头结点线程已经执行完了,后续的节点还没有被唤醒
//2.在q中没有任何节点,也就是锁没有线程在竞争的状态
//公正锁就是保证了在情况1下面,后续节点在与外部节点竞争的情况下能够公正的排到q的tail上去,而不是在state==0的时候就直接执行
//每一步操作都不是原子操作,所以需要cas(status)来做最后的保证
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 可重入锁的操作
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 进入该方法的时候可能已经有其他线程唤醒了头部,或者有新的头结点介入了
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
// volatile 用内存屏障保证了多个volatile变量写不会重排序
// todo (Read fields in reverse initialization order) 最后一行应该不论正反读都是能保证可见性的
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 在初始化的时候 head先初始化,tail = head
// 可能存在的情况是head初始化完毕且tail没有初始化完成,所以需要判断h.next == null来判断操作是否在进行中
// 如果有并发操作在进行节点操作说明可能有多余的节点生成
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// h == null 则不需要unpark
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

unparkSuccessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部而不是从头开始是因为prev是可靠地
// 因为只有在prev之后才会进行cas来塞tail,如果tail塞成功证明prev一定是有效的
// 然而因为next是在cas之后,所以是不可靠的。
// 而效率问题
// 因为仅仅在s==null的情况下,或者s.waitStatus > 0 的情况下,大概率来说,在上述两行之间不会存在插入过多节点的效率问题
// 可能的情况是,下一个节点被cancel,而后面堆积了很多节点,此时效率可能会很低?(待验证)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

ReentrantLock tryRelease 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
// 独占锁的解锁相对简单,单线程不需要cas
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 判断可重入锁的情况
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

共享锁

acquireShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
// tryAcquireShared > 0 表示后续节点可以获取锁
// = 0 表示后续节点不能获取到锁
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

doAcquireSharedInterruptibly

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 确保了当前节点node p的前驱节点
if (p == head) {
// 和独占锁的主要差别在这个方法
// 因为status保证了可见性,所以此处一定能保证执行setHeadAndProagate的头结点一定需要被唤醒
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// 这之后可能引起外层head的失控
// 两个线程同时执行unpark方法(共享锁)
// 线程1在塞完head之后被停止,给到线程2,线程2透过外层的p == head的校验
// 可以进入到setHead方法,此时head不再是线程1设置的head,而是thread2设置的head
// 此时在执行到h == head 的时候,线程1和线程2的h均为线程2的node
// 然后当前线程就接着执行下去
// 所以此时是不强保证head的原子操作的
// todo 再想想
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 当propagate == 0 时,后续节点将不再能够获得锁。
// 如果propagate > 0, 则说明后续节点能够再次获得锁
// h == null || h.waitStatus < 0 均表示没有需要执行的节点
// h = head 保证了当前节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 共享锁会在唤醒当前节点的时候同时唤醒后续节点
doReleaseShared();
}
}
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 在park那个方法中会把后续节点设置为Signal
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// todo 不懂什么时候会执行
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

Semaphore中tryAcquire(fair)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* Attempts to acquire in shared mode. This method should query if
* the state of the object permits it to be acquired in the shared
* mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread.
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return a negative value on failure; zero if acquisition in shared
* mode succeeded but no subsequent shared-mode acquire can
* succeed; and a positive value if acquisition in shared
* mode succeeded and subsequent shared-mode acquires might
* also succeed, in which case a subsequent waiting thread
* must check availability. (Support for three different
* return values enables this method to be used in contexts
* where acquires only sometimes act exclusively.) Upon
* success, this object has been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
//上述是tryAcquireShared的说明
protected int tryAcquireShared(int acquires) {
for (;;) {
//与独占锁一样,用于判断是否有后续节点
if (hasQueuedPredecessors())
return -1;
//Semaphore中 status初始化=permits
//通过status的值来判断通量有多少
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

Semaphore tryReleaseShared

1
2
3
4
5
6
7
8
9
10
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
  • java
  • concurrent
  • aqs
  • tech

扫一扫,分享到微信

微信分享二维码
读写锁
redis复制
Like Issue Page
Loading comments...
Login with GitHub
Styling with Markdown is supported
Powered by Gitment
© 2018 bobby.peng
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链
  • 关于我

tag:

  • tree
  • cocurrent
  • java
  • 复习
  • mysql
  • data-structure
  • ximalaya
  • redis
  • wow
  • 转载
  • jvm
  • concurrent
  • aqs
  • Spring
  • io

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 麻子
  • 王鸿缘
  • 美团技术博客
  • 何登成的技术博客
  • 并发编程网
Blizzard fans
wow & overwatch
Arena master & Gladiator