K8s-kubelet(Pod生命周期管理)

基于1.25

kubelet以Pod为基本处理单元,负责Pod从创建到消亡的整个生命周期

在1.21中Unknown状态已经被标记为弃用。

CRI

kubelet通过CRI RPC管理容器的生命周期,执行容器的lifecycle hook和 startup/liveness/readiness的健康检查,同时根据Pod的重启策略在容器失败退出后自动重启容器,CRI是kubelet管理Pod和容器的基础

  • Ref:https://github.com/kubernetes/cri-api/blob/2c8d015e0d408208ca8843c1d6e2e2fce1e5dd94/pkg/apis/runtime/v1/api.proto#L34

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    // Runtime service defines the public APIs for remote container runtimes
    service RuntimeService {
    // Version returns the runtime name, runtime version, and runtime API version.
    rpc Version(VersionRequest) returns (VersionResponse) {}

    // RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure
    // the sandbox is in the ready state on success.
    rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}
    // StopPodSandbox stops any running process that is part of the sandbox and
    // reclaims network resources (e.g., IP addresses) allocated to the sandbox.
    // If there are any running containers in the sandbox, they must be forcibly
    // terminated.
    // This call is idempotent, and must not return an error if all relevant
    // resources have already been reclaimed. kubelet will call StopPodSandbox
    // at least once before calling RemovePodSandbox. It will also attempt to
    // reclaim resources eagerly, as soon as a sandbox is not needed. Hence,
    // multiple StopPodSandbox calls are expected.
    rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {}
    // RemovePodSandbox removes the sandbox. If there are any running containers
    // in the sandbox, they must be forcibly terminated and removed.
    // This call is idempotent, and must not return an error if the sandbox has
    // already been removed.
    rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {}
    // PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not
    // present, returns an error.
    rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}
    // ListPodSandbox returns a list of PodSandboxes.
    rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}

    // CreateContainer creates a new container in specified PodSandbox
    rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}
    // StartContainer starts the container.
    rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {}
    // StopContainer stops a running container with a grace period (i.e., timeout).
    // This call is idempotent, and must not return an error if the container has
    // already been stopped.
    // The runtime must forcibly kill the container after the grace period is
    // reached.
    rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {}
    // RemoveContainer removes the container. If the container is running, the
    // container must be forcibly removed.
    // This call is idempotent, and must not return an error if the container has
    // already been removed.
    rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}
    // ListContainers lists all containers by filters.
    rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}
    // ContainerStatus returns status of the container. If the container is not
    // present, returns an error.
    rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}
    // UpdateContainerResources updates ContainerConfig of the container synchronously.
    // If runtime fails to transactionally update the requested resources, an error is returned.
    rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}
    // ReopenContainerLog asks runtime to reopen the stdout/stderr log file
    // for the container. This is often called after the log file has been
    // rotated. If the container is not running, container runtime can choose
    // to either create a new log file and return nil, or return an error.
    // Once it returns error, new container log file MUST NOT be created.
    rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}

    // ExecSync runs a command in a container synchronously.
    rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}
    // Exec prepares a streaming endpoint to execute a command in the container.
    rpc Exec(ExecRequest) returns (ExecResponse) {}
    // Attach prepares a streaming endpoint to attach to a running container.
    rpc Attach(AttachRequest) returns (AttachResponse) {}
    // PortForward prepares a streaming endpoint to forward ports from a PodSandbox.
    rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}

    // ContainerStats returns stats of the container. If the container does not
    // exist, the call returns an error.
    rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}
    // ListContainerStats returns stats of all running containers.
    rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}

    // PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not
    // exist, the call returns an error.
    rpc PodSandboxStats(PodSandboxStatsRequest) returns (PodSandboxStatsResponse) {}
    // ListPodSandboxStats returns stats of the pod sandboxes matching a filter.
    rpc ListPodSandboxStats(ListPodSandboxStatsRequest) returns (ListPodSandboxStatsResponse) {}

    // UpdateRuntimeConfig updates the runtime configuration based on the given request.
    rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}

    // Status returns the status of the runtime.
    rpc Status(StatusRequest) returns (StatusResponse) {}

    // CheckpointContainer checkpoints a container
    rpc CheckpointContainer(CheckpointContainerRequest) returns (CheckpointContainerResponse) {}

    // GetContainerEvents gets container events from the CRI runtime
    rpc GetContainerEvents(GetEventsRequest) returns (stream ContainerEventResponse) {}

    }

Pod启动流程

syncLoop监听到Pod创建事件,触发执行HandlerPodAdditions Handler

