K8s controller workqueue 源码学习

在写 K8s controller 的过程中,我们一定会用到 queue,这也是 K8s controller 推荐的范式。本文从源码剖析 K8s client-go 中 workqueue 的底层实现。

最底层:满足特定需求的基础 workqueue

k8s.io/client-go/util/workqueue 最基础的 queue 具备以下能力:

  1. FIFO,元素被处理的顺序与其加入队列的顺序一致
  2. 严格限制一个元素不会被并行多次处理。如果一个元素在被处理前多次 enqueue,也会只被处理一次
  3. 支持多生产者和消费者。即便一个元素正在被处理,也支持将其重新加入队列
  4. 队列关闭通知。
// Package workqueue provides a simple queue that supports the following
// features:
//  * Fair: items processed in the order in which they are added.
//  * Stingy: a single item will not be processed multiple times concurrently,
//      and if an item is added multiple times before it can be processed, it
//      will only be processed once.
//  * Multiple consumers and producers. In particular, it is allowed for an
//      item to be reenqueued while it is being processed.
//  * Shutdown notifications.
type Interface interface {
        Add(item interface{})
        Len() int
        Get() (item interface{}, shutdown bool)
        Done(item interface{})
        ShutDown()
        ShuttingDown() bool
}

workqueue 具体数据结构比较重要的几个字段如下,支持了 queue 的基础能力

  1. queue 数组,定义元素被处理的顺序
  2. processing,map 结构,当前正在被处理的元素集合
  3. dirty,map 结构,所有将要被处理的元素集合
// Type is a work queue (see the package comment).
type Type struct {
        // queue defines the order in which we will work on items. Every
        // element of queue should be in the dirty set and not in the
        // processing set.
        queue []t
        // dirty defines all of the items that need to be processed.
        dirty set
        // Things that are currently being processed are in the processing set.
        // These things may be simultaneously in the dirty set. When we finish
        // processing something and remove it from this set, we'll check if
        // it's in the dirty set, and if so, add it to the queue.
        processing set
        cond *sync.Cond
        shuttingDown bool
        metrics queueMetrics
        unfinishedWorkUpdatePeriod time.Duration
        clock                      clock.Clock
}

核心方法实现细节:

  1. Add 标记一个元素需要被处理。先检查当前元素是否已在待处理集合中,已存在则直接返回。这也就实现了目标中的:“严格限制一个元素不会被并行多次处理”。元素会被先被加入 dirty,如果同样元素已经正在处理中,则直接返回(下面会讲到,等到该元素被处理完后,再加入 queue 数组)。否则将其加入 queue 数组中,标记处理顺序。
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
        if q.shuttingDown {
                return
        }
        if q.dirty.has(item) {
                return
        }

        q.metrics.add(item)

        q.dirty.insert(item)
        if q.processing.has(item) {
                return
        }

        q.queue = append(q.queue, item)
        q.cond.Signal()
}
  1. Get 是一个阻塞性方法直到发现队列内有元素需要被处理。首先先从 queue 内取出一个元素,然后对 queue 做重新赋值。接下来将该元素插入 processing map 且移除 dirty map,标识该元素正在被处理。Get 方法拿到元素后,无论处理结果如何,一定要调用 Done 方法,如果不调用 Done 会导致 processing 集合越来越大可能引发内存泄漏。
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
        for len(q.queue) == 0 && !q.shuttingDown {
                q.cond.Wait()
        }
        if len(q.queue) == 0 {
                // We must be shutting down.
                return nil, true
        }

        item, q.queue = q.queue[0], q.queue[1:]

        q.metrics.get(item)

        q.processing.insert(item)
        q.dirty.delete(item)

        return item, false
}
  1. Done 标记一个元素已经被处理完成。首先先将元素从 processing map 中移除,然后检查该元素是否在 dirty 集合中存在,如果存在则将元素重新 enqueue。这也就保证了一个元素不会同时被多个 consumer 处理,当一个元素被执行结束之后,再将重复的元素加入 queue 数组。
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()

        q.metrics.done(item)

        q.processing.delete(item)
        if q.dirty.has(item) {
                q.queue = append(q.queue, item)
                q.cond.Signal()
        }
}

第二层:延迟队列 delaying queue

