Poison

ReadWriteLock

首先看一段存在 bug 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@GuardedBy("isolatedEndpointsSetLock")
private final Set<Endpoints> isolatedEndpointsSet;
private final ReadWriteLock isolatedEndpointsSetLock;

public ProducerImpl(String group) throws ClientException {
super(group);
this.defaultSendCallbackExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryImpl("SendCallbackWorker"));

this.sendingRouteDataCache = new ConcurrentHashMap<String, SendingTopicRouteData>();

this.isolatedEndpointsSet = new HashSet<Endpoints>();
this.isolatedEndpointsSetLock = new ReentrantReadWriteLock();
}

/**
* Check the status of isolated {@link Endpoints}, rejoin it if it is healthy.
*/
@Override
public void doHealthCheck() {
final Set<Endpoints> routeEndpointsSet = getRouteEndpointsSet();
final Set<Endpoints> expired = new HashSet<Endpoints>(Sets.difference(routeEndpointsSet, isolatedEndpointsSet));
// remove all isolated endpoints which is expired.
isolatedEndpointsSetLock.writeLock().lock();
try {
isolatedEndpointsSet.removeAll(expired);
} finally {
isolatedEndpointsSetLock.writeLock().unlock();
}

HealthCheckRequest request = HealthCheckRequest.newBuilder().build();
isolatedEndpointsSetLock.readLock().lock();
try {
for (final Endpoints endpoints : isolatedEndpointsSet) {
Metadata metadata;
try {
metadata = sign();
} catch (Throwable t) {
continue;
}
final ListenableFuture<HealthCheckResponse> future =
clientManager.healthCheck(endpoints, metadata, request, ioTimeoutMillis, TimeUnit.MILLISECONDS);
Futures.addCallback(future, new FutureCallback<HealthCheckResponse>() {
@Override
public void onSuccess(HealthCheckResponse response) {
final Status status = response.getCommon().getStatus();
final Code code = Code.forNumber(status.getCode());
// target endpoints is healthy, rejoin it.
if (Code.OK.equals(code)) {
isolatedEndpointsSetLock.writeLock().lock();
try {
isolatedEndpointsSet.remove(endpoints);
} finally {
isolatedEndpointsSetLock.writeLock().unlock();
}
log.info("Rejoin endpoints which is isolated before, clientId={}, endpoints={}", id,
endpoints);
return;
}
log.warn("Failed to rejoin the endpoints which is isolated before, clientId={}, code={}, "
+ "status message=[{}], endpoints={}", id, code, status.getMessage(), endpoints);
}

@Override
public void onFailure(Throwable t) {
log.error("Failed to do health check, clientId={}, endpoints={}", id, endpoints, t);
}
});
}
} finally {
isolatedEndpointsSetLock.readLock().unlock();
}
}

不知道大家能不能看出以上代码存在的问题,我第一眼看到的是第 27 行对 isolatedEndpointsSet 的读取操作没有加上读锁。其实还有另一个问题,那就是第 55 行会触发死锁,为什么呢?不是由另一个线程执行回调方法申请写锁吗?以上代码使用的 Guava 版本为 20.0,我们看下 Futures.addCallback 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Registers separate success and failure callbacks to be run when the {@code Future}'s
* computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the
* computation is already complete, immediately.
*
* <p>There is no guaranteed ordering of execution of callbacks, but any callback added through
* this method is guaranteed to be called once the computation is complete.
*
* Example: <pre> {@code
* ListenableFuture<QueryResult> future = ...;
* addCallback(future,
* new FutureCallback<QueryResult>() {
* public void onSuccess(QueryResult result) {
* storeInCache(result);
* }
* public void onFailure(Throwable t) {
* reportError(t);
* }
* });}</pre>
*
* <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous
* choice in some cases. See the discussion in the {@link ListenableFuture#addListener
* ListenableFuture.addListener} documentation.
*
* <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link
* ListenableFuture#addListener addListener}.
*
* @param future The future attach the callback to.
* @param callback The callback to invoke when {@code future} is completed.
* @since 10.0
*/
public static <V> void addCallback(
ListenableFuture<V> future, FutureCallback<? super V> callback) {
addCallback(future, callback, directExecutor());
}

