Poison

RateLimiter

我之前曾使用过阿里云离线版的 IP 地理位置库,在该 SDK 中,使用了 RateLimiter 去对用户的调用速率进行限制,记得上限为 15w,早期的版本采用了 tryAcquire 方法去尝试获取许可,即使用的非阻塞版本,该问题导致我们集成至 Spark 集群后,在进行离线计算时因为超过 QPS 上限使任务失败,后面向他们反馈该问题后,他们将限速实现调整为了基于 acquire 方法的阻塞版本,在限速的基础上支持了离线计算环境下的正常运行,本文简要记录 RateLimiter 的限流实现机制。

首先根据 RateLimiter 的官方文档我们知道,RateLimiter 支持以可配置的速率分发许可,它支持并发调用,且将限制来自所有线程的总调用率,但是不保证公平性。

如果没有额外的配置,许可证将以固定的速率分发,以每秒许可证的数量来定义。也可以将 RateLimiter 配置为具有预热期,在此期间,每秒发出的许可稳步增加,直到达到稳定的速率。

同时,文档中举了两个例子,一个是控制提交任务的速率,使其不超过 2 个每秒,代码如下:

1
2
3
4
5
6
7
final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // may wait
executor.execute(task);
}
}

另一个例子是限制生成数据流的速度,使其不超过 5000 个字节每秒,其中每个字节使用一个许可,代码如下:

1
2
3
4
5
6

final RateLimiter rateLimiter = RateLimiter.create(5000.0); // rate = 5000 permits per second
void submitPacket(byte[] packet) {
rateLimiter.acquire(packet.length);
networkService.send(packet);
}

需要注意的是,当前请求的许可数量永远不会影响请求本身的限流(对 acquire(1) 的调用和对 acquire(1000) 的调用将导致完全相同的限流,如果有的话),但它会影响下一个请求的限流。即,如果一个昂贵的任务到达空闲的 RateLimiter,它将立即被放行,但下一个请求将经历额外的限流,从而支付昂贵任务的成本。

以上是 RateLimiter 的 Java Doc 中的描述,我们再看看它的子类 SmoothRateLimiter,该类是一个抽象类,在该类的源码中,对 RateLimiter 的设计思想进行了阐述,源码可以参见:SmoothRateLimiter.java,此处进行简单的解释:

RateLimiter 的主要特点在于它的 “稳定速率”,即正常条件下允许的最大速度。这是通过限制传入的请求来强制执行的。

最简单的维持 QPS 的方式是追踪最后一次放行请求时的时间戳,并确保从那时起已经过去了 (1/QPS) 秒。比如,对于一个 QPS = 5 的限速器(5 个令牌每秒),如果我们确保在最后一次放行请求的 200ms 内没有放行过请求,那么我们就达到了预期的速度。如果一个请求在最后一次放行请求的 100ms 后到达,那么我们需要等待 100ms 才能获得许可。按照这个速度,提供 15 个新的许可需要 3 秒。

需要意识要这样的 RateLimiter 对过去这段时间的记忆非常肤浅,因为它只记住了最后一次请求。如果 RateLimiter 长时间未被使用,那么一个新请求到达时应该被立刻放行吗?以上的 RateLimiter 实现会忘记过去长时间的空闲。这将可能导致对资源的未充分利用或过分利用,取决于真实场景下不满足预期速率的后果。

过去这段时间的的未充分利用可能意味着可用资源的过剩。此时,RateLimiter 应该加速一段时间,以利用这些资源。这一点非常重要,比如当 RateLimiter 用于网络限速,过去未充分利用往往意味着存在几乎为空的缓冲区,此时可以立即填充。

另一方面,过去这段时间的未充分利用可能意味着负责处理请求的服务对未来的请求准备不足,即不能支持预期的速率。比如,当缓存项大量失效时,此时需要触发昂贵的操作,所以不能达到预期的速率,或者说服务器刚刚启动,忙于让自己达到预期的速度。