kubelet最终会启动一个sync主调谐程序,最终他们都触发执行syncLoopIteration,执行HandlerPodAdditions,将任务下发给podWorkers异步执行

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/config/apiserver.go#L37

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // NewSourceApiserver creates a config source that watches and pulls from the apiserver.
    func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))

    // The Reflector responsible for watching pods at the apiserver should be run only after
    // the node sync with the apiserver has completed.
    klog.InfoS("Waiting for node sync before watching apiserver pods")
    go func() {
    for {
    if nodeHasSynced() {
    klog.V(4).InfoS("node sync completed")
    break
    }
    time.Sleep(WaitForAPIServerSyncPeriod)
    klog.V(4).InfoS("node sync has not completed yet")
    }
    klog.InfoS("Watching apiserver")
    newSourceApiserverFromLW(lw, updates)
    }()
    }

    NewSourceApiserver 负责创建kube-apiserver事件监听源。此处通过设置fieldSelector确保收到的Pod仅于本节点相关,即Pod的spec.nodeName必须和当前的kubelet设置的nodeName匹配,才能接受并处理。

    • 在收到Pod事件后,通过Reflector的Store存储接口设置为send,来实现将Event直接发送到updates Channel

    来自不同的事件源的Event首先会被PodConfig做Merge聚合处理,同时确保事件按照正确的顺序推送给syncLoopIteration

    • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/config/config.go#L212

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
      s.podLock.Lock()
      defer s.podLock.Unlock()

      addPods := []*v1.Pod{}
      updatePods := []*v1.Pod{}
      deletePods := []*v1.Pod{}
      removePods := []*v1.Pod{}
      reconcilePods := []*v1.Pod{}

      pods := s.pods[source]
      if pods == nil {
      pods = make(map[types.UID]*v1.Pod)
      }

      // updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
      // After updated, new pod will be stored in the pod cache *pods*.
      // Notice that *pods* and *oldPods* could be the same cache.
      updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
      filtered := filterInvalidPods(newPods, source, s.recorder)
      for _, ref := range filtered {
      // Annotate the pod with the source before any comparison.
      if ref.Annotations == nil {
      ref.Annotations = make(map[string]string)
      }
      ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
      if existing, found := oldPods[ref.UID]; found {
      pods[ref.UID] = existing
      needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
      if needUpdate {
      updatePods = append(updatePods, existing)
      } else if needReconcile {
      reconcilePods = append(reconcilePods, existing)
      } else if needGracefulDelete {
      deletePods = append(deletePods, existing)
      }
      continue
      }
      recordFirstSeenTime(ref)
      pods[ref.UID] = ref
      addPods = append(addPods, ref)
      }
      }

      update := change.(kubetypes.PodUpdate)
      switch update.Op {
      case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
      if update.Op == kubetypes.ADD {
      klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      } else if update.Op == kubetypes.DELETE {
      klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      } else {
      klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      }
      updatePodsFunc(update.Pods, pods, pods)

      case kubetypes.REMOVE:
      klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      for _, value := range update.Pods {
      if existing, found := pods[value.UID]; found {
      // this is a delete
      delete(pods, value.UID)
      removePods = append(removePods, existing)
      continue
      }
      // this is a no-op
      }

      case kubetypes.SET:
      klog.V(4).InfoS("Setting pods for source", "source", source)
      s.markSourceSet(source)
      // Clear the old map entries by just creating a new map
      oldPods := pods
      pods = make(map[types.UID]*v1.Pod)
      updatePodsFunc(update.Pods, oldPods, pods)
      for uid, existing := range oldPods {
      if _, found := pods[uid]; !found {
      // this is a delete
      removePods = append(removePods, existing)
      }
      }

      default:
      klog.InfoS("Received invalid update type", "type", update)

      }

      s.pods[source] = pods

      adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
      updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
      deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
      removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
      reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}

      return adds, updates, deletes, removes, reconciles
      }

      PodConfig通过内部的Cache已经发现的Pod,当有change的事件,通过对比新老数据,判断Pod事件分为ADD、UPDATE、DELETE、REMOVE、RECONFILE几种

      syncLoopIeration处理并处理PodUpdate事件

      • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kubelet.go#L2083

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        92
        93
        94
        95
        96
        97
        98
        99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
        syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
        select {
        case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
        klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
        return false
        }

        switch u.Op {
        case kubetypes.ADD:
        klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
        // After restarting, kubelet will get all existing pods through
        // ADD as if they are new pods. These pods will then go through the
        // admission process and *may* be rejected. This can be resolved
        // once we have checkpointing.
        handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
        klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
        handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
        klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
        handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
        klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
        handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
        klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
        // DELETE is treated as a UPDATE because of graceful deletion.
        handler.HandlePodUpdates(u.Pods)
        case kubetypes.SET:
        // TODO: Do we want to support this?
        klog.ErrorS(nil, "Kubelet does not support snapshot update")
        default:
        klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
        }

        kl.sourcesReady.AddSource(u.Source)

        case e := <-plegCh:
        if e.Type == pleg.ContainerStarted {
        // record the most recent time we observed a container start for this pod.
        // this lets us selectively invalidate the runtimeCache when processing a delete for this pod
        // to make sure we don't miss handling graceful termination for containers we reported as having started.
        kl.lastContainerStartedTime.Add(e.ID, time.Now())
        }
        if isSyncPodWorthy(e) {
        // PLEG event for a pod; sync it.
        if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
        klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
        handler.HandlePodSyncs([]*v1.Pod{pod})
        } else {
        // If the pod no longer exists, ignore the event.
        klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
        }
        }

        if e.Type == pleg.ContainerDied {
        if containerID, ok := e.Data.(string); ok {
        kl.cleanUpContainersInPod(e.ID, containerID)
        }
        }
        case <-syncCh:
        // Sync pods waiting for sync
        podsToSync := kl.getPodsToSync()
        if len(podsToSync) == 0 {
        break
        }
        klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
        handler.HandlePodSyncs(podsToSync)
        case update := <-kl.livenessManager.Updates():
        if update.Result == proberesults.Failure {
        handleProbeSync(kl, update, handler, "liveness", "unhealthy")
        }
        case update := <-kl.readinessManager.Updates():
        ready := update.Result == proberesults.Success
        kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)

        status := ""
        if ready {
        status = "ready"
        }
        handleProbeSync(kl, update, handler, "readiness", status)
        case update := <-kl.startupManager.Updates():
        started := update.Result == proberesults.Success
        kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)

        status := "unhealthy"
        if started {
        status = "started"
        }
        handleProbeSync(kl, update, handler, "startup", status)
        case <-housekeepingCh:
        if !kl.sourcesReady.AllReady() {
        // If the sources aren't ready or volume manager has not yet synced the states,
        // skip housekeeping, as we may accidentally delete pods from unready sources.
        klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
        } else {
        start := time.Now()
        klog.V(4).InfoS("SyncLoop (housekeeping)")
        if err := handler.HandlePodCleanups(); err != nil {
        klog.ErrorS(err, "Failed cleaning pods")
        }
        duration := time.Since(start)
        if duration > housekeepingWarningDuration {
        klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
        }
        klog.V(4).InfoS("SyncLoop (housekeeping) end")
        }
        }
        return true
        }

新建Pod对应的ADD事件,直接由HandlePodAdditions处理

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kubelet.go#L2238

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40

    // HandlePodAdditions is the callback in SyncHandler for pods being added from
    // a config source.
    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    start := kl.clock.Now()
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
    existingPods := kl.podManager.GetPods()
    // Always add the pod to the pod manager. Kubelet relies on the pod
    // manager as the source of truth for the desired state. If a pod does
    // not exist in the pod manager, it means that it has been deleted in
    // the apiserver and no action (other than cleanup) is required.
    kl.podManager.AddPod(pod)

    if kubetypes.IsMirrorPod(pod) {
    kl.handleMirrorPod(pod, start)
    continue
    }

    // Only go through the admission process if the pod is not requested
    // for termination by another part of the kubelet. If the pod is already
    // using resources (previously admitted), the pod worker is going to be
    // shutting it down. If the pod hasn't started yet, we know that when
    // the pod worker is invoked it will also avoid setting up the pod, so
    // we simply avoid doing any work.
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    // We failed pods that we rejected, so activePods include all admitted
    // pods that are alive.
    activePods := kl.filterOutInactivePods(existingPods)

    // Check if we can admit the pod; if not, reject it.
    if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
    kl.rejectPod(pod, reason, message)
    continue
    }
    }
    mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    }
    }

新建Pod对应的ADD事件类型,直接由HandlePodAdditions处理。

HandlePodAdditions:通过dispatchWork->kl.podWorkers.UpdatePod的调用链,将Pod创建请求的发送给podWorkers做进一步的处理

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kubelet.go#L2238

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    // HandlePodAdditions is the callback in SyncHandler for pods being added from
    // a config source.
    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    start := kl.clock.Now()
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
    existingPods := kl.podManager.GetPods()
    // Always add the pod to the pod manager. Kubelet relies on the pod
    // manager as the source of truth for the desired state. If a pod does
    // not exist in the pod manager, it means that it has been deleted in
    // the apiserver and no action (other than cleanup) is required.
    kl.podManager.AddPod(pod)

    if kubetypes.IsMirrorPod(pod) {
    kl.handleMirrorPod(pod, start)
    continue
    }

    // Only go through the admission process if the pod is not requested
    // for termination by another part of the kubelet. If the pod is already
    // using resources (previously admitted), the pod worker is going to be
    // shutting it down. If the pod hasn't started yet, we know that when
    // the pod worker is invoked it will also avoid setting up the pod, so
    // we simply avoid doing any work.
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    // We failed pods that we rejected, so activePods include all admitted
    // pods that are alive.
    activePods := kl.filterOutInactivePods(existingPods)

    // Check if we can admit the pod; if not, reject it.
    if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
    kl.rejectPod(pod, reason, message)
    continue
    }
    }
    mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    }
    }

在处理新增Pod前,首先会进行排序操作,确保创建时间早的Pod总是能被优先处理。

  • 对于Static Pod的Mirror Pod,kubelet将忽略其Spec的变化,总是以File或HTTP源定义的PodSpec为准,更新Mirror POd。因此,用户无法通过删除或者修改Mirror pod的方式修改或者重启Staic Pod
  • kubelet在创建Mirror Pod时,会为其设置一个key为kubernetes.io/config.mirror,value为static Pod的哈希值的Annotation,并且该值不能允许更新或者修改,在执行SyncPod,通过判断哈希值,判断是否需要重建Mirror Pod。

在真正调用dispatch分发任务,还会通过canAdmitPod 执行节点级准入控制器

kubelte默认注册的准入控制器如下:

