0%

限流及Google Guava RateLimiter解读

前言

限流,是服务或者应用对自身保护的一种手段,通过限制或者拒绝调用方的流量,来保证自身的负载。限流是保护高并发系统的三把利器之一,另外两个是缓存和降级。限流在很多场景中用来限制并发和请求量,比如说秒杀抢购,保护自身系统和下游系统不被巨型流量冲垮等。

1. 限流基础

1.1 限流的意义

其实很好理解的一个问题,为什么要限流,自然就流量过大了呗,一个对外服务有很多场景都会流量增大:

  • 业务用户量不断攀升
  • 各种促销
  • 网络爬虫
  • 恶意刷单

注意这个”大”,1000QPS(permits per second)大吗?5000QPS大吗?10000QPS大么?没有答案,因为没有标准,因此,”大”一定是和正常流量相比的大。流量一大,服务器扛不住,扛不住就挂了,挂了没法提供对外服务导致业务直接熔断。怎么办,最直接的办法就是从源头把流量限制下来,例如服务器只有支撑1000QPS的处理能力,那就每秒放1000个请求,自然保证了服务器的稳定,这就是限流。

1.2 常用的限流方式

  1. 限制总并发数(比如数据库连接池、线程池)
  2. 限制瞬时并发数(如nginx的limitconn模块,用来限制瞬时并发连接数,Java的Semaphore也可以实现)
  3. 限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limitreq模块,限制每秒的平均速率)
  4. 其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。

2. 限流算法

常用的限流算法有两种:漏桶算法和令牌桶算法

2.1 漏桶算法

漏桶算法的原理比较简单,水(请求)先进入到漏桶里,人为设置一个最大出水速率,漏桶以<=出水速率的速度出水,当水流入速度过大会直接溢出(拒绝服务):

漏桶算法

从上图中,我们可以看到,就像一个漏斗一样,进来的水量就好像访问流量一样,而出去的水量就像是我们的系统处理请求一样。当访问流量过大时,这个漏斗中就会积水,如果水太多了就会溢出。

漏桶算法的实现往往依赖于队列,请求到达如果队列未满则直接放入队列,然后有一个处理器按照固定频率从队列头取出请求进行处理。如果请求量大,则会导致队列满,那么新来的请求就会被抛弃。

漏桶算法

因此,这个算法的核心为:

  • 存下请求
  • 匀速处理
  • 多于丢弃

因此这是一种强行限制请求速率的方式,但是缺点非常明显,主要有两点:

  1. 无法面对突发的大流量—-比如请求处理速率为1000,容量为5000,来了一波2000/s的请求持续10s,那么后5s的请求将全部直接被丢弃,服务器拒绝服务,但是实际上网络中突发一波大流量尤其是短时间的大流量是非常正常的,超过容量就拒绝,非常简单粗暴
  2. 无法有效利用网络资源—-比如虽然服务器的处理能力是1000/s,但这不是绝对的,这个1000只是一个宏观服务器处理能力的数字,实际上一共5秒,每秒请求量分别为1200、1300、1200、500、800,平均下来qps也是1000/s,但是这个量对服务器来说完全是可以接受的,但是因为限制了速率是1000/s,因此前面的三秒,每秒只能处理掉1000个请求而一共打回了700个请求,白白浪费了服务器资源

所以,通常来说利用漏桶算法来限流,实际场景下用得不多。

2.2 令牌桶算法

令牌桶算法是网络流量整形(Traffic Shaping)和限流(Rate Limiting)中最常使用的一种算法,它可用于控制发送到网络上数据的数量并允许突发数据的发送。

从某种意义上来说,令牌桶算法是对漏桶算法的一种改进,主要在于令牌桶算法能够在限制调用的平均速率的同时还允许一定程度的突发调用。与漏桶算法不同的是,令牌桶算法是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。桶中存放的令牌数有最大上限,超出之后就被丢弃或者拒绝。当流量或者网络请求到达时,每个请求都要获取一个令牌,如果能够获取到,则直接处理,并且令牌桶删除一个令牌。如果获取不到,该请求就要被限流,拒绝服务,要么直接丢弃,要么在缓冲区等待。

