Poison

Executor of Tomcat

首先我们看看 JDK 中 java.util.concurrent.ThreadPoolExecutor 提交任务的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

根据以上代码及注释,我们知道,在 JDK 的线程池实现中,当线程池中的 worker 数量小于 corePoolSize 时,会尝试创建 worker 并执行任务,而如果线程池中的 worker 数量大于等于 corePoolSize 时,会尝试将任务放入队列,仅当放入队列失败时才会尝试创建 worker 并执行任务。那么可以理解为 JDK 中的实现偏向于尽量少创建线程,优先放入队列,更加适合于 CPU 密集型的任务。

我们再看看 Tomcat 中线程池的实现,代码位于 ThreadPoolExecutor.java,核心部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* The number of tasks submitted but not yet finished. This includes tasks
* in the queue and tasks that have been handed to a worker thread but the
* latter did not start executing the task yet.
* This number is always greater or equal to {@link #getActiveCount()}.
*/
private final AtomicInteger submittedCount = new AtomicInteger(0);

@Override
protected void afterExecute(Runnable r, Throwable t) {
// Throwing StopPooledThreadException is likely to cause this method to
// be called more than once for a given task based on the typical
// implementations of the parent class. This test ensures that
// decrementAndGet() is only called once after each task execution.
if (!(t instanceof StopPooledThreadException)) {
submittedCount.decrementAndGet();
}

if (t == null) {
stopCurrentThreadIfNeeded();
}
}

/**
* {@inheritDoc}
*/
@Override
public void execute(Runnable command) {
execute(command,0,TimeUnit.MILLISECONDS);
}

/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the <code>Executor</code> implementation.
* If no threads are available, it will be added to the work queue.
* If the work queue is full, the system will wait for the specified
* time and it throw a RejectedExecutionException if the queue is still
* full after that.
*
* @param command the runnable task
* @param timeout A timeout for the completion of the task
* @param unit The timeout time unit
* @throws RejectedExecutionException if this task cannot be
* accepted for execution - the queue is full
* @throws NullPointerException if command or unit is null
*/
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}

}
}

关键之处在于维护了一个 submittedCount 变量来记录当前时刻已经提交的任务数,而这个变量哪里有用到呢,此处就要引入 org.apache.tomcat.util.threads.TaskQueue 类,该类继承了 java.util.concurrent.LinkedBlockingQueue 重写了 offer 方法,其中代码位于 TaskQueue.java,相关源码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private transient volatile ThreadPoolExecutor parent = null;

public void setParent(ThreadPoolExecutor tp) {
parent = tp;
}

@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) {
return super.offer(o);
}
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
return super.offer(o);
}
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
return super.offer(o);
}
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
return false;
}
//if we reached here, we need to add it to the queue
return super.offer(o);
}

其中 parent 即指向 Tomcat 中自行实现的线程池 org.apache.tomcat.util.threads.ThreadPoolExecutor 类的实例,主要的不同在于当前提交的任务数大于当前时刻线程池中的线程数且小于最大线程数时,直接返回了 false 不允许任务进入队列,以触发对 addWorker 的调用,即对 JDK 默认的实现语义进行了调整,Tomcat 中的实现当前已提交的且未执行完成的任务数大于当前时刻线程池中的线程数时,会优先创建线程,而不是放入队列,更加适合于 Tomcat 这种 IO 密集型的应用。

在 Tomcat 中,Executor 的创建逻辑位于 org.apache.tomcat.util.net.AbstractEndpoint#createExecutor:

1
2
3
4
5
6
7
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}

可以看出,创建了一个 TaskQueue 的实例,然后再创建了一个线程池,使用 TaskQueue 作为队列的实现,同时将 org.apache.tomcat.util.threads.ThreadPoolExecutor 的实例设置至 TaskQueue 实例的 parent 属性,以使 TaskQueue 可以使用 org.apache.tomcat.util.threads.ThreadPoolExecutor 实例中的 submittedCount 变量。

关于 org.apache.tomcat.util.threads.TaskQueue 类,源码中的注释如下:

1
2
3
4
5
6
7
/**
* As task queue specifically designed to run with a thread pool executor. The
* task queue is optimised to properly utilize threads within a thread pool
* executor. If you use a normal queue, the executor will spawn threads when
* there are idle threads and you wont be able to force items onto the queue
* itself.
*/

