1.概述
在高并发服务中,限流是一种对请求或并发数进行限制的技术手段。当服务资源或处理能力有限时,限流可以对调用服务的上游请求进行限制,或对下游访问进行限制,以防止自身或下游服务因资源耗尽而影响服务。
2.常见限流算法
2.1 固定窗口限流法
在固定时间窗口(单位时间)内限制请求的数量。该算法将时间分成固定的窗口,并在每个窗口内限制请求的数量。具体来说,算法将请求按照时间顺序放入时间窗口中,并计算该时间窗口内的请求数量,如果请求数量超出了限制,则拒绝该请求。
单体 固定窗口限流
//单体
type FixedWindow struct {
mu sync.Mutex
counter int
limit int
windowEnd time.Time
window time.Duration
}
func NewFixedWindow(limit int, window time.Duration) *FixedWindow {
return &FixedWindow{
limit: limit,
window: window,
windowEnd: time.Now().Add(window),
}
}
func (fw *FixedWindow) Allow() bool {
fw.mu.Lock()
defer fw.mu.Unlock()
now := time.Now()
if now.After(fw.windowEnd) {
fw.counter = 0
fw.windowEnd = now.Add(fw.window)
}
if fw.counter >= fw.limit {
return false
}
fw.counter++
return true
}
redis 固定窗口限流
//方式一
type FixedWindow struct {
client *redis.Client
key string
maxRequests int64
windowSize time.Duration
}
func NewFixedWindow(client *redis.Client, key string, maxRequests int64, windowSize time.Duration) *FixedWindow {
return &FixedWindow{
client: client,
key: key,
maxRequests: maxRequests,
windowSize: windowSize,
}
}
func (fw *FixedWindow) Allow(ctx context.Context) (bool, error) {
now := time.Now().Unix()
windowStart := now - int64(fw.windowSize.Seconds())
pipe := fw.client.TxPipeline()
pipe.ZRemRangeByScore(ctx, fw.key, "0", fmt.Sprintf("%d", windowStart))
count := pipe.ZCard(ctx, fw.key)
pipe.ZAdd(ctx, fw.key, &redis.Z{Score: float64(now), Member: now})
pipe.Expire(ctx, fw.key, fw.windowSize*2)
if _, err := pipe.Exec(ctx); err != nil {
return false, err
}
return count.Val() < fw.maxRequests, nil
}
//方式二
type CounterRateLimiter struct {
client *redis.Client
key string
limit int64
window time.Duration
}
func NewCounterRateLimiter(client *redis.Client, key string, limit int64, window time.Duration) *CounterRateLimiter {
return &CounterRateLimiter{
client: client,
key: key,
limit: limit,
window: window,
}
}
func (rl *CounterRateLimiter) Allow(ctx context.Context) (bool, error) {
// 使用Lua脚本保证原子性操作
script := `
local current = redis.call('GET', KEYS[1])
if current == false then
redis.call('SETEX', KEYS[1], ARGV[2], 1)
return 1
elseif tonumber(current) < tonumber(ARGV[1]) then
redis.call('INCR', KEYS[1])
return 1
else
return 0
end
`
res, err := rl.client.Eval(ctx, script, []string{rl.key}, rl.limit, int(rl.window.Seconds())).Result()
if err != nil {
return false, err
}
return res.(int64) == 1, nil
}
这个算法的优点是原理和实现相对比较简单。它的缺点是无法实现平滑地限流,应对段时间内的突发流量,在固定窗口内可能流量集中在某一时间段内。这也造成了一个临界问题,就是有可能流量集中在上一个时间窗口的末尾和下一个时间窗口的开始,造成在短时间内消耗了两个时间窗口允许的流量。
2.2 滑动窗口限流法
它将单位时间周期分为n个小周期,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。它可以解决固定窗口临界值的问题。
这个算法的解决了固定窗口算法临界位置突发流量的问题,能够更好地应对突发流量的情况,实现更精确的限流控制。但是它需要记录滑动时间窗口内的每个小周期请求次数或每个请求时间列表,会消耗更多的内存,同时需要较多时间开销用于处理窗口滑动、请求计数、过时请求移除。
2.3 漏桶限流算法
漏桶限流算法(Leaky Bucket Algorithm)是一种流量控制算法,用于控制流入网络的数据速率,以防止网络拥塞。它的思想是将数据包看作是水滴,漏桶看作是一个固定容量的水桶,数据包像水滴一样从桶的顶部流入桶中,并通过桶底的一个小孔以一定的速度流出,从而限制了数据包的流量。
优点:可以平滑限制请求的处理速度,避免瞬间请求过多导致系统崩溃或者雪崩。
缺点:流出速率是固定的,即使流量比较低的情况下也无法有效地提高资源消耗速度,资源利用率低,对突发流量无法快速处理,造成饥饿问题。
面对突发流量的时候,漏桶算法还是循规蹈矩地处理请求,这不是我们想看到的。
2.4 令牌桶算法
令牌桶算法用于限制单位时间内请求的数量。该算法维护一个固定容量的令牌桶,每秒钟会向令牌桶中放入一定数量的令牌。当有请求到来时,如果令牌桶中有足够的令牌,则请求被允许通过并从令牌桶中消耗一个令牌,否则请求被拒绝。
令牌桶算法简易实现
package tokenbucket
import (
"sync"
"time"
)
// TokenBucket 令牌桶结构体
type TokenBucket struct {
capacity int64 // 桶的容量
tokens int64 // 当前令牌数量
fillInterval time.Duration // 填充时间间隔
fillPerInterval int64 // 每次填充的令牌数量
lastFillTime time.Time // 上次填充时间
mu sync.Mutex // 互斥锁
}
// NewTokenBucket 创建一个新的令牌桶
// capacity: 桶的总容量
// fillInterval: 填充时间间隔
// fillPerInterval: 每次填充的令牌数量
func NewTokenBucket(capacity, fillPerInterval int64, fillInterval time.Duration) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
fillInterval: fillInterval,
fillPerInterval: fillPerInterval,
lastFillTime: time.Now(),
}
}
// Allow 检查是否允许请求通过(消耗一个令牌)
func (tb *TokenBucket) Allow() bool {
return tb.AllowN(1)
}
// AllowN 检查是否允许请求通过(消耗n个令牌)
func (tb *TokenBucket) AllowN(n int64) bool {
tb.mu.Lock()
defer tb.mu.Unlock()
// 先填充令牌
now := time.Now()
elapsed := now.Sub(tb.lastFillTime)
fillTimes := int64(elapsed / tb.fillInterval)
if fillTimes > 0 {
tb.tokens += fillTimes * tb.fillPerInterval
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
tb.lastFillTime = now
}
// 检查令牌是否足够
if tb.tokens >= n {
tb.tokens -= n
return true
}
return false
}
// Wait 阻塞直到获取到令牌
func (tb *TokenBucket) Wait() {
tb.WaitN(1)
}
// WaitN 阻塞直到获取到n个令牌
func (tb *TokenBucket) WaitN(n int64) {
for !tb.AllowN(n) {
time.Sleep(tb.fillInterval / time.Duration(tb.fillPerInterval))
}
}
优点:令牌桶算法可以处理突发流量,可以在短时间内提供更多的处理能力,以处理突发流量。
我们可以控制令牌的放入速度来控制API的处理速度,也可以控制令牌桶的大小(可以是机器的CPU/内存能处理的上限)。
缺点:实现相对复杂,不过有已经实现的工具可以直接拿来用。例如Guava的RateLimiter限流组件,就是基于令牌桶算法实现的。
3.对比总结
算法
突发处理能力
实现复杂度
内存消耗
输出平滑性
适用场景
固定窗口
⚠️ 边界突发
⭐
很低
低
简单限流、临时性保护,对精度要求不高
滑动窗口
❌ 不允许突发
⭐⭐
高
高
高精度限流、金融交易等严格场景
漏桶
❌ 不允许突发
⭐⭐
低
高
需要平滑输出,对实时性要求不高的场景
令牌桶
✅ 允许突发
⭐⭐⭐
低
中
需要灵活控制请求速率、允许合理突发的场景
参考:
https://heapdump.cn/article/5480577
https://huanglianjing.com/posts