client-go indexer 内部原理及使用

当我们使用 client-go 去 listWatch 一个资源对象时,其中 informer 内部主要执行以下几个步骤:

  1. reflector 先 List 全量资源,获取到 resourceVersion 后,开始 watch resourceVersion 之后的事件
  2. 每个 watch 事件(包含事件类型、资源对象)被加入 DeltaFIFO 队列
  3. Informer 从 DeltaFIFO 取出 delta 事件开始处理,更新 indexer 内数据
  4. Informer 根据事件类型,触发用户定义的处理函数 Indexer 是 client-go informer 中负责存储数据的模块,是真正最占用内存的模块。本文着重于步骤三,分析 indexer 是如何存储数据以及检索数据的。

The working mechanism of an informer

术语对齐

Indexer 存储涉及多类 K/V,为了表达清晰,文章开始前先对一些特定名词做清晰定义:

  1. 资源对象 Key:一个资源对象默认的唯一 key (通常是 namespace/name)
  2. 索引函数:indexer 支持用户自定义索引计算函数 type IndexFunc func(obj interface{}) ([]string, error),满足用户按照自己的需求索引资源对象。
  3. 索引函数名:每一个索引函数都有一个唯一名字
  4. 索引值:索引函数会针对资源对象计算出一个或多个索引 key,本文内称作索引值

Indexer 内部实现

Indexer 在 Store interface(负责数据的增删改查)基础上增加了索引机制。用户可以自定义索引函数,然后按索引函数计算出的索引值来查询。从 interface 定义可以看出,自定义的 index 相关数据仅支持查询,不支持增删改。因为自定义 index 的相关增删改均是在 Store 相关操作后顺带执行。

type Indexer interface {
        // 基础的增删改查
        Store
        // 返回与 obj 在 indexName 索引函数下有交集的资源对象集合
        Index(indexName string, obj interface{}) ([]interface{}, error)
        // 返回 indexName 索引函数下,索引值包含 indexedValue 的资源对象 key
        IndexKeys(indexName, indexedValue string) ([]string, error)
        // 返回 indexName 索引函数下,所有的索引值。
        ListIndexFuncValues(indexName string) []string
        // 返回 indexName 索引函数下,索引值为 indexedValue 的所有资源对象
        ByIndex(indexName, indexedValue string) ([]interface{}, error)
        GetIndexers() Indexers
        AddIndexers(newIndexers Indexers) error
}

client-go library 中默认使用的 indexer interface 实现是 threadSafeMap。其数据结构十分简单:

  1. Lock 读写锁,为了实现线程安全(正如它的名字那样)
  2. Items 是一个 map,通过 K/V 来存储基础的资源对象。比如 test-namespace/test-pod => v1.Pod{xx}
  3. Index 主要实现索引相关的接口
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
        lock  sync.RWMutex
        items map[string]interface{}
        // index implements the indexing functionality
        index *storeIndex
}

下一步进一步分析 storeIndex,主要由两个字段组成:

  1. Indexers 是一个索引函数存储,Key 为索引函数名,value 为 func(obj interface{}) ([]string, error) 索引函数,索引函数用来计算出一个资源对象的索引值,一个索引函数可以对一个资源对象计算出多个索引值
  2. indices 是一个 map[string]Index, 其中 Index 是一个 map,value 为 map[string]sets.String 。外层 map 中的 key 用来存放索引函数名,内层 map 中 key 用来存放索引函数值,value 为资源对象 key。一句话来讲就是:indices 用来存储每个索引函数构建出的索引表。
// storeIndex implements the indexing functionality for Store interface
type storeIndex struct {
        // indexers maps a name to an IndexFunc
        indexers Indexers
        // indices maps a name to an Index
        indices Indices
}

总结下来,indexer 从存储结构上看有 3 个 map:

Indexer 使用

基础使用

添加 indexer,定义名为 byHostIP 的 indexerFunc ,将 pod.Status.HostIP 作为索引值。

// 1. 创建 SharedInformerFactory
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)

// 2. 获取 Pod Informer
podInformer := factory.Core().V1().Pods().Informer()

// 3. 添加自定义 IndexerFunc,索引 key 为 pod.Status.HostIP
indexName := "byHostName"
err = podInformer.AddIndexers(cache.Indexers{
    indexName: func(obj interface{}) ([]string, error) {
        pod, ok := obj.(*v1.Pod)
        if !ok {
            return []string{}, nil
        }
        if pod.Status.HostIP != "" {
            // 使用 HostIP 作为索引
            return []string{pod.Status.HostIP}, nil
        }
        return []string{}, nil
    },
})

当需要查看一个 node 上有哪些 pod 时,调用 indexer.ByIndex 传入 HostIP 可拿到 node 上的所有 pod。

// 使用索引获取 Pod
indexer := podInformer.GetIndexer()
pods, err := indexer.ByIndex(indexName, "172.18.0.2")
if err != nil {
    panic(err)
}
fmt.Printf("Pods on HostIP 172.18.0.2: %d\n", len(pods))

注意点

通过 indexer 获取的数据仅可用于只读,不可随意修改。修改 indexer 获取的数据是非线程安全的,且修改后不会自动重新构建索引,会破坏了索引的有效性。

总结

Indexer 作为 informer 缓存资源对象的模块,其实现并不复杂。自定义索引函数机制使得数据检索更加灵活与便捷。在合适的场景下构建索引函数可以提升开发效率、和检索效率。

© 2022 - 2025 · Stay foolish · Theme Simpleness Powered by Hugo ·