一、写在最前

大张旗鼓
的双12已从前小半个月了,法式猿的我坐在办公桌上思量,双12这么大的拜候量,这群电商是怎样扛住的,接口分分钟会变得不可用,并激发连锁反应招致全部
零碎溃散。好吃懒做的小编,被可怕的好奇心驱使着去调研流量把持算法。好奇心害死猫,才有了这篇文章。

二、流量把持算法简介

流量把持在盘算机规模称为过载庇护。何为过载庇护?所谓“过载”,即需求超过了负载威力;而“庇护”则是指当“过载”发生

了,采用须要的措施庇护本身不受“损伤”。在盘算机规模,尤其是分布式零碎规模,“过载庇护”是一个重要的观点。一个不具有
“过载庇护”功效的零碎,是十分风险和脆弱的,很也许由于霎时的压力激增,引起“雪崩效应”,招致零碎的各个局部都同时溃散,中止办事。这就似乎在不保险丝的庇护下,电压突然变高,招致所有的电器都邑被破坏
同样,“过载庇护”功效是零碎的“保险丝”。

如今互联网规模,也自创了这一思路扛住双12, 把持网络数据传输的速率,使流量以比拟均匀的速率向外发送。 终究
完成优化性能,淘汰延迟和普及带宽等。

三、经常运用的限流算法

经常运用的限流算法有两种:漏桶算法和令牌桶算法。本篇文章将先容本身造轮子限流算法、漏桶算法和令牌桶算法。

3.1 本身造轮子限流算法

作为一名小白,我是不愿意本身造轮子的,然而真要早轮子,有一个简略粗暴的思路:
1)设置单元时光T(如10s)内的最大拜候量ReqMax,在单元时光T内维护计数器Count;
2)当乞求抵达时,判别时光能否进入下一个单元时光;
3)若是是,则重置计数器为0;
4)若是不是,计数器Count++,并判别计数器Count能否超过最大拜候量ReqMax,如超过,则谢绝拜候。

long timeStamp = getNowTime();
int reqCount = 0; 
const int maxReqCount = 10000;//时光周期内最大乞求数 
const long effectiveDuration = 10;//时光把持周期  

public static bool control(){
     long now = getNowTime();
     if (now < timeStamp + effectiveDuration){//在时光把持范围内
         reqCount++; 
         return reqCount > maxReqCount;//以后时光范围内超过最大乞求把持数
     }else{
         timeStamp = now;//超时后重置
         reqCount = 0; 

         return true;
     } 
} 

public static int getNowTime(){
    long time = System.currentTimeMillis();
    return   (int) (time/1000);
}

该算法完成看似确实完美的完成了“单元时光内最大拜候量把持”,但它在两个单元时光的临界值上的处置是有缺陷的。如:设需求把持的最大乞求数为1w, 在第一个单元时光(0-10s)的最初一秒(即第9s)里抵达的乞求数为1w,接下来第二个单元时光(10-20s)的第一秒(即第10s)里抵达乞求数也是1w,由于超时重置发生

在两个单元时光之间,以是这2w个乞求都将经由进程把持,也等于说在2s里处置2w个乞求,与咱们设置的10s里1w个乞求的设想是相违背。

学术一点的说法是该算法处置乞求不敷滑润,不克不及很好的餍足限流需求。

3.2 漏桶算法

漏桶算法思路很简略,乞求先进入到漏桶里,漏桶以固定的速率出水,也等于处置乞求,当水加的过快,则会间接溢出,也等于谢绝乞求,能够看出漏桶算法能强行制约数据的传输速率。

漏桶算法

long timeStamp = getNowTime(); 
int capacity = 10000;// 桶的容量
int rate = 1;//水漏出的速率 
int water = 100;//以后水量  

