K8s-kubelet(Pod生命周期管理)

基于1.25

kubelet以Pod为基本处理单元,负责Pod从创建到消亡的整个生命周期

在1.21中Unknown状态已经被标记为弃用。

CRI

kubelet通过CRI RPC管理容器的生命周期,执行容器的lifecycle hook和 startup/liveness/readiness的健康检查,同时根据Pod的重启策略在容器失败退出后自动重启容器,CRI是kubelet管理Pod和容器的基础

  • Ref:https://github.com/kubernetes/cri-api/blob/2c8d015e0d408208ca8843c1d6e2e2fce1e5dd94/pkg/apis/runtime/v1/api.proto#L34

    // Runtime service defines the public APIs for remote container runtimes
    service RuntimeService {
    // Version returns the runtime name, runtime version, and runtime API version.
    rpc Version(VersionRequest) returns (VersionResponse) {}

    // RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure
    // the sandbox is in the ready state on success.
    rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}
    // StopPodSandbox stops any running process that is part of the sandbox and
    // reclaims network resources (e.g., IP addresses) allocated to the sandbox.
    // If there are any running containers in the sandbox, they must be forcibly
    // terminated.
    // This call is idempotent, and must not return an error if all relevant
    // resources have already been reclaimed. kubelet will call StopPodSandbox
    // at least once before calling RemovePodSandbox. It will also attempt to
    // reclaim resources eagerly, as soon as a sandbox is not needed. Hence,
    // multiple StopPodSandbox calls are expected.
    rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {}
    // RemovePodSandbox removes the sandbox. If there are any running containers
    // in the sandbox, they must be forcibly terminated and removed.
    // This call is idempotent, and must not return an error if the sandbox has
    // already been removed.
    rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {}
    // PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not
    // present, returns an error.
    rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}
    // ListPodSandbox returns a list of PodSandboxes.
    rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}

    // CreateContainer creates a new container in specified PodSandbox
    rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}
    // StartContainer starts the container.
    rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {}
    // StopContainer stops a running container with a grace period (i.e., timeout).
    // This call is idempotent, and must not return an error if the container has
    // already been stopped.
    // The runtime must forcibly kill the container after the grace period is
    // reached.
    rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {}
    // RemoveContainer removes the container. If the container is running, the
    // container must be forcibly removed.
    // This call is idempotent, and must not return an error if the container has
    // already been removed.
    rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}
    // ListContainers lists all containers by filters.
    rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}
    // ContainerStatus returns status of the container. If the container is not
    // present, returns an error.
    rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}
    // UpdateContainerResources updates ContainerConfig of the container synchronously.
    // If runtime fails to transactionally update the requested resources, an error is returned.
    rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}
    // ReopenContainerLog asks runtime to reopen the stdout/stderr log file
    // for the container. This is often called after the log file has been
    // rotated. If the container is not running, container runtime can choose
    // to either create a new log file and return nil, or return an error.
    // Once it returns error, new container log file MUST NOT be created.
    rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}

    // ExecSync runs a command in a container synchronously.
    rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}
    // Exec prepares a streaming endpoint to execute a command in the container.
    rpc Exec(ExecRequest) returns (ExecResponse) {}
    // Attach prepares a streaming endpoint to attach to a running container.
    rpc Attach(AttachRequest) returns (AttachResponse) {}
    // PortForward prepares a streaming endpoint to forward ports from a PodSandbox.
    rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}

    // ContainerStats returns stats of the container. If the container does not
    // exist, the call returns an error.
    rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}
    // ListContainerStats returns stats of all running containers.
    rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}

    // PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not
    // exist, the call returns an error.
    rpc PodSandboxStats(PodSandboxStatsRequest) returns (PodSandboxStatsResponse) {}
    // ListPodSandboxStats returns stats of the pod sandboxes matching a filter.
    rpc ListPodSandboxStats(ListPodSandboxStatsRequest) returns (ListPodSandboxStatsResponse) {}

    // UpdateRuntimeConfig updates the runtime configuration based on the given request.
    rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}

    // Status returns the status of the runtime.
    rpc Status(StatusRequest) returns (StatusResponse) {}

    // CheckpointContainer checkpoints a container
    rpc CheckpointContainer(CheckpointContainerRequest) returns (CheckpointContainerResponse) {}

    // GetContainerEvents gets container events from the CRI runtime
    rpc GetContainerEvents(GetEventsRequest) returns (stream ContainerEventResponse) {}

    }

Pod启动流程

syncLoop监听到Pod创建事件,触发执行HandlerPodAdditions Handler