令牌桶算法

整个的过程是这样的:

  • 系统以恒定的速率产生令牌,然后将令牌放入令牌桶中
  • 令牌桶有一个容量,当令牌桶满了的时候,再向其中放入的令牌就会被丢弃
  • 每次一个请求过来,需要从令牌桶中获取一个令牌,假设有令牌,那么提供服务;假设没有令牌,那么拒绝服务

那么,我们再看一下,为什么令牌桶算法可以防止一定程度的突发流量呢?可以这么理解,假设我们想要的速率是1000QPS,那么往桶中放令牌的速度就是1000个/s,假设第1秒只有800个请求,那意味着第2秒可以容许1200个请求,这就是一定程度突发流量的意思,反之我们看漏桶算法,第一秒只有800个请求,那么全部放过,第二秒这1200个请求将会被打回200个。

注意上面多次提到一定程度这四个字,这也是我认为令牌桶算法最需要注意的一个点。假设还是1000QPS的速率,那么5秒钟放1000个令牌,第1秒钟800个请求过来,第2~4秒没有请求,那么按照令牌桶算法,第5秒钟可以接受4200个请求,但是实际上这已经远远超出了系统的承载能力,因此使用令牌桶算法特别注意设置桶中令牌的上限即可。

总而言之,作为对漏桶算法的改进,令牌桶算法在限流场景下被使用更加广泛。

2.3 令牌桶和漏桶对比

  • 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
  • 令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;
  • 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率。

3. RateLimiter

3.1 平滑突发限流SmoothBursty

RateLimiter 的使用方法实例如下,使用 RateLimiter的静态方法创建一个限流器,设置每秒放置的令牌数为5个。返回的RateLimiter对象可以保证1秒内不会给超过5个令牌,并且以固定速率进行放置,达到平滑输出的效果。

1
2
3
4
5
6
public void testSmoothBursty() {
RateLimiter r = RateLimiter.create(5);
while (true) {
System.out.println("get 1 tokens: " + r.acquire() + "s");
}
}

create方法用于构建既定速度的实例,acquire方法使用阻塞方式获取令牌。

源代码中涉及到以下属性:

  • SleepingStopwatch 基础计时器
  • storedPermits 当前存储的令牌数量
  • maxPermits 最大能存储令牌数量
  • stableIntervalMicros 两个单元请求之间的间隔,以我们稳定的速度。 例如,每秒5个许可的稳定速率具有200ms的稳定间隔。
  • nextFreeTicketMicros 下一个请求(无论其大小)将被批准的时间。由于RateLimiter允许预消费,上次请求预消费令牌后,下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌。

RateLimiter的原理就是每次调用acquire时用当前时间和nextFreeTicketMicros进行比较,根据二者的间隔和添加单位令牌的时间间隔 stableIntervalMicros来刷新存储令牌数storedPermits。然后acquire会进行休眠,直到nextFreeTicketMicros。

3.1.1 create方法

create的源代码如下,创建具有指定稳定吞吐量的RateLimiter,以“permits per second”(通常称为QPS,每秒查询)给出,上述实例中,QPS = 5。并创建一个计时器SleepingStopwatch。

创建的RateLimiter确保在给定的每秒内平均发出的许可数不超过QPS,而持续的请求则每秒平滑地传播。 当传入的请求速率超过QPS时,速率限制器将每(1.0 / QPS)秒释放一个许可。 当未使用速率限制器时,将允许突发最高允许QPS的许可,随后的请求将以QPS的稳定速率进行平滑限制。

1
2
3
4
5
6
7
8
9
10
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

maxBurstSeconds固定为1,说明令牌桶中所能存储的的最大令牌数是1*QPS。

接着调用setRate方法,该方法初始化一些重要的参数:

1
2
3
4
5
6
7
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}

接着在同步代码块的互斥锁synchronized (mutex())中调用doSetRate,主要实现在SmoothRateLimiter中,初始化stableIntervalMicros,该字段表示1/QPS,即生产令牌的速率。:

