K8s-List-watch机制

BackGround

在 Kubernetes 中,有5个主要的组件,分别是 master 节点上的 kube-api-server、kube-controller-manager 和 kube-scheduler,node 节点上的 kubelet 和kube-proxy 。这其中 kube-apiserver 是对外和对内提供资源的声明式 API 的组件,其它4个组件都需要和它交互。为了保证消息的实时性,有两种方式:

  • 客户端组件 (kubelet, scheduler, controller-manager 等) 轮询 apiserver
  • apiserver 通知客户端

为了降低kube-apiserver的压力,就是通过list-watch机制。其实list-watch就是client监听K8s资源变化

list-watach 机制需要满足以下需求:

  1. 实时性 (即数据变化时,相关组件越快感知越好)
  2. 保证消息的顺序性 (即消息要按发生先后顺序送达目的组件。很难想象在Pod创建消息前收到该Pod删除消息时组件应该怎么处理)
  3. 保证消息不丢失或者有可靠的重新获取机制 (比如 kubelet 和 kube-apiserver 间网络闪断,需要保证网络恢复后kubelet可以收到网络闪断期间产生的消息)

list-watch机制

Etcd存储集群的数据信息,apiserver作为统一入口,任何对数据的操作都必须经过apiserver。客户端(kubelet/scheduler/controller-manager)通过list-watch监听apiserver中资源(pod/rs/rc等等)的create,updatedelete事件,并针对事件类型调用相应的事件处理函数

List:是k8s 资源的List API,基于HTTP短链接

Watch:调用资源的 watch API,基于HTTP长链接

Informer机制

K8s的Informer封装了list-watch API。编写事件处理函数,AddFunc, UpdateFunc 和 DeleteFunc 等。如下图所示,informer 首先通过 list API 罗列资源,然后调用 watch API 监听资源的变更事件,并将结果放入到一个 FIFO 队列,队列的另一头有协程从中取出事件,并调用对应的注册函数处理事件。Informer 还维护了一个只读的 Map Store 缓存,主要为了提升查询的效率,降低 apiserver 的负载

完整的Informer

Reflector 从 API Server 中通过 List&Watch 得到资源的状态变化,把数据塞到 Delta Fifo 队列里(Reflector 相当于生产者),由 Informer 进行消费。更新时在回调里可以获得新值和旧值,旧值从 Indexer(store) 中获取

  • FIFO :先入先出队列,拥有队列基本方法(ADD,UPDATE,DELETE,LIST,POP,CLOSE 等)
  • Delta : 存储对象的行为(变化)类型(Added,Updated,Deleted,Sync 等)

如果要对一个资源支持多种监听方式,需要使用到 SharedInformer(SharedIndexInformer)

  • 支持多个EventHandler . 可以认为是支持多个消费者,多个消费者之间共享 Indexer, Reflector 统一下发数据统一处理
  • 内置一个 Indexer(有一个叫做 threadSafeMap 的 struct 来实现 cache/thread_safe_store.go)

里面有个属性 sharedProcessor,用于协调和管理若干个处理器对象 processorListener(这是真正干活的对象)

  • run():阻塞运行
  • pop():好比不断从队列里取数据,完成对应的回调操作
  • addCh:一个 channel,外部向它插入数据

如果要对多个资源支持多种监听方式,需要使用到 SharedInformerFactory,里面有个属性 informers 包含多个 SharedIndexInformer 对象

Client-go

type DepHandler struct{}
func (this *DepHandler) OnAdd(obj interface{}) {}
func (this *DepHandler) OnUpdate(oldObj, newObj interface{}) {
if dep, ok := newObj.(*v1.Deployment); ok {
fmt.Println(dep.Name)
}
}
func (this *DepHandler) OnDelete(obj interface{}) {}

func main() {
_, c := cache.NewInformer(
// 监听 default 命名空间中 deployment 的变化
cache.NewListWatchFromClient(K8sClient.AppsV1().RESTClient(),
"deployments", "default", fields.Everything()),
&v1.Deployment{},
0, // 重新同步时间
&DepHandler{}, // 实现类
)
c.Run(wait.NeverStop)
select {}
}

SharedInformerFactory

// sharedInformerFactory 用来构造各种 Informer 的工厂对象,它可以共享多个 informer 资源
informerFactory := informers.NewSharedInformerFactory(K8sClient, 0)

// 构建一个 deployment informer
depInformer := informerFactory.Apps().V1().Deployments()
depInformer.Informer().AddEventHandler(&DepHandler{})

informerFactory.Start(wait.NeverStop)
select {}

Example

// 全局对象,存储所有deployments
var DepMapImpl *DeploymentMap

func init() {
DepMapImpl = &DeploymentMap{Data: new(sync.Map)}
}

type DeploymentMap struct {
Data *sync.Map // key:namespace value:[]*v1.Deployments
}

// 添加
func (this *DeploymentMap) Add(deployment *v1.Deployment) {
if depList, ok := this.Data.Load(deployment.Namespace); ok {
depList = append(depList.([]*v1.Deployment), deployment)
this.Data.Store(deployment.Namespace, depList)
} else {
this.Data.Store(deployment.Namespace, []*v1.Deployment{deployment})
}
}

// 获取列表
func (this *DeploymentMap) ListByNs(namespace string) ([]*v1.Deployment, error) {
if depList, ok := this.Data.Load(namespace); ok {
return depList.([]*v1.Deployment), nil
}

return nil, fmt.Errorf("record not found")
}

// 更新
func (this *DeploymentMap) Update(deployment *v1.Deployment) error {
if depList, ok := this.Data.Load(deployment.Namespace); ok {
depList := depList.([]*v1.Deployment)
for i, dep := range depList {
if dep.Name == deployment.Name {
depList[i] = deployment
break
}
}
return nil
}

return fmt.Errorf("deployment [%s] not found", deployment.Name)
}

// 删除
func (this *DeploymentMap) Delete(deployment *v1.Deployment) {
if depList, ok := this.Data.Load(deployment.Namespace); ok {
depList := depList.([]*v1.Deployment)
for i, dep := range depList {
if dep.Name == deployment.Name {
newDepList := append(depList[:i], depList[i+1:]...)
this.Data.Store(deployment.Namespace, newDepList)
break
}
}
}
}

// informer实现
type DepHandler struct{}
func (this *DepHandler) OnAdd(obj interface{}) {
DepMapImpl.Add(obj.(*v1.Deployment))
}
func (this *DepHandler) OnUpdate(oldObj, newObj interface{}) {
err := DepMapImpl.Update(newObj.(*v1.Deployment))
if err != nil {
log.Println(err)
}
}
func (this *DepHandler) OnDelete(obj interface{}) {
DepMapImpl.Delete(obj.(*v1.Deployment))
}

// 执行监听
func InitDeployments() {
informerFactory := informers.NewSharedInformerFactory(K8sClient, 0)

depInformer := informerFactory.Apps().V1().Deployments()
depInformer.Informer().AddEventHandler(&DepHandler{})

informerFactory.Start(wait.NeverStop)
}