public static bool control() {   
    //先履行
漏水,由于rate是固定的,以是能够认为“时光距离*rate”即为漏出的水量
    long  now = getNowTime();
    water = Math.max(0, water - (now - timeStamp) * rate);
    timeStamp = now;

    if (water < capacity) { // 水还未满,加水
        water ++; 
        return true; 
    } else { 
        return false;//水满,谢绝加水
   } 
} 

该算法很好的解决了时光鸿沟处置不敷滑润的问题,由于在每次乞求进桶前都将履行
“漏水”的驾御,再无鸿沟问题。

然而对良多场景来说,除了要求能够制约数据的均匀传输速率外,还要求许可某种水平的突发传输。这时漏桶算法也许就不合适了,令牌桶算法更为合适

3.3 令牌桶算法

令牌桶算法的原理是零碎会以一个恒定的速率往桶里放入令牌,而若是乞求需求被处置,则需求先从桶里获得
一个令牌,当桶里不令牌可取时,则谢绝办事。

令牌桶算法

3.3.1 原理

令牌桶是网络设备的外部

暮气存储池,而令牌则是以给定速率添补令牌桶的虚拟信息包。每一个抵达的令牌都邑从数据队列领出照应的数据包举行发送,发送完数据后令牌被删除。

乞求注解(RFC)中界说了两种令牌桶算法——单速率三色标识表记标帜算法和双速率三色标识表记标帜算法,其评价了局都是为报文打上红、黄、绿三色标识表记标帜。QoS会依照报文的色彩
,设置报文的抛弃优先级,其中单速率三色标识表记标帜比拟关心报文尺寸的突发,而双速率三色标识表记标帜则关注速率上的突发,两种算法都可工作于色盲模式和非色盲模式。如下结合这两种工作模式先容一下RFC中所描绘的这两种算法。

1)单速率三色标识表记标帜算法
网络工程师义务小组(IETF)的RFC文件界说了单速率三色标识表记标帜算法,评价依照
如下3个参数:许诺拜候速率(CIR),即向令牌桶中添补令牌的速率;许诺突发尺寸(CBS),即令牌桶的容量,每次突发所许可的最大流量尺寸(注:设置的突发尺寸必需大于最大报文长度);超额突发尺寸(EBS)。

一般采用双桶布局:C桶和E桶。Tc默示C桶中的令牌数,Te默示E桶中令牌数,两桶的总容量别离为CBS和EBS。初始形态时两桶是满的,即Tc和Te初始值别离等于CBS和EBS。令牌的发生

速率是CIR,通常是先往C桶中增加令牌,等C桶满了,再往E桶中增加令牌,当两桶都被填满时,新发生

的令牌将会被抛弃。

色盲模式下,假设抵达的报文长度为B。若报文长度B小于C桶中的令牌数Tc,则报文被标识表记标帜为绿色,且C桶中的令牌数淘汰B;若Tc<B <Te,则标识表记标帜为黄色,E和C桶中的令牌数均淘汰B;若B >Te,标识表记标帜为白色,两桶总令牌数都不淘汰。

在非色盲模式下,若报文已被标识表记标帜为绿色或B <Tc,则报文被标识表记标帜为绿色,Tc淘汰B;若报文已被标识表记标帜为黄色或Tc<B <Te,则标识表记标帜为黄色,且Te淘汰B;若报文已被标识表记标帜为白色或B >Te,则标识表记标帜为白色,Tc和Te都不淘汰。

2)双速率三色标识表记标帜算法
IETF的RFC文件界说了双速率三色算法,次要是依照4种流量参数来评价:CIR、CBS、峰值信息速率(PIR),峰值突发尺寸(PBS)。前两种参数与单速率三色算法中的含义相反,PIR这个参数只在交换机上才有,路由器不这个参数。该值必需不小于CIR的设置值,若是大于CIR,则速率制约在CIR于PRI之间的一个值。