1
2
3
4
5
6
7
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}

其中resync方法是一个关键的方法,在请求令牌时也会用到,后面会详细说明。

1
2
3
4
5
6
7
8
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}

从中可以看出,如果nowMicros大于nextFreeTicketMicros(初始为0L),会重新计算nextFreeTicketMicros和storedPermit的值。

接着调用doSetRate方法,该方法在SmoothBursty类中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}

初始化maxPermits和storePermits,后者永远不会大于前者。

到此,rateLimiter初始化完成。除了resync方法,在不重新设置rate的情况,其他方法不在处理请求时用到,暂时忽略。

3.1.2 acquire方法

acquire方法的源代码如下,在实例代码中,调用acquire()方法,申请令牌,无参数表示申请一个。接着调用acquire(int permits)方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
// 计算获取令牌所需等待的时间
long microsToWait = reserve(permits);
// 进行线程sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

acquire中首先调用reserve,reserve方法返回获取令牌所需要等待的时间,stopwatch阻塞当前线程,然后使用不SleepStopwatch休眠microsToWait时间,最后返回线程休眠的秒数。

如果microsToWait为0,表示立即返回。reserve需要获取锁才可以操作,这也是令牌桶线程安全的原因,以下操作都在同步代码块中。

1
2
3
4
5
6
7
final long reserve(int permits) {
checkPermits(permits);
// 由于涉及并发操作,所以使用synchronized进行并发操作
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}

在reserve中首先调用checkPermits方法对参数进行验证。然后在reserve中继续调用reserveAndGetWaitLength方法,获取可以使用令牌的时间。

1
2
3
4
5
6
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 计算能够获取到目标数量令牌时的时间
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 两个时间相减,获得需要等待的时间
return max(momentAvailable - nowMicros, 0);
}

在reserveAndGetWaitLength方法中,首先调用reserveEarliestAvailable,方法名说明了返回值的意义:即返回满足当前请求的最早的时钟,该值大于等于nowMicros。

reserveEarliestAvailable是刷新令牌数和下次获取令牌时间 nextFreeTicketMicros的关键函数。它有三个步骤,一是调用resync函数增加(刷新)令牌数,二是计算预支付令牌所需额外等待的时间,三是更新下次获取令牌时间 nextFreeTicketMicros和存储令牌数 storedPermits。

这里涉及 RateLimiter的一个特性,也就是可以预先支付令牌,并且所需等待的时间在下次获取令牌时再实际执行。

如何保证这一点的呢?我们看reserveEarliestAvailable方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 刷新令牌数,相当于每次acquire时在根据时间进行令牌的刷新
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
// 获取当前已有的令牌数和需要获取的目标令牌数进行比较,计算出可以目前即可得到的令牌数
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// freshPermits是需要预先支付的令牌,也就是目标令牌数减去目前即可得到的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend;
// 因为会突然涌入大量请求,而现有令牌数又不够用,因此会预先支付一定的令牌数
// waitMicros即是产生预先支付令牌的数量时间,则将下次要添加令牌的时间应该计算时间加上watiMicros
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 更新nextFreeTicketMicros,本次预先支付的令牌所需等待的时间让下一次请求来实际等待
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 更新令牌数,最低数量为0
this.storedPermits -= storedPermitsToSpend;
// 返回旧的nextFreeTicketMicros数值,无需为预支付的令牌多加等待时间
return returnValue;
}

void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
// 当前时间晚于nextFreeTicketMicros,所以刷新令牌和nextFreeTicketMicros
if (nowMicros > nextFreeTicketMicros) {
// coolDownIntervalMicros函数获取每机秒生成一个令牌,SmoothWarmingUp和SmoothBuresty的实现不同
// SmoothBuresty的coolDownIntervalMicros直接返回stableIntervalMicros
// 当前时间减去要更新令牌的时间获取时间间隔,再除以添加令牌时间间隔获取这段时间内要添加的令牌数
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
// 如果当前时间早于nextFreeTicketMicros,则获取令牌的线程要一直等待到nextFreeTicketMicros
// 该线程获取令牌所需额外等待的时间由下一次获取的线程来代替等待
nextFreeTicketMicros = nowMicros;
}
}

