Poison

Decorator

之前在编写基于 ScheduledExecutorService (Java Platform SE 8 ) 的定时任务处理逻辑时,发现若任务出现异常,将不会被再调度执行,其文档中也有如下说明:

If any execution of the task encounters an exception, subsequent executions are suppressed.

比如如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
package me.tianshuang;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceTest {

public static void main(String[] args) {
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> System.out.println("I'm running..."), 1, 1, TimeUnit.SECONDS);
}

}

控制台将以一秒的时间间隔持续打印 I'm running...,部分输出如下:

1
2
3
4
5
6
7
8
I'm running...
I'm running...
I'm running...
I'm running...
I'm running...
I'm running...
I'm running...
I'm running...

现在我们在任务中加入异常抛出逻辑,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package me.tianshuang;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceTest {

public static void main(String[] args) {
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> {
System.out.println("I'm running...");
throw new RuntimeException();
}, 0, 1, TimeUnit.SECONDS);
}

}

控制台打印一次 I'm running... 后就不再打印,输出如下:

1
I'm running...

关于该问题,我联想到 Spring 中的定时任务,在 Spring 中的定时任务中,任务异常后依然会被调度执行,这是怎样实现的呢,其实很简单,我们先看 Spring 中的 org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler#scheduleAtFixedRate(java.lang.Runnable, long) 方法:

1
2
3
4
5
6
7
8
9
10
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
return executor.scheduleAtFixedRate(errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

由以上代码可知,在将任务提交给底层的 ScheduledExecutorService 前,调用了 errorHandlingTask(task, true) 对任务进行了处理,我们看下此方法实现:

1
2
3
private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
}

即使用了 TaskUtils.decorateTaskWithErrorHandler 方法,将需要提交的 task 进行装饰,装饰上 ErrorHandler 以实现错误处理,该接口的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* A strategy for handling errors. This is especially useful for handling
* errors that occur during asynchronous execution of tasks that have been
* submitted to a TaskScheduler. In such cases, it may not be possible to
* throw the error to the original caller.
*
* @author Mark Fisher
* @since 3.0
*/
@FunctionalInterface
public interface ErrorHandler {

/**
* Handle the given error, possibly rethrowing it as a fatal exception.
*/
void handleError(Throwable t);

}

即用于处理异常,我们再看刚提到的 TaskUtils.decorateTaskWithErrorHandler 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Decorate the task for error handling. If the provided {@link ErrorHandler}
* is not {@code null}, it will be used. Otherwise, repeating tasks will have
* errors suppressed by default whereas one-shot tasks will have errors
* propagated by default since those errors may be expected through the
* returned {@link Future}. In both cases, the errors will be logged.
*/
public static DelegatingErrorHandlingRunnable decorateTaskWithErrorHandler(
Runnable task, @Nullable ErrorHandler errorHandler, boolean isRepeatingTask) {

if (task instanceof DelegatingErrorHandlingRunnable) {
return (DelegatingErrorHandlingRunnable) task;
}
ErrorHandler eh = (errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask));
return new DelegatingErrorHandlingRunnable(task, eh);
}

根据注释及源码我们知道,该方法用于装饰 task 以处理运行中的错误,最常用的两个 ErrorHandler 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* An ErrorHandler strategy that will log the Exception but perform
* no further handling. This will suppress the error so that
* subsequent executions of the task will not be prevented.
*/
public static final ErrorHandler LOG_AND_SUPPRESS_ERROR_HANDLER = new LoggingErrorHandler();

/**
* An ErrorHandler strategy that will log at error level and then
* re-throw the Exception. Note: this will typically prevent subsequent
* execution of a scheduled task.
*/
public static final ErrorHandler LOG_AND_PROPAGATE_ERROR_HANDLER = new PropagatingErrorHandler();

/**
* Return the default {@link ErrorHandler} implementation based on the boolean
* value indicating whether the task will be repeating or not. For repeating tasks
* it will suppress errors, but for one-time tasks it will propagate. In both
* cases, the error will be logged.
*/
public static ErrorHandler getDefaultErrorHandler(boolean isRepeatingTask) {
return (isRepeatingTask ? LOG_AND_SUPPRESS_ERROR_HANDLER : LOG_AND_PROPAGATE_ERROR_HANDLER);
}


/**
* An {@link ErrorHandler} implementation that logs the Throwable at error
* level. It does not perform any additional error handling. This can be
* useful when suppression of errors is the intended behavior.
*/
private static class LoggingErrorHandler implements ErrorHandler {

private final Log logger = LogFactory.getLog(LoggingErrorHandler.class);

@Override
public void handleError(Throwable t) {
logger.error("Unexpected error occurred in scheduled task", t);
}
}


/**
* An {@link ErrorHandler} implementation that logs the Throwable at error
* level and then propagates it.
*/
private static class PropagatingErrorHandler extends LoggingErrorHandler {

@Override
public void handleError(Throwable t) {
super.handleError(t);
ReflectionUtils.rethrowRuntimeException(t);
}
}

即一种为仅打印日志,不将异常抛出给调用方,一种为打印日志并抛出异常给调用方。确认了 ErrorHandler 的实现后,使用 taskErrorHandler 去创建 Runnable 的包装器 DelegatingErrorHandlingRunnable 以实现装饰,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Runnable wrapper that catches any exception or error thrown from its
* delegate Runnable and allows an {@link ErrorHandler} to handle it.
*
* @author Juergen Hoeller
* @author Mark Fisher
* @since 3.0
*/
public class DelegatingErrorHandlingRunnable implements Runnable {

private final Runnable delegate;

private final ErrorHandler errorHandler;


/**
* Create a new DelegatingErrorHandlingRunnable.
* @param delegate the Runnable implementation to delegate to
* @param errorHandler the ErrorHandler for handling any exceptions
*/
public DelegatingErrorHandlingRunnable(Runnable delegate, ErrorHandler errorHandler) {
Assert.notNull(delegate, "Delegate must not be null");
Assert.notNull(errorHandler, "ErrorHandler must not be null");
this.delegate = delegate;
this.errorHandler = errorHandler;
}

@Override
public void run() {
try {
this.delegate.run();
}
catch (UndeclaredThrowableException ex) {
this.errorHandler.handleError(ex.getUndeclaredThrowable());
}
catch (Throwable ex) {
this.errorHandler.handleError(ex);
}
}

@Override
public String toString() {
return "DelegatingErrorHandlingRunnable for " + this.delegate;
}

}

默认情况下的实现使用的 org.springframework.scheduling.support.TaskUtils.LoggingErrorHandler,这也是 Spring 中定时任务异常后依然会被触发执行的原因。

References

java - ScheduledExecutorService Exception handling - Stack Overflow