与单速率三色标识表记标帜算法差别,双速率三色标识表记标帜算法的两个令牌桶C桶和P桶添补令牌的速率差别,C桶添补速率为CIR,P桶为PIR;两桶的容量别离为CBS和PBS。用Tc和Tp默示两桶中的令牌数目,初始形态时两桶是满的,即Tc和Tp初始值别离等于CBS和PBS。

色盲模式下,若是抵达的报文速率大于PIR,超过Tp+Tc局部没法失掉令牌,报文被标识表记标帜为白色,未超过Tp+Tc而从P桶中获得
令牌的报文标识表记标帜为黄色,从C桶中获得
令牌的报文被标识表记标帜为绿色;当报文速率小于PIR,大于CIR时,报文不会得不到令牌,但超过Tp局部报文将从P桶中获得
令牌,被标识表记标帜为黄色报文,从C桶中获得
令牌的报文被标识表记标帜为绿色;当报文速率小于CIR时,报文所需令牌数不会超过Tc,只从C桶中获得
令牌,以是只会被标识表记标帜为绿色报文。

在非色盲模式下,若是报文已被标识表记标帜为白色或超过Tp+Tc局部没法失掉令牌的报文,被标识表记标帜为白色;若是标识表记标帜为黄色或超过Tc未超过Tp局部报文记为黄色;若是报文被标识表记标帜为绿或未超过Tc局部报文,被标识表记标帜为绿色。

3.3.2 算法描绘与完成
  • 假如用户设置的均匀发送速率为r,则每隔1/r秒一个令牌被插手到桶中(每秒会有r个令牌放入桶中);
  • 假设桶中至多能够寄存b个令牌。若是令牌抵达时令牌桶已满了,那末
    这个令牌会被抛弃;
  • 当一个n个字节的数据包抵达时,就从令牌桶中删除n个令牌(差别巨细的数据包,耗损的令牌数目不同样),而且数据包被发送到网络;
  • 若是令牌桶中少于n个令牌,那末
    不会删除令牌,而且认为这个数据包在流量制约之外(n个字节,需求n个令牌。该数据包将被缓存或抛弃);
  • 算法许可最长b个字节的突发,但从长期运转了局看,数据包的速率被制约成常量r。对在流量制约外的数据包能够以差别的体式格局处置:
    1)它们能够被抛弃;
    2)它们能够排放在队列中以方便令牌桶中积累
    了足够多的令牌时再传输;
    3)它们能够接续发送,但需求做特殊标识表记标帜,网络过载的时分将这些特殊标识表记标帜的包抛弃。
long timeStamp=getNowTime(); 
int capacity; // 桶的容量 
int rate ;//令牌放入速率
 int tokens;//以后水量  

bool control() {
   //先履行
增加令牌的驾御
   long  now = getNowTime();
   tokens = max(capacity, tokens+ (now - timeStamp)*rate); 
   timeStamp = now;   //令牌已用完,谢绝拜候

   if(tokens<1){
     return false;
   }else{//还有令牌,支付令牌
     tokens--;
     retun true;
   }
 } 

令牌桶算法是网络流量整形和速率制约中最常运用的一种算法。典型情形下,令牌桶算法用来把持发送到网络上的数据的数目,并许可突发数据的发送。

巨细固定的令牌桶可自行以恒定的速率源源不竭地发生

令牌。若是令牌不被耗损,或被耗损的速率小于发生

的速率,令牌就会不竭地增多,直到把桶填满。后面再发生

的令牌就会从桶中溢出。最初桶中能够保存的最大令牌数永远不会超过桶的巨细。

传递到令牌桶的数据包需求耗损令牌。差别巨细的数据包,耗损的令牌数目不同样。令牌桶这类把持机制基于令牌桶中能否具有令牌来唆使
什么时分能够发送流量。令牌桶中的每一个令牌都代表一个字节。若是令牌桶中具有令牌,则许可发送流量;而若是令牌桶中不具有令牌,则不许可发送流量。因而,若是突发门限被合理地设置而且令牌桶中有足够的令牌,那末
流量就能够以峰值速率发送。

