實際開發中,當業務流量過大時,為了保護下游服務,我們通常會做一些預防性的工作,今天我們就一起來聊聊限流!
在實際應用中,每個系統或者服務都有其處理能力的極限(瓶頸),即便是微服務中有集群和分布式的夾持,也不能保證系統能應對任何大小的流量,因此,系統為了自保,需要對處理能力范圍以外的流量進行“特殊照顧”(比如,丟棄請求或者延遲處理),從而避免系統卡死、崩潰或不可用等情況,保證系統整體服務可用。
令牌桶算法(Token Bucket Algorithm)是計算機網絡和電信領域中常用的一種簡單方法,用于流量整形和速率限制。它旨在控制系統在某個時間段內可以發送或接收的數據量,確保流量符合指定的速率。
令牌桶算法的核心思路:系統按照固定速度往桶里加入令牌,如果桶滿則停止添加。當有請求到來時,會嘗試從桶里拿走一個令牌,取到令牌才能繼續進行請求處理,沒有令牌就拒絕服務。示意圖如下:
令牌桶法的幾個特點:
令牌桶算法主要用于應對突發流量的場景,在 Java語言中使用最多的是 Google的 Guava RateLimiter,下面舉幾個例子來說明它是如何應對突發流量:
示例1
import java.util.concurrent.TimeUnit;public class RateLimit { public static void main(String[] args) { RateLimiter limiter = RateLimiter.create(5); // 每秒創建5個令牌 System.out.println("acquire(5), wait " + limiter.acquire(5) + " s"); // 全部取走 5個令牌 System.out.println("acquire(1), wait " + limiter.acquire(1) + " s");// 獲取1個令牌 boolean result = limiter.tryAcquire(1, 0, TimeUnit.SECONDS); // 嘗試獲取1個令牌,獲取不到則直接返回 System.out.println("tryAcquire(1), result: " + result); }}
示例代碼運行結果如下:
acquire(5), wait 0.0 sacquire(1), wait 0.971544 stryAcquire(1), result: false
桶中共有 5個令牌,acquire(5)返回0 代表令牌充足無需等待,當桶中令牌不足,acquire(1)等待一段時間才獲取到,當令牌不足時,tryAcquire(1)不等待直接返回。
示例2
import com.google.common.util.concurrent.RateLimiter;public class RateLimit { public static void main(String[] args) { RateLimiter limiter = RateLimiter.create(5); System.out.println("acquire(10), wait " + limiter.acquire(10) + " s"); System.out.println("acquire(1), wait " + limiter.acquire(1) + " s"); }}
示例代碼運行結果如下:
acquire(10), wait 0.0 sacquire(1), wait 1.974268 s
桶中共有 5個令牌,acquire(10)返回0,和示例似乎有點沖突,其實,這里返回0 代表應對了突發流量,但是 acquire(1)
卻等待了 1.974268秒,這代表 acquire(1)的等待是時間包含了應對突然流量多出來的 5個請求,即 1.974268 = 1 + 0.974268。
為了更好的驗證示例2的猜想,我們看示例3:
示例3
import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.TimeUnit;public class RateLimit { public static void main(String[] args) throws InterruptedException { RateLimiter limiter = RateLimiter.create(5); System.out.println("acquire(5), wait " + limiter.acquire(5) + " s"); TimeUnit.SECONDS.sleep(1); System.out.println("acquire(5), wait " + limiter.acquire(5) + " s"); System.out.println("acquire(1), wait " + limiter.acquire(1) + " s"); }}
示例代碼運行結果如下:
acquire(5), wait 0.0 sacquire(5), wait 0.0 sacquire(1), wait 0.966104 s
桶中共有 5個令牌,acquire(5)返回0 代表令牌充足無需等待,接著睡眠 1s,這樣系統又可以增加5個令牌,因此,再次 acquire(5)令牌充足返回0 無需等待,acquire(1)需要等待一段時間才能獲取令牌。
漏桶算法(Leaky Bucket Algorithm)的核心思路是:水(請求)進入固定容量的漏桶,漏桶的水以固定的速度流出,當水流入漏桶的速度過大導致漏桶滿而直接溢出,然后拒絕請求。示意圖如下:
下面為一個 Java版本的漏桶算法示例:
import java.util.concurrent.*;public class LeakyBucket { private final int capacity; // 桶的容量 private final int rate; // 出水速率 private int water; // 漏斗中的水量 private long lastLeakTime; // 上一次漏水的時間 public LeakyBucket(int capacity, int rate) { this.capacity = capacity; this.rate = rate; this.water = 0; this.lastLeakTime = System.currentTimeMillis(); } public synchronized boolean allowRequest(int tokens) { leak(); // 漏水 if (water + tokens <= capacity) { water += tokens; // 漏斗容量未滿,可以加水 return true; } else { return false; // 漏斗容量已滿,無法加水 } } private void leak() { long currentTime = System.currentTimeMillis(); long timeElapsed = currentTime - lastLeakTime; int waterToLeak = (int) (timeElapsed * rate / 1000); // 計算經過的時間內應該漏掉的水量 water = Math.max(0, water - waterToLeak); // 漏水 lastLeakTime = currentTime; // 更新上一次漏水時間 } public static void main(String[] args) { LeakyBucket bucket = new LeakyBucket(10, 2); // 容量為10,速率為2令牌/秒 int[] packets = {2, 3, 1, 5, 2, 10}; // 要發送的數據包大小 for (int packet : packets) { if (bucket.allowRequest(packet)) { System.out.println("發送 " + packet + " 字節的數據包"); } else { System.out.println("漏桶已滿,無法發送數據包"); } try { TimeUnit.SECONDS.sleep(1); // 模擬發送間隔 } catch (InterruptedException e) { e.printStackTrace(); } } }}
漏桶算法的幾個特點:
計數器是最簡單的限流方式,主要用來限制總并發數,主要通過一個支持原子操作的計數器來累計 1秒內的請求次數,當 秒內計數達到限流閾值時觸發拒絕策略。每過 1秒,計數器重置為 0開始重新計數。比如數據庫連接池大小、線程池大小、程序訪問并發數等都是使用計數器算法。
如下代碼就是一個Java版本的計數器算法示例,通過一個原子計算器 AtomicInteger來記錄總數,如果請求數大于總數就拒絕請求,否則正常處理請求:
import java.util.concurrent.atomic.AtomicInteger;public class CounterRateLimiter { private final int limit; // 限流閾值 private final long windowSizeMs; // 時間窗口大小(毫秒) private AtomicInteger counter; // 請求計數器 private long lastResetTime; // 上次重置計數器的時間 public CounterRateLimiter(int limit, long windowSizeMs) { this.limit = limit; this.windowSizeMs = windowSizeMs; this.counter = new AtomicInteger(0); this.lastResetTime = System.currentTimeMillis(); } public boolean allowRequest() { long currentTime = System.currentTimeMillis(); // 如果當前時間超出了時間窗口,重置計數器 if (currentTime - lastResetTime > windowSizeMs) { counter.set(0); lastResetTime = currentTime; } // 檢查計數器是否超過了限流閾值 return counter.incrementAndGet() <= limit; } public static void main(String[] args) { CounterRateLimiter rateLimiter = new CounterRateLimiter(3, 1000); // 每秒最多處理3個請求 for (int i = 0; i < 10; i++) { if (rateLimiter.allowRequest()) { System.out.println("允許請求 " + (i + 1)); } else { System.out.println("限流,拒絕請求 " + (i + 1)); } try { Thread.sleep(200); // 模擬請求間隔 } catch (InterruptedException e) { e.printStackTrace(); } } }}
滑動窗口算法是一種常用于限流和統計的算法。它基于一個固定大小的時間窗口,在這個時間窗口內統計請求的數量,并根據設定的閾值來控制流量。比如,TCP協議就使用了該算法
以下是一個簡單的 Java 示例實現滑動窗口算法:
import java.util.concurrent.atomic.AtomicInteger;public class SlidingWindowRateLimiter { private final int limit; // 限流閾值 private final long windowSizeMs; // 時間窗口大小(毫秒) private final AtomicInteger[] window; // 滑動窗口 private long lastUpdateTime; // 上次更新窗口的時間 private int pointer; // 指向當前時間窗口的指針 public SlidingWindowRateLimiter(int limit, long windowSizeMs, int granularity) { this.limit = limit; this.windowSizeMs = windowSizeMs; this.window = new AtomicInteger[granularity]; for (int i = 0; i < granularity; i++) { window[i] = new AtomicInteger(0); } this.lastUpdateTime = System.currentTimeMillis(); this.pointer = 0; } public synchronized boolean allowRequest() { long currentTime = System.currentTimeMillis(); // 計算時間窗口的起始位置 long windowStart = currentTime - windowSizeMs + 1; // 更新窗口中過期的計數器 while (lastUpdateTime < windowStart) { lastUpdateTime++; window[pointer].set(0); pointer = (pointer + 1) % window.length; } // 檢查窗口內的總計數是否超過限流閾值 int totalRequests = 0; for (AtomicInteger counter : window) { totalRequests += counter.get(); } if (totalRequests >= limit) { return false; // 超過限流閾值,拒絕請求 } else { window[pointer].incrementAndGet(); // 記錄新的請求 return true; // 允許請求 } } public static void main(String[] args) { SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(10, 1000, 10); // 每秒最多處理10個請求 for (int i = 0; i < 20; i++) { if (rateLimiter.allowRequest()) { System.out.println("允許請求 " + (i + 1)); } else { System.out.println("限流,拒絕請求 " + (i + 1)); } try { Thread.sleep(100); // 模擬請求間隔 } catch (InterruptedException e) { e.printStackTrace(); } } }}
Redis + Lua屬于分布式環境下的限流方案,主要利用的是Lua在 Redis中運行能保證原子性。如下示例為一個簡單的Lua限流腳本:
local key = KEYS[1]local limit = tonumber(ARGV[1])local current = tonumber(redis.call('get', key) or "0")if current + 1 > limit then return 0else redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 1) return 1end
腳本解釋:
當我們自己無法實現比較好的限流方案時,成熟的三方框架就是我們比較好的選擇,下面列出兩個 Java語言比較優秀的框架。
(1) resilience4j
resilience4j 是一個輕量級的容錯庫,提供了限流、熔斷、重試等功能。限流模塊 RateLimiter 提供了靈活的限流配置,其優點如下:
(2) Sentinel
Sentinel 是阿里巴巴開源的一個功能全面的流量防護框架,提供限流、熔斷、系統負載保護等多種功能。其優點如下:
本文講述了以下幾種限流方式:
上面的限流方式,主要是針對服務器進行限流,除此之外,我們也可以對客戶端進行限流, 比如驗證碼,答題,排隊等方式。
另外,我們也會在一些中間件上進行限流,比如Apache、Tomcat、Nginx等。
在實際的開發中,限流場景略有差異,限流的維度也不一樣,比如,有的場景需要根據請求的 URL來限流,有的會對 IP地址進行限流、另外,設備ID、用戶ID 也是限流常用的維度,因此,我們需要結合真實業務場景靈活的使用限流方案。
本文鏈接:http://www.tebozhan.com/showinfo-26-92106-0.html為什么要限流?常見的限流算法有哪些?
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com