Poison

Decorator

之前在编写基于 ScheduledExecutorService (Java Platform SE 8 ) 的定时任务处理逻辑时,发现若任务出现异常,将不会被再调度执行,其文档中也有如下说明:

If any execution of the task encounters an exception, subsequent executions are suppressed.

比如如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
package me.tianshuang;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceTest {

public static void main(String[] args) {
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> System.out.println("I'm running..."), 1, 1, TimeUnit.SECONDS);
}

}

控制台将以一秒的时间间隔持续打印 I'm running...,部分输出如下:

1
2
3
4
5
6
7
8
I'm running...
I'm running...
I'm running...
I'm running...
I'm running...
I'm running...
I'm running...
I'm running...

现在我们在任务中加入异常抛出逻辑,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package me.tianshuang;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceTest {

public static void main(String[] args) {
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> {
System.out.println("I'm running...");
throw new RuntimeException();
}, 0, 1, TimeUnit.SECONDS);
}

}

控制台打印一次 I'm running... 后就不再打印,输出如下:

1
I'm running...

如果拉取栈帧,你会看到线程依然还在,只是处于 WAITING 状态,定时任务的线程对应的栈帧如下:

1
2
3
4
5
6
7
8
9
10
11
"pool-1-thread-1@729" prio=5 tid=0xd nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

而为什么异常后就不再被调度执行了呢,跟踪源码可以发现,在 ScheduledFutureTaskrun() 方法中会触发对任务的实际调用,源码位于 ScheduledThreadPoolExecutor.java at jdk8-b120:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}

其中 runAndReset() 方法实现位于 FutureTask.java at jdk8-b120:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* Executes the computation without setting its result, and then
* resets this future to initial state, failing to do so if the
* computation encounters an exception or is cancelled. This is
* designed for use with tasks that intrinsically execute more
* than once.
*
* @return {@code true} if successfully run and reset
*/
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

/**
* Causes this future to report an {@link ExecutionException}
* with the given throwable as its cause, unless this future has
* already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon failure of the computation.
*
* @param t the cause of failure
*/
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

可以看出,在调用 c.call() 时,外层对 Throwable 进行了 catch 操作,然后调用了 setException 方法,该任务就算执行完成了,而因为 c.call() 调用发生了异常,导致 ran = true; 未被执行,使 runAndReset() 方法的返回值为 false,从而使 run() 方法的最后一个 else if 块中的代码不会被执行,即该定时任务不会再入队了,这就导致了我们文首提到的问题。

关于该问题,我联想到 Spring 中的定时任务,在 Spring 中的定时任务中,任务异常后依然会被调度执行,这是怎样实现的呢,其实很简单,我们先看 Spring 中的 org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler#scheduleAtFixedRate(java.lang.Runnable, long) 方法:

1
2
3
4
5
6
7
8
9
10
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
return executor.scheduleAtFixedRate(errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

由以上代码可知,在将任务提交给底层的 ScheduledExecutorService 前,调用了 errorHandlingTask(task, true) 对任务进行了处理,我们看下此方法实现:

1
2
3
private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
}

即使用了 TaskUtils.decorateTaskWithErrorHandler 方法,将需要提交的 task 进行装饰,装饰上 ErrorHandler 以实现错误处理,该接口的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* A strategy for handling errors. This is especially useful for handling
* errors that occur during asynchronous execution of tasks that have been
* submitted to a TaskScheduler. In such cases, it may not be possible to
* throw the error to the original caller.
*
* @author Mark Fisher
* @since 3.0
*/
@FunctionalInterface
public interface ErrorHandler {

/**
* Handle the given error, possibly rethrowing it as a fatal exception.
*/
void handleError(Throwable t);

}