kubelet最终会启动一个sync主调谐程序,最终他们都触发执行syncLoopIteration,执行HandlerPodAdditions,将任务下发给podWorkers异步执行

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/config/apiserver.go#L37

    // NewSourceApiserver creates a config source that watches and pulls from the apiserver.
    func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))

    // The Reflector responsible for watching pods at the apiserver should be run only after
    // the node sync with the apiserver has completed.
    klog.InfoS("Waiting for node sync before watching apiserver pods")
    go func() {
    for {
    if nodeHasSynced() {
    klog.V(4).InfoS("node sync completed")
    break
    }
    time.Sleep(WaitForAPIServerSyncPeriod)
    klog.V(4).InfoS("node sync has not completed yet")
    }
    klog.InfoS("Watching apiserver")
    newSourceApiserverFromLW(lw, updates)
    }()
    }

    NewSourceApiserver 负责创建kube-apiserver事件监听源。此处通过设置fieldSelector确保收到的Pod仅于本节点相关,即Pod的spec.nodeName必须和当前的kubelet设置的nodeName匹配,才能接受并处理。

    • 在收到Pod事件后,通过Reflector的Store存储接口设置为send,来实现将Event直接发送到updates Channel

    来自不同的事件源的Event首先会被PodConfig做Merge聚合处理,同时确保事件按照正确的顺序推送给syncLoopIteration

    • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/config/config.go#L212

      func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
      s.podLock.Lock()
      defer s.podLock.Unlock()

      addPods := []*v1.Pod{}
      updatePods := []*v1.Pod{}
      deletePods := []*v1.Pod{}
      removePods := []*v1.Pod{}
      reconcilePods := []*v1.Pod{}

      pods := s.pods[source]
      if pods == nil {
      pods = make(map[types.UID]*v1.Pod)
      }

      // updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
      // After updated, new pod will be stored in the pod cache *pods*.
      // Notice that *pods* and *oldPods* could be the same cache.
      updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
      filtered := filterInvalidPods(newPods, source, s.recorder)
      for _, ref := range filtered {
      // Annotate the pod with the source before any comparison.
      if ref.Annotations == nil {
      ref.Annotations = make(map[string]string)
      }
      ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
      if existing, found := oldPods[ref.UID]; found {
      pods[ref.UID] = existing
      needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
      if needUpdate {
      updatePods = append(updatePods, existing)
      } else if needReconcile {
      reconcilePods = append(reconcilePods, existing)
      } else if needGracefulDelete {
      deletePods = append(deletePods, existing)
      }
      continue
      }
      recordFirstSeenTime(ref)
      pods[ref.UID] = ref
      addPods = append(addPods, ref)
      }
      }

      update := change.(kubetypes.PodUpdate)
      switch update.Op {
      case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
      if update.Op == kubetypes.ADD {
      klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      } else if update.Op == kubetypes.DELETE {
      klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      } else {
      klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      }
      updatePodsFunc(update.Pods, pods, pods)

      case kubetypes.REMOVE:
      klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      for _, value := range update.Pods {
      if existing, found := pods[value.UID]; found {
      // this is a delete
      delete(pods, value.UID)
      removePods = append(removePods, existing)
      continue
      }
      // this is a no-op
      }

      case kubetypes.SET:
      klog.V(4).InfoS("Setting pods for source", "source", source)
      s.markSourceSet(source)
      // Clear the old map entries by just creating a new map
      oldPods := pods
      pods = make(map[types.UID]*v1.Pod)
      updatePodsFunc(update.Pods, oldPods, pods)
      for uid, existing := range oldPods {
      if _, found := pods[uid]; !found {
      // this is a delete
      removePods = append(removePods, existing)
      }
      }

      default:
      klog.InfoS("Received invalid update type", "type", update)

      }

      s.pods[source] = pods

      adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
      updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
      deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
      removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
      reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}

      return adds, updates, deletes, removes, reconciles
      }

      PodConfig通过内部的Cache已经发现的Pod,当有change的事件,通过对比新老数据,判断Pod事件分为ADD、UPDATE、DELETE、REMOVE、RECONFILE几种

      syncLoopIeration处理并处理PodUpdate事件

      • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kubelet.go#L2083

        func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
        syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
        select {
        case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
        klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
        return false
        }

        switch u.Op {
        case kubetypes.ADD:
        klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
        // After restarting, kubelet will get all existing pods through
        // ADD as if they are new pods. These pods will then go through the
        // admission process and *may* be rejected. This can be resolved
        // once we have checkpointing.
        handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
        klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
        handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
        klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
        handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
        klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
        handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
        klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
        // DELETE is treated as a UPDATE because of graceful deletion.
        handler.HandlePodUpdates(u.Pods)
        case kubetypes.SET:
        // TODO: Do we want to support this?
        klog.ErrorS(nil, "Kubelet does not support snapshot update")
        default:
        klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
        }

        kl.sourcesReady.AddSource(u.Source)

        case e := <-plegCh:
        if e.Type == pleg.ContainerStarted {
        // record the most recent time we observed a container start for this pod.
        // this lets us selectively invalidate the runtimeCache when processing a delete for this pod
        // to make sure we don't miss handling graceful termination for containers we reported as having started.
        kl.lastContainerStartedTime.Add(e.ID, time.Now())
        }
        if isSyncPodWorthy(e) {
        // PLEG event for a pod; sync it.
        if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
        klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
        handler.HandlePodSyncs([]*v1.Pod{pod})
        } else {
        // If the pod no longer exists, ignore the event.
        klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
        }
        }

        if e.Type == pleg.ContainerDied {
        if containerID, ok := e.Data.(string); ok {
        kl.cleanUpContainersInPod(e.ID, containerID)
        }
        }
        case <-syncCh:
        // Sync pods waiting for sync
        podsToSync := kl.getPodsToSync()
        if len(podsToSync) == 0 {
        break
        }
        klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
        handler.HandlePodSyncs(podsToSync)
        case update := <-kl.livenessManager.Updates():
        if update.Result == proberesults.Failure {
        handleProbeSync(kl, update, handler, "liveness", "unhealthy")
        }
        case update := <-kl.readinessManager.Updates():
        ready := update.Result == proberesults.Success
        kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)

        status := ""
        if ready {
        status = "ready"
        }
        handleProbeSync(kl, update, handler, "readiness", status)
        case update := <-kl.startupManager.Updates():
        started := update.Result == proberesults.Success
        kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)

        status := "unhealthy"
        if started {
        status = "started"
        }
        handleProbeSync(kl, update, handler, "startup", status)
        case <-housekeepingCh:
        if !kl.sourcesReady.AllReady() {
        // If the sources aren't ready or volume manager has not yet synced the states,
        // skip housekeeping, as we may accidentally delete pods from unready sources.
        klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
        } else {
        start := time.Now()
        klog.V(4).InfoS("SyncLoop (housekeeping)")
        if err := handler.HandlePodCleanups(); err != nil {
        klog.ErrorS(err, "Failed cleaning pods")
        }
        duration := time.Since(start)
        if duration > housekeepingWarningDuration {
        klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
        }
        klog.V(4).InfoS("SyncLoop (housekeeping) end")
        }
        }
        return true
        }

新建Pod对应的ADD事件,直接由HandlePodAdditions处理

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kubelet.go#L2238


    // HandlePodAdditions is the callback in SyncHandler for pods being added from
    // a config source.
    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    start := kl.clock.Now()
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
    existingPods := kl.podManager.GetPods()
    // Always add the pod to the pod manager. Kubelet relies on the pod
    // manager as the source of truth for the desired state. If a pod does
    // not exist in the pod manager, it means that it has been deleted in
    // the apiserver and no action (other than cleanup) is required.
    kl.podManager.AddPod(pod)

    if kubetypes.IsMirrorPod(pod) {
    kl.handleMirrorPod(pod, start)
    continue
    }

    // Only go through the admission process if the pod is not requested
    // for termination by another part of the kubelet. If the pod is already
    // using resources (previously admitted), the pod worker is going to be
    // shutting it down. If the pod hasn't started yet, we know that when
    // the pod worker is invoked it will also avoid setting up the pod, so
    // we simply avoid doing any work.
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    // We failed pods that we rejected, so activePods include all admitted
    // pods that are alive.
    activePods := kl.filterOutInactivePods(existingPods)

    // Check if we can admit the pod; if not, reject it.
    if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
    kl.rejectPod(pod, reason, message)
    continue
    }
    }
    mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    }
    }

新建Pod对应的ADD事件类型,直接由HandlePodAdditions处理。

HandlePodAdditions:通过dispatchWork->kl.podWorkers.UpdatePod的调用链,将Pod创建请求的发送给podWorkers做进一步的处理

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kubelet.go#L2238

    // HandlePodAdditions is the callback in SyncHandler for pods being added from
    // a config source.
    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    start := kl.clock.Now()
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
    existingPods := kl.podManager.GetPods()
    // Always add the pod to the pod manager. Kubelet relies on the pod
    // manager as the source of truth for the desired state. If a pod does
    // not exist in the pod manager, it means that it has been deleted in
    // the apiserver and no action (other than cleanup) is required.
    kl.podManager.AddPod(pod)

    if kubetypes.IsMirrorPod(pod) {
    kl.handleMirrorPod(pod, start)
    continue
    }

    // Only go through the admission process if the pod is not requested
    // for termination by another part of the kubelet. If the pod is already
    // using resources (previously admitted), the pod worker is going to be
    // shutting it down. If the pod hasn't started yet, we know that when
    // the pod worker is invoked it will also avoid setting up the pod, so
    // we simply avoid doing any work.
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    // We failed pods that we rejected, so activePods include all admitted
    // pods that are alive.
    activePods := kl.filterOutInactivePods(existingPods)

    // Check if we can admit the pod; if not, reject it.
    if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
    kl.rejectPod(pod, reason, message)
    continue
    }
    }
    mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    }
    }

在处理新增Pod前,首先会进行排序操作,确保创建时间早的Pod总是能被优先处理。

  • 对于Static Pod的Mirror Pod,kubelet将忽略其Spec的变化,总是以File或HTTP源定义的PodSpec为准,更新Mirror POd。因此,用户无法通过删除或者修改Mirror pod的方式修改或者重启Staic Pod
  • kubelet在创建Mirror Pod时,会为其设置一个key为kubernetes.io/config.mirror,value为static Pod的哈希值的Annotation,并且该值不能允许更新或者修改,在执行SyncPod,通过判断哈希值,判断是否需要重建Mirror Pod。

在真正调用dispatch分发任务,还会通过canAdmitPod 执行节点级准入控制器

kubelte默认注册的准入控制器如下:

