K8s-etcd核心存储实现

基于1.25

什么是Etcd

etcd是K8s默认的数据持久化端存储,主要用于保存集群配置和状态

  • etcd是分布式键值存储集群
  • 基于Raft协议提供可靠强一致性数据存取服务

架构设计

REST Storage

genericregistory.Store

genericregistory.Store封装了资源对象的CRUD操作,并且支持资源版本ResourceVersion的冲突检查

  • 在存储资源对象前执行Beofre Func,存储资源对象后执行(After Func),以及处理DryRun

storage.Interface

底层通用存储接口,定义了资源的操作方法,Create|Delete|Watch|Get|Count|GETList|GuranteedUpdate|Versioner

cachestorage.Cacher

为了减轻etcd读取压力和提供了Cache,可以通过设置kube-apiserver的–watch-cache启动或者关闭Cacher,默认开启

Underlying Storage

后端存储,默认K8s使用etcd3,使用etcd client库实现storage.Interface

RESTStorage资源存储接口

K8s中,所有都通过Kube-apiserver RESTful API对外暴露资源,必须实现RESTSTorage才能对外注册到APIGroup

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/registry/rest/rest.go#L55

    // Storage is a generic interface for RESTful storage services.
    // Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
    // that objects may implement any of the below interfaces.
    type Storage interface {
    // New returns an empty object that can be used with Create and Update after request data has been put into it.
    // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
    New() runtime.Object

    // Destroy cleans up its resources on shutdown.
    // Destroy has to be implemented in thread-safe way and be prepared
    // for being called more than once.
    Destroy()
    }

可能还有获取到实时资源对象变化的能力:

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/registry/rest/rest.go#L276

    // Watcher should be implemented by all Storage objects that
    // want to offer the ability to watch for changes through the watch api.
    type Watcher interface {
    // 'label' selects on labels; 'field' selects on the object's fields. Not all fields
    // are supported; an error should be returned if 'field' tries to select on a field that
    // isn't supported. 'resourceVersion' allows for continuing/starting a watch at a
    // particular version.
    Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error)
    }

在K8s中,每种资源实现的RESTStorage,一般定义在pkg/registry/<资源组>/<资源>/storage/storage.go,通过NewREST或NewStorage实例化函数RESTStorage接口

  • Deploy Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/registry/apps/deployment/storage/storage.go#L88

    // DeploymentStorage includes dummy storage for Deployments and for Scale subresource.
    type DeploymentStorage struct {
    Deployment *REST
    Status *StatusREST
    Scale *ScaleREST
    Rollback *RollbackREST
    }

    // NewStorage returns new instance of DeploymentStorage.
    func NewStorage(optsGetter generic.RESTOptionsGetter) (DeploymentStorage, error) {
    deploymentRest, deploymentStatusRest, deploymentRollbackRest, err := NewREST(optsGetter)
    if err != nil {
    return DeploymentStorage{}, err
    }

    return DeploymentStorage{
    Deployment: deploymentRest,
    Status: deploymentStatusRest,
    Scale: &ScaleREST{store: deploymentRest.Store},
    Rollback: deploymentRollbackRest,
    }, nil
    }

    // REST implements a RESTStorage for Deployments.
    type REST struct {
    *genericregistry.Store
    }

genericregistry.Store通用操作封装

对一般的资源对象的通用操作进行了抽象和封装。形成了genericregistery.Store

  • 有常规的资源存储需要
  • 自动处理资源更新时的版本冲突检查
  • 存储前后的钩子函数
  • 支持服务端DryRun模式

标准存储实现

genericregistry.Store同时实现了StandardStorage和TableConvertor接口

版本冲突检测

genericregistry.Store在执行资源更新的时候采用了乐观锁,通过对比更新前后资源的版本对象的Resource.Version是否变换判断本次更新是否冲突

  • 多并发场景下,乐观锁能有效避免脏数据,K8s资源对象更新都会增大ResourceVersion

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/registry/generic/registry/store.go#L520

    // Update performs an atomic update and set of the object. Returns the result of the update
    // or an error. If the registry allows create-on-update, the create flow will be executed.
    // A bool is returned along with the object and any errors, to indicate object creation.
    func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
    key, err := e.KeyFunc(ctx, name)
    if err != nil {
    return nil, false, err
    }
    ...

