上一篇文章介紹了 如何實現計數器限流。主要有兩種實現方式,分別是固定窗口和滑動窗口,并且分析了 go-zero 采用固定窗口方式實現的源碼。
但是采用固定窗口實現的限流器會有兩個問題:
這篇文章來介紹一下令牌桶算法,可以很好解決以上兩個問題。
算法概念如下:
圖片
令牌桶算法既能夠將所有的請求平均分布到時間區間內,又能接受服務器能夠承受范圍內的突發請求,因此是目前使用較為廣泛的一種限流算法。
源碼分析我們還是以 go-zero 項目為例,首先來看生成令牌的部分,依然是使用 Redis 來實現。
// core/limit/tokenlimit.go// 生成 token 速率script = `local rate = tonumber(ARGV[1])// 通容量local capacity = tonumber(ARGV[2])// 當前時間戳local now = tonumber(ARGV[3])// 請求數量local requested = tonumber(ARGV[4])// 需要多少秒才能把桶填滿local fill_time = capacity/rate// 向下取整,ttl 為填滿時間 2 倍local ttl = math.floor(fill_time*2)// 當前桶剩余容量,如果為 nil,說明第一次使用,賦值為桶最大容量local last_tokens = tonumber(redis.call("get", KEYS[1]))if last_tokens == nil then last_tokens = capacityend// 上次請求時間戳,如果為 nil 則賦值 0local last_refreshed = tonumber(redis.call("get", KEYS[2]))if last_refreshed == nil then last_refreshed = 0end// 距離上一次請求的時間跨度local delta = math.max(0, now-last_refreshed)// 距離上一次請求的時間跨度能生成的 token 數量和桶內剩余 token 數量的和// 與桶容量比較,取二者的小值local filled_tokens = math.min(capacity, last_tokens+(delta*rate))// 判斷請求數量和桶內 token 數量的大小local allowed = filled_tokens >= requested// 被請求消耗掉之后,更新剩余 token 數量local new_tokens = filled_tokensif allowed then new_tokens = filled_tokens - requestedend// 更新 redis tokenredis.call("setex", KEYS[1], ttl, new_tokens)// 更新 redis 刷新時間redis.call("setex", KEYS[2], ttl, now)return allowed`
Redis 中主要保存兩個 key,分別是 token 數量和刷新時間。
核心思想就是比較兩次請求時間間隔內生成的 token 數量 + 桶內剩余 token 數量,和請求量之間的大小,如果滿足則允許,否則則不允許。
限流器初始化:
// A TokenLimiter controls how frequently events are allowed to happen with in one second.type TokenLimiter struct { // 生成 token 速率 rate int // 桶容量 burst int store *redis.Redis // 桶 key tokenKey string // 桶刷新時間 key timestampKey string rescueLock sync.Mutex // redis 健康標識 redisAlive uint32 // redis 健康監控啟動狀態 monitorStarted bool // 內置單機限流器 rescueLimiter *xrate.Limiter}// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits// bursts of at most burst tokens.func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter { tokenKey := fmt.Sprintf(tokenFormat, key) timestampKey := fmt.Sprintf(timestampFormat, key) return &TokenLimiter{ rate: rate, burst: burst, store: store, tokenKey: tokenKey, timestampKey: timestampKey, redisAlive: 1, rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst), }}
其中有一個變量 rescueLimiter,這是一個進程內的限流器。如果 Redis 發生故障了,那么就使用這個,算是一個保障,盡量避免系統被突發流量拖垮。
圖片
提供了四個可調用方法:
// Allow is shorthand for AllowN(time.Now(), 1).func (lim *TokenLimiter) Allow() bool { return lim.AllowN(time.Now(), 1)}// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool { return lim.AllowNCtx(ctx, time.Now(), 1)}// AllowN reports whether n events may happen at time now.// Use this method if you intend to drop / skip events that exceed the rate.// Otherwise, use Reserve or Wait.func (lim *TokenLimiter) AllowN(now time.Time, n int) bool { return lim.reserveN(context.Background(), now, n)}// AllowNCtx reports whether n events may happen at time now with incoming context.// Use this method if you intend to drop / skip events that exceed the rate.// Otherwise, use Reserve or Wait.func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool { return lim.reserveN(ctx, now, n)}
最終調用的都是 reverveN 方法:
func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool { // 判斷 Redis 健康狀態,如果 Redis 故障,則使用進程內限流器 if atomic.LoadUint32(&lim.redisAlive) == 0 { return lim.rescueLimiter.AllowN(now, n) } // 執行限流腳本 resp, err := lim.store.EvalCtx(ctx, script, []string{ lim.tokenKey, lim.timestampKey, }, []string{ strconv.Itoa(lim.rate), strconv.Itoa(lim.burst), strconv.FormatInt(now.Unix(), 10), strconv.Itoa(n), }) // redis allowed == false // Lua boolean false -> r Nil bulk reply if err == redis.Nil { return false } if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { logx.Errorf("fail to use rate limiter: %s", err) return false } if err != nil { logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err) // 如果有異常的話,會啟動進程內限流 lim.startMonitor() return lim.rescueLimiter.AllowN(now, n) } code, ok := resp.(int64) if !ok { logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp) lim.startMonitor() return lim.rescueLimiter.AllowN(now, n) } // redis allowed == true // Lua boolean true -> r integer reply with value of 1 return code == 1}
最后看一下進程內限流的啟動與恢復:
func (lim *TokenLimiter) startMonitor() { lim.rescueLock.Lock() defer lim.rescueLock.Unlock() // 需要加鎖保護,如果程序已經啟動了,直接返回,不要重復啟動 if lim.monitorStarted { return } lim.monitorStarted = true atomic.StoreUint32(&lim.redisAlive, 0) go lim.waitForRedis()}func (lim *TokenLimiter) waitForRedis() { ticker := time.NewTicker(pingInterval) // 更新監控進程的狀態 defer func() { ticker.Stop() lim.rescueLock.Lock() lim.monitorStarted = false lim.rescueLock.Unlock() }() for range ticker.C { // 對 redis 進行健康監測,如果 redis 服務恢復了 // 則更新 redisAlive 標識,并退出 goroutine if lim.store.Ping() { atomic.StoreUint32(&lim.redisAlive, 1) return } }}
參考文章:
本文鏈接:http://www.tebozhan.com/showinfo-26-5770-0.htmlGo-Zero 是如何實現令牌桶限流的?
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: 「Go面經」算法 并發模型 緩存落盤 etcd actor模型
下一篇: 阿里云還會繼續降價嗎?