准入控制器 功能描述
EvictionAdmitHandler 当面临资源压力的时候,拒绝可能影响节点稳定性的Pod运行,具体规则如下:1. Critical Pod不考虑资源压力,将被直接允许2. 对于仅仅存在内存压力的情况,Guranteed呵呵Burstable类型的Pod将被允许3.对于仅存在内存压力场景,当BestEffot类型的Pod容忍了内存压力污点时,将被允许4. 在其他情况下,Pod将会被拒绝访问
SyctlsAllowlist 检查Pod的SecurityContext定义中包含的sysctls是否在allowlist白名单中,默认许可的sysctls如下(可以通过–allow-unsafe-sysctls设置更多)1. “kernel.shm_rmid_forced” 2. Net.ipv4.ip_local_port_ranage 3. Net.ipv4tcp_syncookies 4. Net.ipv4.ping_group_range 5.net.ipv4.ip_unprivileged_port_start
AllocateResourcesPodAdmitHandler 检查节点Device\CPU\Memory资源是否满足Pod需求。当启用TopologpyManager特性门控(默认开启),会合并资源拓扑分布是否满足Polcy策略
PredicateAdmintHandler 从调度特性评估Pod是否能够通过准入控制器,检查内容是否包括noderesources、nodeport、nodeAffity、nodeName、taint/toleration。当Critical Pod被当前准入控制器拒绝且原因是资源不足时,将首先尝试驱逐低优先级Pod来释放需要的资源,以满足调度要求,此准入控制器次阿辉检查Pod的OS Label(kubernetes.io/os)以及Field(Spec.OS.Name)是否和当前的kubelet运行OS匹配
ShutdownAdmitHadnler 在节点处于shutting关机中,拒绝所有Pod

dispatchWork主要工作就是对Pod(创建、更新、删除、同步)事件下发给podWorkers做异步处理。

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kubelet.go#L2212

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // dispatchWork starts the asynchronous sync of the pod in a pod worker.
    // If the pod has completed termination, dispatchWork will perform no action.
    func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(UpdatePodOptions{
    Pod: pod,
    MirrorPod: mirrorPod,
    UpdateType: syncType,
    StartTime: start,
    })
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {
    metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    }
    }

podWorkers收到Update事件,采用独立协程异步调用syncPod来处理

podWorkers会为每个Pod创建一个独立的任务Channel和一个goroutine,每个Pod处理协程都会阻塞式等待Channel中任务,并且对获取的任务进行处理。podWorkers则负责将Pod任务发送到对应的Channel中

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/pod_workers.go#L557

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227

    // UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable,
    // terminating, or terminated, and will transition to terminating if deleted on the apiserver, it is
    // discovered to have a terminal phase (Succeeded or Failed), or if it is evicted by the kubelet.
    func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
    // handle when the pod is an orphan (no config) and we only have runtime status by running only
    // the terminating part of the lifecycle
    pod := options.Pod
    var isRuntimePod bool
    if options.RunningPod != nil {
    if options.Pod == nil {
    pod = options.RunningPod.ToAPIPod()
    if options.UpdateType != kubetypes.SyncPodKill {
    klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
    return
    }
    options.Pod = pod
    isRuntimePod = true
    } else {
    options.RunningPod = nil
    klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KObj(options.Pod), "podUID", options.Pod.UID)
    }
    }
    uid := pod.UID

    p.podLock.Lock()
    defer p.podLock.Unlock()

    // decide what to do with this pod - we are either setting it up, tearing it down, or ignoring it
    now := time.Now()
    status, ok := p.podSyncStatuses[uid]
    if !ok {
    klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
    status = &podSyncStatus{
    syncedAt: now,
    fullname: kubecontainer.GetPodFullName(pod),
    }
    // if this pod is being synced for the first time, we need to make sure it is an active pod
    if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
    // check to see if the pod is not running and the pod is terminal.
    // If this succeeds then record in the podWorker that it is terminated.
    if statusCache, err := p.podCache.Get(pod.UID); err == nil {
    if isPodStatusCacheTerminal(statusCache) {
    status = &podSyncStatus{
    terminatedAt: now,
    terminatingAt: now,
    syncedAt: now,
    startedTerminating: true,
    finished: true,
    fullname: kubecontainer.GetPodFullName(pod),
    }
    }
    }
    }
    p.podSyncStatuses[uid] = status
    }

    // if an update is received that implies the pod should be running, but we are already terminating a pod by
    // that UID, assume that two pods with the same UID were created in close temporal proximity (usually static
    // pod but it's possible for an apiserver to extremely rarely do something similar) - flag the sync status
    // to indicate that after the pod terminates it should be reset to "not running" to allow a subsequent add/update
    // to start the pod worker again
    if status.IsTerminationRequested() {
    if options.UpdateType == kubetypes.SyncPodCreate {
    status.restartRequested = true
    klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KObj(pod), "podUID", pod.UID)
    return
    }
    }

    // once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
    if status.IsFinished() {
    klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
    return
    }

    // check for a transition to terminating
    var becameTerminating bool
    if !status.IsTerminationRequested() {
    switch {
    case isRuntimePod:
    klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KObj(pod), "podUID", pod.UID)
    status.deleted = true
    status.terminatingAt = now
    becameTerminating = true
    case pod.DeletionTimestamp != nil:
    klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
    status.deleted = true
    status.terminatingAt = now
    becameTerminating = true
    case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded:
    klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
    status.terminatingAt = now
    becameTerminating = true
    case options.UpdateType == kubetypes.SyncPodKill:
    if options.KillPodOptions != nil && options.KillPodOptions.Evict {
    klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
    status.evicted = true
    } else {
    klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
    }
    status.terminatingAt = now
    becameTerminating = true
    }
    }

    // once a pod is terminating, all updates are kills and the grace period can only decrease
    var workType PodWorkType
    var wasGracePeriodShortened bool
    switch {
    case status.IsTerminated():
    // A terminated pod may still be waiting for cleanup - if we receive a runtime pod kill request
    // due to housekeeping seeing an older cached version of the runtime pod simply ignore it until
    // after the pod worker completes.
    if isRuntimePod {
    klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KObj(pod), "podUID", pod.UID)
    return
    }

    workType = TerminatedPodWork

    if options.KillPodOptions != nil {
    if ch := options.KillPodOptions.CompletedCh; ch != nil {
    close(ch)
    }
    }
    options.KillPodOptions = nil

    case status.IsTerminationRequested():
    workType = TerminatingPodWork
    if options.KillPodOptions == nil {
    options.KillPodOptions = &KillPodOptions{}
    }

    if ch := options.KillPodOptions.CompletedCh; ch != nil {
    status.notifyPostTerminating = append(status.notifyPostTerminating, ch)
    }
    if fn := options.KillPodOptions.PodStatusFunc; fn != nil {
    status.statusPostTerminating = append(status.statusPostTerminating, fn)
    }

    gracePeriod, gracePeriodShortened := calculateEffectiveGracePeriod(status, pod, options.KillPodOptions)

    wasGracePeriodShortened = gracePeriodShortened
    status.gracePeriod = gracePeriod
    // always set the grace period for syncTerminatingPod so we don't have to recalculate,
    // will never be zero.
    options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod

    default:
    workType = SyncPodWork

    // KillPodOptions is not valid for sync actions outside of the terminating phase
    if options.KillPodOptions != nil {
    if ch := options.KillPodOptions.CompletedCh; ch != nil {
    close(ch)
    }
    options.KillPodOptions = nil
    }
    }

    // the desired work we want to be performing
    work := podWork{
    WorkType: workType,
    Options: options,
    }

    // start the pod worker goroutine if it doesn't exist
    podUpdates, exists := p.podUpdates[uid]
    if !exists {
    // We need to have a buffer here, because checkForUpdates() method that
    // puts an update into channel is called from the same goroutine where
    // the channel is consumed. However, it is guaranteed that in such case
    // the channel is empty, so buffer of size 1 is enough.
    podUpdates = make(chan podWork, 1)
    p.podUpdates[uid] = podUpdates

    // ensure that static pods start in the order they are received by UpdatePod
    if kubetypes.IsStaticPod(pod) {
    p.waitingToStartStaticPodsByFullname[status.fullname] =
    append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
    }

    // allow testing of delays in the pod update channel
    var outCh <-chan podWork
    if p.workerChannelFn != nil {
    outCh = p.workerChannelFn(uid, podUpdates)
    } else {
    outCh = podUpdates
    }

    // Creating a new pod worker either means this is a new pod, or that the
    // kubelet just restarted. In either case the kubelet is willing to believe
    // the status of the pod for the first pod worker sync. See corresponding
    // comment in syncPod.
    go func() {
    defer runtime.HandleCrash()
    p.managePodLoop(outCh)
    }()
    }

    // dispatch a request to the pod worker if none are running
    if !status.IsWorking() {
    status.working = true
    podUpdates <- work
    return
    }

    // capture the maximum latency between a requested update and when the pod
    // worker observes it
    if undelivered, ok := p.lastUndeliveredWorkUpdate[pod.UID]; ok {
    // track the max latency between when a config change is requested and when it is realized
    // NOTE: this undercounts the latency when multiple requests are queued, but captures max latency
    if !undelivered.Options.StartTime.IsZero() && undelivered.Options.StartTime.Before(work.Options.StartTime) {
    work.Options.StartTime = undelivered.Options.StartTime
    }
    }

    // always sync the most recent data
    p.lastUndeliveredWorkUpdate[pod.UID] = work

    if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil {
    klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", work.WorkType)
    status.cancelFn()
    return
    }
    }
    1. UpdatePod首先构造出work任务对象,WorkType支持三种类型,即SyncPodWork、TerminatingPodWork和TerminatePodWork分别对应kl.syncPod\kl.syncTerminatingPod\kl.syncTerminatedPod
    2. 根据Pod UID检索是否已经支持对应的work Channel,如果没有则为Pod创建一个Channel和goroutine
    3. 将work任务发送给Pod的任务的Channel