即用于处理异常,我们再看刚提到的 TaskUtils.decorateTaskWithErrorHandler 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Decorate the task for error handling. If the provided {@link ErrorHandler}
* is not {@code null}, it will be used. Otherwise, repeating tasks will have
* errors suppressed by default whereas one-shot tasks will have errors
* propagated by default since those errors may be expected through the
* returned {@link Future}. In both cases, the errors will be logged.
*/
public static DelegatingErrorHandlingRunnable decorateTaskWithErrorHandler(
Runnable task, @Nullable ErrorHandler errorHandler, boolean isRepeatingTask) {

if (task instanceof DelegatingErrorHandlingRunnable) {
return (DelegatingErrorHandlingRunnable) task;
}
ErrorHandler eh = (errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask));
return new DelegatingErrorHandlingRunnable(task, eh);
}

根据注释及源码我们知道,该方法用于装饰 task 以处理运行中的错误,最常用的两个 ErrorHandler 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* An ErrorHandler strategy that will log the Exception but perform
* no further handling. This will suppress the error so that
* subsequent executions of the task will not be prevented.
*/
public static final ErrorHandler LOG_AND_SUPPRESS_ERROR_HANDLER = new LoggingErrorHandler();

/**
* An ErrorHandler strategy that will log at error level and then
* re-throw the Exception. Note: this will typically prevent subsequent
* execution of a scheduled task.
*/
public static final ErrorHandler LOG_AND_PROPAGATE_ERROR_HANDLER = new PropagatingErrorHandler();

/**
* Return the default {@link ErrorHandler} implementation based on the boolean
* value indicating whether the task will be repeating or not. For repeating tasks
* it will suppress errors, but for one-time tasks it will propagate. In both
* cases, the error will be logged.
*/
public static ErrorHandler getDefaultErrorHandler(boolean isRepeatingTask) {
return (isRepeatingTask ? LOG_AND_SUPPRESS_ERROR_HANDLER : LOG_AND_PROPAGATE_ERROR_HANDLER);
}


/**
* An {@link ErrorHandler} implementation that logs the Throwable at error
* level. It does not perform any additional error handling. This can be
* useful when suppression of errors is the intended behavior.
*/
private static class LoggingErrorHandler implements ErrorHandler {

private final Log logger = LogFactory.getLog(LoggingErrorHandler.class);

@Override
public void handleError(Throwable t) {
logger.error("Unexpected error occurred in scheduled task", t);
}
}


/**
* An {@link ErrorHandler} implementation that logs the Throwable at error
* level and then propagates it.
*/
private static class PropagatingErrorHandler extends LoggingErrorHandler {

@Override
public void handleError(Throwable t) {
super.handleError(t);
ReflectionUtils.rethrowRuntimeException(t);
}
}

即一种为仅打印日志,不将异常抛出给调用方,一种为打印日志并抛出异常给调用方。确认了 ErrorHandler 的实现后,使用 taskErrorHandler 去创建 Runnable 的包装器 DelegatingErrorHandlingRunnable 以实现装饰,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Runnable wrapper that catches any exception or error thrown from its
* delegate Runnable and allows an {@link ErrorHandler} to handle it.
*
* @author Juergen Hoeller
* @author Mark Fisher
* @since 3.0
*/
public class DelegatingErrorHandlingRunnable implements Runnable {

private final Runnable delegate;

private final ErrorHandler errorHandler;


/**
* Create a new DelegatingErrorHandlingRunnable.
* @param delegate the Runnable implementation to delegate to
* @param errorHandler the ErrorHandler for handling any exceptions
*/
public DelegatingErrorHandlingRunnable(Runnable delegate, ErrorHandler errorHandler) {
Assert.notNull(delegate, "Delegate must not be null");
Assert.notNull(errorHandler, "ErrorHandler must not be null");
this.delegate = delegate;
this.errorHandler = errorHandler;
}

@Override
public void run() {
try {
this.delegate.run();
}
catch (UndeclaredThrowableException ex) {
this.errorHandler.handleError(ex.getUndeclaredThrowable());
}
catch (Throwable ex) {
this.errorHandler.handleError(ex);
}
}

@Override
public String toString() {
return "DelegatingErrorHandlingRunnable for " + this.delegate;
}

}

默认情况下的实现使用的 org.springframework.scheduling.support.TaskUtils.LoggingErrorHandler,这也是 Spring 中定时任务异常后依然会被触发执行的原因。

Reference

ScheduledExecutorService Exception handling - Stack Overflow