即设计用于更好地利用线程池中的线程。随后,注释又提到,如果你使用一个普通的队列,那么 executor 在具有空闲线程时将创建线程而你不能强制任务进入队列。刚看到后面这段注释时我是不能理解的,因为在 JDK 的实现中,当线程池中的 worker 数量大于等于 corePoolSize 时,会尝试将任务放入队列,仅当放入队列失败时才会尝试创建 worker 并执行任务,与此注释并不相符。我为此专门询问了作者注释中的 normal queue 是指什么队列,讨论可以参考:Re: About the comment of org.apache.tomcat.util.threads.TaskQueue。作者表示写这段注释的背景是,当 corePoolSizemaxThreads 设置为相同的值时,即我们通常使用 java.util.concurrent.Executors#newFixedThreadPool(int) 创建的线程池时,在线程池中的线程数达到 corePoolSizemaxThreads 之前,都会调用 addWorker 进行线程的创建,即使当前线程池中的线程存在空闲的线程也会创建,这就是后面这段注释的背景。

在 2021 年 7 月,有用户报告了该实现存在的一个竞态条件,Bug 可参考:65454 – Race condition with idle thread removal results in request being improperly enqueued by default ThreadPoolExecutor,该竞态条件的发生条件如下:

线程池一共有 11 个线程,其中 10 个线程运行,1 个线程空闲。当空闲的线程等待任务的时间达到超时时间(默认 60 秒)时,会调用 java.util.concurrent.ThreadPoolExecutor#processWorkerExit 进行 worker 的退出,该方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

在 line 22 执行之前,workers 中是含有 11 个线程的,而因为 submittedCount 此时为 10,那么根据 org.apache.tomcat.util.threads.ThreadPoolExecutor#getPoolSizeorg.apache.tomcat.util.threads.TaskQueue#offer 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
1
2
3
4
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
return super.offer(o);
}

此时调用 workQueue.offer(command) 会使任务进入队列,随后空闲的线程会退出,那么此时线程池中有 10 个忙碌的线程,队列中含有 1 个任务,如果这 10 个线程处理的任务一直没有执行完成且没有新请求进入触发线程创建的话,那么该队列中的这个任务将得不到执行。

为了修复这个问题,Mask 修改了 java.util.concurrent.ThreadPoolExecutor#processWorkerExit 中的代码,而该方法被 private 方法修饰,不能被覆写,最后 Mark 采用复制 java.util.concurrent.ThreadPoolExecutor 代码至 org.apache.tomcat.util.threads.ThreadPoolExecutor 的方式来对这个 Bug 进行了修复,修复的提交位于:Fix BZ 65454. Correct a timing issue that could delay a request · apache/tomcat@0f2e084 · GitHub

修复前的 java.util.concurrent.ThreadPoolExecutor#processWorkerExit 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

修复后的 org.apache.tomcat.util.threads.ThreadPoolExecutor#processWorkerExit 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) {
decrementWorkerCount();
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) {
min = 1;
}
// https://bz.apache.org/bugzilla/show_bug.cgi?id=65454
// If the work queue is not empty, it is likely that a task was
// added to the work queue between this thread timing out and
// the worker count being decremented a few lines above this
// comment. In this case, create a replacement worker so that
// the task isn't held in the queue waiting for one of the other
// workers to finish.
if (workerCountOf(c) >= min && workQueue.isEmpty()) {
return; // replacement not needed
}
}
addWorker(null, false);
}
}

即在退出前,检查 workQueue 是否为空,如果不为空,则创建一个线程规避上面的竞态条件导致的问题。

类似的实现在 Dubbo 中也可以看到,根据以上的分析,可以认为 Dubbo 中也存在相同的 Bug,相关源码可以参考下方链接。

Reference

Apache Tomcat 8 Configuration Reference (8.5.72) - The Executor (thread pool)
dubbo/TaskQueue.java at dubbo-2.7.5 · apache/dubbo · GitHub
dubbo/EagerThreadPool.java at dubbo-2.7.5 · apache/dubbo · GitHub
Extension: Eager Thread Pool (#1568) · apache/dubbo@38f45ee · GitHub