为了处理以上提到的这两种情况,我们添加了一个额外的维度,即 “过去未充分利用” 的维度,由变量 storedPermits 建模。当不存在未充分利用时,该变量为 0,它可以增长到 maxStoredPermits 以实现足够大的未充分利用。所以,当调用 acquire(permits) 尝试获取许可时,请求的许可来自:

  • storedPermits (如果存在)
  • freshPermits (storedPermits 抵扣后还需申请的许可数量)

用一个例子来解释这是如何工作的:

对于每秒产生 1 个令牌的 RateLimiter,当 RateLimiter 未使用时,每一秒将分配一个令牌,即 storedPermits 每秒加 1。假设我们 10 秒未使用 RateLimiter,那么 storedPermits 为 10(假设 maxStoredPermits >= 10)。此时,请求 acquire(3) 到达,即请求获取 3 个令牌。我们从 storedPermits 获取令牌并处理此请求,即将 storedPermits 中的令牌消耗 3 个,storedPermits 减少至 7。紧接着,假定请求 acquire(10) 到达,该请求需要获取 10 个令牌,我们从 storedPermits 中消耗 7 个,然后我们还差 3 个令牌,这 3 个令牌需要从 RateLimiter 中获取,这 3 个令牌就是刚提到的 freshPermits

我们已经知道提供 3 个新的令牌需要多长时间:如果速率是每秒 1 个令牌,那么提供 3 个新的令牌需要 3 秒。但是提供 7 个存储的令牌需要多长时间呢?根据我们上面的解释,答案不止一个。

如果我们倾向于处理资源未充分利用的情况,那么我们希望更快使用存储的令牌 storedPermits,因为未充分利用就等于存在可供获取的免费资源,此时从 storedPermits 中获取 7 个令牌需要的时间为 0s,这也是 SmoothBursty 类中 storedPermitsToWaitTime 的实现:

1
2
3
4
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}

如果我们倾向于处理请求溢出即处理资源不足的情况,那么我们应该更快使用 freshPermits,并将之前存储的令牌数 storedPermits 转换为时间,以平滑处理请求,这也是 SmoothWarmingUp 类中 storedPermitsToWaitTime 的实现,此处暂不粘贴源码,后面会详细分析。

最后,考虑一个每秒分配一个令牌的 RateLimiter,目前完全未使用过,当一个 acquire(100) 请求分配 100 个令牌时,此时我们应该等待 100 秒后再放行请求吗?为什么什么都不做就等待?一个更好的实现方式是立即允许请求,就像 acquire(1) 一样,然后推迟后续的请求。在当前版本的实现中,我们允许启动 RateLimiter 后就立即执行任务,并将后续的请求推迟 100 秒,因此我们允许任务立即完成而不是无所事事地等待。

以上的实现方式会导致 RateLimiter 不记得最后一次请求的时间,而是记住期望的下次请求的时间。这个期望的下次请求的时间可以立即告诉我们带特定超时时间的 tryAcquire(timeout) 方法调用是否能够获得足够的令牌。当我们观察到期望的下次请求的时间实际上是过去的时间时,那么那个时间点到当前时刻的时间差就是 RateLimiter 未被使用的时间,这段时间就可以被转换为 storedPermits,即我们在实际请求到达时才根据时间差去增加 storedPermits

以上是 SmoothRateLimiter 抽象类中对 RateLimiter 设计思想的阐述。简要总结一下,RateLimiter 使用的类似令牌桶的限流算法,然后内部使用 storedPermits 存储可以使用的令牌,令牌并不是随着时间的流逝实时往令牌桶中放入,而是在下次请求到达时,计算时间差一次性补充这段时间内生成的令牌,然后使用 maxPermits 存储了最大允许暂存的令牌数,即令牌桶中最多允许存储多少个令牌。当存在 storedPermits 往往意味着两种情况,一种是资源过剩,即没有请求来申请令牌,导致存储了令牌,另一种是资源过载,即请求方申请了令牌后不能达到设定的速率,这往往意味着计算资源不足以处理请求。所以 RateLimiter 提供了两种实现,即 SmoothBurstySmoothWarmingUp,类图如下:
RateLimiter