通用钩子函数

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/registry/generic/registry/store.go#L96

    type Store struct {
    // NewFunc returns a new instance of the type this registry returns for a
    // GET of a single object, e.g.:
    //
    // curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object
    NewFunc func() runtime.Object

    // NewListFunc returns a new list of the type this registry; it is the
    // type returned when the resource is listed, e.g.:
    //
    // curl GET /apis/group/version/namespaces/my-ns/myresource
    NewListFunc func() runtime.Object

    // DefaultQualifiedResource is the pluralized name of the resource.
    // This field is used if there is no request info present in the context.
    // See qualifiedResourceFromContext for details.
    DefaultQualifiedResource schema.GroupResource

    // KeyRootFunc returns the root etcd key for this resource; should not
    // include trailing "/". This is used for operations that work on the
    // entire collection (listing and watching).
    //
    // KeyRootFunc and KeyFunc must be supplied together or not at all.
    KeyRootFunc func(ctx context.Context) string

    // KeyFunc returns the key for a specific object in the collection.
    // KeyFunc is called for Create/Update/Get/Delete. Note that 'namespace'
    // can be gotten from ctx.
    //
    // KeyFunc and KeyRootFunc must be supplied together or not at all.
    KeyFunc func(ctx context.Context, name string) (string, error)

    // ObjectNameFunc returns the name of an object or an error.
    ObjectNameFunc func(obj runtime.Object) (string, error)

    // TTLFunc returns the TTL (time to live) that objects should be persisted
    // with. The existing parameter is the current TTL or the default for this
    // operation. The update parameter indicates whether this is an operation
    // against an existing object.
    //
    // Objects that are persisted with a TTL are evicted once the TTL expires.
    TTLFunc func(obj runtime.Object, existing uint64, update bool) (uint64, error)

    // PredicateFunc returns a matcher corresponding to the provided labels
    // and fields. The SelectionPredicate returned should return true if the
    // object matches the given field and label selectors.
    PredicateFunc func(label labels.Selector, field fields.Selector) storage.SelectionPredicate

    // EnableGarbageCollection affects the handling of Update and Delete
    // requests. Enabling garbage collection allows finalizers to do work to
    // finalize this object before the store deletes it.
    //
    // If any store has garbage collection enabled, it must also be enabled in
    // the kube-controller-manager.
    EnableGarbageCollection bool

    // DeleteCollectionWorkers is the maximum number of workers in a single
    // DeleteCollection call. Delete requests for the items in a collection
    // are issued in parallel.
    DeleteCollectionWorkers int

    // Decorator is an optional exit hook on an object returned from the
    // underlying storage. The returned object could be an individual object
    // (e.g. Pod) or a list type (e.g. PodList). Decorator is intended for
    // integrations that are above storage and should only be used for
    // specific cases where storage of the value is not appropriate, since
    // they cannot be watched.
    // 退出钩子,从底层存储获取到资源对象后,返回给调用者进行最后的修正
    // 只适合一些特殊的场景,如底层存储的资源对象不能直接满足调用者要求(如不支持watch)
    Decorator func(runtime.Object)

    // CreateStrategy implements resource-specific behavior during creation.
    CreateStrategy rest.RESTCreateStrategy
    // BeginCreate is an optional hook that returns a "transaction-like"
    // commit/revert function which will be called at the end of the operation,
    // but before AfterCreate and Decorator, indicating via the argument
    // whether the operation succeeded. If this returns an error, the function
    // is not called. Almost nobody should use this hook.
    // 存储之前的func
    BeginCreate BeginCreateFunc
    // AfterCreate implements a further operation to run after a resource is
    // created and before it is decorated, optional.
    // 存储之后的func
    AfterCreate AfterCreateFunc

    // UpdateStrategy implements resource-specific behavior during updates.
    UpdateStrategy rest.RESTUpdateStrategy
    // BeginUpdate is an optional hook that returns a "transaction-like"
    // commit/revert function which will be called at the end of the operation,
    // but before AfterUpdate and Decorator, indicating via the argument
    // whether the operation succeeded. If this returns an error, the function
    // is not called. Almost nobody should use this hook.
    BeginUpdate BeginUpdateFunc
    // AfterUpdate implements a further operation to run after a resource is
    // updated and before it is decorated, optional.
    AfterUpdate AfterUpdateFunc

    // DeleteStrategy implements resource-specific behavior during deletion.
    DeleteStrategy rest.RESTDeleteStrategy
    // AfterDelete implements a further operation to run after a resource is
    // deleted and before it is decorated, optional.
    AfterDelete AfterDeleteFunc
    // ReturnDeletedObject determines whether the Store returns the object
    // that was deleted. Otherwise, return a generic success status response.
    ReturnDeletedObject bool
    // ShouldDeleteDuringUpdate is an optional function to determine whether
    // an update from existing to obj should result in a delete.
    // If specified, this is checked in addition to standard finalizer,
    // deletionTimestamp, and deletionGracePeriodSeconds checks.
    ShouldDeleteDuringUpdate func(ctx context.Context, key string, obj, existing runtime.Object) bool

    // TableConvertor is an optional interface for transforming items or lists
    // of items into tabular output. If unset, the default will be used.
    TableConvertor rest.TableConvertor

    // ResetFieldsStrategy provides the fields reset by the strategy that
    // should not be modified by the user.
    ResetFieldsStrategy rest.ResetFieldsStrategy

    // Storage is the interface for the underlying storage for the
    // resource. It is wrapped into a "DryRunnableStorage" that will
    // either pass-through or simply dry-run.
    Storage DryRunnableStorage
    // StorageVersioner outputs the <group/version/kind> an object will be
    // converted to before persisted in etcd, given a list of possible
    // kinds of the object.
    // If the StorageVersioner is nil, apiserver will leave the
    // storageVersionHash as empty in the discovery document.
    StorageVersioner runtime.GroupVersioner

    // DestroyFunc cleans up clients used by the underlying Storage; optional.
    // If set, DestroyFunc has to be implemented in thread-safe way and
    // be prepared for being called more than once.
    DestroyFunc func()
    }

DryRun实现原理

kubectl中的--dry-run可以帮我们真正执行操作前检查应用配置是否存在错误或者冲突

