K8s-kube-apiserver(List-Watch实现原理)

基于1.25

List-Watch是K8s的核心机制,基于List-Watch实现了资源变化的感知

  • List调用ListAPI列出所有资源,基于HTTP短链实现
  • Watch调用WatchAPI监听资源变更事件,基于HTTP长连接事件

长连接通信协议

支持Watch操作碧玺实现客户端和服务端的长连接。

kube-apiserver的Watch支持3种类型的长连接

  • HTTP/1.1 Chunked Transfer Encoding
  • HTTP/2
  • WebSocket

HTTP/1.1 Chunked Transfer Encoding

HTTP/1.1 Chunked Transfer Encoding(分块传输编码)是HTTP中数据传输的一种机制,允许吧服务端发送给客户端的数据分为多个部分,分批次发送

# 验证支持HTTP/1.1 Chunked
curl -ik --http1.1 \
--cert /etc/kubernetes/pki/apiserver-kubelet-client.crt \
--key /etc/kubernetes/pki/apiserver-kubelet-cliet.key \
https://127.0.0.1:6443/api/v1/pods?watch=true

HTTP/2

HTTP/2,主要基于SPDY协议

# 验证支持HTTP2
curl -ik --http \
--cert /etc/kubernetes/pki/apiserver-kubelet-client.crt \
--key /etc/kubernetes/pki/apiserver-kubelet-cliet.key \
https://127.0.0.1:6443/api/v1/pods?watch=true

WebSocket

WebSocket是一种网络传输协议,可以在单个TCP连接上进行双向同时通信

WebSocket可以通过Upgrade Header从HTTP1.1升级到WebSocket协议

  • WebSocket使用wswss作为统一资源标识符
# 验证支持WebSocket
curl -ik --http/1.1 \
--header "Connection:Upgrade" \
--header "Upgrade: websocket" \
--header "Sec-WebSocket-Key: xxx" \
--header "Sec-WebSocket-Version: 13" \
--header "Host: 127.0.0.1:8001 " \
--header "Origin: http://127.0.0.1:8001" \
--cert /etc/kubernetes/pki/apiserver-kubelet-client.crt \
--key /etc/kubernetes/pki/apiserver-kubelet-cliet.key \
https://127.0.0.1:6443/api/v1/pods?watch=true

List-Watch核心原理

kube-apiserver的List-Watch是在etcd List-Watch基础上的封装,类似于etcd的代理层

Informer启动之后,内部Controller运行Reflector,Relector通过Run可以通过BackoffUntil的方式重复执行r.ListAndWatch func(确保在Watch连接断开之后,能自动重新执行List-Wacth)

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/cache/reflector.go#L220

    // Run repeatedly uses the reflector's ListAndWatch to fetch all the
    // objects and subsequent deltas.
    // Run will exit when stopCh is closed.
    func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.BackoffUntil(func() {
    if err := r.ListAndWatch(stopCh); err != nil {
    r.watchErrorHandler(r, err)
    }
    }, r.backoffManager, true, stopCh)
    klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    }

ListAndWatch func 首先执行List,把全量数据更新到DeltaFIFO队列中,然后从List中获取到最新的ResourceVersion开始Watch的后续的资源变化事件,通过Add/Update/Delete 方式传递到DeltaFIFO处理

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/cache/reflector.go#L256


    // ListAndWatch first lists all items and get the resource version at the moment of call,
    // and then use the resource version to watch.
    // It returns error if ListAndWatch didn't even try to initialize watch.
    func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)

    err := r.list(stopCh)
    if err != nil {
    return err
    }

    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
    resyncCh, cleanup := r.resyncChan()
    defer func() {
    cleanup() // Call the last one written into cleanup
    }()
    for {
    select {
    case <-resyncCh:
    case <-stopCh:
    return
    case <-cancelCh:
    return
    }
    if r.ShouldResync == nil || r.ShouldResync() {
    klog.V(4).Infof("%s: forcing resync", r.name)
    if err := r.store.Resync(); err != nil {
    resyncerrc <- err
    return
    }
    }
    cleanup()
    resyncCh, cleanup = r.resyncChan()
    }
    }()

    retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
    for {
    // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
    select {
    case <-stopCh:
    return nil
    default:
    }

    timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
    options := metav1.ListOptions{
    ResourceVersion: r.LastSyncResourceVersion(),
    // We want to avoid situations of hanging watchers. Stop any watchers that do not
    // receive any events within the timeout window.
    TimeoutSeconds: &timeoutSeconds,
    // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
    // Reflector doesn't assume bookmarks are returned at all (if the server do not support
    // watch bookmarks, it will ignore this field).
    AllowWatchBookmarks: true,
    }

    // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
    start := r.clock.Now()
    w, err := r.listerWatcher.Watch(options)
    if err != nil {
    // If this is "connection refused" error, it means that most likely apiserver is not responsive.
    // It doesn't make sense to re-list all objects because most likely we will be able to restart
    // watch where we ended.
    // If that's the case begin exponentially backing off and resend watch request.
    // Do the same for "429" errors.
    if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
    <-r.initConnBackoffManager.Backoff().C()
    continue
    }
    return err
    }

    err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
    retry.After(err)
    if err != nil {
    if err != errorStopRequested {
    switch {
    case isExpiredError(err):
    // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
    // has a semantic that it returns data at least as fresh as provided RV.
    // So first try to LIST with setting RV to resource version of last observed object.
    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
    case apierrors.IsTooManyRequests(err):
    klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
    <-r.initConnBackoffManager.Backoff().C()
    continue
    case apierrors.IsInternalError(err) && retry.ShouldRetry():
    klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err)
    continue
    default:
    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
    }
    }
    return nil
    }
    }
    }