resync函数用于增加(刷新)存储令牌,核心逻辑就是 (nowMicros-nextFreeTicketMicros)/stableIntervalMicros。当前时间大于 nextFreeTicketMicros时进行刷新,否则直接返回。

这十多行代码是整个算法实现的核心,重点说明:

  1. 首先调用resync(nowMicros),更新storedPermits(当前存储的令牌数量)和nextFreeTicketMicros(下一个请求将被批准的时间)。如果nowMicros在nextFreeTicketMicros之后,nextFreeTicketMicros=nowMicros,并往storedPermits中增加这段时间能产生的令牌。
    返回值设置为当前的nextFreeTicketMicros。为什么要这样设置呢?因为如果nowMicros大于nextFreeTicketMicros,说明令牌桶肯定能满足需求(无论请求的令牌数目是多少),而resync方法已经修改了nextFreeTicketMicros值为nowMicros值,逐层返回给调用者时,等待时间为0,线程无需等待;反之,如果nowMicros小于等于nextFreeTicketMicros,说明请求过快,线程需要等待,等待的时间就是nextFreeTicketMicros-nowMicros。
  2. 接下来,storedPermitsToSpend代表令牌桶中已有的令牌数,可以用于当前的请求。但未必满足需求。
  3. 其次,freshPermits代表需要新生成的令牌数。如果storedPermits已经满足需求,则freshPermits为0。
  4. 再次,计算新生成令牌需要花费的时间waitMicros,这些需要后来者偿还。
  5. 然后修改nextFreeTicketMicros的值。
  6. 最后修改storedPermits值。

至此整个处理过程结束。