storage.Interface通用存储接口

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/storage/interfaces.go#L159


    // Interface offers a common interface for object marshaling/unmarshaling operations and
    // hides all the storage-related operations behind it.
    type Interface interface {
    // Returns Versioner associated with this interface.
    // 资源版本管理器,包含从底层存储读取资源对象的版本信息以及写入资源对象的ResourceVersion
    Versioner() Versioner

    // Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
    // in seconds (0 means forever). If no error is returned and out is not nil, out will be
    // set to the read value from database.
    // 创建资源对象,支持传入TTL,传入0代表永远不过期,Event资源对象默认保留1h(设置了1h的ttl)
    Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error

    // Delete removes the specified key and returns the value that existed at that spot.
    // If key didn't exist, it will return NotFound storage error.
    // If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
    // current version of the object to avoid read operation from storage to get it.
    // However, the implementations have to retry in case suggestion is stale.
    // 删除资源对象的函数
    Delete(
    ctx context.Context, key string, out runtime.Object, preconditions *Preconditions,
    validateDeletion ValidateObjectFunc, cachedExistingObject runtime.Object) error

    // Watch begins watching the specified key. Events are decoded into API objects,
    // and any items selected by 'p' are sent down to returned watch.Interface.
    // resourceVersion may be used to specify what version to begin watching,
    // which should be the current resourceVersion, and no longer rv+1
    // (e.g. reconnecting without missing any updates).
    // If resource version is "0", this interface will get current object at given key
    // and send it in an "ADDED" event, before watch starts.
    // 通过Watch机制监听资源对象的函数,只应用于单个key
    Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)

    // Get unmarshals object found at key into objPtr. On a not found error, will either
    // return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
    // Treats empty responses and nil response nodes exactly like a not found error.
    // The returned contents may be delayed, but it is guaranteed that they will
    // match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
    // 获取资源对象的函数
    Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error

    // GetList unmarshalls objects found at key into a *List api object (an object
    // that satisfies runtime.IsList definition).
    // If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive'
    // is true, 'key' is used as a prefix.
    // The returned contents may be delayed, but it is guaranteed that they will
    // match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
    // 获取资源对象列表
    GetList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error

    // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination')
    // retrying the update until success if there is index conflict.
    // Note that object passed to tryUpdate may change across invocations of tryUpdate() if
    // other writers are simultaneously updating it, so tryUpdate() needs to take into account
    // the current contents of the object when deciding how the update object should look.
    // If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
    // else `destination` will be set to the zero value of it's type.
    // If the eventual successful invocation of `tryUpdate` returns an output with the same serialized
    // contents as the input, it won't perform any update, but instead set `destination` to an object with those
    // contents.
    // If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
    // current version of the object to avoid read operation from storage to get it.
    // However, the implementations have to retry in case suggestion is stale.
    //
    // Example:
    //
    // s := /* implementation of Interface */
    // err := s.GuaranteedUpdate(
    // "myKey", &MyType{}, true, preconditions,
    // func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
    // // Before each invocation of the user defined function, "input" is reset to
    // // current contents for "myKey" in database.
    // curr := input.(*MyType) // Guaranteed to succeed.
    //
    // // Make the modification
    // curr.Counter++
    //
    // // Return the modified object - return an error to stop iterating. Return
    // // a uint64 to alter the TTL on the object, or nil to keep it the same value.
    // return cur, nil, nil
    // }, cachedExistingObject
    // )
    // 更新资源对象,会通过重试调用传入的tryUpdate func
    GuaranteedUpdate(
    ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
    preconditions *Preconditions, tryUpdate UpdateFunc, cachedExistingObject runtime.Object) error

    // Count returns number of different entries under the key (generally being path prefix).
    // 获取指定key下的条目数量
    Count(key string) (int64, error)
    }

Cacher Storage缓存层

Cacher Storage可以减少数据库服务负载,减少连接数等

Cacher Storage缓存架构

Cacher Storage缓存层通过对底层数据进行缓存,提高etcd集群的响应速度,同时减少底层etcd集群连接数,确保缓存数据与etcd集群中数据保持一致

Cacher Storage缓存层缓存范围:

  • Get、Get List、Watch、GuaranteedUpdate、Delete等操作
  • 不包含:Create、Count、Versioner等操作

watchCache

watchCache是缓存架构基于Reflector框架的ListAndWatch func通过Underlying Storage 监听etcd集群的事件

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/storage/cacher/cacher.go#L439

    func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
    // The 'usable' lock is always 'RLock'able when it is safe to use the cache.
    // It is safe to use the cache after a successful list until a disconnection.
    // We start with usable (write) locked. The below OnReplace function will
    // unlock it after a successful list. The below defer will then re-lock
    // it when this function exits (always due to disconnection), only if
    // we actually got a successful list. This cycle will repeat as needed.
    successfulList := false
    c.watchCache.SetOnReplace(func() {
    successfulList = true
    c.ready.set(true)
    klog.V(1).Infof("cacher (%v): initialized", c.objectType.String())
    metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc()
    })
    defer func() {
    if successfulList {
    c.ready.set(false)
    }
    }()

    c.terminateAllWatchers()
    // Note that since onReplace may be not called due to errors, we explicitly
    // need to retry it on errors under lock.
    // Also note that startCaching is called in a loop, so there's no need
    // to have another loop here.
    if err := c.reflector.ListAndWatch(stopChannel); err != nil {
    klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err)
    }
    }

