Poison

分布式环境下防止定时任务重复运行的解决方案

在 Java 应用中,经常会使用 Spring 的 @Scheduled 注解用于处理定时任务,但是在集群环境中,这会导致被 @Scheduled 注解标记的方法在每个应用节点都被定时执行,在早期的工程代码中,开发使用一个定时任务 IP 去指定执行任务的节点 IP,若当前节点 IP 与指定的定时任务 IP 一致才执行定时任务,该方案需要在每个 @Scheduled 方法中硬编码 IP 判断逻辑,且在部分应用迁移至 k8s 后因节点 IP 不确定导致该方案不再可用,故查询了下相关的解决方案。

我们在部分工程引入了 ShedLock 用于处理分布式环境下的定时任务重复执行的问题,下面简单记录一下 ShedLock 的实现机制。

ShedLock 确保在分布式环境下你的定时任务在同一时刻仅执行一次。如果一个任务正在被一个节点执行,那么该节点会获取一把锁用于阻止相同的任务被其他节点执行,其他的节点会尝试获取锁,但是获取失败时不会等待,而是采用跳过的方式,以确保一个任务仅被一个节点执行。

ShedLock 使用外部存储用于任务协调,如:Mongo、JDBC database、Redis、Hazelcast、ZooKeeper 等。

需要注意的是,ShedLock 并不是一个分布式的任务调度器,并且 ShedLock 用于处理定时任务未准备用于并行执行但是可以安全的重复执行的情况,此外,锁是基于时间的,ShedLock 假定节点上的时钟是同步的。

对于 @SchedulerLock 注解,源码位于 SchedulerLock.java,其中有三个属性用于配置,name 属性用于指定锁的名字,lockAtMostFor 用于指定锁最长持有时间,主要是为了防止持有锁的节点宕机而锁未释放从而导致其他节点无法获取该锁的问题,这仅仅是一个保护性措施,大多数时候锁会在任务完成的时候释放,lockAtLeastFor 用于指定锁最少持有的时间,该属性主要为了处理任务执行时间小于节点间时钟偏移时长的情况,假设我们的方法执行耗时仅需要 0.1s,而我们 A 节点与节点 B 的时钟偏移为 1s, 那么配置为 @Scheduled(cron = "0 0 0 * * *") 的定时任务在 A 节点启动并执行后锁被释放,此后 B 节点于 A 节点执行完任务 0.9s 后获取锁将会获取成功,导致该定时任务会在 A 节点与 B 节点都被执行一次,如果我们将 lockAtLeastFor 设置为大于 1s 即可避免该问题。

在与 Spring 的集成中采用 AOP 拦截的方式实现,提供了两种代理模式,其中一种围绕被 @SchedulerLock 注解的方法实现,另一种通过代理 TaskScheduler 实现。关于该两种代理方式的配置源码位于 EnableSchedulerLock.java,注意源码中的默认拦截方式与注释中的不一致,注意辨别。

基于代理被 @SchedulerLock 注解的方法的实现

该实现方式的优点主要为它可以很好地与想要以某种方式改变默认 Spring 调度机制的其他框架一起使用。而因为该模式拦截的为被 @SchedulerLock 注解的方法,这会导致非定时任务触发的方法调用也会被拦截,从而使并发调用时仅有一个线程能够获得结果,而其他线程发起的调用不能获得结果,因为方法并未被执行,后续可以通过源码理解原理。

其中关键源码位于:MethodProxyScheduledLockAdvisor.java,源码解释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class MethodProxyScheduledLockAdvisor extends AbstractPointcutAdvisor {
// 此处创建切入点为被 @SchedulerLock 注解标记的方法(被废弃的 net.javacrumbs.shedlock.core.SchedulerLock 注解也算)
private final Pointcut pointcut = new ComposablePointcut(methodPointcutFor(net.javacrumbs.shedlock.core.SchedulerLock.class))
.union(methodPointcutFor(SchedulerLock.class));

private final Advice advice;

MethodProxyScheduledLockAdvisor(ExtendedLockConfigurationExtractor lockConfigurationExtractor, LockingTaskExecutor lockingTaskExecutor) {
this.advice = new LockingInterceptor(lockConfigurationExtractor, lockingTaskExecutor);
}

@NonNull
private static AnnotationMatchingPointcut methodPointcutFor(Class<? extends Annotation> methodAnnotationType) {
return new AnnotationMatchingPointcut(
null,
methodAnnotationType,
true
);
}

/**
* Get the Pointcut that drives this advisor.
*/
@NonNull
@Override
public Pointcut getPointcut() {
return pointcut;
}

@NonNull
@Override
public Advice getAdvice() {
return advice;
}

private static class LockingInterceptor implements MethodInterceptor {
private final ExtendedLockConfigurationExtractor lockConfigurationExtractor;
private final LockingTaskExecutor lockingTaskExecutor;

LockingInterceptor(ExtendedLockConfigurationExtractor lockConfigurationExtractor, LockingTaskExecutor lockingTaskExecutor) {
this.lockConfigurationExtractor = lockConfigurationExtractor;
this.lockingTaskExecutor = lockingTaskExecutor;
}

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> returnType = invocation.getMethod().getReturnType();
// 此处对返回类型进行判断,因为原语类型不能为 NULL, 无法处理方法执行被跳过的情况,所以直接抛出异常
if (returnType.isPrimitive() && !void.class.equals(returnType)) {
throw new LockingNotSupportedException("Can not lock method returning primitive value");
}

LockConfiguration lockConfiguration = lockConfigurationExtractor.getLockConfiguration(invocation.getThis(), invocation.getMethod()).get();
TaskResult<Object> result = lockingTaskExecutor.executeWithLock(invocation::proceed, lockConfiguration);

if (Optional.class.equals(returnType)) {
return toOptional(result);
} else {
return result.getResult();
}
}

private static Object toOptional(TaskResult<Object> result) {
if (result.wasExecuted()) {
return result.getResult();
} else {
return Optional.empty();
}
}
}
}
基于代理 TaskScheduler 的实现