准入控制器 功能描述
EvictionAdmitHandler 当面临资源压力的时候,拒绝可能影响节点稳定性的Pod运行,具体规则如下:1. Critical Pod不考虑资源压力,将被直接允许2. 对于仅仅存在内存压力的情况,Guranteed呵呵Burstable类型的Pod将被允许3.对于仅存在内存压力场景,当BestEffot类型的Pod容忍了内存压力污点时,将被允许4. 在其他情况下,Pod将会被拒绝访问
SyctlsAllowlist 检查Pod的SecurityContext定义中包含的sysctls是否在allowlist白名单中,默认许可的sysctls如下(可以通过–allow-unsafe-sysctls设置更多)1. “kernel.shm_rmid_forced” 2. Net.ipv4.ip_local_port_ranage 3. Net.ipv4tcp_syncookies 4. Net.ipv4.ping_group_range 5.net.ipv4.ip_unprivileged_port_start
AllocateResourcesPodAdmitHandler 检查节点Device\CPU\Memory资源是否满足Pod需求。当启用TopologpyManager特性门控(默认开启),会合并资源拓扑分布是否满足Polcy策略
PredicateAdmintHandler 从调度特性评估Pod是否能够通过准入控制器,检查内容是否包括noderesources、nodeport、nodeAffity、nodeName、taint/toleration。当Critical Pod被当前准入控制器拒绝且原因是资源不足时,将首先尝试驱逐低优先级Pod来释放需要的资源,以满足调度要求,此准入控制器次阿辉检查Pod的OS Label(kubernetes.io/os)以及Field(Spec.OS.Name)是否和当前的kubelet运行OS匹配
ShutdownAdmitHadnler 在节点处于shutting关机中,拒绝所有Pod

dispatchWork主要工作就是对Pod(创建、更新、删除、同步)事件下发给podWorkers做异步处理。

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kubelet.go#L2212

    // dispatchWork starts the asynchronous sync of the pod in a pod worker.
    // If the pod has completed termination, dispatchWork will perform no action.
    func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(UpdatePodOptions{
    Pod: pod,
    MirrorPod: mirrorPod,
    UpdateType: syncType,
    StartTime: start,
    })
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {
    metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    }
    }

podWorkers收到Update事件,采用独立协程异步调用syncPod来处理

podWorkers会为每个Pod创建一个独立的任务Channel和一个goroutine,每个Pod处理协程都会阻塞式等待Channel中任务,并且对获取的任务进行处理。podWorkers则负责将Pod任务发送到对应的Channel中

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/pod_workers.go#L557


    // UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable,
    // terminating, or terminated, and will transition to terminating if deleted on the apiserver, it is
    // discovered to have a terminal phase (Succeeded or Failed), or if it is evicted by the kubelet.
    func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
    // handle when the pod is an orphan (no config) and we only have runtime status by running only
    // the terminating part of the lifecycle
    pod := options.Pod
    var isRuntimePod bool
    if options.RunningPod != nil {
    if options.Pod == nil {
    pod = options.RunningPod.ToAPIPod()
    if options.UpdateType != kubetypes.SyncPodKill {
    klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
    return
    }
    options.Pod = pod
    isRuntimePod = true
    } else {
    options.RunningPod = nil
    klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KObj(options.Pod), "podUID", options.Pod.UID)
    }
    }
    uid := pod.UID

    p.podLock.Lock()
    defer p.podLock.Unlock()

    // decide what to do with this pod - we are either setting it up, tearing it down, or ignoring it
    now := time.Now()
    status, ok := p.podSyncStatuses[uid]
    if !ok {
    klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
    status = &podSyncStatus{
    syncedAt: now,
    fullname: kubecontainer.GetPodFullName(pod),
    }
    // if this pod is being synced for the first time, we need to make sure it is an active pod
    if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
    // check to see if the pod is not running and the pod is terminal.
    // If this succeeds then record in the podWorker that it is terminated.
    if statusCache, err := p.podCache.Get(pod.UID); err == nil {
    if isPodStatusCacheTerminal(statusCache) {
    status = &podSyncStatus{
    terminatedAt: now,
    terminatingAt: now,
    syncedAt: now,
    startedTerminating: true,
    finished: true,
    fullname: kubecontainer.GetPodFullName(pod),
    }
    }
    }
    }
    p.podSyncStatuses[uid] = status
    }

    // if an update is received that implies the pod should be running, but we are already terminating a pod by
    // that UID, assume that two pods with the same UID were created in close temporal proximity (usually static
    // pod but it's possible for an apiserver to extremely rarely do something similar) - flag the sync status
    // to indicate that after the pod terminates it should be reset to "not running" to allow a subsequent add/update
    // to start the pod worker again
    if status.IsTerminationRequested() {
    if options.UpdateType == kubetypes.SyncPodCreate {
    status.restartRequested = true
    klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KObj(pod), "podUID", pod.UID)
    return
    }
    }

    // once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
    if status.IsFinished() {
    klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
    return
    }

    // check for a transition to terminating
    var becameTerminating bool
    if !status.IsTerminationRequested() {
    switch {
    case isRuntimePod:
    klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KObj(pod), "podUID", pod.UID)
    status.deleted = true
    status.terminatingAt = now
    becameTerminating = true
    case pod.DeletionTimestamp != nil:
    klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
    status.deleted = true
    status.terminatingAt = now
    becameTerminating = true
    case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded:
    klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
    status.terminatingAt = now
    becameTerminating = true
    case options.UpdateType == kubetypes.SyncPodKill:
    if options.KillPodOptions != nil && options.KillPodOptions.Evict {
    klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
    status.evicted = true
    } else {
    klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
    }
    status.terminatingAt = now
    becameTerminating = true
    }
    }

    // once a pod is terminating, all updates are kills and the grace period can only decrease
    var workType PodWorkType
    var wasGracePeriodShortened bool
    switch {
    case status.IsTerminated():
    // A terminated pod may still be waiting for cleanup - if we receive a runtime pod kill request
    // due to housekeeping seeing an older cached version of the runtime pod simply ignore it until
    // after the pod worker completes.
    if isRuntimePod {
    klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KObj(pod), "podUID", pod.UID)
    return
    }

    workType = TerminatedPodWork

    if options.KillPodOptions != nil {
    if ch := options.KillPodOptions.CompletedCh; ch != nil {
    close(ch)
    }
    }
    options.KillPodOptions = nil

    case status.IsTerminationRequested():
    workType = TerminatingPodWork
    if options.KillPodOptions == nil {
    options.KillPodOptions = &KillPodOptions{}
    }

    if ch := options.KillPodOptions.CompletedCh; ch != nil {
    status.notifyPostTerminating = append(status.notifyPostTerminating, ch)
    }
    if fn := options.KillPodOptions.PodStatusFunc; fn != nil {
    status.statusPostTerminating = append(status.statusPostTerminating, fn)
    }

    gracePeriod, gracePeriodShortened := calculateEffectiveGracePeriod(status, pod, options.KillPodOptions)

    wasGracePeriodShortened = gracePeriodShortened
    status.gracePeriod = gracePeriod
    // always set the grace period for syncTerminatingPod so we don't have to recalculate,
    // will never be zero.
    options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod

    default:
    workType = SyncPodWork

    // KillPodOptions is not valid for sync actions outside of the terminating phase
    if options.KillPodOptions != nil {
    if ch := options.KillPodOptions.CompletedCh; ch != nil {
    close(ch)
    }
    options.KillPodOptions = nil
    }
    }

    // the desired work we want to be performing
    work := podWork{
    WorkType: workType,
    Options: options,
    }

    // start the pod worker goroutine if it doesn't exist
    podUpdates, exists := p.podUpdates[uid]
    if !exists {
    // We need to have a buffer here, because checkForUpdates() method that
    // puts an update into channel is called from the same goroutine where
    // the channel is consumed. However, it is guaranteed that in such case
    // the channel is empty, so buffer of size 1 is enough.
    podUpdates = make(chan podWork, 1)
    p.podUpdates[uid] = podUpdates

    // ensure that static pods start in the order they are received by UpdatePod
    if kubetypes.IsStaticPod(pod) {
    p.waitingToStartStaticPodsByFullname[status.fullname] =
    append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
    }

    // allow testing of delays in the pod update channel
    var outCh <-chan podWork
    if p.workerChannelFn != nil {
    outCh = p.workerChannelFn(uid, podUpdates)
    } else {
    outCh = podUpdates
    }

    // Creating a new pod worker either means this is a new pod, or that the
    // kubelet just restarted. In either case the kubelet is willing to believe
    // the status of the pod for the first pod worker sync. See corresponding
    // comment in syncPod.
    go func() {
    defer runtime.HandleCrash()
    p.managePodLoop(outCh)
    }()
    }

    // dispatch a request to the pod worker if none are running
    if !status.IsWorking() {
    status.working = true
    podUpdates <- work
    return
    }

    // capture the maximum latency between a requested update and when the pod
    // worker observes it
    if undelivered, ok := p.lastUndeliveredWorkUpdate[pod.UID]; ok {
    // track the max latency between when a config change is requested and when it is realized
    // NOTE: this undercounts the latency when multiple requests are queued, but captures max latency
    if !undelivered.Options.StartTime.IsZero() && undelivered.Options.StartTime.Before(work.Options.StartTime) {
    work.Options.StartTime = undelivered.Options.StartTime
    }
    }

    // always sync the most recent data
    p.lastUndeliveredWorkUpdate[pod.UID] = work

    if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil {
    klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", work.WorkType)
    status.cancelFn()
    return
    }
    }
    1. UpdatePod首先构造出work任务对象,WorkType支持三种类型,即SyncPodWork、TerminatingPodWork和TerminatePodWork分别对应kl.syncPod\kl.syncTerminatingPod\kl.syncTerminatedPod
    2. 根据Pod UID检索是否已经支持对应的work Channel,如果没有则为Pod创建一个Channel和goroutine
    3. 将work任务发送给Pod的任务的Channel