reflector框架提供了接收事件回调:实现了Add、Update、Delete函数

  • w.store:接收到事件存储到本地缓存,数据结构为cache.Indexer,功能与client-go到Indexer相同

  • w.cache:是一个环形缓存队列,将事件存储至缓存滑动窗口,提供对Watch 操作的缓存数据,防止因为网络或其他原因Watch连接中断,导致事件丢失

  • w.eventHandler:把事件分发给Cacher,Cacher进一步将事件二级分发给目前所有已建立连接Watcher,分发过程使用非阻塞式

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/storage/cacher/watch_cache.go#L282

    // processEvent is safe as long as there is at most one call to it in flight
    // at any point in time.
    func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
    key, err := w.keyFunc(event.Object)
    if err != nil {
    return fmt.Errorf("couldn't compute key: %v", err)
    }
    elem := &storeElement{Key: key, Object: event.Object}
    elem.Labels, elem.Fields, err = w.getAttrsFunc(event.Object)
    if err != nil {
    return err
    }

    wcEvent := &watchCacheEvent{
    Type: event.Type,
    Object: elem.Object,
    ObjLabels: elem.Labels,
    ObjFields: elem.Fields,
    Key: key,
    ResourceVersion: resourceVersion,
    RecordTime: w.clock.Now(),
    }

    if err := func() error {
    // TODO: We should consider moving this lock below after the watchCacheEvent
    // is created. In such situation, the only problematic scenario is Replace(
    // happening after getting object from store and before acquiring a lock.
    // Maybe introduce another lock for this purpose.
    w.Lock()
    defer w.Unlock()

    previous, exists, err := w.store.Get(elem)
    if err != nil {
    return err
    }
    if exists {
    previousElem := previous.(*storeElement)
    wcEvent.PrevObject = previousElem.Object
    wcEvent.PrevObjLabels = previousElem.Labels
    wcEvent.PrevObjFields = previousElem.Fields
    }

    w.updateCache(wcEvent)
    w.resourceVersion = resourceVersion
    defer w.cond.Broadcast()

    return updateFunc(elem)
    }(); err != nil {
    return err
    }

    // Avoid calling event handler under lock.
    // This is safe as long as there is at most one call to Add/Update/Delete and
    // UpdateResourceVersion in flight at any point in time, which is true now,
    // because reflector calls them synchronously from its main thread.
    if w.eventHandler != nil {
    w.eventHandler(wcEvent)
    }
    return nil
    }

Cacher

Cacher把接收到的watchCache回调发送过来的事件后,遍历目前所有已经连接的Watcher,并且把事件逐个分发给每个Watcher,分发过程使用非阻塞

这里的非阻塞式是相对的,优先采用非阻塞

为了容忍短暂的连接异常,当非阻塞失败事件转发,K8s首先尝试设置一个定时器,延迟发送时间,当定时器延迟发送超时还是没有事件发送,才会关闭对应的Broker Watch连接

cacheWatcher

kube-apiserver会为每个watch请求的客户端发起一个独立的cacheWatcher实例,用于接收Watch事件

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/storage/cacher/cacher.go#L473

    // Watch implements storage.Interface.
    func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
    ...
    // Create a watcher here to reduce memory allocations under lock,
    // given that memory allocation may trigger GC and block the thread.
    // Also note that emptyFunc is a placeholder, until we will be able
    // to compute watcher.forget function (which has to happen under lock).
    watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier)

    // We explicitly use thread unsafe version and do locking ourself to ensure that
    // no new events will be processed in the meantime. The watchCache will be unlocked
    // on return from this function.
    // Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
    // underlying watchCache is calling processEvent under its lock.
    c.watchCache.RLock()
    defer c.watchCache.RUnlock()
    cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
    if err != nil {
    // To match the uncached watch implementation, once we have passed authn/authz/admission,
    // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
    // rather than a directly returned error.
    return newErrWatcher(err), nil
    }

    func() {
    c.Lock()
    defer c.Unlock()
    // Update watcher.forget function once we can compute it.
    watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, triggerValue, triggerSupported)
    c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)

    // Add it to the queue only when the client support watch bookmarks.
    if watcher.allowWatchBookmarks {
    c.bookmarkWatchers.addWatcher(watcher)
    }
    c.watcherIdx++
    }()
    ...
    }

当客户端发起Watch请求,通过newCacheWatcher func 实例化cacheWatcher对象,并分配一个ID。这个ID全局唯一,从0开始计数,当有新的客户端发送Watch请求,ID自增1.kube-apiserver重启重置。

在cacherWatcher初始化,会同时启动一个goroutine协程,最终调用watcher.process func,监听c.input.channel中的数据,当其中没有数据,会进入阻塞;其他有数据,通过ResultChan对外暴露,只发送大于ResourceVersion

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/storage/cacher/cacher.go#L1470

    func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
    // At this point we already start processing incoming watch events.
    // However, the init event can still be processed because their serialization
    // and sending to the client happens asynchrnously.
    // TODO: As describe in the KEP, we would like to estimate that by delaying
    // the initialization signal proportionally to the number of events to
    // process, but we're leaving this to the tuning phase.
    utilflowcontrol.WatchInitialized(ctx)

    for {
    select {
    case event, ok := <-c.input:
    if !ok {
    return
    }
    // only send events newer than resourceVersion
    if event.ResourceVersion > resourceVersion {
    c.sendWatchCacheEvent(event)
    }
    case <-ctx.Done():
    return
    }
    }
    }