我们先看相对简单的 SmoothBursty 实现,英文 bursty 含义为爆裂、爆炸,顾名思义,猜测该实现是支持长时间未使用后进行突发请求的,该类的注释如下:

1
2
3
4
5
6
7
8
9
/**
* This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.
* The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in
* terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10
* seconds, we can save up to 2 * 10 = 20 permits.
*/
static final class SmoothBursty extends SmoothRateLimiter {
// omitted
}

根据注释我们知道,在 SmoothBursty 实现中,存储的令牌 storedPermits 被拿去使用时无需任何等待,直接取走这部分令牌使用即可。

我们从创建一个 RateLimiter 看起,当调用 RateLimiter.create(1) 时,实现如下:

1
2
3
4
5
6
7
8
9
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

即底层创建了一个 SmoothBursty 实例,并对该实例设置上我们所需的 QPS,即这里面的 permitsPerSecond 变量。在实例化 SmoothBursty 类时,调用了如下构造函数:

1
2
3
4
5
6
7
8
9
static final class SmoothBursty extends SmoothRateLimiter {
/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
final double maxBurstSeconds;

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
}

根据代码及注释我们知道,默认的 RateLimiter.create 方法创建的 SmoothBursty 实例只能允许存储 1 秒的许可数。我们继续看 rateLimiter.setRate(permitsPerSecond) 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class RateLimiter {
// Can't be initialized in the constructor because mocks don't call the constructor.
@CheckForNull private volatile Object mutexDoNotUseDirectly;

private Object mutex() {
Object mutex = mutexDoNotUseDirectly;
if (mutex == null) {
synchronized (this) {
mutex = mutexDoNotUseDirectly;
if (mutex == null) {
mutexDoNotUseDirectly = mutex = new Object();
}
}
}
return mutex;
}

public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}

abstract void doSetRate(double permitsPerSecond, long nowMicros);
}

RateLimiter 源码中,可以看到 setRate 方法实现使用了 synchronized 加锁,这也印证了文首提到的支持并发,支持限制来自所有线程的总调用率。且锁的对象的创建采用的使用了本地变量的双重检查加锁实现,这部分可以参考我之前的文章:Double Checked Locking。获取到锁后,调用了子类 SmoothRateLimiter 抽象类的 doSetRate 方法进行速率设置,其实现如下:

1
2
3
4
5
6
7
8
9
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}

abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);

最后再调用子类 SmoothBursty 中的 doSetRate(double permitsPerSecond, double stableIntervalMicros) 进行速率设置,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}

结合上面几块代码来看的话,我们知道其中的主要逻辑为根据 permitsPerSecond 计算出令牌分配的间隔 stableIntervalMicros,然后设置初始的 storedPermits0.0,注意 setRate 方法可以在 RateLimiter 创建后多次调用,所以后续调用时会根据 permitsPerSecond 的值对当前的 storedPermits 进行等比例缩放。在设置 stableIntervalMicros 变量的值之前,调用了 resync(nowMicros) 方法,该方法的源码如下:

1
2
3
4
5
6
7
8
9
/** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}

根据源码及注释可知,该方法的作用为更新 storedPermitsnextFreeTicketMicros 变量的值,这也是我们之前提到的令牌分配实现,并不存在一个线程持续地往令牌桶中放入令牌,而是在申请令牌请求到达时,根据时间差除以 coolDownIntervalMicros() 计算出这段时间生成的令牌数并添加至变量 storedPermits 中。coolDownIntervalMicros() 方法在 SmoothBursty 中的实现为:

1
2
3
4
@Override
double coolDownIntervalMicros() {
return stableIntervalMicros;
}

即直接使用的 stableIntervalMicros,即 1 / QPS,由 double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond 计算而来。以上就是 SmoothBursty 实例创建的主要逻辑,我们再看看调用 acquire(permits) 方法获取许可时的实现逻辑,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Acquires a single permit from this {@code RateLimiter}, blocking until the request can be
* granted. Tells the amount of time slept, if any.
*
* <p>This method is equivalent to {@code acquire(1)}.
*
* @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited
* @since 16.0 (present in 13.0 with {@code void} return type})
*/
public double acquire() {
return acquire(1);
}