执行容器创建准备工作(包括检查网络、设置cgroup、挂载磁盘等)

podWorkers的syncPodFn实际上就是kl.ysncPod

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kubelet.go#L1522

    func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
    klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
    defer func() {
    klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
    }()

    // Latency measurements for the main workflow are relative to the
    // first time the pod was seen by kubelet.
    var firstSeenTime time.Time
    if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
    firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
    }

    // Record pod worker start latency if being created
    // TODO: make pod workers record their own latencies
    if updateType == kubetypes.SyncPodCreate {
    if !firstSeenTime.IsZero() {
    // This is the first time we are syncing the pod. Record the latency
    // since kubelet first saw the pod if firstSeenTime is set.
    metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
    } else {
    klog.V(3).InfoS("First seen time not recorded for pod",
    "podUID", pod.UID,
    "pod", klog.KObj(pod))
    }
    }

    // Generate final API pod status with pod and status manager status
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
    // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
    // set pod IP to hostIP directly in runtime.GetPodStatus
    podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
    for _, ipInfo := range apiPodStatus.PodIPs {
    podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
    }
    if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
    podStatus.IPs = []string{apiPodStatus.PodIP}
    }

    // If the pod is terminal, we don't need to continue to setup the pod
    if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
    kl.statusManager.SetPodStatus(pod, apiPodStatus)
    isTerminal = true
    return isTerminal, nil
    }

    // If the pod should not be running, we request the pod's containers be stopped. This is not the same
    // as termination (we want to stop the pod, but potentially restart it later if soft admission allows
    // it later). Set the status and phase appropriately
    runnable := kl.canRunPod(pod)
    if !runnable.Admit {
    // Pod is not runnable; and update the Pod and Container statuses to why.
    if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded {
    apiPodStatus.Phase = v1.PodPending
    }
    apiPodStatus.Reason = runnable.Reason
    apiPodStatus.Message = runnable.Message
    // Waiting containers are not creating.
    const waitingReason = "Blocked"
    for _, cs := range apiPodStatus.InitContainerStatuses {
    if cs.State.Waiting != nil {
    cs.State.Waiting.Reason = waitingReason
    }
    }
    for _, cs := range apiPodStatus.ContainerStatuses {
    if cs.State.Waiting != nil {
    cs.State.Waiting.Reason = waitingReason
    }
    }
    }

    // Record the time it takes for the pod to become running
    // since kubelet first saw the pod if firstSeenTime is set.
    existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
    if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
    !firstSeenTime.IsZero() {
    metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
    }

    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    // Pods that are not runnable must be stopped - return a typed error to the pod worker
    if !runnable.Admit {
    klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
    var syncErr error
    p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
    if err := kl.killPod(pod, p, nil); err != nil {
    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
    syncErr = fmt.Errorf("error killing pod: %v", err)
    utilruntime.HandleError(syncErr)
    } else {
    // There was no error killing the pod, but the pod cannot be run.
    // Return an error to signal that the sync loop should back off.
    syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
    }
    return false, syncErr
    }

    // If the network plugin is not ready, only start the pod if it uses the host network
    if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
    return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
    }

    // ensure the kubelet knows about referenced secrets or configmaps used by the pod
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    if kl.secretManager != nil {
    kl.secretManager.RegisterPod(pod)
    }
    if kl.configMapManager != nil {
    kl.configMapManager.RegisterPod(pod)
    }
    }

    // Create Cgroups for the pod and apply resource parameters
    // to them if cgroups-per-qos flag is enabled.
    pcm := kl.containerManager.NewPodContainerManager()
    // If pod has already been terminated then we need not create
    // or update the pod's cgroup
    // TODO: once context cancellation is added this check can be removed
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    // When the kubelet is restarted with the cgroups-per-qos
    // flag enabled, all the pod's running containers
    // should be killed intermittently and brought back up
    // under the qos cgroup hierarchy.
    // Check if this is the pod's first sync
    firstSync := true
    for _, containerStatus := range apiPodStatus.ContainerStatuses {
    if containerStatus.State.Running != nil {
    firstSync = false
    break
    }
    }
    // Don't kill containers in pod if pod's cgroups already
    // exists or the pod is running for the first time
    podKilled := false
    if !pcm.Exists(pod) && !firstSync {
    p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
    if err := kl.killPod(pod, p, nil); err == nil {
    podKilled = true
    } else {
    klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
    }
    }
    // Create and Update pod's Cgroups
    // Don't create cgroups for run once pod if it was killed above
    // The current policy is not to restart the run once pods when
    // the kubelet is restarted with the new flag as run once pods are
    // expected to run only once and if the kubelet is restarted then
    // they are not expected to run again.
    // We don't create and apply updates to cgroup if its a run once pod and was killed above
    if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
    if !pcm.Exists(pod) {
    if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
    klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
    }
    if err := pcm.EnsureExists(pod); err != nil {
    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
    return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
    }
    }
    }
    }

    // Create Mirror Pod for Static Pod if it doesn't already exist
    if kubetypes.IsStaticPod(pod) {
    deleted := false
    if mirrorPod != nil {
    if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
    // The mirror pod is semantically different from the static pod. Remove
    // it. The mirror pod will get recreated later.
    klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
    podFullName := kubecontainer.GetPodFullName(pod)
    var err error
    deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
    if deleted {
    klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
    } else if err != nil {
    klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
    }
    }
    }
    if mirrorPod == nil || deleted {
    node, err := kl.GetNode()
    if err != nil || node.DeletionTimestamp != nil {
    klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
    } else {
    klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
    if err := kl.podManager.CreateMirrorPod(pod); err != nil {
    klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
    }
    }
    }
    }

    // Make data directories for the pod
    if err := kl.makePodDataDirs(pod); err != nil {
    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
    klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
    return false, err
    }

    // Volume manager will not mount volumes for terminating pods
    // TODO: once context cancellation is added this check can be removed
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    // Wait for volumes to attach/mount
    if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
    klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
    return false, err
    }
    }

    // Fetch the pull secrets for the pod
    pullSecrets := kl.getPullSecretsForPod(pod)

    // Ensure the pod is being probed
    kl.probeManager.AddPod(pod)

    // Call the container runtime's SyncPod callback
    result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
    kl.reasonCache.Update(pod.UID, result)
    if err := result.Error(); err != nil {
    // Do not return error if the only failures were pods in backoff
    for _, r := range result.SyncResults {
    if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
    // Do not record an event here, as we keep all event logging for sync pod failures
    // local to container runtime, so we get better errors.
    return false, err
    }
    }

    return false, nil
    }

    return false, nil
    }