ReourceVersion资源版本号

ReourceVersion是K8s资源对象中非常重要的元素

  • 用于资源更新的冲突检测
  • 在List和Watch阶段防止事件丢失
  • 每次对etcd中资源对象修改的时候,都会更新ResourceVersion
  • Client-go根据ResourceVersion就可以标志资源对象是否变化
  • 如果Watch意外端开,从上次的ResourceVersion开始重新监听,确保Watch连贯

K8s没有对ResourceVersion有额外的实现,依赖etcd的全局Indexer实例,有俩个核心的Indexer,分为createdIndex和modifiedIndex

  • createdIndex:全局唯一且递增的正整数,每次在etcd集群中创建key时就会递增
  • modifiedIndex:与createdIndex相视,但是每次对etcd集群中的key进行修改的时候递增
  • createdIndex和modifiedIndex都通过原子操作更新

以storage.Interface Get为例:

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/storage/etcd3/store.go#L130

    // Get implements storage.Interface.Get.
    func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
    preparedKey, err := s.prepareKey(key)
    if err != nil {
    return err
    }
    startTime := time.Now()
    getResp, err := s.client.KV.Get(ctx, preparedKey)
    metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
    if err != nil {
    return err
    }
    if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
    return err
    }

    if len(getResp.Kvs) == 0 {
    if opts.IgnoreNotFound {
    return runtime.SetZeroValue(out)
    }
    return storage.NewKeyNotFoundError(preparedKey, 0)
    }
    kv := getResp.Kvs[0]

    data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(preparedKey))
    if err != nil {
    return storage.NewInternalError(err.Error())
    }

    return decode(s.codec, s.versioner, data, out, kv.ModRevision)
    }

watchCache缓存滑动窗口

接收Reflector的事件回调,把事件分发到3个地方进行处理,其中之一就是缓存滑动窗口

常见缓存算法

  1. FIFO
    • 特点:先进先出,实现简单
    • 数据结构:队列
    • 淘汰原则:当缓存满,移除最先进入到数据
  2. LRU
    • 特点:按照时间维度,优先移除最久未使用
    • 数据结构:链表和HashMap
    • 淘汰原则:根据缓存使用时间维度,优先移除最不常用的缓存数据
  3. LFU
    • 特点:按照统计维度,优先移除访问次数最少的数据
    • 数据结构:链表和HashMap
    • 淘汰原则:根据缓存数据使用次数,优先移除访问次数最少的缓存数据

watchCache实现

watchCache使用缓存滑动窗口,跟FIFO类似

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/storage/cacher/watch_cache.go#L136

    // watchCache implements a Store interface.
    // However, it depends on the elements implementing runtime.Object interface.
    //
    // watchCache is a "sliding window" (with a limited capacity) of objects
    // observed from a watch.
    type watchCache struct {
    sync.RWMutex
    // Condition on which lists are waiting for the fresh enough
    // resource version.
    cond *sync.Cond

    // Maximum size of history window.
    // 缓存滑动窗口大小,默认100
    // 可以通过--default-watch-cache-size 参数设置
    // 如果是0 禁用watchCache
    capacity int

    // upper bound of capacity since event cache has a dynamic size.
    upperBoundCapacity int

    // lower bound of capacity since event cache has a dynamic size.
    lowerBoundCapacity int

    // keyFunc is used to get a key in the underlying storage for a given object.
    keyFunc func(runtime.Object) (string, error)

    // getAttrsFunc is used to get labels and fields of an object.
    getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)

    // cache is used a cyclic buffer - its first element (with the smallest
    // resourceVersion) is defined by startIndex, its last element is defined
    // by endIndex (if cache is full it will be startIndex + capacity).
    // Both startIndex and endIndex can be greater than buffer capacity -
    // you should always apply modulo capacity to get an index in cache array.
    // 缓存滑动窗口,通过一个固定大小的数组形成一个环形缓冲区,可以向前滑动,当缓存滑动窗口满,移除最先进入到缓存滑动窗口的数据
    cache []*watchCacheEvent
    // 开始下标
    startIndex int
    // 结束下标
    endIndex int

    // store will effectively support LIST operation from the "end of cache
    // history" i.e. from the moment just after the newest cached watched event.
    // It is necessary to effectively allow clients to start watching at now.
    // NOTE: We assume that <store> is thread-safe.
    store cache.Indexer

    // ResourceVersion up to which the watchCache is propagated.
    resourceVersion uint64

    // ResourceVersion of the last list result (populated via Replace() method).
    listResourceVersion uint64

    // This handler is run at the end of every successful Replace() method.
    onReplace func()

    // This handler is run at the end of every Add/Update/Delete method
    // and additionally gets the previous value of the object.
    eventHandler func(*watchCacheEvent)

    // for testing timeouts.
    clock clock.Clock

    // An underlying storage.Versioner.
    versioner storage.Versioner

    // cacher's objectType.
    objectType reflect.Type

    // For testing cache interval invalidation.
    indexValidator indexValidator
    }