可以看出,注释进行了如下的说明:

This overload, which does not accept an executor, uses directExecutor, a dangerous choice in some cases. See the discussion in the ListenableFuture#addListener documentation.

即该方法并没有接受一个 executor,而是使用的 directExecutor(),在某些情况下是一个危险的选择,可以查看 ListenableFuture#addListener 文档中的讨论。我们先看看 directExecutor() 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Returns an {@link Executor} that runs each task in the thread that invokes
* {@link Executor#execute execute}, as in {@link CallerRunsPolicy}.
*
* <p>This instance is equivalent to: <pre> {@code
* final class DirectExecutor implements Executor {
* public void execute(Runnable r) {
* r.run();
* }
* }}</pre>
*
* <p>This should be preferred to {@link #newDirectExecutorService()} because implementing the
* {@link ExecutorService} subinterface necessitates significant performance overhead.
*
* @since 18.0
*/
public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
}

/** See {@link #directExecutor} for behavioral notes. */
private enum DirectExecutor implements Executor {
INSTANCE;

@Override
public void execute(Runnable command) {
command.run();
}

@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}

可以看出 DirectExecutorexecute 方法实现为 command.run(),即直接用当前线程执行 run() 方法,与 CallerRunsPolicy 拒绝策略实现一样。我们再看看 ListenableFuture#addListener 的文档:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public interface ListenableFuture<V> extends Future<V> {
/**
* Registers a listener to be {@linkplain Executor#execute(Runnable) run} on the given executor.
* The listener will run when the {@code Future}'s computation is {@linkplain Future#isDone()
* complete} or, if the computation is already complete, immediately.
*
* <p>There is no guaranteed ordering of execution of listeners, but any listener added through
* this method is guaranteed to be called once the computation is complete.
*
* <p>Exceptions thrown by a listener will be propagated up to the executor. Any exception thrown
* during {@code Executor.execute} (e.g., a {@code RejectedExecutionException} or an exception
* thrown by {@linkplain MoreExecutors#directExecutor direct execution}) will be caught and
* logged.
*
* <p>Note: For fast, lightweight listeners that would be safe to execute in any thread, consider
* {@link MoreExecutors#directExecutor}. Otherwise, avoid it. Heavyweight {@code directExecutor}
* listeners can cause problems, and these problems can be difficult to reproduce because they
* depend on timing. For example:
*
* <ul>
* <li>The listener may be executed by the caller of {@code addListener}. That caller may be a UI
* thread or other latency-sensitive thread. This can harm UI responsiveness.
* <li>The listener may be executed by the thread that completes this {@code Future}. That thread
* may be an internal system thread such as an RPC network thread. Blocking that thread may stall
* progress of the whole system. It may even cause a deadlock.
* <li>The listener may delay other listeners, even listeners that are not themselves {@code
* directExecutor} listeners.
* </ul>
*
* <p>This is the most general listener interface. For common operations performed using
* listeners, see {@link Futures}. For a simplified but general listener interface, see {@link
* Futures#addCallback addCallback()}.
*
* <p>Memory consistency effects: Actions in a thread prior to adding a listener <a
* href="https://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.4.5">
* <i>happen-before</i></a> its execution begins, perhaps in another thread.
*
* @param listener the listener to run when the computation is complete
* @param executor the executor to run the listener in
* @throws RejectedExecutionException if we tried to execute the listener immediately but the
* executor rejected it.
*/
void addListener(Runnable listener, Executor executor);
}

其中特意提到,当使用 MoreExecutors#directExecutor 时:

The listener may be executed by the thread that completes this Future. That thread may be an internal system thread such as an RPC network thread. Blocking that thread may stall progress of the whole system. It may even cause a deadlock.