syncPod 主要步骤如下:

  1. 通过canRunPod检查是否能够在当前节点启动。canRunPod内部遍历softAdmitHandlers准入控制器列表,分别调用各准入控制器Admit func。

    • 如果有任意一个准入控制器返回拒绝,则拒绝运行Pod
  2. 通过runtimeState.networkErrors检查网络插件是否就绪

    • 如果网络插件未就绪,值启动Host Network类型的Pod。网络插件状态由容器运行时检测并且反馈,kubelet通过CRI的Status接口的StatusResponse响应,通过判断RuntimeStatus中的Conditions是否包含NetworkReady来确定网络插件是否就绪
  3. 通过secretManager.RegisterPod\configMapManager.ResgisterPod将当前Pod注册到相应的manager,以将Pod引用的Secret和ConfigMap对象注册到Manager,通过watch、TTL等机制维护kubelet本地缓存,方便在Pod挂载时快速读取,避免每次同步Pod都向kube-apiserver发起请求

  4. --cgroups-per-qos参数被启用(默认被启用),通过pcm.EnsureExists为Pod设置Pod级别的cgroup资源限制,而QoS级别的cgroup层级通过containerManager.UpdateQOSCgroups处理。QOSContainerManager通过内部协程每隔1min执行一次UpdateCgroups同步操作

    • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/cm/pod_container_manager_linux.go#L72

      // EnsureExists takes a pod as argument and makes sure that
      // pod cgroup exists if qos cgroup hierarchy flag is enabled.
      // If the pod level container doesn't already exist it is created.
      func (m *podContainerManagerImpl) EnsureExists(pod *v1.Pod) error {
      podContainerName, _ := m.GetPodContainerName(pod)
      // check if container already exist
      alreadyExists := m.Exists(pod)
      if !alreadyExists {
      enforceMemoryQoS := false
      if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryQoS) &&
      libcontainercgroups.IsCgroup2UnifiedMode() {
      enforceMemoryQoS = true
      }
      // Create the pod container
      containerConfig := &CgroupConfig{
      Name: podContainerName,
      ResourceParameters: ResourceConfigForPod(pod, m.enforceCPULimits, m.cpuCFSQuotaPeriod, enforceMemoryQoS),
      }
      if m.podPidsLimit > 0 {
      containerConfig.ResourceParameters.PidsLimit = &m.podPidsLimit
      }
      if enforceMemoryQoS {
      klog.V(4).InfoS("MemoryQoS config for pod", "pod", klog.KObj(pod), "unified", containerConfig.ResourceParameters.Unified)
      }
      if err := m.cgroupManager.Create(containerConfig); err != nil {
      return fmt.Errorf("failed to create container for %v : %v", podContainerName, err)
      }
      }
      return nil
      }

  • 这里没有Memory Requests,是否设置Memory Request并不重要吗?不是,requests更多调度阶段才被关注,limits在运行阶段更多被关注

kubelet通过EnsureExists实现了Pod级别的cgroup资源限制,更低层级的Container级别的cgroup资源借助CRI来完成

通过makePodDataDirs 为Pod准备以下文件夹:

  • /var/lib/kubelet/pods/<pod_uid>
  • /var/lib/kubelet/pods/<pod_uid>/volumes
  • /var/lib/kubelet/pods/<pod_uid>/plugins

对于使用了存储卷的Pod,通过volumeManager.WaitForAttachAndMount等待VolumeManager完成对存储卷Attach和Mount操作,挂载后的数据卷会在宿主机的/var/lib/kubelete/pods//文件夹下体现,在创建容器时通过容器运行时挂载到容器中

  • 启动条件就绪,通过probeManager.AddPod将Pod加入到健康检查探测,再通过SyncPod创建容器

调用CRI创建Sandbox隔离环境(包含调用CNI设置网络)

containerRuntimes.SyncPod执行主要是以下几步:

  1. 计算Sandbox隔离环境是否发生了变化,如Pod的网络从容器网络切换为主机网络会导致Sandbox隔离环境改变
  2. 如果Sandbox隔离环境发生了变化,则重建Sandbox隔离环境
  3. 根据计算结果,终止不应该保持继续运行的容器
  4. 如果需要,为Pod创建新的Sandbox隔离环境
  5. 创建临时容器
  6. 创建初始化容器
  7. 创建普通业务容器

对于Pod启动而言,较为核心的就是创建Sandbox隔离环境和运行普通容器环境。

  1. 首先,通过generatePodSandboxConfig生成runtimeapi.PodSandboxConfig对象,内部包含创建Sandbox隔离环境所需要的信息,如基本元信息

  2. 为Pod生存日志目录,默认为/var/log/pods/<pod_uid>

  3. 调用RunPodSandbox创建并启动Sandbox容器

    • RunPodSandbox不同的CRI实现有不通

    • 整体流程:

        1. 确保Sandbox容器镜像存在
        2. 根据传入的PodSandboxConfig创建Container对象
        3. 为Sandbox容器配置网络环境,包含调用CNI的ADD方法为Sandbox容器分配IP地址
        4. 调用底层OCI Runtime(如runc)启动Sandbox容器

        CNI定义了容器IP地址分配和回收的标准接口,最核心的是ADD(IP分配)和DEL(IP释放)俩个操作

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go#L40

    // createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error).
    func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
    if err != nil {
    message := fmt.Sprintf("Failed to generate sandbox config for pod %q: %v", format.Pod(pod), err)
    klog.ErrorS(err, "Failed to generate sandbox config for pod", "pod", klog.KObj(pod))
    return "", message, err
    }

    // Create pod logs directory
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
    if err != nil {
    message := fmt.Sprintf("Failed to create log directory for pod %q: %v", format.Pod(pod), err)
    klog.ErrorS(err, "Failed to create log directory for pod", "pod", klog.KObj(pod))
    return "", message, err
    }

    runtimeHandler := ""
    if m.runtimeClassManager != nil {
    runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
    if err != nil {
    message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)
    return "", message, err
    }
    if runtimeHandler != "" {
    klog.V(2).InfoS("Running pod with runtime handler", "pod", klog.KObj(pod), "runtimeHandler", runtimeHandler)
    }
    }

    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
    if err != nil {
    message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)
    klog.ErrorS(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod))
    return "", message, err
    }

    return podSandBoxID, "", nil
    }

调用CRI创建普通容器

在Sandbox隔离环境创建之后,kubelet接着启动普通容器,普通容器包含Ephemeral临时容器、Init容器以及Normal业务容器。