Delaying queue 在 workqueue 的基础上做了封装,支持延迟 enqueue,目的是让失败后重新 enqueue 操作不要过于频繁,避免无限循环。增加 AddAfter 方法,支持在指定时长之后将元素加入队列。

// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
        Interface
        // AddAfter adds an item to the workqueue after the indicated duration has passed
        AddAfter(item interface{}, duration time.Duration)
}

Delaying queue 在实现上增加了 waitingForAddCh channel,buffer 长度 1000。新增的 AddAfter 方法接收 item, duration 两个参数,将两个元素组成 waitFor 结构投递到 waitingForAddCh 中。 Deleying queue 创建后会启动一个异步的 loop 函数,将 waitFor 元素完成等待时长后加入队列。

// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
        Interface

        // clock tracks time for delayed firing
        clock clock.Clock

        // stopCh lets us signal a shutdown to the waiting loop
        stopCh chan struct{}
        // stopOnce guarantees we only signal shutdown a single time
        stopOnce sync.Once

        // heartbeat ensures we wait no more than maxWait before firing
        heartbeat clock.Ticker

        // waitingForAddCh is a buffered channel that feeds waitingForAdd
        waitingForAddCh chan *waitFor

        // metrics counts the number of retries
        metrics retryMetrics
}
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
        // don't add if we're already shutting down
        if q.ShuttingDown() {
                return
        }

        q.metrics.retry()

        // immediately add things with no delay
        if duration <= 0 {
                q.Add(item)
                return
        }

        select {
        case <-q.stopCh:
                // unblock if ShutDown() is called
        case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
        }
}

第三层:限速队列 rate limiting queue

限速队列是在 delaying queue 基础之上组合了 ratelimiter 构造的队列,顾名思义它可以控制元素的 enqueue 速率。在 delaying queue 的基础上增加了 AddRateLimitedForgetNumRequeues 方法。

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
        DelayingInterface

        // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
        AddRateLimited(item interface{})

        // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
        // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
        // still have to call `Done` on the queue.
        Forget(item interface{})

        // NumRequeues returns back how many times the item was requeued
        NumRequeues(item interface{}) int
}

Ratelimiter 是限速队列的一个关键模块,默认的 ratelimiter 由两类 ratelimiter 共同组成,一个是 BucketRateLimiter 基于令牌桶实现,控制入队的 qps & burst(默认 qps 10 burst 100) 。另一个是 ItemExponentialFailureRateLimiter 指数回退限速器,用于计算元素下一次入队的延迟间隔。

func DefaultControllerRateLimiter() RateLimiter {
        return NewMaxOfRateLimiter(
                NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
                // 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
                &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
        )
}

AddRateLimited 方法会先让 ratelimiter 计算出该元素的延迟时长,然后调用 delaying queue 提供的 AddAfter 方法。ItemExponentialFailureRateLimiter 在 When 方法中会基于元素 enqueue 次数计算出延迟入队时长,计算方法是 baseDelay 值 * (2 ^ 重复 enqueue 次数)。所以 ratelimiter 结构体中有 failures map[interface{}]int 来记录重复入队次数。

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
        q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
        r.failuresLock.Lock()
        defer r.failuresLock.Unlock()

        exp := r.failures[item]
        r.failures[item] = r.failures[item] + 1

        // The backoff is capped such that 'calculated' value never overflows.
        backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
        if backoff > math.MaxInt64 {
                return r.maxDelay
        }

        calculated := time.Duration(backoff)
        if calculated > r.maxDelay {
                return r.maxDelay
        }

        return calculated
}

当一个元素重复处理失败,多次 enqueue 后终于被处理成功了。一定要调用 Forget 方法,清除失败次数统计记录,不然下一次执行 AddRateLimited 会基于之前的记录计算 enqueue 延迟。

func (q *rateLimitingType) Forget(item interface{}) {
        q.rateLimiter.Forget(item)
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
        r.failuresLock.Lock()
        defer r.failuresLock.Unlock()

        delete(r.failures, item)
}
NumRequeues() 用于正在失败重试的元素个数。
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
        r.failuresLock.Lock()
        defer r.failuresLock.Unlock()

        return r.failures[item]
}
© 2022 - 2024 · Stay foolish · Theme Simpleness Powered by Hugo ·