执行容器创建准备工作(包括检查网络、设置cgroup、挂载磁盘等)

podWorkers的syncPodFn实际上就是kl.ysncPod

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kubelet.go#L1522

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
    klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
    defer func() {
    klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
    }()

    // Latency measurements for the main workflow are relative to the
    // first time the pod was seen by kubelet.
    var firstSeenTime time.Time
    if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
    firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
    }

    // Record pod worker start latency if being created
    // TODO: make pod workers record their own latencies
    if updateType == kubetypes.SyncPodCreate {
    if !firstSeenTime.IsZero() {
    // This is the first time we are syncing the pod. Record the latency
    // since kubelet first saw the pod if firstSeenTime is set.
    metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
    } else {
    klog.V(3).InfoS("First seen time not recorded for pod",
    "podUID", pod.UID,
    "pod", klog.KObj(pod))
    }
    }

    // Generate final API pod status with pod and status manager status
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
    // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
    // set pod IP to hostIP directly in runtime.GetPodStatus
    podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
    for _, ipInfo := range apiPodStatus.PodIPs {
    podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
    }
    if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
    podStatus.IPs = []string{apiPodStatus.PodIP}
    }

    // If the pod is terminal, we don't need to continue to setup the pod
    if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
    kl.statusManager.SetPodStatus(pod, apiPodStatus)
    isTerminal = true
    return isTerminal, nil
    }

    // If the pod should not be running, we request the pod's containers be stopped. This is not the same
    // as termination (we want to stop the pod, but potentially restart it later if soft admission allows
    // it later). Set the status and phase appropriately
    runnable := kl.canRunPod(pod)
    if !runnable.Admit {
    // Pod is not runnable; and update the Pod and Container statuses to why.
    if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded {
    apiPodStatus.Phase = v1.PodPending
    }
    apiPodStatus.Reason = runnable.Reason
    apiPodStatus.Message = runnable.Message
    // Waiting containers are not creating.
    const waitingReason = "Blocked"
    for _, cs := range apiPodStatus.InitContainerStatuses {
    if cs.State.Waiting != nil {
    cs.State.Waiting.Reason = waitingReason
    }
    }
    for _, cs := range apiPodStatus.ContainerStatuses {
    if cs.State.Waiting != nil {
    cs.State.Waiting.Reason = waitingReason
    }
    }
    }

    // Record the time it takes for the pod to become running
    // since kubelet first saw the pod if firstSeenTime is set.
    existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
    if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
    !firstSeenTime.IsZero() {
    metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
    }

    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    // Pods that are not runnable must be stopped - return a typed error to the pod worker
    if !runnable.Admit {
    klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
    var syncErr error
    p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
    if err := kl.killPod(pod, p, nil); err != nil {
    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
    syncErr = fmt.Errorf("error killing pod: %v", err)
    utilruntime.HandleError(syncErr)
    } else {
    // There was no error killing the pod, but the pod cannot be run.
    // Return an error to signal that the sync loop should back off.
    syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
    }
    return false, syncErr
    }

    // If the network plugin is not ready, only start the pod if it uses the host network
    if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
    return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
    }

    // ensure the kubelet knows about referenced secrets or configmaps used by the pod
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    if kl.secretManager != nil {
    kl.secretManager.RegisterPod(pod)
    }
    if kl.configMapManager != nil {
    kl.configMapManager.RegisterPod(pod)
    }
    }

    // Create Cgroups for the pod and apply resource parameters
    // to them if cgroups-per-qos flag is enabled.
    pcm := kl.containerManager.NewPodContainerManager()
    // If pod has already been terminated then we need not create
    // or update the pod's cgroup
    // TODO: once context cancellation is added this check can be removed
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    // When the kubelet is restarted with the cgroups-per-qos
    // flag enabled, all the pod's running containers
    // should be killed intermittently and brought back up
    // under the qos cgroup hierarchy.
    // Check if this is the pod's first sync
    firstSync := true
    for _, containerStatus := range apiPodStatus.ContainerStatuses {
    if containerStatus.State.Running != nil {
    firstSync = false
    break
    }
    }
    // Don't kill containers in pod if pod's cgroups already
    // exists or the pod is running for the first time
    podKilled := false
    if !pcm.Exists(pod) && !firstSync {
    p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
    if err := kl.killPod(pod, p, nil); err == nil {
    podKilled = true
    } else {
    klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
    }
    }
    // Create and Update pod's Cgroups
    // Don't create cgroups for run once pod if it was killed above
    // The current policy is not to restart the run once pods when
    // the kubelet is restarted with the new flag as run once pods are
    // expected to run only once and if the kubelet is restarted then
    // they are not expected to run again.
    // We don't create and apply updates to cgroup if its a run once pod and was killed above
    if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
    if !pcm.Exists(pod) {
    if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
    klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
    }
    if err := pcm.EnsureExists(pod); err != nil {
    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
    return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
    }
    }
    }
    }

    // Create Mirror Pod for Static Pod if it doesn't already exist
    if kubetypes.IsStaticPod(pod) {
    deleted := false
    if mirrorPod != nil {
    if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
    // The mirror pod is semantically different from the static pod. Remove
    // it. The mirror pod will get recreated later.
    klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
    podFullName := kubecontainer.GetPodFullName(pod)
    var err error
    deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
    if deleted {
    klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
    } else if err != nil {
    klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
    }
    }
    }
    if mirrorPod == nil || deleted {
    node, err := kl.GetNode()
    if err != nil || node.DeletionTimestamp != nil {
    klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
    } else {
    klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
    if err := kl.podManager.CreateMirrorPod(pod); err != nil {
    klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
    }
    }
    }
    }

    // Make data directories for the pod
    if err := kl.makePodDataDirs(pod); err != nil {
    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
    klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
    return false, err
    }

    // Volume manager will not mount volumes for terminating pods
    // TODO: once context cancellation is added this check can be removed
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
    // Wait for volumes to attach/mount
    if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
    klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
    return false, err
    }
    }

    // Fetch the pull secrets for the pod
    pullSecrets := kl.getPullSecretsForPod(pod)

    // Ensure the pod is being probed
    kl.probeManager.AddPod(pod)

    // Call the container runtime's SyncPod callback
    result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
    kl.reasonCache.Update(pod.UID, result)
    if err := result.Error(); err != nil {
    // Do not return error if the only failures were pods in backoff
    for _, r := range result.SyncResults {
    if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
    // Do not record an event here, as we keep all event logging for sync pod failures
    // local to container runtime, so we get better errors.
    return false, err
    }
    }

    return false, nil
    }

    return false, nil
    }

