Kratos 源码分析:熔断

引言

熔断器是为了当依赖的服务已经出现故障时,主动阻止对依赖服务的请求。保证自身服务的正常运行不受依赖服务影响,防止雪崩效应。

为什么需要熔断器

  1. 虽然服务端限流虽然可以帮助我们抗住一定的压力,但是拒绝请求本身也有成本的。
  2. 如果我们的本来流量可以支撑 1w rps,加了限流可以支撑在 10w rps 的情况下仍然可以提供 1w rps 的有效请求,但是流量突然再翻了 10 倍,来到 100w rps 那么服务该挂还是得挂。
  3. 限流是一种被动的保护机制,而熔断是一种主动的保护机制。

熔断器的原理

熔断器的三种状态

  1. 关闭状态(CLOSED):熔断器关闭时,请求会正常通过,熔断器会统计一段时间内的请求成功率,如果请求成功率低于设定的阈值,熔断器会进入打开状态。
  2. 打开状态(OPEN):熔断器打开时,请求会被熔断器拒绝,直接返回错误,熔断器会统计一段时间内的请求成功率,如果请求成功率高于设定的阈值,熔断器会进入半开状态。
  3. 半开状态(HALF-OPEN):熔断器半开时,熔断器会允许部分请求通过,熔断器会统计一段时间内的请求成功率,如果请求成功率高于设定的阈值,熔断器会进入关闭状态,否则会继续保持打开状态。

Kratos 实现分析

CircuitBreaker 接口

1
2
3
4
5
type CircuitBreaker interface {
	Allow() error
	MarkSuccess()
	MarkFailed()
}
  1. Allow()
  • 判断熔断器是否允许通过
  1. MarkSuccess()
  • 熔断器成功的回调
  1. MarkFailed()
  • 熔断器失败的回调

Group 结构体

1
2
3
4
5
6
type Group struct {
   mutex sync.Mutex
   val   atomic.Value
 
   New func() CircuitBreaker
}
  1. mutex
  • 互斥锁,使val这个map不产生数据竞争
  1. val
  • map,存储name -> CircuitBreaker
  1. New
  • 生成一个CircuitBreaker

Get方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// Get .
func (g *Group) Get(name string) CircuitBreaker {
	m, ok := g.val.Load().(map[string]CircuitBreaker)
	if ok {
		breaker, ok := m[name]
		if ok {
			return breaker // 很具name从val拿出 breaker 如果存在返回
		}
	}
	// slowpath for group don`t have specified name breaker.
	g.mutex.Lock()
	nm := make(map[string]CircuitBreaker, len(m)+1)
	for k, v := range m {
		nm[k] = v
	}
	breaker := g.New()
	nm[name] = breaker // 如果不存在 生成一个 并放入map 并返回
	g.val.Store(nm)
	g.mutex.Unlock()
	return breaker
}

Breaker 结构体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Breaker is a sre CircuitBreaker pattern.
type Breaker struct {
   stat window.RollingCounter
   r    *rand.Rand
   // rand.New(...) returns a non thread safe object
   randLock sync.Mutex
 
   // Reducing the k will make adaptive throttling behave more aggressively,
   // Increasing the k will make adaptive throttling behave less aggressively.
   k       float64
   request int64
 
   state int32
}
  1. stat
  • 滑动窗口,记录成功失败
  1. r
  • 随机数
  1. randLock
  • 读写锁
  1. k 成功系数
  • total(总数) = success * k
  1. request 请求数
  • 当总数 < request时,不判断是否熔断
  1. state
  • 熔断器状态 打开或者关闭

Allow()方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Allow request if error returns nil.
func (b *Breaker) Allow() error {
	success, total := b.summary() // 从活动窗口获取成功数和总数
	k := b.k * float64(success) // 根据k成功系数 获取
 
	// check overflow requests = K * success
	if total < b.request || float64(total) < k { // 如果总数<request 或者  总数 < k 
		if atomic.LoadInt32(&b.state) == StateOpen { 
			atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed) // 如果state是打开 关闭
		}
		return nil
	}
	if atomic.LoadInt32(&b.state) == StateClosed { 
		atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) // 如果state是关闭 打开
	}
	dr := math.Max(0, (float64(total)-k)/float64(total+1)) // 获取系数,当k越大 dr越小
	drop := b.trueOnProba(dr)// trueOnProba 获取水机数,当dr越大,drop越大
	if drop { // 如果是 拒绝请求
		return circuitbreaker.ErrNotAllowed
	}
	return nil
}
 
func (b *Breaker) trueOnProba(proba float64) (truth bool) {
	b.randLock.Lock()
	truth = b.r.Float64() < proba
	b.randLock.Unlock()
	return
}

使用trueOnProba的原因是,当熔断器关闭时,随机让一部分请求通过,当success越大,请求的通过的数量就越多。用这些数据成功与否,放入窗口统计,当成功数达到要求时,就可以关闭熔断器了。

MarkSuccess()以及MarkFailed()方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// MarkSuccess mark requeest is success.
func (b *Breaker) MarkSuccess() {
	b.stat.Add(1) // 成功数+1
}
 
// MarkFailed mark request is failed.
func (b *Breaker) MarkFailed() {
	// NOTE: when client reject requets locally, continue add counter let the
	// drop ratio higher.
	b.stat.Add(0) // 失败数+1
}

参考