




package main

import (

v1 "k8s.io/api/apps/v1"

func main() {
var err error
var config *rest.Config

var kubeconfig *string

if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径")
} else {
kubeconfig = flag.String("kubeconfig", "", "kubeconfig 绝对路径")
// 初始化 rest.Config 对象
if config, err = rest.InClusterConfig(); err != nil {
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
// 创建 Clientset 对象
clientset, err := kubernetes.NewForConfig(config)
if err != nil {

// 初始化 informer factory(为了测试方便这里设置每30s重新 List 一次)
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
// 对 Deployment 监听
deployInformer := informerFactory.Apps().V1().Deployments()
// 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源)
informer := deployInformer.Informer()
// 创建 Lister
deployLister := deployInformer.Lister()
// 注册事件处理程序
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,

stopper := make(chan struct{})
defer close(stopper)

// 启动 informer,List & Watch
// 等待所有启动的 Informer 的缓存被同步

// 从本地缓存中获取 default 中的所有 deployment 列表
deployments, err := deployLister.Deployments("default").List(labels.Everything())
if err != nil {
for idx, deploy := range deployments {
fmt.Printf("%d -> %s\n", idx+1, deploy.Name)

func onAdd(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Println("add a deployment:", deploy.Name)

func onUpdate(old, new interface{}) {
oldDeploy := old.(*v1.Deployment)
newDeploy := new.(*v1.Deployment)
fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name)

func onDelete(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Println("delete a deployment:", deploy.Name)
  • 首先通过kubernetes.NewForConfig func创建clientSet对象,Informer需要通过ClientSet与kube-apiserver进行交互

  • Informer是一个持久运行的协程





  • Delta:资源对象的存储,保存资源的操作类型:Added、Updated、Deleted事件
  • FIFO:先进先出的队列,有队列的基础方法



  • 周期性处理DeltaFIFO中的数据,驱动数据向下链路分发



  • 在客户端内存中维护了K8s资源对象的缓存,减少对kube-apiserver的请求次数
  • 允许K8s资源定义索引,实现基于索引的快速检索



  • process在分发资源对象的时候,会特别识别Sync事件,并把这个事件分发给等待同步事件的监听器



  • 使用ResourceEventHandler定义感知资源对象的Added、Updated、Deleted
  • 根据收到的processor的事件,做OnAdd、OnUpdate、OnDelete



  • K8s许多控制器在接受到对应的资源对象更变,会使用workqueue把对应的资源对象key存入workqueue,并且启动独立的goroutime获取队列中的数据



  • Informer启动的时候,使用NewReflector func实例化Reflector对象,实例化需要传入ListWatcher对象,拥有List和Watch func


ListAndWatch使用list func在首次启动或有wait.BackoffUnitl重新拉起获取资源下所有的对象的数据并其存储至DeltaFIFO


  1. pager.New func构造出查询列表的分页数据对象ListPager
    • ListPager是Client-go中的工具类,采用分页的形式,使用用户自定义的页距查询函数(默认查询分页大小500)
  2. pager.List用于获取资源的所有对象数据
    • 优先采用多批次的分页数据
    • 如果分页查询失败,采用降级全量查询,通过options的ResourceVersion字段控制(为0代表获取所有Pod)
    • pager.List有点类似于端点续传,因为网络原因端开依旧能实现继续传输
  3. listMetaInterface.GetResourceVersion用于获取ResourceVersion
    • ResourceVersion是标识符,kube-apiserver每次更新都会更新ResourceVersion
  4. meta.ExtracList用于把资源数据转换为资源对象列表,将runntime.Object对象转换为[]runntime.Object列表
  5. r.syncWith用于把资源对象列表中的资源对象和Resource Version存储到DeltaFIFO,并且替换已经存在的资源对象
  6. r.setLastSyncResourceVersion用于设置最新的ResourceVersion
// list simply lists all items and records a resource version obtained from the server at the moment of the call.
// the resource version can be used for further progress notification (aka. watch).
func (r *Reflector) list(stopCh <-chan struct{}) error {
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
case options.ResourceVersion != "" && options.ResourceVersion != "0":
// User didn't explicitly request pagination.
// With ResourceVersion != "", we have a possibility to list from watch cache,
// but we do that (for ResourceVersion != "0") only if Limit is unset.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With the existing semantic of RV (result is at least as fresh as provided RV),
// this is correct and doesn't lead to going back in time.
// We also don't turn off pagination for ResourceVersion="0", since watch cache
// is ignoring Limit in that case anyway, and if watch cache is not enabled
// we don't introduce regression.
pager.PageSize = 0

list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
// Retry immediately if the resource version used to list is unavailable.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, the full list might fail because the
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
select {
case <-stopCh:
return nil
case r := <-panicCh:
case <-listCh:
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
if err != nil {
klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err)
return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err)

// We check if the list was paginated and if so set the paginatedResult based on that.
// However, we want to do that only for the initial list (which is the only case
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
// situations we may force listing directly from etcd (by setting ResourceVersion="")
// which will return paginated result, even if watch cache is enabled. However, in
// that case, we still want to prefer sending requests to watch cache if possible.
// Paginated result returned for request with ResourceVersion="0" mean that watch
// cache is disabled and there are a lot of objects of a given type. In such case,
// there is no need to prefer listing from watch cache.
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true

r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
initTrace.Step("SyncWith done")
initTrace.Step("Resource version updated")
return nil


Watch操作会与kube-apiserver简历长链接(支持HTTP/1.1 chunked、WebSocket、HTTP/2(优先选择),接受kube-apiserver发送过来的资源变更事件

  • 当使用HTTP/2 Watch操作使用Frame进行传输,类似很多RPC协议

  • Frame使用二进制编码,通过帧头固定位置的字节描述Body长度就可以读取Body,知道Flag遇到END_STREAM

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/cache/reflector.go#L315

    options := metav1.ListOptions{
    ResourceVersion: r.LastSyncResourceVersion(),
    // We want to avoid situations of hanging watchers. Stop any watchers that do not
    // receive any events within the timeout window.
    TimeoutSeconds: &timeoutSeconds,
    // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
    // Reflector doesn't assume bookmarks are returned at all (if the server do not support
    // watch bookmarks, it will ignore this field).
    AllowWatchBookmarks: true,

    // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
    start := r.clock.Now()
    w, err := r.listerWatcher.Watch(options)
    if err != nil {
    // If this is "connection refused" error, it means that most likely apiserver is not responsive.
    // It doesn't make sense to re-list all objects because most likely we will be able to restart
    // watch where we ended.
    // If that's the case begin exponentially backing off and resend watch request.
    // Do the same for "429" errors.
    if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
    return err

    err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
    if err != nil {
    if err != errorStopRequested {
    switch {
    case isExpiredError(err):
    // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
    // has a semantic that it returns data at least as fresh as provided RV.
    // So first try to LIST with setting RV to resource version of last observed object.
    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
    case apierrors.IsTooManyRequests(err):
    klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
    case apierrors.IsInternalError(err) && retry.ShouldRetry():
    klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err)
    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
    return nil

WatchHandler func处理资源对象的变更事件,触发Addded、Updated、Deleted事件,把对应的资源对象更新到本地缓存DeltaFIFO中并且更新ResourceVersion

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/cache/reflector.go#L473

    // watchHandler watches w and sets setLastSyncResourceVersion
    func watchHandler(start time.Time,
    w watch.Interface,
    store Store,
    expectedType reflect.Type,
    expectedGVK *schema.GroupVersionKind,
    name string,
    expectedTypeName string,
    setLastSyncResourceVersion func(string),
    clock clock.Clock,
    errc chan error,
    stopCh <-chan struct{},
    ) error {
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()

    for {
    select {
    case <-stopCh:
    return errorStopRequested
    case err := <-errc:
    return err
    case event, ok := <-w.ResultChan():
    if !ok {
    break loop
    if event.Type == watch.Error {
    return apierrors.FromObject(event.Object)
    if expectedType != nil {
    if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
    if expectedGVK != nil {
    if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
    meta, err := meta.Accessor(event.Object)
    if err != nil {
    utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
    resourceVersion := meta.GetResourceVersion()
    switch event.Type {
    case watch.Added:
    err := store.Add(event.Object)
    if err != nil {
    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
    case watch.Modified:
    err := store.Update(event.Object)
    if err != nil {
    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
    case watch.Deleted:
    // TODO: Will any consumers need access to the "last known
    // state", which is passed in event.Object? If so, may need
    // to change this.
    err := store.Delete(event.Object)
    if err != nil {
    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
    case watch.Bookmark:
    // A `Bookmark` means watch has synced here, just update the resourceVersion
    utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
    if rvu, ok := store.(ResourceVersionUpdater); ok {

    watchDuration := clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
    return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
    klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
    return nil


ListAndWatch func启动了一个独立的groutime,用于定时把底层中的资源对象同步到DeltaFIFO中,同步的时间周期为创建Reflector时指定的resyncPeroid

go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
for {
select {
case <-resyncCh:
case <-stopCh:
case <-cancelCh:
// 判断当前是否需要同步资源对象
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
resyncCh, cleanup = r.resyncChan()
  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/cache/delta_fifo.go#L719

    func (f *DeltaFIFO) syncKeyLocked(key string) error {
    // f.knownObjects是Indexer底层存储对象
    obj, exists, err := f.knownObjects.GetByKey(key)
    if err != nil {
    klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
    return nil
    } else if !exists {
    klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
    return nil

    // If we are doing Resync() and there is already an event queued for that object,
    // we ignore the Resync for it. This is to avoid the race, in which the resync
    // comes with the previous value of object (since queueing an event for the object
    // doesn't trigger changing the underlying store <knownObjects>.
    id, err := f.KeyOf(obj)
    if err != nil {
    return KeyError{obj, err}
    if len(f.items[id]) > 0 {
    return nil
    // 把资源对象的同步操作写入DeltaFIFO
    if err := f.queueActionLocked(Sync, obj); err != nil {
    return fmt.Errorf("couldn't queue object: %v", err)
    return nil


  • DeltaFIFO队列和普通队列不一样的,不是存储资源对象本身,而是存储资源对象相关的操作记录

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/cache/delta_fifo.go#L101

    // DeltaFIFO is like FIFO, but differs in two ways. One is that the
    // accumulator associated with a given object's key is not that object
    // but rather a Deltas, which is a slice of Delta values for that
    // object. Applying an object to a Deltas means to append a Delta
    // except when the potentially appended Delta is a Deleted and the
    // Deltas already ends with a Deleted. In that case the Deltas does
    // not grow, although the terminal Deleted will be replaced by the new
    // Deleted if the older Deleted's object is a
    // DeletedFinalStateUnknown.
    // The other difference is that DeltaFIFO has two additional ways that
    // an object can be applied to an accumulator: Replaced and Sync.
    // If EmitDeltaTypeReplaced is not set to true, Sync will be used in
    // replace events for backwards compatibility. Sync is used for periodic
    // resync events.
    // DeltaFIFO is a producer-consumer queue, where a Reflector is
    // intended to be the producer, and the consumer is whatever calls
    // the Pop() method.
    // DeltaFIFO solves this use case:
    // - You want to process every object change (delta) at most once.
    // - When you process an object, you want to see everything
    // that's happened to it since you last processed it.
    // - You want to process the deletion of some of the objects.
    // - You might want to periodically reprocess objects.
    // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
    // interface{} to satisfy the Store/Queue interfaces, but they
    // will always return an object of type Deltas. List() returns
    // the newest object from each accumulator in the FIFO.
    // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
    // to list Store keys and to get objects by Store key. The objects in
    // question are called "known objects" and this set of objects
    // modifies the behavior of the Delete, Replace, and Resync methods
    // (each in a different way).
    // A note on threading: If you call Pop() in parallel from multiple
    // threads, you could end up with multiple threads processing slightly
    // different versions of the same object.
    type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    lock sync.RWMutex
    cond sync.Cond

    // `items` maps a key to a Deltas.
    // Each such Deltas has at least one Delta.
    items map[string]Deltas

    // `queue` maintains FIFO order of keys for consumption in Pop().
    // There are no duplicates in `queue`.
    // A key is in `queue` if and only if it is in `items`.
    queue []string

    // populated is true if the first batch of items inserted by Replace() has been populated
    // or Delete/Add/Update/AddIfNotPresent was called first.
    populated bool
    // initialPopulationCount is the number of items inserted by the first call of Replace()
    initialPopulationCount int

    // keyFunc is used to make the key used for queued item
    // insertion and retrieval, and should be deterministic.
    keyFunc KeyFunc

    // knownObjects list keys that are "known" --- affecting Delete(),
    // Replace(), and Resync()
    knownObjects KeyListerGetter

    // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
    // Currently, not used to gate any of CRUD operations.
    closed bool

    // emitDeltaTypeReplaced is whether to emit the Replaced or Sync
    // DeltaType when Replace() is called (to preserve backwards compat).
    emitDeltaTypeReplaced bool

    // Called with every object if non-nil.
    transformer TransformFunc

  • quque字段存储资源对象的key,key通过KeyOf func计算得到

  • items字段通过map存储,value是操作记录的切片


// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 1.计算出计算资源key,默认为cache,MetaNamespaceKeyFunc
// 命名空间对象使用:<namespcae>/<name>作为key
// 全局对象使用<name>作为key
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}

// Every object comes through this code path once, so this is a good
// place to call the transform func. If obj is a
// DeletedFinalStateUnknown tombstone, then the containted inner object
// will already have gone through the transformer, but we document that
// this can happen. In cases involving Replace(), such an object can
// come through multiple times.
// 构造Delta记录,添加到items中以对象key作为主键添加到切片末尾
// 如果最后的俩个类型为Deleted 会进行压缩合并,只需要一条有效操作记录
if f.transformer != nil {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err

oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
// 更新构造之后的Delta操作记录并通过f.cond.Broadcast func唤醒所有等待数据而阻塞的消费者
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
f.items[id] = newDeltas
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
return nil



// Pop blocks until the queue has some items, and then returns one. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe to update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
// process should avoid expensive I/O operation so that other queue operations, i.e.
// Add() and Get(), won't be blocked for too long.
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
defer f.lock.Unlock()
for {
// 1. 如果队列为空,通过.cond.wait等待,只有收到cond.Broadcast才说明有数据添加,接触当前状态
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed

id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
delete(f.items, id)
// Only log traces if the queue depth is greater than 10 and it takes more than
// 100 milliseconds to process one item from the queue.
// Queue depth never goes high because processing an item is locking the queue,
// and new items can't be added until processing finish.
// https://github.com/kubernetes/kubernetes/issues/103789
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
err := process(item)
// 如果错误,使用 f.addIfNotPresent 重新存入队列
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err


  • Ref:https://github.com/kubernetes/client-go/blob/ae04abdbe91989de6c3c08ca27f5880ca07b9f51/tools/cache/controller.go#L541

    // Multiplexes updates in the form of a list of Deltas into a Store, and informs
    // a given handler of events OnUpdate, OnAdd, OnDelete
    func processDeltas(
    // Object which receives event notifications from the given deltas
    handler ResourceEventHandler,
    clientState Store,
    deltas Deltas,
    isInInitialList bool,
    ) error {
    // from oldest to newest
    for _, d := range deltas {
    obj := d.Object

    switch d.Type {
    case Sync, Replaced, Added, Updated:
    // 判断资源是否存在
    // 如果存在,就把资源添加到Indexer
    // 不存在,就从Indexer删除
    if old, exists, err := clientState.Get(obj); err == nil && exists {
    if err := clientState.Update(obj); err != nil {
    return err
    handler.OnUpdate(old, obj)
    } else {
    if err := clientState.Add(obj); err != nil {
    return err
    handler.OnAdd(obj, isInInitialList)
    case Deleted:
    // 如果是删除,之间删除对象从Indexder
    if err := clientState.Delete(obj); err != nil {
    return err
    return nil





  • 拥有Add、Updtae、Delete、List、Get、Replace、Resync等操作,还有索引函数(Indexers)和索引表(Indices)
  • items是存储的资源对象,其中items的key通过keyFunc 计算得到,默认使用MetaNamespaceKeyFunc实现,计算得出格式为/格式
  • indexers是一个map数据结构,其中,key为用于检索资源对象的索引名称,value为该索引对应的索引函数
  • indices是一个俩层的map数据结构,里面保存了每个索引经过索引函数得到的索引值,以及该索引值匹配的所有资源对象主键(objkey)列表
  • lock读写锁控制,并发的安全访问



  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/cache/index.go#L35

    // Indexer extends Store with multiple indices and restricts each
    // accumulator to simply hold the current object (and be empty after
    // Delete).
    // There are three kinds of strings here:
    // 1. a storage key, as defined in the Store interface,
    // 2. a name of an index, and
    // 3. an "indexed value", which is produced by an IndexFunc and
    // can be a field value or any other string computed from the object.
    type Indexer interface {
    // Index returns the stored objects whose set of indexed values
    // intersects the set of indexed values of the given object, for
    // the named index
    // 根据指定的索引名称和示例对象,返回索引名称对象的索引值的所有资源对象列表
    Index(indexName string, obj interface{}) ([]interface{}, error)
    // IndexKeys returns the storage keys of the stored objects whose
    // set of indexed values for the named index includes the given
    // indexed value
    // 给定索引名称和索引值,查询满足条件的所有资源对象主键列表
    IndexKeys(indexName, indexedValue string) ([]string, error)
    // ListIndexFuncValues returns all the indexed values of the given index
    // 根据指定索引名称返回所有的索引值和名称
    ListIndexFuncValues(indexName string) []string
    // ByIndex returns the stored objects whose set of indexed values
    // for the named index includes the given indexed value
    // 给定索引名称如索引值,查询满足条件的资源对象列表
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    // GetIndexers return the indexers
    // 返回所有索引对象
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store. If you call this after you already have data
    // in the store, the results are undefined.
    // 在填充ThreadSafeMap中的缓存资源对象前添加更多的索引对象
    AddIndexers(newIndexers Indexers) error


package main

import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

func UserIndexFunc(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
userString := pod.Annotations["users"]
return strings.Split(userString, ","), nil
func main() {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"byUser": UserIndexFunc})
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "user1"}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "user2"}}}
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "three", Annotations: map[string]string{"users": "user2"}}}

pods, err := indexer.ByIndex("byUser", "user1")
if err != nil {
for _, pod := range pods {



  • processor是资源对象操作记录的处理器,分发到不同的监听器中


  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/cache/shared_informer.go#L617

    // sharedProcessor has a collection of processorListener and can
    // distribute a notification object to its listeners. There are two
    // kinds of distribute operations. The sync distributions go to a
    // subset of the listeners that (a) is recomputed in the occasional
    // calls to shouldResync and (b) every listener is initially put in.
    // The non-sync distributions go to every listener.
    type sharedProcessor struct {
    listenersStarted bool
    listenersLock sync.RWMutex
    listeners []*processorListener
    syncingListeners []*processorListener
    clock clock.Clock
    wg wait.Group
    • listeners为注册到processor的监听器

    • syncingListeners为所有进入同步周期的监听器

    • 监听器默认会被同事添加到这俩个监听器列表中

    • 每一个listener监听器都说一个processorListener对象

    • Informer机制使用方可以使用AddEventHandler func或AddEventHandlerWithResyncPeriod func把新的监听器注册到processor中

      • AddEventHandler 在添加监听器会使用Refector的同步周期作为监听器的同步周期
      • AddEventHandlerWithResyncPeriod func作为监听器专属的同步周期
    • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/cache/shared_informer.go#L514

      func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
      defer s.startedLock.Unlock()

      if s.stopped {
      klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)

      if resyncPeriod > 0 {
      if resyncPeriod < minimumResyncPeriod {
      klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
      resyncPeriod = minimumResyncPeriod

      if resyncPeriod < s.resyncCheckPeriod {
      if s.started {
      klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
      resyncPeriod = s.resyncCheckPeriod
      } else {
      // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
      // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
      // accordingly
      s.resyncCheckPeriod = resyncPeriod

      listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

      if !s.started {

      // in order to safely join, we have to
      // 1. stop sending add/update/delete notifications
      // 2. do a list against the store
      // 3. send synthetic "Add" events to the new handler
      // 4. unblock
      defer s.blockDeltas.Unlock()

      for _, item := range s.indexer.List() {
      listener.add(addNotification{newObj: item})


processorListener用于处理资源对象变更数据的监听器,接收匆匆HandleDeltas func从DeltaFIFO中获取的资源对象操作记录,交给ResourceHandler进行处理,可以处理以下的事件:

  • addNotification:新增资源对象通知消息,其中包含完整的新添加的资源对象newObj
  • updateNotification:更新资源对象通知消息,其中包含完整的添加的资源对象newObj和旧资源对象oldObj
  • deleteNotification:删除资源对象通知消息,其中包含完整的旧资源对象oldObj


  • processorListener会在一个独立的gouroutine执行pop,同时监听输入addCh和输出nextCh
    • 把输入写到环形缓存队列中,pendingNotifications
    • 读取环形缓存对象,写入输出nextCh
  • processListener在另外一个gouroutine执行tun,不断轮训nextCh中的事件,根据时间调用ResourceEventHandler对应的事件



  • 有序:按照添加顺序处理元素
  • 去重:相同元素在同一时间不会被重复处理
  • 并发:支持多个生产者和消费者
  • 标记机制:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队
  • 通知策略:ShutDown 方法通过信号量通知队列不再接收新元素,并且通知metric goroutine退出
  • 限速:支持限速策略,在元素存入队列的时进行速率限制。限制一个元素重新排队的次数
  • Metric:支持metric指标,可用于Prometheus健康
  • 支持三种队列
    • Interface:FIFO通用用队列列接口,先进先出,并且支持去重机制
    • DelayingInterface:延迟队列接口,基于Interface,延迟一段时间插入队列
    • RaceLimiting Interface:限速队列接口,基于Delaying Interface,支持插入队列的时候限速




  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/util/workqueue/queue.go#L72

    // Type is a work queue (see the package comment).
    type Type struct {
    // queue defines the order in which we will work on items. Every
    // element of queue should be in the dirty set and not in the
    // processing set.
    // 实际存储队列元素
    queue []t

    // dirty defines all of the items that need to be processed.
    // 保证去重
    dirty set

    // Things that are currently being processed are in the processing set.
    // These things may be simultaneously in the dirty set. When we finish
    // processing something and remove it from this set, we'll check if
    // it's in the dirty set, and if so, add it to the queue.
    // 标记是否正在处理
    processing set

    cond *sync.Cond

    shuttingDown bool
    drain bool

    metrics queueMetrics

    unfinishedWorkUpdatePeriod time.Duration
    clock clock.WithTicker


在通用队列上添加了AddAfter func


  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/util/workqueue/delaying_queue.go#L75

    // delayingType wraps an Interface and provides delayed re-enquing
    type delayingType struct {

    // clock tracks time for delayed firing
    clock clock.Clock

    // stopCh lets us signal a shutdown to the waiting loop
    stopCh chan struct{}
    // stopOnce guarantees we only signal shutdown a single time
    stopOnce sync.Once

    // heartbeat ensures we wait no more than maxWait before firing
    heartbeat clock.Ticker

    // waitingForAddCh is a buffered channel that feeds waitingForAdd
    waitingForAddCh chan *waitFor

    // metrics counts the number of retries
    metrics retryMetrics


在延迟队列基础上,添加了AddRateLimit、Forget、NumRequeues func,提供了四种限速策略

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/util/workqueue/rate_limiting_queue.go#L20

    // RateLimitingInterface is an interface that rate limits items being added to the queue.
    type RateLimitingInterface interface {

    // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
    AddRateLimited(item interface{})

    // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
    // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
    // still have to call `Done` on the queue.
    Forget(item interface{})

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item interface{}) int



  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/util/workqueue/default_rate_limiters.go#L27

    type RateLimiter interface {
    // When gets an item and gets to decide how long that item should wait
    // 获取指定元素应该等待的时间
    When(item interface{}) time.Duration
    // Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
    // or for success, we'll stop tracking it
    // 释放指定元素,清空该元素的排队数
    Forget(item interface{})
    // NumRequeues returns back how many failures the item has had
    // 获取指定元素的排队数
    NumRequeues(item interface{}) int



  • 令牌桶算法内部实现了一个存放token 的桶

  • 初始化,桶中有一定数量的token,此后固定速率添加token到桶中,直至桶满,多余token被丢弃

  • 每个元素都会从令牌桶中获取token,只有有token的时元素才能被通过

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/util/workqueue/default_rate_limiters.go#L39

    // DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue.  It has
    // both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
    func DefaultControllerRateLimiter() RateLimiter {
    return NewMaxOfRateLimiter(
    NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
    // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
    // 获取令牌桶
    &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},



  • 在同一个限速周期内,不存在相同元素,所有延迟时间为baseDelay,如果有存在,相同元素延迟时间延迟指数级别增大

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/util/workqueue/default_rate_limiters.go#L89

    func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    defer r.failuresLock.Unlock()

    exp := r.failures[item]
    r.failures[item] = r.failures[item] + 1

    // The backoff is capped such that 'calculated' value never overflows.
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    if backoff > math.MaxInt64 {
    return r.maxDelay

    calculated := time.Duration(backoff)
    if calculated > r.maxDelay {
    return r.maxDelay

    return calculated





// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue.  It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
// 生产一个排队指数算法和令牌桶算法作为内部限速器的混合限速器,采用悲观策略,使用俩种延迟策略中最大的限速值
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},