Wacth的端点续传,确保资源不会丢失:

  1. actor1因为一些意外导致watch中断 中断前,最后一次是RV5
  2. 恢复时,请求Watch➕RV5
  3. watchCache从历史中,一次性把RV6、RV7、RV8一次性返回给actor1
  4. actor2使用RV0,全量获取cache历史记录
  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/storage/cacher/watch_cache.go#L645

    // getAllEventsSinceLocked returns a watchCacheInterval that can be used to
    // retrieve events since a certain resourceVersion. This function assumes to
    // be called under the watchCache lock.
    func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCacheInterval, error) {
    size := w.endIndex - w.startIndex
    var oldest uint64
    switch {
    case w.listResourceVersion > 0 && w.startIndex == 0:
    // If no event was removed from the buffer since last relist, the oldest watch
    // event we can deliver is one greater than the resource version of the list.
    oldest = w.listResourceVersion + 1
    case size > 0:
    // If the previous condition is not satisfied: either some event was already
    // removed from the buffer or we've never completed a list (the latter can
    // only happen in unit tests that populate the buffer without performing
    // list/replace operations), the oldest watch event we can deliver is the first
    // one in the buffer.
    oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
    default:
    return nil, fmt.Errorf("watch cache isn't correctly initialized")
    }

    if resourceVersion == 0 {
    // resourceVersion = 0 means that we don't require any specific starting point
    // and we would like to start watching from ~now.
    // However, to keep backward compatibility, we additionally need to return the
    // current state and only then start watching from that point.
    //
    // TODO: In v2 api, we should stop returning the current state - #13969.
    ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
    if err != nil {
    return nil, err
    }
    return ci, nil
    }
    if resourceVersion < oldest-1 {
    return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
    }

    // Binary search the smallest index at which resourceVersion is greater than the given one.
    f := func(i int) bool {
    return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion
    }
    first := sort.Search(size, f)
    indexerFunc := func(i int) *watchCacheEvent {
    return w.cache[i%w.capacity]
    }
    ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex)
    return ci, nil
    }

Underlying Storage底层存储对象

Underlying Storage是后端存储,真正和etcd交互的资源对象

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/storage/storagebackend/factory/factory.go#L32

    // Create creates a storage backend based on given config.
    func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
    switch c.Type {
    case storagebackend.StorageTypeETCD2:
    return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
    case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
    return newETCD3Storage(c, newFunc)
    default:
    return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
    }
    }
  • 在默认情况下,资源对象(除了CustomReource)之外,在etcd中都使用二进制(application/vnd.kubernetes.protobbuf)编码存储

    • 除了默认提供的Protobuf编码格式,还支持application/jsonapplication/yaml编码格式

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/storage/etcd3/store.go#L130

    // Get implements storage.Interface.Get.
    func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
    preparedKey, err := s.prepareKey(key)
    if err != nil {
    return err
    }
    startTime := time.Now()
    getResp, err := s.client.KV.Get(ctx, preparedKey)
    metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
    if err != nil {
    return err
    }
    if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
    return err
    }

    if len(getResp.Kvs) == 0 {
    if opts.IgnoreNotFound {
    return runtime.SetZeroValue(out)
    }
    return storage.NewKeyNotFoundError(preparedKey, 0)
    }
    kv := getResp.Kvs[0]

    data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(preparedKey))
    if err != nil {
    return storage.NewInternalError(err.Error())
    }

    return decode(s.codec, s.versioner, data, out, kv.ModRevision)
    }

流程:

  1. 通过s.client.KV.Get func获取etcd集群中的资源对象的存储数据
  2. 通过protobufSerializer 编码器(code.Decode func)将二进制数据进行解码,将解码的数据存放到objptr中
  3. 通过version.UpdateObject func更新(填充资源对象)

K8s默认基于gRPC和etcd集群交互

Codec数据编码/解码

  1. 实例化Scheme资源
  2. 通过clientv3.New实例化etcd v3 client对象
  3. 通过newCode func实例化runtime.Codec
  4. 通过runtime.Decode解码器将资源对象解码为资源对象
  5. 通过runtime.Encode 转换为JSON打印

Strategy预处理

在K8s中,每种资源都有自己的预处理操作,它在资源对象的创建、更新、删除之前,实现资源持久化存储前的预处理。