即阻塞该线程可能阻塞整个系统的进程,甚至可能造成死锁。正如文档所警告的,文首的代码在生产环境中触发了死锁。为什么会触发死锁呢?很简单,因为读锁与写锁互斥,在当前线程已经申请到读锁的情况下,再去申请写锁,将会触发死锁。这个问题在 ReentrantReadWriteLock (Java Platform SE 8 ) 中示例代码中进行了简短的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}

try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}

其中第 9 行提到:

Must release read lock before acquiring write lock

如果我们未按示例代码的说明进行操作,即在已申请到读锁的情况下去申请写锁,将会导致当前线程死锁。执行文首代码的线程是一个 Endpoint 健康检查的线程,这个线程因死锁导致永久阻塞后影响还不算大,那么现在我们讨论另一个问题,该线程进入死锁状态后,其他线程申请读锁还能申请到吗?起初我认为是能申请的,因为即使当前线程死锁,也是因为写锁申请阻塞,且因为写锁并未申请成功,不应该阻塞其他线程申请读锁才对。但是事实告诉我以上的理解并不完全正确。线上实例出问题时我正在外面吃饭,当时通过手机看到 Agent 自动抓取到的栈帧显示大量的线程阻塞在读锁申请上,我当时猜测有线程申请到了写锁且未及时释放导致,但是后面我用电脑查看堆转储时告诉我并没有线程申请到了写锁。我们使用 MAT 在当时事故现场抓取的堆转储中执行以下 OQL: SELECT * FROM com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.producer.ProducerImpl:

RocketMQ-deadlock-state

可以看出 isolatedEndpointsSetLock 这把锁的 state 值为 65536,对应的二进制表示为:00000000_00000001_00000000_00000000,我们知道高 16 位表示读锁的状态,低 16 位表示写锁的状态,即此时 isolatedEndpointsSetLock 仅有读锁被申请到了,且 firstReaderRocketmqClientScheduler-1,即我们文首分析的进入死锁状态的线程,我们也可以用线程转储来验证这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"RocketmqClientScheduler-1" #71 prio=5 os_prio=0 tid=0x00007f106ca8f800 nid=0x58 waiting on condition [0x00007f104b041000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006cda10750> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
at com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.producer.ProducerImpl$1.onSuccess(ProducerImpl.java:252)
at com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.producer.ProducerImpl$1.onSuccess(ProducerImpl.java:245)
at com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures$4.run(Futures.java:1132)
at com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:435)
at com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:900)
at com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.AbstractFuture.addListener(AbstractFuture.java:634)
at com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures.addCallback(Futures.java:1135)
at com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures.addCallback(Futures.java:1073)
at com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.producer.ProducerImpl.doHealthCheck(ProducerImpl.java:245)
at com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.ClientManagerImpl.doHealthCheck(ClientManagerImpl.java:262)
at com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.ClientManagerImpl.access$000(ClientManagerImpl.java:84)
at com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.ClientManagerImpl$1.run(ClientManagerImpl.java:168)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

RocketmqClientScheduler-1 线程申请到了读锁,在申请写锁的过程中被挂起,触发了死锁。而此时写锁并未申请成功,为何会阻塞其他读锁申请的线程呢?不是说读锁与读锁不互斥吗?我们再看看这把锁的等待队列:

RocketMQ-deadlock-waitqueue

其中绿色的即为等待队列中各个节点对应的线程。即如下图所示:

1
2
3
+----------------+         +-------------------------------+         +--------------------------+         +--------------------------+
| dummy head |--------→| RocketmqClientScheduler-1 |--------→| Dubbo InternalThread |--------→| Dubbo InternalThread |--------→***
+----------------+ +-------------------------------+ +--------------------------+ +--------------------------+