3.1.3 自问自答

  1. 问题1
  • Q:调用resync更新令牌数量时,若当前时间nowMicros早于nextFreeTicketMicros时(nowMicros < nextFreeTicketMicros),程序直接返回,不更新nextFreeTicketMicros。但在reserveEarliestAvailable函数中,并没有拦截后续代码的执行,如何实现限流的呢?

  • A:由于RateLimiter允许一定程度的突发,这里涉及 RateLimiter的一个特性,也就是可以预先支付令牌,并且所需等待的时间在下次获取令牌时再实际执行。当nowMicros < nextFreeTicketMicros时,resync函数并未对nextFreeTicketMicros进行更新(nextFreeTicketMicros = nowMicros),reserveEarliestAvailable返回的时间nextFreeTicketMicros > nowMicros,在reserveAndGetWaitLength函数中表现为momentAvailable > nowMicros,即microsToWait > 0,即acquire函数会对当前线程休眠max(momentAvailable - nowMicros, 0)时间,从而实现了等待效果。并且由于预支了令牌,reserveEarliestAvailable会同步更新当前的nextFreeTicketMicros值。

    以如下代码为实例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
        public static void test1() {
    RateLimiter rateLimiter = RateLimiter.create(2);
    for (int i = 0; i < 5; i++) {
    System.out.println("get 1 tokens: " + rateLimiter.acquire() + "s");
    }
    }
    /* output
    get 1 tokens: 0.0s
    get 1 tokens: 0.492575s
    get 1 tokens: 0.494368s
    get 1 tokens: 0.495195s
    get 1 tokens: 0.494448s
    */

    新建RateLimiter对象后,初始令牌数为0,第一个acquire()请求来时,预支了1个令牌,因此第二个acquire()需要偿还第一个acquire()请求预支的令牌后,再预支1个令牌,并且随着时间的推进,新增的令牌数用于赶不上消耗的令牌数,后续每一个acquire()请求都需要偿还先前一次预支的令牌时间。

  1. 问题2
  • Q:怎么理解RateLimiter为了应对突发流量的预先支付令牌特性?

  • A:以如下代码作为实例来理解预先支付令牌特性:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public static void test2() {
    RateLimiter rateLimiter = RateLimiter.create(2);
    System.out.println("get 4 tokens: " + rateLimiter.acquire(4) + "s");
    System.out.println("get 1 tokens: " + rateLimiter.acquire(1) + "s");
    System.out.println("get 1 tokens: " + rateLimiter.acquire(1) + "s");
    }
    /* output
    get 4 tokens: 0.0s
    get 1 tokens: 1.991897s
    get 1 tokens: 0.493449s
    */

    上述代码中,首先创建了一个permitsPerSecond = 2的RateLimiter对象,创建完成后拥有0个令牌。

    在发起acquire(4)请求时,本实例的stableIntervalMicros较大,此时还未来得及产生新的令牌,因此storedPermits = 0;在reserveEarliestAvailable方法中,可用令牌数storedPermitsToSpend = 0,预支令牌数freshPermits = 4,所以waitMicros为产生4个新令牌所需时间(在当前模式下storedPermitsToWaitTime恒为0),在nextFreeTicketMicros上加上waitMicros形成新的nextFreeTicketMicros,之后更新storedPermits = 0,但是reserveEarliestAvailable仍然返回的是旧的nextFreeTicketMicros数值,即nowMicros,因此获取4个令牌无需等待。

    在发起acquire(1)请求时,nextFreeTicketMicros为acquire(4)请求时的时间+预支4个令牌所需要的时间(1秒),因此nowMicros < nextFreeTicketMicros,不更新令牌数storedPermits和nextFreeTicketMicros,此时令牌数storedPermits = 0。在reserveEarliestAvailable中,storedPermitsToSpend = 1, freshPermits = 1,因此需要预支1个令牌,但实际reserveEarliestAvailable返回的是旧的nextFreeTicketMicros数值,即产生4个新令牌所需时间,即2秒。因此在发起acquire(1)请求时,程序需sleep约2秒。

    在发起第二个acquire(1)请求时,偿还上一次acquire(1)请求时预支的1个令牌产生需要的时间,即0.5秒。

    实例2:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public static void test3() {
    RateLimiter rateLimiter = RateLimiter.create(2);
    System.out.println("get 2 tokens: " + rateLimiter.acquire(2) + "s");
    try {
    Thread.sleep(1000);
    } catch (Exception e) {}
    System.out.println("get 1 tokens: " + rateLimiter.acquire(1) + "s");
    System.out.println("get 1 tokens: " + rateLimiter.acquire(1) + "s");
    }
    /* output
    get 2 tokens: 0.0s
    get 1 tokens: 0.0s
    get 1 tokens: 0.493068s
    */

    经过上面的说明,实例2就很好理解了。acquire(2)请求首先预支了2个令牌,之后程序sleep1秒;在第一个acquire(1)时,nowMicros = nextFreeTicketMicros,无需sleep程序等待令牌,storedPermits = 0,预支1个令牌,更新nextFreeTicketMicros增加产生1个令牌的时间;在第二个acquire(1)时,还未满足增加产生1个令牌的时间,因此需要偿还前一次预支1个令牌的时间。

  1. 问题3
  • Q:使用create函数创建后,限流器中存储的令牌数量为多少,即storedPermits为多少?
  • A:等于0。虽然在创建过程中,由于nowMicros > nextFreeTicketMicros(=0),且maxBurstSeconds默认为1.0L,因此在doSetRate(double permitsPerSecond, long nowMicros)中调用resync得到的storedPermits == maxPermits == 2,但是后来又调用了SmoothBursty.doSetRate(permitsPerSecond, stableIntervalMicros)了方法,将storedPermits重新设置为0。

3.2 平滑预热限流SmoothWarmingUp

RateLimiter的 SmoothWarmingUp是带有预热期的平滑限流,它启动后会有一段预热期,逐步将分发频率提升到配置的速率,就像车辆的启动阶段,先从1档起步,逐渐加快,2档,3档一直到最快的6档。