syncPod 主要步骤如下:

  1. 通过canRunPod检查是否能够在当前节点启动。canRunPod内部遍历softAdmitHandlers准入控制器列表,分别调用各准入控制器Admit func。

    • 如果有任意一个准入控制器返回拒绝,则拒绝运行Pod
  2. 通过runtimeState.networkErrors检查网络插件是否就绪

    • 如果网络插件未就绪,值启动Host Network类型的Pod。网络插件状态由容器运行时检测并且反馈,kubelet通过CRI的Status接口的StatusResponse响应,通过判断RuntimeStatus中的Conditions是否包含NetworkReady来确定网络插件是否就绪
  3. 通过secretManager.RegisterPod\configMapManager.ResgisterPod将当前Pod注册到相应的manager,以将Pod引用的Secret和ConfigMap对象注册到Manager,通过watch、TTL等机制维护kubelet本地缓存,方便在Pod挂载时快速读取,避免每次同步Pod都向kube-apiserver发起请求

  4. --cgroups-per-qos参数被启用(默认被启用),通过pcm.EnsureExists为Pod设置Pod级别的cgroup资源限制,而QoS级别的cgroup层级通过containerManager.UpdateQOSCgroups处理。QOSContainerManager通过内部协程每隔1min执行一次UpdateCgroups同步操作

    • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/cm/pod_container_manager_linux.go#L72

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      // EnsureExists takes a pod as argument and makes sure that
      // pod cgroup exists if qos cgroup hierarchy flag is enabled.
      // If the pod level container doesn't already exist it is created.
      func (m *podContainerManagerImpl) EnsureExists(pod *v1.Pod) error {
      podContainerName, _ := m.GetPodContainerName(pod)
      // check if container already exist
      alreadyExists := m.Exists(pod)
      if !alreadyExists {
      enforceMemoryQoS := false
      if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryQoS) &&
      libcontainercgroups.IsCgroup2UnifiedMode() {
      enforceMemoryQoS = true
      }
      // Create the pod container
      containerConfig := &CgroupConfig{
      Name: podContainerName,
      ResourceParameters: ResourceConfigForPod(pod, m.enforceCPULimits, m.cpuCFSQuotaPeriod, enforceMemoryQoS),
      }
      if m.podPidsLimit > 0 {
      containerConfig.ResourceParameters.PidsLimit = &m.podPidsLimit
      }
      if enforceMemoryQoS {
      klog.V(4).InfoS("MemoryQoS config for pod", "pod", klog.KObj(pod), "unified", containerConfig.ResourceParameters.Unified)
      }
      if err := m.cgroupManager.Create(containerConfig); err != nil {
      return fmt.Errorf("failed to create container for %v : %v", podContainerName, err)
      }
      }
      return nil
      }

  • 这里没有Memory Requests,是否设置Memory Request并不重要吗?不是,requests更多调度阶段才被关注,limits在运行阶段更多被关注

kubelet通过EnsureExists实现了Pod级别的cgroup资源限制,更低层级的Container级别的cgroup资源借助CRI来完成

通过makePodDataDirs 为Pod准备以下文件夹:

  • /var/lib/kubelet/pods/<pod_uid>
  • /var/lib/kubelet/pods/<pod_uid>/volumes
  • /var/lib/kubelet/pods/<pod_uid>/plugins

对于使用了存储卷的Pod,通过volumeManager.WaitForAttachAndMount等待VolumeManager完成对存储卷Attach和Mount操作,挂载后的数据卷会在宿主机的/var/lib/kubelete/pods//文件夹下体现,在创建容器时通过容器运行时挂载到容器中

  • 启动条件就绪,通过probeManager.AddPod将Pod加入到健康检查探测,再通过SyncPod创建容器

调用CRI创建Sandbox隔离环境(包含调用CNI设置网络)

containerRuntimes.SyncPod执行主要是以下几步:

  1. 计算Sandbox隔离环境是否发生了变化,如Pod的网络从容器网络切换为主机网络会导致Sandbox隔离环境改变
  2. 如果Sandbox隔离环境发生了变化,则重建Sandbox隔离环境
  3. 根据计算结果,终止不应该保持继续运行的容器
  4. 如果需要,为Pod创建新的Sandbox隔离环境
  5. 创建临时容器
  6. 创建初始化容器
  7. 创建普通业务容器

对于Pod启动而言,较为核心的就是创建Sandbox隔离环境和运行普通容器环境。

  1. 首先,通过generatePodSandboxConfig生成runtimeapi.PodSandboxConfig对象,内部包含创建Sandbox隔离环境所需要的信息,如基本元信息

  2. 为Pod生存日志目录,默认为/var/log/pods/<pod_uid>

  3. 调用RunPodSandbox创建并启动Sandbox容器

    • RunPodSandbox不同的CRI实现有不通

    • 整体流程:

        1. 确保Sandbox容器镜像存在
        2. 根据传入的PodSandboxConfig创建Container对象
        3. 为Sandbox容器配置网络环境,包含调用CNI的ADD方法为Sandbox容器分配IP地址
        4. 调用底层OCI Runtime(如runc)启动Sandbox容器

        CNI定义了容器IP地址分配和回收的标准接口,最核心的是ADD(IP分配)和DEL(IP释放)俩个操作

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go#L40

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    // createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error).
    func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
    if err != nil {
    message := fmt.Sprintf("Failed to generate sandbox config for pod %q: %v", format.Pod(pod), err)
    klog.ErrorS(err, "Failed to generate sandbox config for pod", "pod", klog.KObj(pod))
    return "", message, err
    }

    // Create pod logs directory
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
    if err != nil {
    message := fmt.Sprintf("Failed to create log directory for pod %q: %v", format.Pod(pod), err)
    klog.ErrorS(err, "Failed to create log directory for pod", "pod", klog.KObj(pod))
    return "", message, err
    }

    runtimeHandler := ""
    if m.runtimeClassManager != nil {
    runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
    if err != nil {
    message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)
    return "", message, err
    }
    if runtimeHandler != "" {
    klog.V(2).InfoS("Running pod with runtime handler", "pod", klog.KObj(pod), "runtimeHandler", runtimeHandler)
    }
    }

    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
    if err != nil {
    message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)
    klog.ErrorS(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod))
    return "", message, err
    }

    return podSandBoxID, "", nil
    }

调用CRI创建普通容器

在Sandbox隔离环境创建之后,kubelet接着启动普通容器,普通容器包含Ephemeral临时容器、Init容器以及Normal业务容器。

