client-go indexer 内部原理及使用
当我们使用 client-go 去 listWatch 一个资源对象时,其中 informer 内部主要执行以下几个步骤:
- reflector 先 List 全量资源,获取到 resourceVersion 后,开始 watch resourceVersion 之后的事件
- 每个 watch 事件(包含事件类型、资源对象)被加入 DeltaFIFO 队列
- Informer 从 DeltaFIFO 取出 delta 事件开始处理,更新 indexer 内数据
- Informer 根据事件类型,触发用户定义的处理函数 Indexer 是 client-go informer 中负责存储数据的模块,是真正最占用内存的模块。本文着重于步骤三,分析 indexer 是如何存储数据以及检索数据的。
术语对齐
Indexer 存储涉及多类 K/V,为了表达清晰,文章开始前先对一些特定名词做清晰定义:
- 资源对象 Key:一个资源对象默认的唯一 key (通常是 namespace/name)
- 索引函数:indexer 支持用户自定义索引计算函数
type IndexFunc func(obj interface{}) ([]string, error)
,满足用户按照自己的需求索引资源对象。 - 索引函数名:每一个索引函数都有一个唯一名字
- 索引值:索引函数会针对资源对象计算出一个或多个索引 key,本文内称作索引值
Indexer 内部实现
Indexer 在 Store interface(负责数据的增删改查)基础上增加了索引机制。用户可以自定义索引函数,然后按索引函数计算出的索引值来查询。从 interface 定义可以看出,自定义的 index 相关数据仅支持查询,不支持增删改。因为自定义 index 的相关增删改均是在 Store 相关操作后顺带执行。
type Indexer interface {
// 基础的增删改查
Store
// 返回与 obj 在 indexName 索引函数下有交集的资源对象集合
Index(indexName string, obj interface{}) ([]interface{}, error)
// 返回 indexName 索引函数下,索引值包含 indexedValue 的资源对象 key
IndexKeys(indexName, indexedValue string) ([]string, error)
// 返回 indexName 索引函数下,所有的索引值。
ListIndexFuncValues(indexName string) []string
// 返回 indexName 索引函数下,索引值为 indexedValue 的所有资源对象
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
client-go library 中默认使用的 indexer interface 实现是 threadSafeMap。其数据结构十分简单:
- Lock 读写锁,为了实现线程安全(正如它的名字那样)
- Items 是一个 map,通过 K/V 来存储基础的资源对象。比如 test-namespace/test-pod => v1.Pod{xx}
- Index 主要实现索引相关的接口
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// index implements the indexing functionality
index *storeIndex
}
下一步进一步分析 storeIndex,主要由两个字段组成:
- Indexers 是一个索引函数存储,Key 为索引函数名,value 为 func(obj interface{}) ([]string, error) 索引函数,索引函数用来计算出一个资源对象的索引值,一个索引函数可以对一个资源对象计算出多个索引值
- indices 是一个 map[string]Index, 其中 Index 是一个 map,value 为 map[string]sets.String 。外层 map 中的 key 用来存放索引函数名,内层 map 中 key 用来存放索引函数值,value 为资源对象 key。一句话来讲就是:indices 用来存储每个索引函数构建出的索引表。
// storeIndex implements the indexing functionality for Store interface
type storeIndex struct {
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
总结下来,indexer 从存储结构上看有 3 个 map:
- items 存储真正的资源对象,承载最基础的 KV 检索能力
- storeIndex.indexers 存储索引函数名到索引函数的映射
- indexers.indices 是一个嵌套 map,外层 map 为索引函数名,内层 map 为该索引函数计算出的索引值到资源对象 key 的映射,是一个二级检索
Indexer 使用
基础使用
添加 indexer,定义名为 byHostIP 的 indexerFunc ,将 pod.Status.HostIP 作为索引值。
// 1. 创建 SharedInformerFactory
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
// 2. 获取 Pod Informer
podInformer := factory.Core().V1().Pods().Informer()
// 3. 添加自定义 IndexerFunc,索引 key 为 pod.Status.HostIP
indexName := "byHostName"
err = podInformer.AddIndexers(cache.Indexers{
indexName: func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if pod.Status.HostIP != "" {
// 使用 HostIP 作为索引
return []string{pod.Status.HostIP}, nil
}
return []string{}, nil
},
})
当需要查看一个 node 上有哪些 pod 时,调用 indexer.ByIndex 传入 HostIP 可拿到 node 上的所有 pod。
// 使用索引获取 Pod
indexer := podInformer.GetIndexer()
pods, err := indexer.ByIndex(indexName, "172.18.0.2")
if err != nil {
panic(err)
}
fmt.Printf("Pods on HostIP 172.18.0.2: %d\n", len(pods))
注意点
通过 indexer 获取的数据仅可用于只读,不可随意修改。修改 indexer 获取的数据是非线程安全的,且修改后不会自动重新构建索引,会破坏了索引的有效性。
总结
Indexer 作为 informer 缓存资源对象的模块,其实现并不复杂。自定义索引函数机制使得数据检索更加灵活与便捷。在合适的场景下构建索引函数可以提升开发效率、和检索效率。