比如下面代码中的例子,创建一个平均分发令牌速率为2,预热期为3分钟。由于设置了预热时间是3秒,令牌桶一开始并不会0.5秒发一个令牌,而是形成一个平滑线性下降的坡度,频率越来越高,在3秒钟之内达到原本设置的频率,以后就以固定的频率输出。这种功能适合系统刚启动需要一点时间来“热身”的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void test4() {
RateLimiter r = RateLimiter.create(2, 3, TimeUnit.SECONDS);
for (int i = 0; i < 2; i++) {
{
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("end");
/*output:
get 1 tokens: 0.0s
get 1 tokens: 1.325543s
get 1 tokens: 0.993067s
get 1 tokens: 0.661552s 上边三次获取的时间相加正好为3秒
end
get 1 tokens: 0.497426s 正常速率0.5秒一个令牌
get 1 tokens: 0.49525s
get 1 tokens: 0.500081s
get 1 tokens: 0.499846s
end
*/
}
}
}

SmoothWarmingUp实现预热缓冲的关键在于其分发令牌的速率会随时间和令牌数而改变,速率会先慢后快。表现形式如下,令牌刷新的时间间隔由长逐渐变短。等存储令牌数从maxPermits到达thresholdPermits时,发放令牌的时间价格也由coldInterval降低到了正常的stableInterval。

SmoothWarmingUp

X轴代表令牌桶存储的token数,Y轴代表限流的速率,单位是一个token的生成速率。XY代表了坐标轴围成的矩形面积,也就是(token生产速率)*(token数量),它有什么含义呢?是的,它代表了生产n个token的时长,这里使用了积分进行计算。右边的梯型面积表示了热身区的token生产总时长,左下边的长方形面积表示稳定期的token生产时长。

系统刚启动状态,初始存储的令牌数量storedPermits = maxPermits,此时处于冷状态,因此上图应该从右往左理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
return create(
permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(
double permitsPerSecond,
long warmupPeriod,
TimeUnit unit,
double coldFactor,
SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

以上述例子说明,RateLimiter r = RateLimiter.create(2, 3, TimeUnit.SECONDS); 2表示QPS,3代表warmupPeriod为3秒,冷却因子coldFactor默认为3.0。

因此稳定速率stable interval = 1/QPS = 0.5秒/个,冷却速率cold interval = coldFactor * stable interval = 1.5秒/个。

通过如下代码设置初始化令牌桶参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}

其中,

  • 稳定速率stableIntervalMicros = 0.5秒/个
  • 冷却速率coldIntervalMicros = stableIntervalMicros * coldFactor = 1.5秒/个
  • 临界令牌数thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros = 0.5 * 3秒 / 0.5秒/个 = 3个
  • 最大令牌数maxPermits =
        thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros) = 3个 + 2.0 * 3秒 / (0.5秒/个 + 1.5秒/个)= 6.0个
  • 预热期间得斜率slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits) = (1.5秒/个 - 0.5秒/个) / (6个 - 3个) = 1/3(秒/个^2)
  • 初始存储的令牌数量storedPermits = maxPermits = 6个

右侧梯形面积:
S = (coldIntervalMicros + stableIntervalMicros) * (maxPermits - thresholdPermits) / 2

其积分意义为令牌数量从临界令牌数thresholdPermits增长到最大令牌数maxPermits的时间,即从冷却速率coldIntervalMicros变为稳定速率stableIntervalMicros的时间,即预热时间,即

warmupPeriodMicros = (coldIntervalMicros + stableIntervalMicros) * (maxPermits - thresholdPermits) / 2

整理得:

最大令牌数maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros)

在平滑预热限流SmoothWarmingUp中,与平滑突发限流SmoothBursty的获取令牌函数acquire的区别在于,storedPermitsToWaitTime函数不同,平滑突发限流SmoothBursty的storedPermitsToWaitTime函数返回值恒为0,而平滑预热限流SmoothWarmingUp的源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
// SmoothWarmingUp,等待时间就是计算上图中梯形或者正方形的面积
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
// 当前permits超出阈值的部分
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
// 如果当前存储的令牌数超出thresholdPermits
if (availablePermitsAboveThreshold > 0.0) {
// 在阈值右侧并且需要被消耗的令牌数量
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
// 梯形的上底+下底
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
// 右侧梯形部分的面积
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
// 减去已经获取的在阈值右侧的令牌数
permitsToTake -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
// 左侧平稳时期的面积,正好是长乘宽
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}