上面的三类容器都通过高satrt func启动

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L848

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
    // startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
    // useful to cluster administrators to distinguish "server errors" from "user errors".
    metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
    if sc.HasWindowsHostProcessRequest(pod, spec.container) {
    metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
    }
    startContainerResult.Fail(err, msg)
    // known errors that are logged in other places are logged at higher levels here to avoid
    // repetitive log spam
    switch {
    case err == images.ErrImagePullBackOff:
    klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
    default:
    utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
    }
    return err
    }

    return nil
    }
  • Ref:https://github.com/kubernetes/kubernetes/blob/16da2955d0ffeb7fcdfd7148ef2fb6c1ce1a9ef5/pkg/kubelet/kuberuntime/kuberuntime_container.go#L196

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    // startContainer starts a container and returns a message indicates why it is failed on error.
    // It starts the container through the following steps:
    // * pull the image
    // * create the container
    // * start the container
    // * run the post start lifecycle hooks (if applicable)
    func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string, imageVolumes kubecontainer.ImageVolumes) (string, error) {
    container := spec.container

    // Step 1: pull the image.
    podRuntimeHandler, err := m.getPodRuntimeHandler(pod)
    if err != nil {
    return "", err
    }

    ref, err := kubecontainer.GenerateContainerRef(pod, container)
    if err != nil {
    klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod), "containerName", container.Name)
    }

    imageRef, msg, err := m.imagePuller.EnsureImageExists(ctx, ref, pod, container.Image, pullSecrets, podSandboxConfig, podRuntimeHandler, container.ImagePullPolicy)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    return msg, err
    }

    // Step 2: create the container.
    // For a new container, the RestartCount should be 0
    restartCount := 0
    containerStatus := podStatus.FindContainerStatusByName(container.Name)
    if containerStatus != nil {
    restartCount = containerStatus.RestartCount + 1
    } else {
    // The container runtime keeps state on container statuses and
    // what the container restart count is. When nodes are rebooted
    // some container runtimes clear their state which causes the
    // restartCount to be reset to 0. This causes the logfile to
    // start at 0.log, which either overwrites or appends to the
    // already existing log.
    //
    // We are checking to see if the log directory exists, and find
    // the latest restartCount by checking the log name -
    // {restartCount}.log - and adding 1 to it.
    logDir := BuildContainerLogsDirectory(m.podLogsDirectory, pod.Namespace, pod.Name, pod.UID, container.Name)
    restartCount, err = calcRestartCountByLogDir(logDir)
    if err != nil {
    klog.InfoS("Cannot calculate restartCount from the log directory", "logDir", logDir, "err", err)
    restartCount = 0
    }
    }

    target, err := spec.getTargetID(podStatus)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    return s.Message(), ErrCreateContainerConfig
    }

    containerConfig, cleanupAction, err := m.generateContainerConfig(ctx, container, pod, restartCount, podIP, imageRef, podIPs, target, imageVolumes)
    if cleanupAction != nil {
    defer cleanupAction()
    }
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    return s.Message(), ErrCreateContainerConfig
    }

    err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message())
    return s.Message(), ErrPreCreateHook
    }

    containerID, err := m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
    return s.Message(), ErrCreateContainer
    }
    err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", s.Message())
    return s.Message(), ErrPreStartHook
    }
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container: %v", container.Name)

    // Step 3: start the container.
    err = m.runtimeService.StartContainer(ctx, containerID)
    if err != nil {
    s, _ := grpcstatus.FromError(err)
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())
    return s.Message(), kubecontainer.ErrRunContainer
    }
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container %v", container.Name)

    // Symlink container logs to the legacy container log location for cluster logging
    // support.
    // TODO(random-liu): Remove this after cluster logging supports CRI container log path.
    containerMeta := containerConfig.GetMetadata()
    sandboxMeta := podSandboxConfig.GetMetadata()
    legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
    sandboxMeta.Namespace)
    containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
    // only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
    // Because if containerLog path does not exist, only dangling legacySymlink is created.
    // This dangling legacySymlink is later removed by container gc, so it does not make sense
    // to create it in the first place. it happens when journald logging driver is used with docker.
    if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
    if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
    klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,
    "containerID", containerID, "containerLogPath", containerLog)
    }
    }

    // Step 4: execute the post start hook.
    if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
    kubeContainerID := kubecontainer.ContainerID{
    Type: m.runtimeName,
    ID: containerID,
    }
    msg, handlerErr := m.runner.Run(ctx, kubeContainerID, pod, container, container.Lifecycle.PostStart)
    if handlerErr != nil {
    klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),
    "podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
    // do not record the message in the event so that secrets won't leak from the server.
    m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, "PostStartHook failed")
    if err := m.killContainer(ctx, pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil, nil); err != nil {
    klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
    "podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
    }
    return msg, ErrPostStartHook
    }
    }

    return "", nil
    }

    由于普通容器共享Sandbox隔离环境的Network、IPC、UTS等命名空间,创建普通容器等过程如下:

    1. 确保运行容器镜像存在,如果不存在,尝试主动拉取
    2. 调用CRI的CreateContainer创建容器,并且绑定Sandbox隔离环境
    3. 调用CRI的StartContainer启动容器,底层通过OCI runtime完成
    4. 如果容器配置了PostStart Hook,则通过runner.Run调用执行

Pod驱逐流程

为了保证节点的稳定性以及Pod QoS,kubelet会在节点资源不足的情况,按照一定的的策略驱逐Pod,确保节点上的资源得到最佳利用

kubelet驱逐Pod的场景主要包含俩类:

  • Critical Pod被调度到节点,因资源竞争,触发对低有优先级的Pod的驱逐
  • 在节点资源紧张的时候,为了保证节点的稳定性,触发对低优先级的Pod的驱逐

Critical Pod被调度到节点,因资源竞争,触发对低有优先级的Pod的驱逐

触发驱逐的核心逻辑是:CriticalPodAdmissionHandler

  • Ref:https://github.com/kubernetes/kubernetes/blob/3ec97a445f036a38bfec6291dee661954138bac9/pkg/kubelet/preemption/preemption.go#L63

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // HandleAdmissionFailure gracefully handles admission rejection, and, in some cases,
    // to allow admission of the pod despite its previous failure.
    func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []lifecycle.PredicateFailureReason) ([]lifecycle.PredicateFailureReason, error) {
    if !kubetypes.IsCriticalPod(admitPod) {
    return failureReasons, nil
    }
    // InsufficientResourceError is not a reason to reject a critical pod.
    // Instead of rejecting, we free up resources to admit it, if no other reasons for rejection exist.
    nonResourceReasons := []lifecycle.PredicateFailureReason{}
    resourceReasons := []*admissionRequirement{}
    for _, reason := range failureReasons {
    if r, ok := reason.(*lifecycle.InsufficientResourceError); ok {
    resourceReasons = append(resourceReasons, &admissionRequirement{
    resourceName: r.ResourceName,
    quantity: r.GetInsufficientAmount(),
    })
    } else {
    nonResourceReasons = append(nonResourceReasons, reason)
    }
    }
    if len(nonResourceReasons) > 0 {
    // Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons.
    return nonResourceReasons, nil
    }
    err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))
    // if no error is returned, preemption succeeded and the pod is safe to admit.
    return nil, err
    }

