Poison

ScheduledAnnotationBeanPostProcessor

设置调度器的核心逻辑位于 ScheduledAnnotationBeanPostProcessor.java at v4.3.16.RELEASE:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
// Running in an ApplicationContext -> register tasks this late...
// giving other ContextRefreshedEvent listeners a chance to perform
// their work at the same time (e.g. Spring Batch's job registration).
finishRegistration();
}
}

private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}

if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<SchedulingConfigurer>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}

if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskScheduler bean", ex);
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskScheduler bean", ex);
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
logger.debug("Could not find unique ScheduledExecutorService bean", ex2);
try {
this.registrar.setScheduler(resolveSchedulerBean(ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
logger.debug("Could not find default ScheduledExecutorService bean", ex2);
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}

this.registrar.afterPropertiesSet();
}

以上代码即在执行 EnableScheduling (Spring Framework 5.3.14 API) 中提到的这一段逻辑:

By default, Spring will search for an associated scheduler definition: either a unique TaskScheduler bean in the context, or a TaskScheduler bean named “taskScheduler” otherwise; the same lookup will also be performed for a ScheduledExecutorService bean. If neither of the two is resolvable, a local single-threaded default scheduler will be created and used within the registrar.

如果既没有设置过 TaskScheduler 也没有设置过 ScheduledExecutorService,那么会创建一个单线程的调度器,但是这个单线程的调度器的潜在问题是如果一个任务执行时阻塞了,其他的定时任务是得不到执行的,这个在我们之前的线上场景中遇到过,所以建议都配置一个多线程的定时任务线程池。在 EnableScheduling (Spring Framework 5.3.14 API) 的文档中还提到:

When more control is desired, a @Configuration class may implement SchedulingConfigurer. This allows access to the underlying ScheduledTaskRegistrar instance. For example, the following example demonstrates how to customize the Executor used to execute scheduled tasks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@EnableScheduling
public class AppConfig implements SchedulingConfigurer {

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskExecutor());
}

@Bean(destroyMethod="shutdown")
public Executor taskExecutor() {
return Executors.newScheduledThreadPool(100);
}
}

所以在我们的应用中经常就是这样设置的。直到我们在工程中接入 ShedLock 用于处理分布式环境下的定时任务重复执行的问题后,发现以上的写法在使用基于 TaskScheduler 的代理时,即如 TaskScheduler proxy 这样的代理方式时,如果应用内有基于 TaskScheduler 接口实现的 Bean,那么就会将该 Bean 进行 AOP 增强,如果应用内没有基于 TaskScheduler 接口实现的 Bean 时,就会注册 Bean 定义,即创建 ConcurrentTaskScheduler 用于任务调度,源码位于 RegisterDefaultTaskSchedulerPostProcessor.java at shedlock-parent-4.24.0:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void postProcessBeanDefinitionRegistry(@NonNull BeanDefinitionRegistry registry) throws BeansException {
ListableBeanFactory listableBeanFactory = (ListableBeanFactory) this.beanFactory;
if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(listableBeanFactory, TaskScheduler.class).length == 0) {
String[] scheduledExecutorsBeanNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(listableBeanFactory, ScheduledExecutorService.class);
if (scheduledExecutorsBeanNames.length != 1) {
logger.debug("Registering default TaskScheduler");
registry.registerBeanDefinition(DEFAULT_TASK_SCHEDULER_BEAN_NAME, rootBeanDefinition(ConcurrentTaskScheduler.class).getBeanDefinition());
if (scheduledExecutorsBeanNames.length != 0) {
logger.warn("Multiple ScheduledExecutorService found, do not know which one to use.");
}
} else {
logger.debug("Registering default TaskScheduler with existing ScheduledExecutorService {}", scheduledExecutorsBeanNames[0]);
registry.registerBeanDefinition(DEFAULT_TASK_SCHEDULER_BEAN_NAME,
rootBeanDefinition(ConcurrentTaskScheduler.class)
.addPropertyReference("scheduledExecutor", scheduledExecutorsBeanNames[0])
.getBeanDefinition()
);
}
}
}

到这里看起来一切正常,当应用启动后,我们通过 Lock assert 容易发现锁没有生效,原因就是在 EnableScheduling (Spring Framework 5.3.14 API) 文档中提到的精细化控制,当我们在配置类上实现了 SchedulingConfigurer 接口并进行了如下配置后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@EnableScheduling
public class AppConfig implements SchedulingConfigurer {

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskExecutor());
}

@Bean(destroyMethod="shutdown")
public Executor taskExecutor() {
return Executors.newScheduledThreadPool(100);
}
}

在文首的代码中,在 22 行会触发 configureTasks 方法的执行,导致调用了 taskRegistrarsetScheduler 方法设置了调度器,就不会再进入文首的 26 行的 if 代码块,那么 ShedLock 进行了 AOP 增强的调度器就不会被设置至 taskRegistrar 了,从而导致分布式定时任务控制没有生效。排查出原因后,我们移除掉了实现 SchedulingConfigurer 接口的逻辑,实例化了基于 ScheduledExecutorService 的 Bean 以同时规避单线程可能导致任务阻塞的问题。