4. Demo

为了方便理解,编写了一个Demo,构建一个线程池,循环执行线程池中的线程,不过执行线程之前首先要获取到令牌。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package guava;

import com.google.common.util.concurrent.RateLimiter;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @author liudongjin
* @date 2021/2/25 14:55
*/
public class SmoothBurstyDemo {
private static AccessLimitService accessLimitService;

public static void main(String[] args) {
accessLimitService = new AccessLimitService(2);
double beginTime = System.currentTimeMillis();
// System.out.println(beginTime);
List<Runnable> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
tasks.add(new UserRequest(i));
}
ExecutorService threadPool = Executors.newCachedThreadPool();
for (Runnable runnable : tasks) {
try {
Thread.sleep(100);
} catch (Exception e) {
}

System.out.print("Time: " + (System.currentTimeMillis() - beginTime) + " ms");
if (accessLimitService.tryAcquire()) {
System.out.println("获取到令牌");
threadPool.execute(runnable);
} else {
System.out.println("未获取到令牌,无法执行");
}
}
try {
threadPool.shutdown();
if(!threadPool.awaitTermination(5 * 1000, TimeUnit.MILLISECONDS)){
threadPool.shutdownNow();
}
} catch (InterruptedException e) {
threadPool.shutdownNow();
}
}
}

class AccessLimitService {
private double permitsPerSecond = 10;
private RateLimiter seckillRateLimiter;

AccessLimitService() {
this.seckillRateLimiter = RateLimiter.create(permitsPerSecond);
}

AccessLimitService(double permitsPerSecond) {
this.seckillRateLimiter = RateLimiter.create(permitsPerSecond);
}

public boolean tryAcquire() {
return seckillRateLimiter.tryAcquire();
}
}

class UserRequest implements Runnable {
private int id;

public UserRequest(int id) {
this.id = id;
}

@Override
public void run() {
System.out.println("运行线程:" + id);
}
}

output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Time: 104.0 ms获取到令牌
运行线程:0
Time: 211.0 ms未获取到令牌,无法执行
Time: 311.0 ms未获取到令牌,无法执行
Time: 413.0 ms未获取到令牌,无法执行
Time: 516.0 ms获取到令牌
运行线程:4
Time: 620.0 ms未获取到令牌,无法执行
Time: 722.0 ms未获取到令牌,无法执行
Time: 822.0 ms未获取到令牌,无法执行
Time: 922.0 ms未获取到令牌,无法执行
Time: 1022.0 ms获取到令牌
运行线程:9

Process finished with exit code 0

5. 总结

RateLimiter是线程安全的,所以在并发环境中可以直接使用,而无需额外的lock或者同步。

考虑到RateLimiter内部的同步锁,我们通常在实际业务开发中,每个资源(比如URL)使用各自的RateLimiter而不是公用一个,占用的内存也不大。

这个限流器内部无额外的线程,也没有其他的数据结构用来存储tickets实体,所以它非常的轻量级,这也是优势所在。

RateLimiter最大的问题,就是acquire方法总会成功,内部的tickets时间点会向后推移; 如果并发很高,严重超过rate阈值时,后续被限流的请求,其等待时间将会基于时间线累加,导致等待时间不可控,这和信号量同病相怜。

为了避免上面的问题,我们通常先使用tryAcquired检测,如果可行再去acquire;如果令牌不足,适当拒绝。所以 基于RateLimiter,并没有内置的拒绝策略,这一点需要我们额外开发。

我们不能简单依赖于acquire方法,来实现限流等待,否则这可能带来严重问题。我们通常需要封装RateLimiter,并使用额外的属性记录其是否“处于限流状态”、“已经推延的tickets时间点”,如果“已经推延的时间点非常遥远”且超过可接受范围,则直接拒绝请求。简单来说,封装acquire方法,增加对请求可能等待时间的判断,如果超长,则直接拒绝。

RateLimiter存在一个很大的问题,就是几乎没法扩展:子类均为protected。反射除外哦。

参考