上面的三类容器都通过高satrt func启动

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L848

    if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
    // startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
    // useful to cluster administrators to distinguish "server errors" from "user errors".
    metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
    if sc.HasWindowsHostProcessRequest(pod, spec.container) {
    metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
    }
    startContainerResult.Fail(err, msg)
    // known errors that are logged in other places are logged at higher levels here to avoid
    // repetitive log spam
    switch {
    case err == images.ErrImagePullBackOff:
    klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
    default:
    utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
    }
    return err
    }

    return nil
    }
  • Ref:https://github.com/kubernetes/kubernetes/blob/16da2955d0ffeb7fcdfd7148ef2fb6c1ce1a9ef5/pkg/kubelet/kuberuntime/kuberuntime_container.go#L196

    // startContainer starts a container and returns a message indicates why it is failed on error.
    // It starts the container through the following steps:
    // * pull the image
    // * create the container
    // * start the container
    // * run the post start lifecycle hooks (if applicable)
    func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string, imageVolumes kubecontainer.ImageVolumes) (string, error) {
    container := spec.container

    // Step 1: pull the image.
    podRuntimeHandler, err := m.getPodRuntimeHandler(pod)
    if err != nil {
    return "", err
    }

    ref, err := kubecontainer.GenerateContainerRef(pod, container)
    if err != nil {
    klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod), "containerName", container.Name)
    }

    imageRef, msg, err := m.imagePuller.EnsureImageExists(ctx, ref, pod, container.Image, pullSecrets, podSandboxConfig, podRuntimeHandler, container.ImagePullPolicy)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    return msg, err
    }

    // Step 2: create the container.
    // For a new container, the RestartCount should be 0
    restartCount := 0
    containerStatus := podStatus.FindContainerStatusByName(container.Name)
    if containerStatus != nil {
    restartCount = containerStatus.RestartCount + 1
    } else {
    // The container runtime keeps state on container statuses and
    // what the container restart count is. When nodes are rebooted
    // some container runtimes clear their state which causes the
    // restartCount to be reset to 0. This causes the logfile to
    // start at 0.log, which either overwrites or appends to the
    // already existing log.
    //
    // We are checking to see if the log directory exists, and find
    // the latest restartCount by checking the log name -
    // {restartCount}.log - and adding 1 to it.
    logDir := BuildContainerLogsDirectory(m.podLogsDirectory, pod.Namespace, pod.Name, pod.UID, container.Name)
    restartCount, err = calcRestartCountByLogDir(logDir)
    if err != nil {
    klog.InfoS("Cannot calculate restartCount from the log directory", "logDir", logDir, "err", err)
    restartCount = 0
    }
    }

    target, err := spec.getTargetID(podStatus)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    return s.Message(), ErrCreateContainerConfig
    }

    containerConfig, cleanupAction, err := m.generateContainerConfig(ctx, container, pod, restartCount, podIP, imageRef, podIPs, target, imageVolumes)
    if cleanupAction != nil {
    defer cleanupAction()
    }
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    return s.Message(), ErrCreateContainerConfig
    }

    err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message())
    return s.Message(), ErrPreCreateHook
    }

    containerID, err := m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    return s.Message(), ErrCreateContainer
    }
    err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", s.Message())
    return s.Message(), ErrPreStartHook
    }
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container: %v", container.Name)

    // Step 3: start the container.
    err = m.runtimeService.StartContainer(ctx, containerID)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())
    return s.Message(), kubecontainer.ErrRunContainer
    }
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container %v", container.Name)

    // Symlink container logs to the legacy container log location for cluster logging
    // support.
    // TODO(random-liu): Remove this after cluster logging supports CRI container log path.
    containerMeta := containerConfig.GetMetadata()
    sandboxMeta := podSandboxConfig.GetMetadata()
    legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
    sandboxMeta.Namespace)
    containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
    // only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
    // Because if containerLog path does not exist, only dangling legacySymlink is created.
    // This dangling legacySymlink is later removed by container gc, so it does not make sense
    // to create it in the first place. it happens when journald logging driver is used with docker.
    if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
    if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
    klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,
    "containerID", containerID, "containerLogPath", containerLog)
    }
    }

    // Step 4: execute the post start hook.
    if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
    kubeContainerID := kubecontainer.ContainerID{
    Type: m.runtimeName,
    ID: containerID,
    }
    msg, handlerErr := m.runner.Run(ctx, kubeContainerID, pod, container, container.Lifecycle.PostStart)
    if handlerErr != nil {
    klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),
    "podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
    // do not record the message in the event so that secrets won't leak from the server.
    m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, "PostStartHook failed")
    if err := m.killContainer(ctx, pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil, nil); err != nil {
    klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
    "podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
    }
    return msg, ErrPostStartHook
    }
    }

    return "", nil
    }

    由于普通容器共享Sandbox隔离环境的Network、IPC、UTS等命名空间,创建普通容器等过程如下:

    1. 确保运行容器镜像存在,如果不存在,尝试主动拉取
    2. 调用CRI的CreateContainer创建容器,并且绑定Sandbox隔离环境
    3. 调用CRI的StartContainer启动容器,底层通过OCI runtime完成
    4. 如果容器配置了PostStart Hook,则通过runner.Run调用执行

Pod驱逐流程

为了保证节点的稳定性以及Pod QoS,kubelet会在节点资源不足的情况,按照一定的的策略驱逐Pod,确保节点上的资源得到最佳利用

kubelet驱逐Pod的场景主要包含俩类:

  • Critical Pod被调度到节点,因资源竞争,触发对低有优先级的Pod的驱逐
  • 在节点资源紧张的时候,为了保证节点的稳定性,触发对低优先级的Pod的驱逐

Critical Pod被调度到节点,因资源竞争,触发对低有优先级的Pod的驱逐

触发驱逐的核心逻辑是:CriticalPodAdmissionHandler

  • Ref:https://github.com/kubernetes/kubernetes/blob/3ec97a445f036a38bfec6291dee661954138bac9/pkg/kubelet/preemption/preemption.go#L63

    // HandleAdmissionFailure gracefully handles admission rejection, and, in some cases,
    // to allow admission of the pod despite its previous failure.
    func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []lifecycle.PredicateFailureReason) ([]lifecycle.PredicateFailureReason, error) {
    if !kubetypes.IsCriticalPod(admitPod) {
    return failureReasons, nil
    }
    // InsufficientResourceError is not a reason to reject a critical pod.
    // Instead of rejecting, we free up resources to admit it, if no other reasons for rejection exist.
    nonResourceReasons := []lifecycle.PredicateFailureReason{}
    resourceReasons := []*admissionRequirement{}
    for _, reason := range failureReasons {
    if r, ok := reason.(*lifecycle.InsufficientResourceError); ok {
    resourceReasons = append(resourceReasons, &admissionRequirement{
    resourceName: r.ResourceName,
    quantity: r.GetInsufficientAmount(),
    })
    } else {
    nonResourceReasons = append(nonResourceReasons, reason)
    }
    }
    if len(nonResourceReasons) > 0 {
    // Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons.
    return nonResourceReasons, nil
    }
    err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))
    // if no error is returned, preemption succeeded and the pod is safe to admit.
    return nil, err
    }