该实现方式基于代理 Spring TaskScheduler 的部分方法实现,仅拦截 TaskScheduler 类部分方法的调用,其中关键源码位于:SchedulerProxyScheduledLockAdvisor.java,源码解释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class SchedulerProxyScheduledLockAdvisor extends AbstractPointcutAdvisor {
private final Pointcut pointcut = new TaskSchedulerPointcut();
private final Advice advice;
private static final Logger logger = LoggerFactory.getLogger(SchedulerProxyScheduledLockAdvisor.class);

SchedulerProxyScheduledLockAdvisor(LockManager lockManager) {
this.advice = new LockingInterceptor(lockManager);
}

/**
* Get the Pointcut that drives this advisor.
*/
@Override
public Pointcut getPointcut() {
return pointcut;
}

@Override
public Advice getAdvice() {
return advice;
}

private static class LockingInterceptor implements MethodInterceptor {
private final LockManager lockManager;

private LockingInterceptor(LockManager lockManager) {
this.lockManager = lockManager;
}


@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Object[] arguments = invocation.getArguments();
if (arguments.length >= 1 && arguments[0] instanceof Runnable) {
arguments[0] = new LockableRunnable((Runnable) arguments[0], lockManager);
} else {
logger.warn("Task scheduler first argument should be Runnable");
}
return invocation.proceed();
}
}

private static class TaskSchedulerPointcut implements Pointcut {
@Override
public ClassFilter getClassFilter() {
// 仅需要 isAssignableFrom TaskScheduler.class 的类
return new RootClassFilter(TaskScheduler.class);
}

@Override
public MethodMatcher getMethodMatcher() {
NameMatchMethodPointcut nameMatchMethodPointcut = new NameMatchMethodPointcut();
// 仅匹配以下方法名的方法
nameMatchMethodPointcut.setMappedNames("schedule", "scheduleAtFixedRate", "scheduleWithFixedDelay");
return nameMatchMethodPointcut;
}
}
}

以上对 ShedLock 的基本配置进行了简单介绍,最后再看看防止并行运行的核心代码,其中源码位于:DefaultLockingTaskExecutor.java,其中核心方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
@NonNull
public <T> TaskResult<T> executeWithLock(@NonNull TaskWithResult<T> task, @NonNull LockConfiguration lockConfig) throws Throwable {
Optional<SimpleLock> lock = lockProvider.lock(lockConfig);
String lockName = lockConfig.getName();

if (alreadyLockedBy(lockName)) {
logger.debug("Already locked '{}'", lockName);
return TaskResult.result(task.call());
} else if (lock.isPresent()) {
try {
LockAssert.startLock(lockName);
logger.debug("Locked '{}', lock will be held at most until {}", lockName, lockConfig.getLockAtMostUntil());
return TaskResult.result(task.call());
} finally {
LockAssert.endLock();
lock.get().unlock();
if (logger.isDebugEnabled()) {
Instant lockAtLeastUntil = lockConfig.getLockAtLeastUntil();
Instant now = ClockProvider.now();
if (lockAtLeastUntil.isAfter(now)) {
logger.debug("Task finished, lock '{}' will be released at {}", lockName, lockAtLeastUntil);
} else {
logger.debug("Task finished, lock '{}' released", lockName);
}
}
}
} else {
logger.debug("Not executing '{}'. It's locked.", lockName);
return TaskResult.notExecuted();
}
}

关于 LockAssert 断言的使用,有用户提了 issue: significance of LockAssert,作者也未给出明确的原因,我们先忽略断言的部分代码,上述核心逻辑可以精简如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
@NonNull
public <T> TaskResult<T> executeWithLock(@NonNull TaskWithResult<T> task, @NonNull LockConfiguration lockConfig) throws Throwable {
Optional<SimpleLock> lock = lockProvider.lock(lockConfig);
String lockName = lockConfig.getName();

if (lock.isPresent()) {
try {
logger.debug("Locked '{}', lock will be held at most until {}", lockName, lockConfig.getLockAtMostUntil());
return TaskResult.result(task.call());
} finally {
lock.get().unlock();
}
} else {
logger.debug("Not executing '{}'. It's locked.", lockName);
return TaskResult.notExecuted();
}
}

可以看出,尝试获取锁,如果获取到了锁,则执行任务,任务执行后释放锁,如果没有获取到锁,则说明该方法当前已由其他节点调用,该节点不再调用该方法。

以上即为 ShedLock 的核心逻辑,对于锁的实现部分,可以自行参考各种实现的源码,此处不再一一分析。

References

Spring Scheduled Task running in clustered environment