整合限流策略

限流

  • 限流又称为流量控制(流控),通常是指限制到达系统的并发请求数

常用的限流策略

漏桶法

  • 漏桶法限流很好理解,假设我们有一个水桶按固定的速率向下方滴落一滴水,无论有多少请求,请求的速率有多大,都按照固定的速率流出,对应到系统中就是按照固定的速率处理请求

uber-go/ratelimit实现

Example uber-go/ratelimit

// 漏桶法
// 定义全局限流器对象
var rateLimit ratelimit.Limiter
// 在 gin.HandlerFunc 加入限流逻辑
func leakyBucket() gin.HandlerFunc {
prev := time.Now()
return func(c *gin.Context) {
now := rateLimit.Take()
fmt.Println(now.Sub(prev)) // 为了打印时间间隔
prev = now // 记录上一次的时间,没有这个打印的会有问题
}
}
func main() {
rateLimit = ratelimit.New(10)
r := gin.Default()
r.GET("/ping", leakyBucket(), func(c *gin.Context) {
c.JSON(200, true)
})
r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}

// type limiter struct {
// sync.Mutex // 锁
// last time.Time // 上一次的时刻
// sleepFor time.Duration // 需要等待的时间
// perRequest time.Duration // 每次的时间间隔
// maxSlack time.Duration // 最大的富余量
// clock Clock // 时钟
// }

令牌桶算法

  • 令牌桶其实和漏桶的原理类似,令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放 Token,桶满则暂时不放

github.com/juju/ratelimit 实现

  • github.com/juju/ratelimit
  • 创建令牌桶:
    // 创建指定填充速率和容量大小的令牌桶
    func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
    // 创建指定填充速率、容量大小和每次填充的令牌数的令牌桶
    func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket
    // 创建填充速度为指定速率和容量大小的令牌桶
    // NewBucketWithRate(0.1, 200) 表示每秒填充20个令牌
    func NewBucketWithRate(rate float64, capacity int64) *Bucket
  • 取出令牌:
    // 取token(非阻塞)
    func (tb *Bucket) Take(count int64) time.Duration
    func (tb *Bucket) TakeAvailable(count int64) int64

    // 最多等maxWait时间取token
    func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

    // 取token(阻塞)
    func (tb *Bucket) Wait(count int64)
    func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool
  • 当前令牌数 = 上一次剩余的令牌数 + (本次取令牌的时刻-上一次取令牌的时刻)/放置令牌的时间间隔 * 每次放置的令牌数

Example github.com/juju/ratelimit

import("github.com/juju/ratelimit")
// 令牌桶方法
func rateLimit(fillIntrerval time.Duration,cap int64) func(ctx.*gin.Content){
rl:ratelimit.NewBucket(fillIntrerval,cap)
return func(c *gin.Content){
if rl.TakeAvailable(1)==1{
c.Next()
return
}
}
c.String(http.StatusOK,"rate limit ......")
c.Abort()
}

固定窗口

  • 每开启一个新的窗口,在窗口时间大小内,可以通过窗口请求上限个请求
  • 该算法主要是会存在临界问题,如果流量都集中在两个窗口的交界处,那么突发流量会是设置上限的两倍
package limiter

import (
"sync"
"time"
)

// FixedWindowLimiter 固定窗口限流器
type FixedWindowLimiter struct {
limit int // 窗口请求上限
window time.Duration // 窗口时间大小
counter int // 计数器
lastTime time.Time // 上一次请求的时间
mutex sync.Mutex // 避免并发问题
}

func NewFixedWindowLimiter(limit int, window time.Duration) *FixedWindowLimiter {
return &FixedWindowLimiter{
limit: limit,
window: window,
lastTime: time.Now(),
}
}

func (l *FixedWindowLimiter) TryAcquire() bool {
l.mutex.Lock()
defer l.mutex.Unlock()
// 获取当前时间
now := time.Now()
// 如果当前窗口失效,计数器清0,开启新的窗口
if now.Sub(l.lastTime) > l.window {
l.counter = 0
l.lastTime = now
}
// 若到达窗口请求上限,请求失败
if l.counter >= l.limit {
return false
}
// 若没到窗口请求上限,计数器+1,请求成功
l.counter++
return true
}

滑动窗口

  • 滑动窗口类似于固定窗口,它只是把大窗口切分成多个小窗口,每次向右移动一个小窗口,它可以避免两倍的突发流量。
  • 固定窗口可以说是滑动窗口的一种特殊情况,只要滑动窗口里面的小窗口和大窗口大小一
package limiter

import (
"errors"
"sync"
"time"
)

// SlidingWindowLimiter 滑动窗口限流器
type SlidingWindowLimiter struct {
limit int // 窗口请求上限
window int64 // 窗口时间大小
smallWindow int64 // 小窗口时间大小
smallWindows int64 // 小窗口数量
counters map[int64]int // 小窗口计数器
mutex sync.Mutex // 避免并发问题
}

// NewSlidingWindowLimiter 创建滑动窗口限流器
func NewSlidingWindowLimiter(limit int, window, smallWindow time.Duration) (*SlidingWindowLimiter, error) {
// 窗口时间必须能够被小窗口时间整除
if window%smallWindow != 0 {
return nil, errors.New("window cannot be split by integers")
}

return &SlidingWindowLimiter{
limit: limit,
window: int64(window),
smallWindow: int64(smallWindow),
smallWindows: int64(window / smallWindow),
counters: make(map[int64]int),
}, nil
}

func (l *SlidingWindowLimiter) TryAcquire() bool {
l.mutex.Lock()
defer l.mutex.Unlock()

// 获取当前小窗口值
currentSmallWindow := time.Now().UnixNano() / l.smallWindow * l.smallWindow
// 获取起始小窗口值
startSmallWindow := currentSmallWindow - l.smallWindow*(l.smallWindows-1)

// 计算当前窗口的请求总数
var count int
for smallWindow, counter := range l.counters {
if smallWindow < startSmallWindow {
delete(l.counters, smallWindow)
} else {
count += counter
}
}

// 若到达窗口请求上限,请求失败
if count >= l.limit {
return false
}
// 若没到窗口请求上限,当前小窗口计数器+1,请求成功
l.counters[currentSmallWindow]++
return true
}

gin框架中使用限流中间件

package middlewares

import (
"net/http"
"time"

"github.com/gin-gonic/gin"
"github.com/juju/ratelimit"
)
// 令牌桶作为限流策略
func RateLimitMiddleware(fillInterval time.Duration, cap int64) func(c *gin.Context) {
bucket := ratelimit.NewBucket(fillInterval, cap)
return func(c *gin.Context) {
// 如果取不到令牌就中断本次请求返回 rate limit...
if bucket.TakeAvailable(1) < 1 {
c.String(http.StatusOK, "rate limit...")
c.Abort()
return
}
c.Next()
}