kubelet选择最低优先级的Pod作为驱逐对象遵循一定的策略

  • Ref:https://github.com/kubernetes/kubernetes/blob/3ec97a445f036a38bfec6291dee661954138bac9/pkg/kubelet/preemption/preemption.go#L130

    // getPodsToPreempt returns a list of pods that could be preempted to free requests >= requirements
    func getPodsToPreempt(pod *v1.Pod, pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
    bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pod, pods)

    // make sure that pods exist to reclaim the requirements
    unableToMeetRequirements := requirements.subtract(append(append(bestEffortPods, burstablePods...), guaranteedPods...)...)
    if len(unableToMeetRequirements) > 0 {
    return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", unableToMeetRequirements.toString())
    }
    // find the guaranteed pods we would need to evict if we already evicted ALL burstable and besteffort pods.
    guaranteedToEvict, err := getPodsToPreemptByDistance(guaranteedPods, requirements.subtract(append(bestEffortPods, burstablePods...)...))
    if err != nil {
    return nil, err
    }
    // Find the burstable pods we would need to evict if we already evicted ALL besteffort pods, and the required guaranteed pods.
    burstableToEvict, err := getPodsToPreemptByDistance(burstablePods, requirements.subtract(append(bestEffortPods, guaranteedToEvict...)...))
    if err != nil {
    return nil, err
    }
    // Find the besteffort pods we would need to evict if we already evicted the required guaranteed and burstable pods.
    bestEffortToEvict, err := getPodsToPreemptByDistance(bestEffortPods, requirements.subtract(append(burstableToEvict, guaranteedToEvict...)...))
    if err != nil {
    return nil, err
    }
    return append(append(bestEffortToEvict, burstableToEvict...), guaranteedToEvict...), nil
    }

    具体计算逻辑如下:

    1. 筛选出节点上优先级低于当前运行Pod的Pod列表(Critial Pod优先级高于非Critial Pod,Priority值大的Pod优先级高于Prioity值小的Pod

    2. 将筛选出的Pod列表,按照Pod的QoS分为三类,即BestEffort、Burstable和Guaranteed

    3. 判断驱逐当前统计出的可驱逐Pod列表中的Pod能释放的资源总和能否满足待运行的Pod的需求,如果不能,则返回错误,不触发后续驱逐操作

    4. 假设所有的BestEffort Pod和Butstable Pod已经全部被驱逐,占用的资源得到释放,则计算剩余资源需要驱逐的Guraranteed Pod列表。在选择Pod时,遍历候选的Pod列表,依次计算驱逐每个Pod能释放的资源和所需资源的距离,距离越小表示驱逐当前的Pod越能满足所需资源。每次都选择距离最小的Pod作为驱逐对象,并且添加到待驱逐列表。

      • 当同时存在多个Pod能释放的资源与所需要的资源的距离计算结果相同,优先选择资源占用更小的Pod
    5. 假设所有的BestEffort Pod和计算出的需要驱逐的Guaranteed Pod已经全部被驱逐,占用的资源假设得到释放,计算释放剩余资源需要驱逐的BestEffort Pod列表

    6. 假设需要驱逐的Guaranteed Pod和Burstable Pod已经全部被驱逐,占用的资源得到释放,则计算释放剩余资源需要驱逐的Besteffort Pod列表

    7. 将前面几步计算得出驱逐的Pod叠加,按照BestEffort、Burstable、Guaranteed的顺序排序,作为最终的带驱逐的Pod列表

      整体,选择待驱逐的Pod,还是遵循QoS等级,按咋后BestEffort\Burstable\Guarantedd顺序,同时采用资源距离算法尽可撒后减少驱逐Pod的数量

    在节点资源紧张的时候,为了保证节点的稳定性,触发对低优先级的Pod的驱逐

    kubelet使用EvictionManager监控节点的资源使用情况,包含内存、存储、PID等,当节点存在资源压力的,主动驱逐Pod释放资源,确保节点工作稳定

EvictionManager每间隔10s轮询Eviction Signal驱逐信号,并且与设置的Thresholds阈值比较,当满足驱逐条件的时候,从Active Pods按照一定的规则策略选择需要驱逐的Pod进行比较,因节点资源压力导致的驱逐在每个执行周期最多驱逐一个Pod,不会批量驱逐。

为了提高对内存变化的感知速度,EvictionManager使用内存用量监听,使用MemoryThresholdNotifier监听cgroups memcg event事件,通过基于事件触发的模式降低每隔10s轮询的延迟

kubelet支持俩种驱逐类型:硬驱逐和软驱逐

硬驱逐相关参数

  • --eviction-hard:硬驱逐阈值,当资源压力达到阈值,立即触发硬驱逐,此时Pod的优雅停机时间会被忽略,直接发送Kill信号,默认值为imagefs.available<15%,memory.available<100Mi,nodefs.available<10%

软驱逐相关参数

  • --eviction-soft:软驱逐阈值,当资源达到阈值,触发软驱逐,此刻不会立即终止Pod,而是等待eviction-sofe-grace-period时间后,资源压力任然触发,才会执行Pod驱逐
  • eviction-sofe-grace-period:软驱逐的优雅等待时间,防止因为抖动导致驱逐
  • --eviction-max-pod-grace-period:软驱逐的最大优雅停机等待时间。如果待驱逐的Pod已经设置了TerminationGradePeriodSeconds,则会选择Pod优雅停机等待时间和软驱逐的优雅停机等待时间的最小作为,而时间上直接用了软驱逐的优雅等待时间,这是一个已知道问题issue#64530

公共参数

  • --eviction-minimum-reclaim:驱逐最小回收量。在某些情况下,驱逐Pod只会回收少量的紧俏资源,这可能导致kubelet反复达到配置的驱逐条件并多次触发驱逐,可以通过该配置为每类资源资源的最小回收量
  • --eviction-pressure-transition-period:节点状态转换等待时间。在某些情况下,节点在软驱逐条件上上下振荡,这会导致报告节点状态一直在true和false转换,导致错误的驱逐错误。为了防止振荡,可以设置该参数在节点转换的等待时间,默认5min
  • --kernel-memcg-notification:启用内核的memcg通知机制。在默认情况下,kubelet轮询cAdvisor以定期收集内存使用情况统计信息,这可能不及时。通过启用memcg通知,可以在内存超过阈值的时候,kubelet立即感知到压力,触发驱逐

工作机制上,EvictionManager通过内部协程持续监测节点资源使用情况,默认10s一次探测:

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/eviction/eviction_manager.go#L178

    // Start starts the control loop to observe and response to low compute resources.
    func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration) {
    thresholdHandler := func(message string) {
    klog.InfoS(message)
    m.synchronize(diskInfoProvider, podFunc)
    }
    if m.config.KernelMemcgNotification {
    for _, threshold := range m.config.Thresholds {
    if threshold.Signal == evictionapi.SignalMemoryAvailable || threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable {
    notifier, err := NewMemoryThresholdNotifier(threshold, m.config.PodCgroupRoot, &CgroupNotifierFactory{}, thresholdHandler)
    if err != nil {
    klog.InfoS("Eviction manager: failed to create memory threshold notifier", "err", err)
    } else {
    go notifier.Start()
    m.thresholdNotifiers = append(m.thresholdNotifiers, notifier)
    }
    }
    }
    }
    // start the eviction manager monitoring
    go func() {
    for {
    if evictedPods := m.synchronize(diskInfoProvider, podFunc); evictedPods != nil {
    klog.InfoS("Eviction manager: pods evicted, waiting for pod to be cleaned up", "pods", klog.KObjs(evictedPods))
    m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
    } else {
    time.Sleep(monitoringInterval)
    }
    }
    }()
    }