Create Strategy预处理

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/registry/rest/create.go#L42

    // RESTCreateStrategy defines the minimum validation, accepted input, and
    // name generation behavior to create an object that follows Kubernetes
    // API conventions.
    type RESTCreateStrategy interface {
    // 获取和检查资源对象的APIVersion和Kind信息
    runtime.ObjectTyper
    // The name generator is used when the standard GenerateName field is set.
    // The NameGenerator will be invoked prior to validation.
    // 支持GenerateName,根据前缀自动生成名字
    names.NameGenerator

    // NamespaceScoped returns true if the object must be within a namespace.
    // 判断当前资源是否在命名空间范围内
    NamespaceScoped() bool
    // PrepareForCreate is invoked on create before validation to normalize
    // the object. For example: remove fields that are not to be persisted,
    // sort order-insensitive list fields, etc. This should not remove fields
    // whose presence would be considered a validation error.
    //
    // Often implemented as a type check and an initailization or clearing of
    // status. Clear the status because status changes are internal. External
    // callers of an api (users) should not be setting an initial status on
    // newly created objects.
    // 定义创建资源对象前的操作
    PrepareForCreate(ctx context.Context, obj runtime.Object)
    // Validate returns an ErrorList with validation errors or nil. Validate
    // is invoked after default fields in the object have been filled in
    // before the object is persisted. This method should not mutate the
    // object.
    // 验证对象是否合法
    Validate(ctx context.Context, obj runtime.Object) field.ErrorList
    // WarningsOnCreate returns warnings to the client performing a create.
    // WarningsOnCreate is invoked after default fields in the object have been filled in
    // and after Validate has passed, before Canonicalize is called, and the object is persisted.
    // This method must not mutate the object.
    //
    // Be brief; limit warnings to 120 characters if possible.
    // Don't include a "Warning:" prefix in the message (that is added by clients on output).
    // Warnings returned about a specific field should be formatted as "path.to.field: message".
    // For example: `spec.imagePullSecrets[0].name: invalid empty name ""`
    //
    // Use warning messages to describe problems the client making the API request should correct or be aware of.
    // For example:
    // - use of deprecated fields/labels/annotations that will stop working in a future release
    // - use of obsolete fields/labels/annotations that are non-functional
    // - malformed or invalid specifications that prevent successful handling of the submitted object,
    // but are not rejected by validation for compatibility reasons
    //
    // Warnings should not be returned for fields which cannot be resolved by the caller.
    // For example, do not warn about spec fields in a subresource creation request.
    // 定义在创建资源对象的警告信息
    WarningsOnCreate(ctx context.Context, obj runtime.Object) []string
    // Canonicalize allows an object to be mutated into a canonical form. This
    // ensures that code that operates on these objects can rely on the common
    // form for things like comparison. Canonicalize is invoked after
    // validation has succeeded but before the object has been persisted.
    // This method may mutate the object. Often implemented as a type check or
    // empty method.
    // 在创建资源前,进行规范化处理,确保存储的数据格式是否通用
    Canonicalize(obj runtime.Object)
    }

Update Strategy预处理

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/registry/rest/update.go#L41

    // RESTUpdateStrategy defines the minimum validation, accepted input, and
    // name generation behavior to update an object that follows Kubernetes
    // API conventions. A resource may have many UpdateStrategies, depending on
    // the call pattern in use.
    type RESTUpdateStrategy interface {
    runtime.ObjectTyper
    // NamespaceScoped returns true if the object must be within a namespace.
    NamespaceScoped() bool
    // AllowCreateOnUpdate returns true if the object can be created by a PUT.
    // 判断当前资源是否在命名空间范围内
    AllowCreateOnUpdate() bool
    // PrepareForUpdate is invoked on update before validation to normalize
    // the object. For example: remove fields that are not to be persisted,
    // sort order-insensitive list fields, etc. This should not remove fields
    // whose presence would be considered a validation error.
    PrepareForUpdate(ctx context.Context, obj, old runtime.Object)
    // ValidateUpdate is invoked after default fields in the object have been
    // filled in before the object is persisted. This method should not mutate
    // the object.
    ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList
    // WarningsOnUpdate returns warnings to the client performing the update.
    // WarningsOnUpdate is invoked after default fields in the object have been filled in
    // and after ValidateUpdate has passed, before Canonicalize is called, and before the object is persisted.
    // This method must not mutate either object.
    //
    // Be brief; limit warnings to 120 characters if possible.
    // Don't include a "Warning:" prefix in the message (that is added by clients on output).
    // Warnings returned about a specific field should be formatted as "path.to.field: message".
    // For example: `spec.imagePullSecrets[0].name: invalid empty name ""`
    //
    // Use warning messages to describe problems the client making the API request should correct or be aware of.
    // For example:
    // - use of deprecated fields/labels/annotations that will stop working in a future release
    // - use of obsolete fields/labels/annotations that are non-functional
    // - malformed or invalid specifications that prevent successful handling of the submitted object,
    // but are not rejected by validation for compatibility reasons
    //
    // Warnings should not be returned for fields which cannot be resolved by the caller.
    // For example, do not warn about spec fields in a status update.
    WarningsOnUpdate(ctx context.Context, obj, old runtime.Object) []string
    // Canonicalize allows an object to be mutated into a canonical form. This
    // ensures that code that operates on these objects can rely on the common
    // form for things like comparison. Canonicalize is invoked after
    // validation has succeeded but before the object has been persisted.
    // This method may mutate the object.
    Canonicalize(obj runtime.Object)
    // AllowUnconditionalUpdate returns true if the object can be updated
    // unconditionally (irrespective of the latest resource version), when
    // there is no resource version specified in the object.
    AllowUnconditionalUpdate() bool
    }