四、限流工具类RateLimiter

谷歌开源工具包guava供应了限流工具类RateLimiter,该类基于“令牌桶算法”,十分不便运用。RateLimiter经经常运用于制约对一些物理资源或逻辑资源的拜候速率。它支持两种获得
permits接口,一种是若是拿不到即时前往false,一种会壅塞等候一段时光看能不克不及拿到。

4.1 RateLimiter test
//多义务履行
,但每秒履行
不超过2个义务
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
        rateLimiter.acquire(); // may wait
        executor.execute(task);
    }
}
//以每秒5kb内的速率发送
final RateLimiter rateLimiter = RateLimiter.create(5000.0);
void submitPacket(byte[] packet) {
    rateLimiter.acquire(packet.length);
    networkService.send(packet);
}
//以非壅塞的方式抵达降级
if(limiter.tryAcquire()) { //未乞求到limiter则即时前往false
    doSomething();
}else{
    doSomethingElse();
}
4.2 次要接口

RateLimiter其实是一个abstract类,然而它供应了几个static体式格局用于创立RateLimiter:

/**
* 创立一个不变输入令牌的RateLimiter,包管了均匀每秒不超过permitsPerSecond个乞求
* 当乞求到来的速率超过了permitsPerSecond,包管每秒只处置permitsPerSecond个乞求
* 当这个RateLimiter运用缺乏


不置可否(即乞求到来速率小于permitsPerSecond),会囤积至多permitsPerSecond个乞求
*/
public static RateLimiter create(double permitsPerSecond);

/**
* 创立一个不变输入令牌的RateLimiter,包管了均匀每秒不超过permitsPerSecond个乞求
* 还包含一个热身期(warmup period),热身期内,RateLimiter会滑润的将其释放令牌的速率加大,直到起抵达最大速率
* 同样,若是RateLimiter在热身期缺乏


不置可否够的乞求(unused),则起速率会逐步降低到冷却形态
* 
* 设计这个的企图是为了餍足那种资源供应方需求热身时光,而不是每次拜候都能供应不变速率的办事的情形(比方带缓存办事,需求按期刷新缓存的)
* 参数warmupPeriod和unit决议了其从冷却形态抵达最大速率的时光
*/
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit);

供应了两个获得
令牌的体式格局,不带参数默示获得
一个令牌。若是不令牌则一向等候,前往等候的时光(单元为秒),不被限流则间接前往0.0:

public double acquire();

public double acquire(int permits);

测验考试获得
令牌,分为待超时时光和不带超时时光两种:

public boolean tryAcquire();
//测验考试获得
一个令牌,即时前往
public boolean tryAcquire(int permits);
public boolean tryAcquire(long timeout, TimeUnit unit);
//测验考试获得
permits个令牌,带超时时光
public boolean tryAcquire(int permits, long timeout, TimeUnit unit);
4.3 RateLimiter的设计

RateLimiter的次要功效等于供应一个不变的速率,完成体式格局等于经由进程制约乞求流入的速率,比方盘算乞求等候合适的时光阈值。

完成QPS速率的最简略的体式格局等于记住上一次乞求的最初受权时光,而后包管1/QPS秒内不许可乞求进入。比方QPS=5,若是咱们包管最初一个被受权乞求以后
的200ms的时光内不乞求被受权,那末
咱们就抵达了预期的速率。若是一个乞求如今曩昔然而最初一个被受权乞求是在100ms以前,那末
咱们就要求以后这个乞求等候100ms。依照这个思路,乞求15个新令牌(许可证)就需求3秒。

有一点很重要:上面这个设计思路的RateLimiter记忆十分的浅,它的脑容量十分的小,只记得上一次被受权的乞求的时光。若是RateLimiter的一个被受权乞求q以前很长一段时光不被运用会怎样样?这个RateLimiter会立马遗忘从前这一段时光的哄骗缺乏