synchronize是驱逐探测的核心逻辑;

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/eviction/eviction_manager.go#L232

    // synchronize is the main control loop that enforces eviction thresholds.
    // Returns the pod that was killed, or nil if no pod was killed.
    func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
    // if we have nothing to do, just return
    thresholds := m.config.Thresholds
    if len(thresholds) == 0 && !m.localStorageCapacityIsolation {
    return nil
    }

    klog.V(3).InfoS("Eviction manager: synchronize housekeeping")
    // build the ranking functions (if not yet known)
    // TODO: have a function in cadvisor that lets us know if global housekeeping has completed
    if m.dedicatedImageFs == nil {
    hasImageFs, ok := diskInfoProvider.HasDedicatedImageFs()
    if ok != nil {
    return nil
    }
    m.dedicatedImageFs = &hasImageFs
    m.signalToRankFunc = buildSignalToRankFunc(hasImageFs)
    m.signalToNodeReclaimFuncs = buildSignalToNodeReclaimFuncs(m.imageGC, m.containerGC, hasImageFs)
    }

    activePods := podFunc()
    updateStats := true
    summary, err := m.summaryProvider.Get(updateStats)
    if err != nil {
    klog.ErrorS(err, "Eviction manager: failed to get summary stats")
    return nil
    }

    if m.clock.Since(m.thresholdsLastUpdated) > notifierRefreshInterval {
    m.thresholdsLastUpdated = m.clock.Now()
    for _, notifier := range m.thresholdNotifiers {
    if err := notifier.UpdateThreshold(summary); err != nil {
    klog.InfoS("Eviction manager: failed to update notifier", "notifier", notifier.Description(), "err", err)
    }
    }
    }

    // make observations and get a function to derive pod usage stats relative to those observations.
    observations, statsFunc := makeSignalObservations(summary)
    debugLogObservations("observations", observations)

    // determine the set of thresholds met independent of grace period
    thresholds = thresholdsMet(thresholds, observations, false)
    debugLogThresholdsWithObservation("thresholds - ignoring grace period", thresholds, observations)

    // determine the set of thresholds previously met that have not yet satisfied the associated min-reclaim
    if len(m.thresholdsMet) > 0 {
    thresholdsNotYetResolved := thresholdsMet(m.thresholdsMet, observations, true)
    thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved)
    }
    debugLogThresholdsWithObservation("thresholds - reclaim not satisfied", thresholds, observations)

    // track when a threshold was first observed
    now := m.clock.Now()
    thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now)

    // the set of node conditions that are triggered by currently observed thresholds
    nodeConditions := nodeConditions(thresholds)
    if len(nodeConditions) > 0 {
    klog.V(3).InfoS("Eviction manager: node conditions - observed", "nodeCondition", nodeConditions)
    }

    // track when a node condition was last observed
    nodeConditionsLastObservedAt := nodeConditionsLastObservedAt(nodeConditions, m.nodeConditionsLastObservedAt, now)

    // node conditions report true if it has been observed within the transition period window
    nodeConditions = nodeConditionsObservedSince(nodeConditionsLastObservedAt, m.config.PressureTransitionPeriod, now)
    if len(nodeConditions) > 0 {
    klog.V(3).InfoS("Eviction manager: node conditions - transition period not met", "nodeCondition", nodeConditions)
    }

    // determine the set of thresholds we need to drive eviction behavior (i.e. all grace periods are met)
    thresholds = thresholdsMetGracePeriod(thresholdsFirstObservedAt, now)
    debugLogThresholdsWithObservation("thresholds - grace periods satisfied", thresholds, observations)

    // update internal state
    m.Lock()
    m.nodeConditions = nodeConditions
    m.thresholdsFirstObservedAt = thresholdsFirstObservedAt
    m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt
    m.thresholdsMet = thresholds

    // determine the set of thresholds whose stats have been updated since the last sync
    thresholds = thresholdsUpdatedStats(thresholds, observations, m.lastObservations)
    debugLogThresholdsWithObservation("thresholds - updated stats", thresholds, observations)

    m.lastObservations = observations
    m.Unlock()

    // evict pods if there is a resource usage violation from local volume temporary storage
    // If eviction happens in localStorageEviction function, skip the rest of eviction action
    if m.localStorageCapacityIsolation {
    if evictedPods := m.localStorageEviction(activePods, statsFunc); len(evictedPods) > 0 {
    return evictedPods
    }
    }

    if len(thresholds) == 0 {
    klog.V(3).InfoS("Eviction manager: no resources are starved")
    return nil
    }

    // rank the thresholds by eviction priority
    sort.Sort(byEvictionPriority(thresholds))
    thresholdToReclaim, resourceToReclaim, foundAny := getReclaimableThreshold(thresholds)
    if !foundAny {
    return nil
    }
    klog.InfoS("Eviction manager: attempting to reclaim", "resourceName", resourceToReclaim)

    // record an event about the resources we are now attempting to reclaim via eviction
    m.recorder.Eventf(m.nodeRef, v1.EventTypeWarning, "EvictionThresholdMet", "Attempting to reclaim %s", resourceToReclaim)

    // check if there are node-level resources we can reclaim to reduce pressure before evicting end-user pods.
    if m.reclaimNodeLevelResources(thresholdToReclaim.Signal, resourceToReclaim) {
    klog.InfoS("Eviction manager: able to reduce resource pressure without evicting pods.", "resourceName", resourceToReclaim)
    return nil
    }

    klog.InfoS("Eviction manager: must evict pod(s) to reclaim", "resourceName", resourceToReclaim)

    // rank the pods for eviction
    rank, ok := m.signalToRankFunc[thresholdToReclaim.Signal]
    if !ok {
    klog.ErrorS(nil, "Eviction manager: no ranking function for signal", "threshold", thresholdToReclaim.Signal)
    return nil
    }

    // the only candidates viable for eviction are those pods that had anything running.
    if len(activePods) == 0 {
    klog.ErrorS(nil, "Eviction manager: eviction thresholds have been met, but no pods are active to evict")
    return nil
    }

    // rank the running pods for eviction for the specified resource
    rank(activePods, statsFunc)

    klog.InfoS("Eviction manager: pods ranked for eviction", "pods", klog.KObjs(activePods))

    //record age of metrics for met thresholds that we are using for evictions.
    for _, t := range thresholds {
    timeObserved := observations[t.Signal].time
    if !timeObserved.IsZero() {
    metrics.EvictionStatsAge.WithLabelValues(string(t.Signal)).Observe(metrics.SinceInSeconds(timeObserved.Time))
    }
    }

    // we kill at most a single pod during each eviction interval
    for i := range activePods {
    pod := activePods[i]
    gracePeriodOverride := int64(0)
    if !isHardEvictionThreshold(thresholdToReclaim) {
    gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
    }
    message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc)
    if m.evictPod(pod, gracePeriodOverride, message, annotations) {
    metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()
    return []*v1.Pod{pod}
    }
    }
    klog.InfoS("Eviction manager: unable to evict any pods from the node")
    return nil
    }

    通过makeSignalObserations计算当前观测到的驱逐信号(资源使用),kubelet支持的驱逐信号如下

    驱逐信号 描述 计算公式
    memory.available 节点可用内存 memory.available=node.status.capacity[memory]-node.status.memory.workingSet
    allocatableMemory.available 留给Pod用的可用内存,仅当Node Allocatable Enforcemennts包含pods时起作用,默认为enforceNodeAllocatable=[“pods”] allocateableMemory.available:=pod.allocatable-pod.workingSet
    nodefs.avaiable kubelet使用的文件系统的可用容量 nodefs.available:=node.stats.fs.available
    nodefs.innodesFree kubelet使用的文件系统的可用inodes数量 nodefs.innodesFree:=node.stats.runtime.imagefs.available
    imagefs.available 容器运行时用来存放镜像以及容器可写层的文件系统可用容量 imagefs.available:=node.stats.runtime.imagefs.available
    imagesfs.indoesFree 容器运行时用来存放镜像以及容器可写层的文件系统的inodes数量 imagefs.inodesFree:=node.stats.runtime.imagefs.inodesFree
    pid.available 留给分配Pod使用的可用PID pid.available:=node.stats.rlimit.maxpid-node.stats.rlimit.curproc

    Pod可分配资源并不一定与节点剩余可用资源一致,为了保证系统的稳定性,一般会为系统守护进程保留资源。

    • Pod可用资源:节点总可用资源-kube-reserved-system-reserved-eviction-threshold
      • kube-reserved:运行kubelet、Container Runtime等K8s系统服务保留资源
      • system-reserved:运行操作系统内核、sshd等系统服务需要保留的资源
      • eviction-threshold:驱逐设置的保留资源