K8s controller workqueue 源码学习
在写 K8s controller 的过程中,我们一定会用到 queue,这也是 K8s controller 推荐的范式。本文从源码剖析 K8s client-go 中 workqueue 的底层实现。
最底层:满足特定需求的基础 workqueue
k8s.io/client-go/util/workqueue
最基础的 queue 具备以下能力:
- FIFO,元素被处理的顺序与其加入队列的顺序一致
- 严格限制一个元素不会被并行多次处理。如果一个元素在被处理前多次 enqueue,也会只被处理一次
- 支持多生产者和消费者。即便一个元素正在被处理,也支持将其重新加入队列
- 队列关闭通知。
// Package workqueue provides a simple queue that supports the following
// features:
// * Fair: items processed in the order in which they are added.
// * Stingy: a single item will not be processed multiple times concurrently,
// and if an item is added multiple times before it can be processed, it
// will only be processed once.
// * Multiple consumers and producers. In particular, it is allowed for an
// item to be reenqueued while it is being processed.
// * Shutdown notifications.
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
workqueue 具体数据结构比较重要的几个字段如下,支持了 queue 的基础能力
- queue 数组,定义元素被处理的顺序
- processing,map 结构,当前正在被处理的元素集合
- dirty,map 结构,所有将要被处理的元素集合
// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
核心方法实现细节:
Add
标记一个元素需要被处理。先检查当前元素是否已在待处理集合中,已存在则直接返回。这也就实现了目标中的:“严格限制一个元素不会被并行多次处理”。元素会被先被加入 dirty,如果同样元素已经正在处理中,则直接返回(下面会讲到,等到该元素被处理完后,再加入 queue 数组)。否则将其加入 queue 数组中,标记处理顺序。
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
return
}
q.metrics.add(item)
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
Get
是一个阻塞性方法直到发现队列内有元素需要被处理。首先先从 queue 内取出一个元素,然后对 queue 做重新赋值。接下来将该元素插入 processing map 且移除 dirty map,标识该元素正在被处理。Get 方法拿到元素后,无论处理结果如何,一定要调用Done
方法,如果不调用Done
会导致 processing 集合越来越大可能引发内存泄漏。
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
Done
标记一个元素已经被处理完成。首先先将元素从 processing map 中移除,然后检查该元素是否在 dirty 集合中存在,如果存在则将元素重新 enqueue。这也就保证了一个元素不会同时被多个 consumer 处理,当一个元素被执行结束之后,再将重复的元素加入 queue 数组。
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
第二层:延迟队列 delaying queue
Delaying queue 在 workqueue 的基础上做了封装,支持延迟 enqueue,目的是让失败后重新 enqueue 操作不要过于频繁,避免无限循环。增加 AddAfter
方法,支持在指定时长之后将元素加入队列。
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
Delaying queue 在实现上增加了 waitingForAddCh
channel,buffer 长度 1000。新增的 AddAfter
方法接收 item, duration 两个参数,将两个元素组成 waitFor 结构投递到 waitingForAddCh 中。 Deleying queue 创建后会启动一个异步的 loop 函数,将 waitFor 元素完成等待时长后加入队列。
// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface
// clock tracks time for delayed firing
clock clock.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once
// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor
// metrics counts the number of retries
metrics retryMetrics
}
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
第三层:限速队列 rate limiting queue
限速队列是在 delaying queue 基础之上组合了 ratelimiter 构造的队列,顾名思义它可以控制元素的 enqueue 速率。在 delaying queue 的基础上增加了 AddRateLimited
、Forget
、NumRequeues
方法。
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
Ratelimiter 是限速队列的一个关键模块,默认的 ratelimiter 由两类 ratelimiter 共同组成,一个是 BucketRateLimiter 基于令牌桶实现,控制入队的 qps & burst(默认 qps 10 burst 100) 。另一个是 ItemExponentialFailureRateLimiter 指数回退限速器,用于计算元素下一次入队的延迟间隔。
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
AddRateLimited
方法会先让 ratelimiter 计算出该元素的延迟时长,然后调用 delaying queue 提供的 AddAfter
方法。ItemExponentialFailureRateLimiter 在 When 方法中会基于元素 enqueue 次数计算出延迟入队时长,计算方法是 baseDelay 值 * (2 ^ 重复 enqueue 次数)。所以 ratelimiter 结构体中有 failures map[interface{}]int
来记录重复入队次数。
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
当一个元素重复处理失败,多次 enqueue 后终于被处理成功了。一定要调用 Forget
方法,清除失败次数统计记录,不然下一次执行 AddRateLimited
会基于之前的记录计算 enqueue 延迟。
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
NumRequeues() 用于正在失败重试的元素个数。
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}