不置可否,而只记得刚刚的乞求q。

从前一段时光的哄骗缺乏

不置可否意味着有过剩的资源是能够哄骗的。这类情形下,RateLimiter应当加把劲(speed up for a while)将这些过剩的资源哄骗起来。比方在向网络中发生

数据的场景(限流),从前一段时光的哄骗缺乏

不置可否也许意味着网卡缓冲区是空的,这类场景下,咱们是能够减速发送来将这些进程的资源哄骗起来。

另一方面,从前一段时光的哄骗缺乏

不置可否也许意味着处置乞求的办事器对行将到来的乞求是预备缺乏

不置可否的(less ready for future requests),比方由于很长一段时光不乞求以后办事器的cache是陈旧的,进而招致行将到来的乞求会触发一个低廉的驾御(比方重新刷新全量的缓存)。

为了处置这类情形,RateLimiter中增加了一个维度的信息,等于从前一段时光的哄骗缺乏

不置可否(past underutilization),代码中运用storedPermits变量默示。当不哄骗缺乏

不置可否这个变量为0,最大能抵达maxStoredPermits(maxStoredPermits默示完全不哄骗)。因而,乞求的令牌也许从两个中央来:

  • 从前残存的令牌(stored permits, 也许不)
  • 现有的令牌(fresh permits,以后这段时光还没用完的令牌)

咱们将经由进程一个例子来解释它是怎样工作的:
对一个每秒发生

一个令牌的RateLimiter,每有一个不运用令牌的一秒,咱们就将storedPermits加1,若是RateLimiter在10秒都不运用,则storedPermits变为10.0。这个时分,一个乞求到来并乞求三个令牌(acquire(3)),咱们将从storedPermits中的令牌为其办事,storedPermits变为7.0。这个乞求以后
立马又有一个乞求到来并乞求10个令牌,咱们将从storedPermits残存的7个令牌给这个乞求,剩下还需求三个令牌,咱们将从RateLimiter新发生

的令牌中获得
。咱们已晓得,RateLimiter每秒新发生

1个令牌,等于说上面这个乞求还需求的3个乞求就要求其等候3秒。

设想一个RateLimiter每秒发生

一个令牌,如今完全不运用(处于初始形态),制约一个低廉的乞求acquire(100)曩昔。若是咱们挑选让这个乞求等候100秒再许可其履行
,这明显
很荒诞
。咱们为何
什么也不做而只是傻傻的等候100秒,一个更好的做法是许可这个乞求即时履行
(和acquire(1)不区分),而后将随后到来的乞求推迟到正确的时光点。这类战略,咱们许可这个低廉的义务即时履行
,并将随后到来的乞求推迟100秒。这类战略等于让义务的履行
和等候同时举行。

一个重要的结论:RateLimiter不会记最初一个乞求,而是即下一个乞求许可履行
的时光。这也能够很直白的告诉咱们抵达下一个调度时光点的时光距离。而后定一个一段时光未运用的Ratelimiter也很简略:下一个调度时光点已从前,这个时光点和如今时光的差等于Ratelimiter多久不被运用,咱们会将这一段时光翻译成storedPermits。所有,若是每秒钟发生

一个令牌(rate==1),而且恰恰每秒来一个乞求,那末
storedPermits就不会增进。

4.4 次要码源

分析一下RateLimiter怎样完成限流:

public double acquire() {
    return acquire(1);
}
public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {    //应答并发情形需求同步
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
}

上面体式格局来自RateLimiter的具体完成类SmoothRateLimiter:

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);  //补充令牌
    long returnValue = nextFreeTicketMicros;
    //此次乞求耗损的令牌数目
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;

    long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
        + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}
private void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
        storedPermits = min(maxPermits,
        storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
        nextFreeTicketMicros = nowMicros;
    }
}