public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

/**
* Reserves the given number of permits from this {@code RateLimiter} for future use, returning
* the number of microseconds until the reservation can be consumed.
*
* @return time in microseconds to wait until the resource can be acquired, never negative
*/
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}

/**
* Reserves next ticket and returns the wait time that the caller must wait for.
*
* @return the required wait time, never negative
*/
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}

跟随以上的调用链可知,主要实现逻辑为使用 reserve 计算出需要等待的微秒数 microsToWait,然后进行睡眠,最后返回等待的时长。其中 reserve 方法实现会调用至 SmoothRateLimiterreserveEarliestAvailable 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);

this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

以上的逻辑比较简单,主要的关键之处在于本次需要等待的时间是使用上一次请求时计算出的下一次预估请求时间 nextFreeTicketMicros 与当前时间 nowMicros 之差。而根据本次需要申请的令牌数,会将下一次请求的预估时间 nextFreeTicketMicros 计算出来,供下一次请求使用,而计算的逻辑即为先看有多少个存储的令牌可以使用,如果完全够用,则下次的请求时间 nextFreeTicketMicros 不会变化,而如果不够用则先将 storedPermits 全部抵扣,同时计算出需要新获取的令牌数 freshPermits,根据缺少的令牌数 freshPermits 计算出需要等待的时间 waitMicros,因为 SmoothBursty 中的 storedPermitsToWaitTime 方法实现为返回 0,那么实际上 SmoothBursty 中的 waitMicros 计算逻辑即为 freshPermits * stableIntervalMicros,将 waitMicros 累加至 nextFreeTicketMicros 即得出新的 nextFreeTicketMicros 值供下一次请求使用。

以上即为 SmoothBursty 的核心实现,个人认为整体思路还是不算复杂,其中 nextFreeTicketMicros 的作用可能需要加深理解。

下面再看看 SmoothWarmingUp 的实现,上面的 SmoothBursty 实现适用于资源过剩的场景,而 SmoothWarmingUp 则适用于资源过载的场景,根据该类的名称也可以知道是用于预热场景的,首先依然是看类上的注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* This implements the following function where coldInterval = coldFactor * stableInterval.
*
* <pre>
* ^ throttling
* |
* cold + /
* interval | /.
* | / .
* | / . ← "warmup period" is the area of the trapezoid between
* | / . thresholdPermits and maxPermits
* | / .
* | / .
* | / .
* stable +----------/ WARM .
* interval | . UP .
* | . PERIOD.
* | . .
* 0 +----------+-------+--------------→ storedPermits
* 0 thresholdPermits maxPermits
* </pre>
*
* Before going into the details of this particular function, let's keep in mind the basics:
*
* <ol>
* <li>The state of the RateLimiter (storedPermits) is a vertical line in this figure.
* <li>When the RateLimiter is not used, this goes right (up to maxPermits)
* <li>When the RateLimiter is used, this goes left (down to zero), since if we have
* storedPermits, we serve from those first
* <li>When _unused_, we go right at a constant rate! The rate at which we move to the right is
* chosen as maxPermits / warmupPeriod. This ensures that the time it takes to go from 0 to
* maxPermits is equal to warmupPeriod.
* <li>When _used_, the time it takes, as explained in the introductory class note, is equal to
* the integral of our function, between X permits and X-K permits, assuming we want to
* spend K saved permits.
* </ol>
*
* <p>In summary, the time it takes to move to the left (spend K permits), is equal to the area of
* the function of width == K.
*
* <p>Assuming we have saturated demand, the time to go from maxPermits to thresholdPermits is
* equal to warmupPeriod. And the time to go from thresholdPermits to 0 is warmupPeriod/2. (The
* reason that this is warmupPeriod/2 is to maintain the behavior of the original implementation
* where coldFactor was hard coded as 3.)
*
* <p>It remains to calculate thresholdsPermits and maxPermits.
*
* <ul>
* <li>The time to go from thresholdPermits to 0 is equal to the integral of the function
* between 0 and thresholdPermits. This is thresholdPermits * stableIntervals. By (5) it is
* also equal to warmupPeriod/2. Therefore
* <blockquote>
* thresholdPermits = 0.5 * warmupPeriod / stableInterval
* </blockquote>
* <li>The time to go from maxPermits to thresholdPermits is equal to the integral of the
* function between thresholdPermits and maxPermits. This is the area of the pictured
* trapezoid, and it is equal to 0.5 * (stableInterval + coldInterval) * (maxPermits -
* thresholdPermits). It is also equal to warmupPeriod, so
* <blockquote>
* maxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval)
* </blockquote>
* </ul>
*/
static final class SmoothWarmingUp extends SmoothRateLimiter {
// omitted
}