kubelet选择最低优先级的Pod作为驱逐对象遵循一定的策略

  • Ref:https://github.com/kubernetes/kubernetes/blob/3ec97a445f036a38bfec6291dee661954138bac9/pkg/kubelet/preemption/preemption.go#L130

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    // getPodsToPreempt returns a list of pods that could be preempted to free requests >= requirements
    func getPodsToPreempt(pod *v1.Pod, pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
    bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pod, pods)

    // make sure that pods exist to reclaim the requirements
    unableToMeetRequirements := requirements.subtract(append(append(bestEffortPods, burstablePods...), guaranteedPods...)...)
    if len(unableToMeetRequirements) > 0 {
    return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", unableToMeetRequirements.toString())
    }
    // find the guaranteed pods we would need to evict if we already evicted ALL burstable and besteffort pods.
    guaranteedToEvict, err := getPodsToPreemptByDistance(guaranteedPods, requirements.subtract(append(bestEffortPods, burstablePods...)...))
    if err != nil {
    return nil, err
    }
    // Find the burstable pods we would need to evict if we already evicted ALL besteffort pods, and the required guaranteed pods.
    burstableToEvict, err := getPodsToPreemptByDistance(burstablePods, requirements.subtract(append(bestEffortPods, guaranteedToEvict...)...))
    if err != nil {
    return nil, err
    }
    // Find the besteffort pods we would need to evict if we already evicted the required guaranteed and burstable pods.
    bestEffortToEvict, err := getPodsToPreemptByDistance(bestEffortPods, requirements.subtract(append(burstableToEvict, guaranteedToEvict...)...))
    if err != nil {
    return nil, err
    }
    return append(append(bestEffortToEvict, burstableToEvict...), guaranteedToEvict...), nil
    }

    具体计算逻辑如下:

    1. 筛选出节点上优先级低于当前运行Pod的Pod列表(Critial Pod优先级高于非Critial Pod,Priority值大的Pod优先级高于Prioity值小的Pod

    2. 将筛选出的Pod列表,按照Pod的QoS分为三类,即BestEffort、Burstable和Guaranteed

    3. 判断驱逐当前统计出的可驱逐Pod列表中的Pod能释放的资源总和能否满足待运行的Pod的需求,如果不能,则返回错误,不触发后续驱逐操作

    4. 假设所有的BestEffort Pod和Butstable Pod已经全部被驱逐,占用的资源得到释放,则计算剩余资源需要驱逐的Guraranteed Pod列表。在选择Pod时,遍历候选的Pod列表,依次计算驱逐每个Pod能释放的资源和所需资源的距离,距离越小表示驱逐当前的Pod越能满足所需资源。每次都选择距离最小的Pod作为驱逐对象,并且添加到待驱逐列表。

      • 当同时存在多个Pod能释放的资源与所需要的资源的距离计算结果相同,优先选择资源占用更小的Pod
    5. 假设所有的BestEffort Pod和计算出的需要驱逐的Guaranteed Pod已经全部被驱逐,占用的资源假设得到释放,计算释放剩余资源需要驱逐的BestEffort Pod列表

    6. 假设需要驱逐的Guaranteed Pod和Burstable Pod已经全部被驱逐,占用的资源得到释放,则计算释放剩余资源需要驱逐的Besteffort Pod列表

    7. 将前面几步计算得出驱逐的Pod叠加,按照BestEffort、Burstable、Guaranteed的顺序排序,作为最终的带驱逐的Pod列表

      整体,选择待驱逐的Pod,还是遵循QoS等级,按咋后BestEffort\Burstable\Guarantedd顺序,同时采用资源距离算法尽可撒后减少驱逐Pod的数量

    在节点资源紧张的时候,为了保证节点的稳定性,触发对低优先级的Pod的驱逐

    kubelet使用EvictionManager监控节点的资源使用情况,包含内存、存储、PID等,当节点存在资源压力的,主动驱逐Pod释放资源,确保节点工作稳定

EvictionManager每间隔10s轮询Eviction Signal驱逐信号,并且与设置的Thresholds阈值比较,当满足驱逐条件的时候,从Active Pods按照一定的规则策略选择需要驱逐的Pod进行比较,因节点资源压力导致的驱逐在每个执行周期最多驱逐一个Pod,不会批量驱逐。

为了提高对内存变化的感知速度,EvictionManager使用内存用量监听,使用MemoryThresholdNotifier监听cgroups memcg event事件,通过基于事件触发的模式降低每隔10s轮询的延迟

kubelet支持俩种驱逐类型:硬驱逐和软驱逐

硬驱逐相关参数

  • --eviction-hard:硬驱逐阈值,当资源压力达到阈值,立即触发硬驱逐,此时Pod的优雅停机时间会被忽略,直接发送Kill信号,默认值为imagefs.available<15%,memory.available<100Mi,nodefs.available<10%

软驱逐相关参数

  • --eviction-soft:软驱逐阈值,当资源达到阈值,触发软驱逐,此刻不会立即终止Pod,而是等待eviction-sofe-grace-period时间后,资源压力任然触发,才会执行Pod驱逐
  • eviction-sofe-grace-period:软驱逐的优雅等待时间,防止因为抖动导致驱逐
  • --eviction-max-pod-grace-period:软驱逐的最大优雅停机等待时间。如果待驱逐的Pod已经设置了TerminationGradePeriodSeconds,则会选择Pod优雅停机等待时间和软驱逐的优雅停机等待时间的最小作为,而时间上直接用了软驱逐的优雅等待时间,这是一个已知道问题issue#64530

公共参数

  • --eviction-minimum-reclaim:驱逐最小回收量。在某些情况下,驱逐Pod只会回收少量的紧俏资源,这可能导致kubelet反复达到配置的驱逐条件并多次触发驱逐,可以通过该配置为每类资源资源的最小回收量
  • --eviction-pressure-transition-period:节点状态转换等待时间。在某些情况下,节点在软驱逐条件上上下振荡,这会导致报告节点状态一直在true和false转换,导致错误的驱逐错误。为了防止振荡,可以设置该参数在节点转换的等待时间,默认5min
  • --kernel-memcg-notification:启用内核的memcg通知机制。在默认情况下,kubelet轮询cAdvisor以定期收集内存使用情况统计信息,这可能不及时。通过启用memcg通知,可以在内存超过阈值的时候,kubelet立即感知到压力,触发驱逐

工作机制上,EvictionManager通过内部协程持续监测节点资源使用情况,默认10s一次探测:

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/eviction/eviction_manager.go#L178

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    // Start starts the control loop to observe and response to low compute resources.
    func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration) {
    thresholdHandler := func(message string) {
    klog.InfoS(message)
    m.synchronize(diskInfoProvider, podFunc)
    }
    if m.config.KernelMemcgNotification {
    for _, threshold := range m.config.Thresholds {
    if threshold.Signal == evictionapi.SignalMemoryAvailable || threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable {
    notifier, err := NewMemoryThresholdNotifier(threshold, m.config.PodCgroupRoot, &CgroupNotifierFactory{}, thresholdHandler)
    if err != nil {
    klog.InfoS("Eviction manager: failed to create memory threshold notifier", "err", err)
    } else {
    go notifier.Start()
    m.thresholdNotifiers = append(m.thresholdNotifiers, notifier)
    }
    }
    }
    }
    // start the eviction manager monitoring
    go func() {
    for {
    if evictedPods := m.synchronize(diskInfoProvider, podFunc); evictedPods != nil {
    klog.InfoS("Eviction manager: pods evicted, waiting for pod to be cleaned up", "pods", klog.KObjs(evictedPods))
    m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
    } else {
    time.Sleep(monitoringInterval)
    }
    }
    }()
    }