别的,对storedPermits的运用,RateLimiter具有两种战略,两者区分次要体如今运用storedPermits时分需求等候的时光。这个逻辑由storedPermitsToWaitTime函数完成:

/**
 * Translates a specified portion of our currently stored permits which we want to
 * spend/acquire, into a throttling time. Conceptually, this evaluates the integral
 * of the underlying function we use, for the range of
 * [(storedPermits - permitsToTake), storedPermits].
 *
 * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
 */
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

具有两种战略等于为了应答咱们上面讲到的,具有资源运用缺乏

不置可否大致分为两种情形:

  • (1)资源确实运用缺乏

    不置可否,这些残存的资源咱们私海能够运用的;

  • (2)供应资源的办事从前还没预备好,比方办事刚启动等。

为此,RateLimiter现实上由两种完成战略,其完成别离见SmoothBursty和SmoothWarmingUp。两者次要的区分等于storedPermitsToWaitTime完成以及maxPermits数目的盘算。

4.4.1 SmoothBursty

SmoothBursty运用storedPermits不需求额外等候时光。而且默许maxBurstSeconds未1,因而maxPermits为permitsPerSecond,即至多能够存储1秒的残存令牌,比方QPS=5,则maxPermits=5。

上面这个RateLimiter的入口等于用来创立SmoothBursty类型的RateLimiter:

public static RateLimiter create(double permitsPerSecond)
/**
     * This implements a "bursty" RateLimiter, where storedPermits are translated to
     * zero throttling. The maximum number of permits that can be saved (when the RateLimiter is
     * unused) is defined in terms of time, in this sense: if a RateLimiter is 2qps, and this
     * time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits.
     */
    static final class SmoothBursty extends SmoothRateLimiter {
        /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
        final double maxBurstSeconds;

        SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
            super(stopwatch);
            this.maxBurstSeconds = maxBurstSeconds;
        }

        void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
            double oldMaxPermits = this.maxPermits;
            maxPermits = maxBurstSeconds * permitsPerSecond;
            System.out.println("maxPermits=" + maxPermits);
            if (oldMaxPermits == Double.POSITIVE_INFINITY) {
                // if we don't special-case this, we would get storedPermits == NaN, below
                storedPermits = maxPermits;
            } else {
                storedPermits = (oldMaxPermits == 0.0)
                        ? 0.0 // initial state
                        : storedPermits * maxPermits / oldMaxPermits;
            }
        }

        long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
            return 0L;
        }
    }

一个简略的运用示企图及解释,上面私海一个QPS=4的SmoothBursty:

(1)t=0,这时storedPermits=0,乞求1个令牌,等候时光=0;
(2)t=1,这时storedPermits=3,乞求3个令牌,等候时光=0;
(3)t=2,这时storedPermits=4,乞求10个令牌,等候时光=0,超前运用了2个令牌;
(4)t=3,这时storedPermits=0,乞求1个令牌,等候时光=0.5。

代码的输入:

maxPermits=4.0, storedPermits=7.2E-4, stableIntervalMicros=250000.0, nextFreeTicketMicros=1472
acquire(1), sleepSecond=0.0

maxPermits=4.0, storedPermits=3.012212, stableIntervalMicros=250000.0, nextFreeTicketMicros=1004345
acquire(3), sleepSecond=0.0

maxPermits=4.0, storedPermits=4.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2004668
acquire(10), sleepSecond=0.0

maxPermits=4.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=3504668
acquire(1), sleepSecond=0.499591

4.4.2 SmoothWarmingUp
static final class SmoothWarmingUp extends SmoothRateLimiter {
        private final long warmupPeriodMicros;
        /**
         * The slope of the line from the stable interval (when permits == 0), to the cold interval
         * (when permits == maxPermits)
         */
        private double slope;
        private double halfPermits;

        SmoothWarmingUp(SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit) {
            super(stopwatch);
            this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
        }

