Google SRE弹性熔断算法(Client-Side Throttling)原理分析

在微服务架构中,单个服务故障可能引发雪崩效应。传统熔断器(如Hystrix)通过固定阈值触发熔断,虽然能防止系统崩溃,但也存在两大痛点:

  • 过度熔断:静态阈值无法适应动态负载,可能误伤健康服务
  • 恢复震荡:从熔断状态恢复时易出现流量突刺,导致二次熔断

Google SRE弹性熔断算法(Client-Side Throttling)是一种基于请求成功率动态调整熔断策略的自适应算法,相比传统熔断器的固定阈值模式,其核心思想是通过请求成功率动态计算丢弃概率,实现更平滑的流量控制。以下是其实现逻辑的详细分析:

Google SRE

一、算法核心思想

1.1 数学模型:用概率说话
算法的核心公式:

$P = \max\left(0, \frac{\text{Requests} - K \times \text{Accepts}}{\text{Requests} + 1}\right)$

  • Requests:客户端总请求数(成功+失败)
  • Accepts:服务端成功处理的请求数
  • K值:敏感系数(通常1.5-2.0),控制熔断激进程度

当 P > 0, 时,每个请求以概率P被丢弃,而非传统熔断器的”全开全关”模式。

该公式的解释如下: 当 requests−K∗accepts>=0时,概率 p==0,客户端不会主动丢弃请求;反之,则概率 p,会随着 accepts值的变小而增加,即成功接受的请求数越少,本地丢弃请求的概率就越高。通俗点说,Client 可以发送请求直到 requests=K∗accepts, 一旦超过限制, 按照概率进行截流。

算法动态调节过程模拟:

1
2
3
4
5
6
7
# 示例:K=2时的状态变化
时间轴 请求状态 Requests Accepts P值计算 丢弃概率
------------------------------------------------------------
t1 成功 1 1 (1-2*1)/(1+1)= -0.50%
t2 失败 2 1 (2-2*1)/(2+1)= 0/30%
t3 失败 3 1 (3-2*1)/(3+1)= 0.2525%
t4 成功 4 2 (4-2*2)/(4+1)= 00%

从 Google 的文档描述中,该算法在实际中使用效果极为良好,可以使整体上保持一个非常稳定的请求速率。对于后端而言,调整 K 值可以使得自适应限流算法适配不同的后端。

  • 降低 K 值会使自适应限流算法更加激进(允许客户端在算法启动时拒绝更多本地请求)
  • 增加 K 值会使自适应限流算法不再那么激进(允许服务端在算法启动时尝试接收更多的请求,与上面相反)

二、工程实现

  1. 滑动窗口代码实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    // ================== 滑动窗口实现 ==================
    type RollingWindow struct {
    buckets []int64 // 桶数组(存储成功数)
    size int // 桶数量
    width int64 // 单个桶时间宽度(秒)
    lastPos int // 当前桶位置
    mu sync.Mutex // 互斥锁
    stopCh chan struct{}
    }

    func NewRollingWindow(size int, windowDuration time.Duration) *RollingWindow {
    rw := &RollingWindow{
    size: size,
    buckets: make([]int64, size),
    width: int64(windowDuration.Seconds()) / int64(size),
    stopCh: make(chan struct{}),
    }
    go rw.backgroundUpdater()
    return rw
    }

    func (rw *RollingWindow) backgroundUpdater() {
    ticker := time.NewTicker(time.Duration(rw.width))
    defer ticker.Stop()

    for {
    select {
    case <-ticker.C:
    rw.mu.Lock()
    rw.lastPos = (rw.lastPos + 1) % rw.size
    atomic.StoreInt64(&rw.buckets[rw.lastPos], 0)
    rw.mu.Unlock()
    case <-rw.stopCh:
    return
    }
    }
    }

    func (rw *RollingWindow) AddSuccess() {
    atomic.AddInt64(&rw.buckets[rw.lastPos], 1)
    }

    func (rw *RollingWindow) Sum() int64 {
    sum := int64(0)
    for i := 0; i < rw.size; i++ {
    sum += atomic.LoadInt64(&rw.buckets[i])
    }
    return sum
    }
  2. 熔断器实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    // ================== 熔断器实现 ==================
    type GoogleBreaker struct {
    k float64 // 敏感系数
    requests int64 // 总请求数(原子操作)
    window *RollingWindow
    rand *rand.Rand
    probaGuard sync.Mutex
    }

    var ErrServiceUnavailable = errors.New("service unavailable")

    func NewGoogleBreaker(k float64, windowSize int, windowDuration time.Duration) *GoogleBreaker {
    return &GoogleBreaker{
    k: k,
    window: NewRollingWindow(windowSize, windowDuration),
    rand: rand.New(rand.NewSource(time.Now().UnixNano())),
    }
    }

    func (b *GoogleBreaker) Do(req func() error) error {
    // 1. 检查是否触发熔断
    if err := b.accept(); err != nil {
    return err
    }

    // 2. 执行请求并统计结果
    atomic.AddInt64(&b.requests, 1)
    err := req()
    if err != nil && isFailure(err) {
    return err
    }

    // 3. 请求成功时更新滑动窗口
    b.window.AddSuccess()
    return nil
    }

    func (b *GoogleBreaker) accept() error {
    // 获取当前统计值
    accepts := b.window.Sum()
    requests := atomic.LoadInt64(&b.requests)

    // 计算丢弃概率
    ratio := float64(requests-int64(b.k*float64(accepts))) / float64(requests+1)
    if ratio <= 0 {
    return nil
    }

    // 概率判断
    b.probaGuard.Lock()
    defer b.probaGuard.Unlock()
    if b.rand.Float64() < ratio {
    return ErrServiceUnavailable
    }
    return nil
    }

    // ================== 辅助函数 ==================
    func isFailure(err error) bool {
    // 根据业务需求定义哪些错误需要触发熔断
    // 示例:仅网络错误触发熔断
    _, ok := err.(net.Error)
    return ok
    }

    // ================== 测试用例 ==================
    func TestBreaker(t *testing.T) {
    breaker := NewGoogleBreaker(1.5, 10, 10*time.Second)

    // 正常请求
    for i := 0; i < 100; i++ {
    err := breaker.Do(func() error {
    // 模拟业务逻辑
    if rand.Intn(100) < 20 { // 20%错误率
    return &net.OpError{}
    }
    return nil
    })
    t.Logf("Request %d: %v", i, err)
    }
    }

Google SRE弹性熔断算法(Client-Side Throttling)原理分析
https://smartmalphite.github.io/2024/08/25/SystemDesign/throttling/
作者
Enbo Wang
发布于
2024年8月25日
许可协议