根据类上的注释,我们知道引入了 coldInterval 冷却时间这个变量,该变量的计算公式为:coldInterval = coldFactor * stableInterval,且在源码中,coldFactor 为固定值 3.0,所以可以认为:coldInterval = 3 * stableInterval。在注释提供的坐标图中,我们知道 storedPermits 位于 x 轴,当 RateLimiter 未使用时,随着时间流逝,storedPermits 向右增加,直到 maxPermits,当 RateLimiter 使用时,storedPermits 向左减少。当未使用时,我们以恒定的速度向右移动,向右移动的速率为 maxPermits / warmupPeriod,这确保 storedPermits0 增大至 maxPermits 的耗时为 warmupPeriod,注意 warmupPeriod 预热时间由我们创建 SmoothWarmingUp 实例时自行指定。当使用时,申请 k 个令牌所需的时间为 k 个令牌在 x 轴中这段长度对应的上侧图形的面积。

根据上侧的图形可知,当我们申请令牌时,从 maxPermitsthresholdPermits 所需的时间为 warmupPeriod,从 thresholdPermits0 的时间为 warmupPeriod / 2。这里这个 1/2 的关系是如何得到的呢?在高版本的 Guava 实现中没有很详细的解释,根据查询该类的提交记录:Modify SmoothRateLimiter so that the ratio between maximum-permits-pe… · google/guava@0b4e7e5 · GitHub 可知,存在一个潜在的条件,即我们希望 cooldownPeriod == warmupPeriod,而 cooldownPeriod 的面积即为从 0maxPermits 上侧长方形的面积,即 cooldownPeriod = maxPermits * stableInterval,而根据图我们知道 warmupPeriod 的面积的等于下方小长方形面积 (maxPermits - thresholdPermits) * stableInterval 加上上方三角形面积 (maxPermits - thresholdPermits) * (coldInterval - stableInterval) / 2,因为 coldInterval = 3 * stableInterval,可以得到三角形面积为 (maxPermits - thresholdPermits) * stableInterval,即在 WARM UP PERIOD 这个梯形中,上侧的三角形面积与下侧的小长方形面积是相等的,且我们希望 cooldownPeriod == warmupPeriod,那么易知左侧小长方形的面积与三角形面积相等,因为它们各自与 (maxPermits - thresholdPermits) * stableInterval 这个小长方形面积相加都等于 WARM UP PERIOD 这块梯形的面积。所以,我们就得到了从 thresholdPermits0 的时间为 warmupPeriod / 2 这一推论。

剩下的就是计算 thresholdsPermitsmaxPermits 的值了。

根据我们刚才的推论可知,从 thresholdPermits0 的时间为 warmupPeriod / 2,即这块长方形面积 thresholdPermits * stableInterval = warmupPeriod / 2,所以可以得到 thresholdPermits = 0.5 * warmupPeriod / stableInterval