        @Override
        void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
            double oldMaxPermits = maxPermits;
            maxPermits = warmupPeriodMicros / stableIntervalMicros;
            halfPermits = maxPermits / 2.0;
            // Stable interval is x, cold is 3x, so on average it's 2x. Double the time -> halve the rate
            double coldIntervalMicros = stableIntervalMicros * 3.0;
            slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits;
            if (oldMaxPermits == Double.POSITIVE_INFINITY) {
                // if we don't special-case this, we would get storedPermits == NaN, below
                storedPermits = 0.0;
            } else {
                storedPermits = (oldMaxPermits == 0.0)
                        ? maxPermits // initial state is cold
                        : storedPermits * maxPermits / oldMaxPermits;
            }
        }

        @Override
        long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
            double availablePermitsAboveHalf = storedPermits - halfPermits;
            long micros = 0;
            // measuring the integral on the right part of the function (the climbing line)
            if (availablePermitsAboveHalf > 0.0) {
                double permitsAboveHalfToTake = min(availablePermitsAboveHalf, permitsToTake);
                micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf)
                        + permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0);
                permitsToTake -= permitsAboveHalfToTake;
            }
            // measuring the integral on the left part of the function (the horizontal line)
            micros += (stableIntervalMicros * permitsToTake);
            return micros;
        }

        private double permitsToTime(double permits) {
            return stableIntervalMicros + permits * slope;
        }
    }

maxPermits等于热身(warmup)时期能发生

的令牌数,比方QPS=4,warmup为2秒,则maxPermits=8。halfPermits为maxPermits的一半。

参考注释中的神图:

 *          ^ throttling
 *          |
 * 3*stable +                  /
 * interval |                 /.
 *  (cold)  |                / .
 *          |               /  .   <-- "warmup period" is the area of the trapezoid between
 * 2*stable +              /   .       halfPermits and maxPermits
 * interval |             /    .
 *          |            /     .
 *          |           /      .
 *   stable +----------/  WARM . }
 * interval |          .   UP  . } <-- this rectangle (from 0 to maxPermits, and
 *          |          . PERIOD. }     height == stableInterval) defines the cooldown period,
 *          |          .       . }     and we want cooldownPeriod == warmupPeriod
 *          |---------------------------------> storedPermits
 *              (halfPermits) (maxPermits)
 *

上面是咱们QPS=4,warmup为2秒时分对应的图。

maxPermits=8,halfPermits=4,和SmoothBursty相反的乞求序列:

(1)t=0,这时storedPermits=8,乞求1个令牌,运用1个storedPermits耗损时光=1×(0.75+0.625)/2=0.6875秒;
(2)t=1,这时storedPermits=8,乞求3个令牌,运用3个storedPermits耗损时光=3×(0.75+0.375)/2=1.6875秒(留意已超过1秒了,意味着下次发生

新Permit时光为2.6875);
(3)t=2,这时storedPermits=5,乞求10个令牌,运用5个storedPermits耗损时光=1×(0.375+0.25)/2+4*0.25=1.3125秒,再加上额外乞求的5个新发生

的Permit需求耗损=5*0.25=1.25秒,即统共需求耗时2.5625秒,则下一次发生

新的Permit时光为2.6875+2.5625=5.25,留意以后乞求私海2.6875才前往的,以前一向壅塞;
(4)t=3,由于前一个乞求壅塞到2.6875,现实这个乞求3.6875才抵达RateLimiter,乞求1个令牌,storedPermits=0,下一次发生

新Permit时光为5.25,因而统共需求等候5.25-3.6875=1.5625秒。

现实履行
了局:

warmupPeriodMicros=2000000
stableIntervalMicros=250000.0, maxPermits=8.0, halfPermits=4.0, coldIntervalMicros=750000.0, slope=125000.0, storedPermits=8.0

maxPermits=8.0, storedPermits=8.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=1524
acquire(1), sleepSecond=0.0