即线程 RocketmqClientScheduler-1 处于队列中的首个节点(不含 dummy head),在等待自己释放读锁,而读锁需要申请到写锁后才能释放导致了死锁,导致线程 RocketmqClientScheduler-1 一直处于队列首个节点(不含 dummy head),而其他申请读锁的线程为何全部被阻塞挂起了呢?注意此时并没有任何线程持有写锁,按照读锁与写锁互斥的约定理论上读锁是应该能申请到的,但是实际上读锁并不能申请到,而是申请读锁的线程都被阻塞挂起了。我们再次回到文首的代码,可知创建读写锁是使用的这一行代码:this.isolatedEndpointsSetLock = new ReentrantReadWriteLock();,对应的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {
this(false);
}

/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

即创建的非公平读写锁,那么非公平读写锁在处理线程是否阻塞挂起的判断是怎样的呢,我们再看看 NonfairSync 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Nonfair version of Sync
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}

readerShouldBlock() 方法的注释中已经解释得非常清楚了,这是一个启发式的方法,为了避免写锁饥饿,如果队首节点(不含 dummy head)中的线程是申请写锁的线程,则读锁申请应该阻塞以便让写锁优先申请。我们看看 apparentlyFirstQueuedIsExclusive() 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Returns {@code true} if the apparent first queued thread, if one
* exists, is waiting in exclusive mode. If this method returns
* {@code true}, and the current thread is attempting to acquire in
* shared mode (that is, this method is invoked from {@link
* #tryAcquireShared}) then it is guaranteed that the current thread
* is not the first queued thread. Used only as a heuristic in
* ReentrantReadWriteLock.
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

该方法实现正如方法名所描述的,这也解释了在生产环境中遇到的问题,即不仅仅线程 RocketmqClientScheduler-1 被挂起了,其他申请读锁的线程全部被挂起了,导致相关线程池活跃线程数满,影响线上服务。在 ons-client 2.0.0.Finalons-client 2.0.1.Final 中都存在该问题,这个问题在 ons-client 2.0.2.Final 中被修复,修复的方案简单粗暴,直接移除了读写锁实现,改为了线程安全的集合实现:

1
this.isolatedEndpointsSet = Collections.newSetFromMap(new ConcurrentHashMap<Endpoints, Boolean>());

我们再看看读写锁为公平锁实现时判断锁申请是否应该阻塞的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Fair version of Sync
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}

/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
*
* <p>An invocation of this method is equivalent to (but may be
* more efficient than):
* <pre> {@code
* getFirstQueuedThread() != Thread.currentThread() &&
* hasQueuedThreads()}</pre>
*
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current
* thread. Likewise, it is possible for another thread to win a
* race to enqueue after this method has returned {@code false},
* due to the queue being empty.
*
* <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
*
* <pre> {@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}</pre>
*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

公平锁的判断则相对来说较为简单,即判断等待队列中是否存在线程指向不为当前线程的节点,正如方法名所描述的,是否存在排队的前驱。

最后总结下文首代码导致线上服务不可用的几个关键因素:

  • 使用了 Guava 的 Futures.addCallback 方法,该重载版本使用的 directExecutor 导致写锁申请在持有读锁的线程中执行,触发了死锁
  • 实例化 ReentrantReadWriteLock 时采用的默认的构造函数,即非公平锁实现,该实现中的 readerShouldBlock() 方法为了避免写锁申请饥饿进行了启发式处理
  • 由于某种执行次序导致 RocketmqClientScheduler-1 写锁申请对应的节点处于等待队列的队首(不含 dummy head),导致其他进行读锁申请的线程全部被阻塞挂起

正是以上几个因素的共同作用导致了线上事故,我们查看 Guava 关于 Futures 的 changelog 可以发现使用 directExecutorFutures.addCallback 方法在 2018-05-24 被标记为了 @DonotCall,于 2018-07-31 被移除,说明官方也认为该方法具有误导性,直接从 Guava 中移除了。相关 commit 可以参见:History for guava/src/com/google/common/util/concurrent/Futures.java - google/guava · GitHub

Reference

Heap Dump - Concepts - Memory Analyzer
版本说明 - 消息队列RocketMQ版 - 阿里云
ConcurrentHashSet