synchronize是驱逐探测的核心逻辑;

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/eviction/eviction_manager.go#L232

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    // synchronize is the main control loop that enforces eviction thresholds.
    // Returns the pod that was killed, or nil if no pod was killed.
    func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
    // if we have nothing to do, just return
    thresholds := m.config.Thresholds
    if len(thresholds) == 0 && !m.localStorageCapacityIsolation {
    return nil
    }

    klog.V(3).InfoS("Eviction manager: synchronize housekeeping")
    // build the ranking functions (if not yet known)
    // TODO: have a function in cadvisor that lets us know if global housekeeping has completed
    if m.dedicatedImageFs == nil {
    hasImageFs, ok := diskInfoProvider.HasDedicatedImageFs()
    if ok != nil {
    return nil
    }
    m.dedicatedImageFs = &hasImageFs
    m.signalToRankFunc = buildSignalToRankFunc(hasImageFs)
    m.signalToNodeReclaimFuncs = buildSignalToNodeReclaimFuncs(m.imageGC, m.containerGC, hasImageFs)
    }

    activePods := podFunc()
    updateStats := true
    summary, err := m.summaryProvider.Get(updateStats)
    if err != nil {
    klog.ErrorS(err, "Eviction manager: failed to get summary stats")
    return nil
    }

    if m.clock.Since(m.thresholdsLastUpdated) > notifierRefreshInterval {
    m.thresholdsLastUpdated = m.clock.Now()
    for _, notifier := range m.thresholdNotifiers {
    if err := notifier.UpdateThreshold(summary); err != nil {
    klog.InfoS("Eviction manager: failed to update notifier", "notifier", notifier.Description(), "err", err)
    }
    }
    }

    // make observations and get a function to derive pod usage stats relative to those observations.
    observations, statsFunc := makeSignalObservations(summary)
    debugLogObservations("observations", observations)

    // determine the set of thresholds met independent of grace period
    thresholds = thresholdsMet(thresholds, observations, false)
    debugLogThresholdsWithObservation("thresholds - ignoring grace period", thresholds, observations)

    // determine the set of thresholds previously met that have not yet satisfied the associated min-reclaim
    if len(m.thresholdsMet) > 0 {
    thresholdsNotYetResolved := thresholdsMet(m.thresholdsMet, observations, true)
    thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved)
    }
    debugLogThresholdsWithObservation("thresholds - reclaim not satisfied", thresholds, observations)

    // track when a threshold was first observed
    now := m.clock.Now()
    thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now)

    // the set of node conditions that are triggered by currently observed thresholds
    nodeConditions := nodeConditions(thresholds)
    if len(nodeConditions) > 0 {
    klog.V(3).InfoS("Eviction manager: node conditions - observed", "nodeCondition", nodeConditions)
    }

    // track when a node condition was last observed
    nodeConditionsLastObservedAt := nodeConditionsLastObservedAt(nodeConditions, m.nodeConditionsLastObservedAt, now)

    // node conditions report true if it has been observed within the transition period window
    nodeConditions = nodeConditionsObservedSince(nodeConditionsLastObservedAt, m.config.PressureTransitionPeriod, now)
    if len(nodeConditions) > 0 {
    klog.V(3).InfoS("Eviction manager: node conditions - transition period not met", "nodeCondition", nodeConditions)
    }

    // determine the set of thresholds we need to drive eviction behavior (i.e. all grace periods are met)
    thresholds = thresholdsMetGracePeriod(thresholdsFirstObservedAt, now)
    debugLogThresholdsWithObservation("thresholds - grace periods satisfied", thresholds, observations)

    // update internal state
    m.Lock()
    m.nodeConditions = nodeConditions
    m.thresholdsFirstObservedAt = thresholdsFirstObservedAt
    m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt
    m.thresholdsMet = thresholds

    // determine the set of thresholds whose stats have been updated since the last sync
    thresholds = thresholdsUpdatedStats(thresholds, observations, m.lastObservations)
    debugLogThresholdsWithObservation("thresholds - updated stats", thresholds, observations)

    m.lastObservations = observations
    m.Unlock()

    // evict pods if there is a resource usage violation from local volume temporary storage
    // If eviction happens in localStorageEviction function, skip the rest of eviction action
    if m.localStorageCapacityIsolation {
    if evictedPods := m.localStorageEviction(activePods, statsFunc); len(evictedPods) > 0 {
    return evictedPods
    }
    }

    if len(thresholds) == 0 {
    klog.V(3).InfoS("Eviction manager: no resources are starved")
    return nil
    }

    // rank the thresholds by eviction priority
    sort.Sort(byEvictionPriority(thresholds))
    thresholdToReclaim, resourceToReclaim, foundAny := getReclaimableThreshold(thresholds)
    if !foundAny {
    return nil
    }
    klog.InfoS("Eviction manager: attempting to reclaim", "resourceName", resourceToReclaim)

    // record an event about the resources we are now attempting to reclaim via eviction
    m.recorder.Eventf(m.nodeRef, v1.EventTypeWarning, "EvictionThresholdMet", "Attempting to reclaim %s", resourceToReclaim)

    // check if there are node-level resources we can reclaim to reduce pressure before evicting end-user pods.
    if m.reclaimNodeLevelResources(thresholdToReclaim.Signal, resourceToReclaim) {
    klog.InfoS("Eviction manager: able to reduce resource pressure without evicting pods.", "resourceName", resourceToReclaim)
    return nil
    }

    klog.InfoS("Eviction manager: must evict pod(s) to reclaim", "resourceName", resourceToReclaim)

    // rank the pods for eviction
    rank, ok := m.signalToRankFunc[thresholdToReclaim.Signal]
    if !ok {
    klog.ErrorS(nil, "Eviction manager: no ranking function for signal", "threshold", thresholdToReclaim.Signal)
    return nil
    }

    // the only candidates viable for eviction are those pods that had anything running.
    if len(activePods) == 0 {
    klog.ErrorS(nil, "Eviction manager: eviction thresholds have been met, but no pods are active to evict")
    return nil
    }

    // rank the running pods for eviction for the specified resource
    rank(activePods, statsFunc)

    klog.InfoS("Eviction manager: pods ranked for eviction", "pods", klog.KObjs(activePods))

    //record age of metrics for met thresholds that we are using for evictions.
    for _, t := range thresholds {
    timeObserved := observations[t.Signal].time
    if !timeObserved.IsZero() {
    metrics.EvictionStatsAge.WithLabelValues(string(t.Signal)).Observe(metrics.SinceInSeconds(timeObserved.Time))
    }
    }

    // we kill at most a single pod during each eviction interval
    for i := range activePods {
    pod := activePods[i]
    gracePeriodOverride := int64(0)
    if !isHardEvictionThreshold(thresholdToReclaim) {
    gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
    }
    message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc)
    if m.evictPod(pod, gracePeriodOverride, message, annotations) {
    metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()
    return []*v1.Pod{pod}
    }
    }
    klog.InfoS("Eviction manager: unable to evict any pods from the node")
    return nil
    }

    通过makeSignalObserations计算当前观测到的驱逐信号(资源使用),kubelet支持的驱逐信号如下

    驱逐信号 描述 计算公式
    memory.available 节点可用内存 memory.available=node.status.capacity[memory]-node.status.memory.workingSet
    allocatableMemory.available 留给Pod用的可用内存,仅当Node Allocatable Enforcemennts包含pods时起作用,默认为enforceNodeAllocatable=[“pods”] allocateableMemory.available:=pod.allocatable-pod.workingSet
    nodefs.avaiable kubelet使用的文件系统的可用容量 nodefs.available:=node.stats.fs.available
    nodefs.innodesFree kubelet使用的文件系统的可用inodes数量 nodefs.innodesFree:=node.stats.runtime.imagefs.available
    imagefs.available 容器运行时用来存放镜像以及容器可写层的文件系统可用容量 imagefs.available:=node.stats.runtime.imagefs.available
    imagesfs.indoesFree 容器运行时用来存放镜像以及容器可写层的文件系统的inodes数量 imagefs.inodesFree:=node.stats.runtime.imagefs.inodesFree
    pid.available 留给分配Pod使用的可用PID pid.available:=node.stats.rlimit.maxpid-node.stats.rlimit.curproc

    Pod可分配资源并不一定与节点剩余可用资源一致,为了保证系统的稳定性,一般会为系统守护进程保留资源。

    • Pod可用资源:节点总可用资源-kube-reserved-system-reserved-eviction-threshold
      • kube-reserved:运行kubelet、Container Runtime等K8s系统服务保留资源
      • system-reserved:运行操作系统内核、sshd等系统服务需要保留的资源
      • eviction-threshold:驱逐设置的保留资源