maxPermits=8.0, storedPermits=8.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=1001946
acquire(3), sleepSecond=0.0

maxPermits=8.0, storedPermits=5.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2689446
acquire(10), sleepSecond=0.687186

maxPermits=8.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=5251946
acquire(1), sleepSecond=1.559174

五、Guava并发:ListenableFuture与RateLimiter示例

5.1观点

ListenableFuture望文生义等于能够监听的Future,它是对java原生Future的扩大
加强。咱们晓得Future默示一个异步盘算义务,当义务完成时能够失掉盘算了局。若是咱们希望一旦盘算完成就拿到了局展现
给用户或做别的的盘算,就必需运用另一个线程不竭的查询盘算形态。如许做,代码庞杂,而且效力
低下。运用ListenableFuture Guava帮咱们检测Future能否完成了,若是完成就自动挪用回调函数,如许能够淘汰并发法式的庞杂度。

保举运用第二种体式格局,由于第二种体式格局能够间接失掉Future的前往值,或处置过错情形。本质上第二种体式格局是经由进程调动第一种体式格局完成的,做了进一步的封装。

别的ListenableFuture还有其余几种内置完成:
1)SettableFuture:不需求完成一个体式格局来盘算前往值,而只需求前往一个固定值来做为前往值,能够经由进程法式设置此Future的前往值或异样信息;
2)CheckedFuture: 这是一个继续自ListenableFuture接口,他供应了checkedGet()体式格局,此体式格局在Future履行
发生

异样时,能够抛出指定类型的异样。

RateLimiter类似于JDK的信号量Semphore,他用来制约对资源并发拜候的线程数

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.谷歌.common.util.concurrent.FutureCallback;
import com.谷歌.common.util.concurrent.Futures;
import com.谷歌.common.util.concurrent.ListenableFuture;
import com.谷歌.common.util.concurrent.ListeningExecutorService;
import com.谷歌.common.util.concurrent.MoreExecutors;
import com.谷歌.common.util.concurrent.RateLimiter;

public class ListenableFutureDemo {
    public static void main(String[] args) {
        testRateLimiter();
        testListenableFuture();
    }

    /**
     * RateLimiter类似于JDK的信号量Semphore,他用来制约对资源并发拜候的线程数
     */
    public static void testRateLimiter() {
        ListeningExecutorService executorService = MoreExecutors
                .listeningDecorator(Executors.newCachedThreadPool());
        RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个义务被提交

        for (int i = 0; i < 10; i++) {
            limiter.acquire(); // 乞求RateLimiter, 超过permits会被壅塞
            final ListenableFuture<Integer> listenableFuture = executorService
                    .submit(new Task("is "+ i));
        }
    }

    public static void testListenableFuture() {
        ListeningExecutorService executorService = MoreExecutors
                .listeningDecorator(Executors.newCachedThreadPool());

        final ListenableFuture<Integer> listenableFuture = executorService
                .submit(new Task("testListenableFuture"));

        //同步获得
挪用了局
        try {
            System.out.println(listenableFuture.get());
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        } catch (ExecutionException e1) {
            e1.printStackTrace();
        }

        //第一种体式格局
        listenableFuture.addListener(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("get listenable future's result "
                            + listenableFuture.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }, executorService);

        //第二种体式格局
        Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(Integer result) {
                System.out
                       .println("get listenable future's result with callback "
                               + result);
            }

            @Override
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
        });
    }
} 

class Task implements Callable<Integer> {
    String str;
    public Task(String str){
        this.str = str;
    }

    @Override
    public Integer call() throws Exception {
        System.out.println("call execute.." + str);
        TimeUnit.SECONDS.sleep(1);
        return 7;
    }
}

pom.xml依赖

<dependency>
      <groupId>com.谷歌.guava</groupId>
      <artifactId>guava</artifactId>
      <version>14.0.1</version>
</dependency>

更多精彩,尽在https://popnsprinkle.com