Delete Strategy预处理

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/registry/rest/delete.go#L75

    // BeforeDelete tests whether the object can be gracefully deleted.
    // If graceful is set, the object should be gracefully deleted. If gracefulPending
    // is set, the object has already been gracefully deleted (and the provided grace
    // period is longer than the time to deletion). An error is returned if the
    // condition cannot be checked or the gracePeriodSeconds is invalid. The options
    // argument may be updated with default values if graceful is true. Second place
    // where we set deletionTimestamp is pkg/registry/generic/registry/store.go.
    // This function is responsible for setting deletionTimestamp during gracefulDeletion,
    // other one for cascading deletions.
    func BeforeDelete(strategy RESTDeleteStrategy, ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) (graceful, gracefulPending bool, err error) {
    objectMeta, gvk, kerr := objectMetaAndKind(strategy, obj)
    if kerr != nil {
    return false, false, kerr
    }
    if errs := validation.ValidateDeleteOptions(options); len(errs) > 0 {
    return false, false, errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "DeleteOptions"}, "", errs)
    }
    // Checking the Preconditions here to fail early. They'll be enforced later on when we actually do the deletion, too.
    if options.Preconditions != nil {
    if options.Preconditions.UID != nil && *options.Preconditions.UID != objectMeta.GetUID() {
    return false, false, errors.NewConflict(schema.GroupResource{Group: gvk.Group, Resource: gvk.Kind}, objectMeta.GetName(), fmt.Errorf("the UID in the precondition (%s) does not match the UID in record (%s). The object might have been deleted and then recreated", *options.Preconditions.UID, objectMeta.GetUID()))
    }
    if options.Preconditions.ResourceVersion != nil && *options.Preconditions.ResourceVersion != objectMeta.GetResourceVersion() {
    return false, false, errors.NewConflict(schema.GroupResource{Group: gvk.Group, Resource: gvk.Kind}, objectMeta.GetName(), fmt.Errorf("the ResourceVersion in the precondition (%s) does not match the ResourceVersion in record (%s). The object might have been modified", *options.Preconditions.ResourceVersion, objectMeta.GetResourceVersion()))
    }
    }

    // Negative values will be treated as the value `1s` on the delete path.
    if gracePeriodSeconds := options.GracePeriodSeconds; gracePeriodSeconds != nil && *gracePeriodSeconds < 0 {
    options.GracePeriodSeconds = utilpointer.Int64(1)
    }
    if deletionGracePeriodSeconds := objectMeta.GetDeletionGracePeriodSeconds(); deletionGracePeriodSeconds != nil && *deletionGracePeriodSeconds < 0 {
    objectMeta.SetDeletionGracePeriodSeconds(utilpointer.Int64(1))
    }

    gracefulStrategy, ok := strategy.(RESTGracefulDeleteStrategy)
    if !ok {
    // If we're not deleting gracefully there's no point in updating Generation, as we won't update
    // the obcject before deleting it.
    return false, false, nil
    }
    // if the object is already being deleted, no need to update generation.
    if objectMeta.GetDeletionTimestamp() != nil {
    // if we are already being deleted, we may only shorten the deletion grace period
    // this means the object was gracefully deleted previously but deletionGracePeriodSeconds was not set,
    // so we force deletion immediately
    // IMPORTANT:
    // The deletion operation happens in two phases.
    // 1. Update to set DeletionGracePeriodSeconds and DeletionTimestamp
    // 2. Delete the object from storage.
    // If the update succeeds, but the delete fails (network error, internal storage error, etc.),
    // a resource was previously left in a state that was non-recoverable. We
    // check if the existing stored resource has a grace period as 0 and if so
    // attempt to delete immediately in order to recover from this scenario.
    if objectMeta.GetDeletionGracePeriodSeconds() == nil || *objectMeta.GetDeletionGracePeriodSeconds() == 0 {
    return false, false, nil
    }
    // only a shorter grace period may be provided by a user
    if options.GracePeriodSeconds != nil {
    period := int64(*options.GracePeriodSeconds)
    if period >= *objectMeta.GetDeletionGracePeriodSeconds() {
    return false, true, nil
    }
    newDeletionTimestamp := metav1.NewTime(
    objectMeta.GetDeletionTimestamp().Add(-time.Second * time.Duration(*objectMeta.GetDeletionGracePeriodSeconds())).
    Add(time.Second * time.Duration(*options.GracePeriodSeconds)))
    objectMeta.SetDeletionTimestamp(&newDeletionTimestamp)
    objectMeta.SetDeletionGracePeriodSeconds(&period)
    return true, false, nil
    }
    // graceful deletion is pending, do nothing
    options.GracePeriodSeconds = objectMeta.GetDeletionGracePeriodSeconds()
    return false, true, nil
    }

    // `CheckGracefulDelete` will be implemented by specific strategy
    if !gracefulStrategy.CheckGracefulDelete(ctx, obj, options) {
    return false, false, nil
    }

    if options.GracePeriodSeconds == nil {
    return false, false, errors.NewInternalError(fmt.Errorf("options.GracePeriodSeconds should not be nil"))
    }

    now := metav1.NewTime(metav1.Now().Add(time.Second * time.Duration(*options.GracePeriodSeconds)))
    objectMeta.SetDeletionTimestamp(&now)
    objectMeta.SetDeletionGracePeriodSeconds(options.GracePeriodSeconds)
    // If it's the first graceful deletion we are going to set the DeletionTimestamp to non-nil.
    // Controllers of the object that's being deleted shouldn't take any nontrivial actions, hence its behavior changes.
    // Thus we need to bump object's Generation (if set). This handles generation bump during graceful deletion.
    // The bump for objects that don't support graceful deletion is handled in pkg/registry/generic/registry/store.go.
    if objectMeta.GetGeneration() > 0 {
    objectMeta.SetGeneration(objectMeta.GetGeneration() + 1)
    }

    return true, false, nil
    }