对于图中 WARM UP PERIOD 这块梯形的面积,根据梯形面积公式:(上底 + 下底)* 高 / 2 可知 warmupPeriod = (stableInterval + coldInterval) * (maxPermits - thresholdPermits) / 2,所以可以得到 maxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval)

以上就是 SmoothWarmingUp 类上的注释,理解了该注释后,再来看源码就非常简单了,首先我们知道 storedPermits 向右移动的速度即令牌的生成速度为 maxPermits / warmupPeriod,那么我们根据时间间隔计算令牌产生了多少个时用到的 coolDownIntervalMicros()SmoothWarmingUp 中的实现如下:

1
2
3
4
@Override
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}

该方法被我们之前提到的 resync 方法所使用,用于计算两次获取令牌的时间点之前应该生成多少个令牌。

再看 RateLimiter 用于创建 SmoothWarmingUp 实例的静态方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Creates a {@code RateLimiter} with the specified stable throughput, given as "permits per
* second" (commonly referred to as <i>QPS</i>, queries per second), and a <i>warmup period</i>,
* during which the {@code RateLimiter} smoothly ramps up its rate, until it reaches its maximum
* rate at the end of the period (as long as there are enough requests to saturate it). Similarly,
* if the {@code RateLimiter} is left <i>unused</i> for a duration of {@code warmupPeriod}, it
* will gradually return to its "cold" state, i.e. it will go through the same warming up process
* as when it was first created.
*
* <p>The returned {@code RateLimiter} is intended for cases where the resource that actually
* fulfills the requests (e.g., a remote server) needs "warmup" time, rather than being
* immediately accessed at the stable (maximum) rate.
*
* <p>The returned {@code RateLimiter} starts in a "cold" state (i.e. the warmup period will
* follow), and if it is left unused for long enough, it will return to that state.
*
* @param permitsPerSecond the rate of the returned {@code RateLimiter}, measured in how many
* permits become available per second
* @param warmupPeriod the duration of the period where the {@code RateLimiter} ramps up its rate,
* before reaching its stable (maximum) rate
* @param unit the time unit of the warmupPeriod argument
* @throws IllegalArgumentException if {@code permitsPerSecond} is negative or zero or {@code
* warmupPeriod} is negative
*/
@SuppressWarnings("GoodTime") // should accept a java.time.Duration
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
return create(
permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(
double permitsPerSecond,
long warmupPeriod,
TimeUnit unit,
double coldFactor,
SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

可知,大体逻辑与 SmoothBursty 一致,只是设置了固定的 coldFactor3.0,然后跟随调用链至 doSetRate 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}

可知,根据我们传入的 warmupPeriod 对应的 warmupPeriodMicros 计算出 thresholdPermitsmaxPermits 的值,源码中的公式我们已经进行了推导,此处不再复述,除此之外还计算了斜线的斜率供后续计算面积使用。

然后我们再看看调用 acquire(permits) 方法获取许可时的实现逻辑,大体逻辑与 SmoothBursty 相同,不同点在于从 storedPermits 中获取许可时,SmoothBursty 的实现中从 storedPermits 中获取许可无需等待,而再当前的 SmoothWarmingUp 实现,因为存在 storedPermits 说明系统过载,那么获取许可的时候存储的令牌越多我们就应该以越慢的速度获取许可,storedPermits 转换为等待时间的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) {
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}

private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}

permitsToTime 方法的作用为根据 x 轴的 permits 计算对应的 y 轴上的等待时间的值。storedPermitsToWaitTime 整体的思路即为根据可用的 storedPermits 算出上方图形的面积,即为需要等待的时间,根据注释就能理解其含义,此处不再赘述。

以上,就是 RateLimiter 的核心实现,个人认为 SmoothWarmingUp 实现不看注释还是不容易理解,在理解其设计思想后再看代码就不再复杂了。