K8s-kubelet(PLEG核心原理)

基于1.25

PLEG是kubelet的一个重要组件,负责监控kubelet管理的节点运行的Pod的生命周期,并生成于生命周期相关的事件

PLEG产生原因

在K8s中,kubelet负责维护和管理每个节点上的Pod,不断的调谐Pod的状态以使得符合Spec。

  • 为了实现这个目标,kubelet同时需要支持对Pod Spec和Container Status 的事件监听。对于前者kubelet通过watch不同源的对PodSpec事件实现,对于后者,PLEG之前,不断需要Pod处理协程不断的周期性拉取最新状态,尝试了大量轮询压力。
  • 在kubeletv1.2.0版本引入了PLEG,目标是改善kubelet的可拓展性
    • 减少不必要的处理操作(当状态为发生变化时,不执行无效的调谐操作)
    • 减少对底层容器运行的并发请求,以减轻容器运行时的压力

PLEG架构设计

PLEG主要包含俩个核心工作,一是感受容器变化,生成Pod事件,俩是维持一份最新的Pod Status Cache数据供其他组件读取。

  • kubelet同时接收俩个方向的事件,Pod Spec有kube-apiserver、File、HTTP三大来源,而Pod Status来自于PLEG
  • 不论收到Pod Spec还是PLEG都会触发Pod Worker调谐
  • Pod Worker在执行调谐过程中,会读取PLEG维护的Pod Status,避免直接像容器运行时发起请求,降低容器运行时压力,提高状态读取效率。
  • PLEG在设计上支持俩种方法获取容器运行时的状态变化
    • 执行周期性的relist操作,读取所有容器列表并与内部Cache中旧状态进进行比较,生成变化事件
    • 监听来自上游容器状态事件生成器(Container State Event Generator)发来的事件,并且对事件进行处理和转换,生成比那花事件

在v1.25中的kubelet,PLEG仅实现了基于周期性relist方式容器事件生成,并未实现对接上游容器状态事件生成器的功能。

在v1.26中,kubelet引入了Evented PLEG,并且在v1.24 版本进入beta,实现了对接上游容器状态事件生成器的功能以支持对上游容器运行时的事件监听,减少relist的开销,并且提高事件响应速度。由于Evented PLEG依赖CRI Runtime的支持,默认处于关闭状态,因此需要显式开启EventedPLEG featrue gate才能使用该功能

PLEG原理剖析

PLEG在kubelet实例化阶段(NewMainKubelet)完成初始化,在kubelet启动主syncLoop前开始执行。由于EventedPLEG在v1.26引入,默认关闭,所以目前介绍基于relist 的GenericPLEG

实际上,为了避免容器运行时的监听事件丢失,在开启EventedPLEG的情况下,GenericPLEG也会同时被启动,周期性地通过relist func是同步全量Pod Status,只是其relistPeriod会从1s调整到5s

GenericPLEG的实现

relist核心流程如下:

  1. 通过runtime.GetPods func调用容器运行时读取Pod列表
  2. updateRelistTime更新本次执行relist操作的时间戳,PLEG健康检查通过该时间戳距离当前时间是否超过3min判断是否超过3min判断PLGE的的工作状态。当relist操作耗时过长,PLEG将呈现unhealthy状态,即出现“PLEG is not healthy”问题,继而导致节点NotReady
  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/pleg/generic.go#L190

    // relist queries the container runtime for list of pods/containers, compare
    // with the internal pods/containers, and generates events accordingly.
    func (g *GenericPLEG) relist() {
    klog.V(5).InfoS("GenericPLEG: Relisting")

    if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
    metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
    }

    timestamp := g.clock.Now()
    defer func() {
    metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
    }()

    // Get all the pods.
    podList, err := g.runtime.GetPods(true)
    if err != nil {
    klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
    return
    }

    g.updateRelistTime(timestamp)

    pods := kubecontainer.Pods(podList)
    // update running pod and container count
    updateRunningPodAndContainerMetrics(pods)
    g.podRecords.setCurrent(pods)

    // Compare the old and the current pods, and generate events.
    eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
    for pid := range g.podRecords {
    oldPod := g.podRecords.getOld(pid)
    pod := g.podRecords.getCurrent(pid)
    // Get all containers in the old and the new pod.
    allContainers := getContainersFromPods(oldPod, pod)
    for _, container := range allContainers {
    events := computeEvents(oldPod, pod, &container.ID)
    for _, e := range events {
    updateEvents(eventsByPodID, e)
    }
    }
    }

    var needsReinspection map[types.UID]*kubecontainer.Pod
    if g.cacheEnabled() {
    needsReinspection = make(map[types.UID]*kubecontainer.Pod)
    }

    // If there are events associated with a pod, we should update the
    // podCache.
    for pid, events := range eventsByPodID {
    pod := g.podRecords.getCurrent(pid)
    if g.cacheEnabled() {
    // updateCache() will inspect the pod and update the cache. If an
    // error occurs during the inspection, we want PLEG to retry again
    // in the next relist. To achieve this, we do not update the
    // associated podRecord of the pod, so that the change will be
    // detect again in the next relist.
    // TODO: If many pods changed during the same relist period,
    // inspecting the pod and getting the PodStatus to update the cache
    // serially may take a while. We should be aware of this and
    // parallelize if needed.
    if err := g.updateCache(pod, pid); err != nil {
    // Rely on updateCache calling GetPodStatus to log the actual error.
    klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))

    // make sure we try to reinspect the pod during the next relisting
    needsReinspection[pid] = pod

    continue
    } else {
    // this pod was in the list to reinspect and we did so because it had events, so remove it
    // from the list (we don't want the reinspection code below to inspect it a second time in
    // this relist execution)
    delete(g.podsToReinspect, pid)
    }
    }
    // Update the internal storage and send out the events.
    g.podRecords.update(pid)

    // Map from containerId to exit code; used as a temporary cache for lookup
    containerExitCode := make(map[string]int)

    for i := range events {
    // Filter out events that are not reliable and no other components use yet.
    if events[i].Type == ContainerChanged {
    continue
    }
    select {
    case g.eventChannel <- events[i]:
    default:
    metrics.PLEGDiscardEvents.Inc()
    klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
    }
    // Log exit code of containers when they finished in a particular event
    if events[i].Type == ContainerDied {
    // Fill up containerExitCode map for ContainerDied event when first time appeared
    if len(containerExitCode) == 0 && pod != nil && g.cache != nil {
    // Get updated podStatus
    status, err := g.cache.Get(pod.ID)
    if err == nil {
    for _, containerStatus := range status.ContainerStatuses {
    containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
    }
    }
    }
    if containerID, ok := events[i].Data.(string); ok {
    if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
    klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
    }
    }
    }
    }
    }

    if g.cacheEnabled() {
    // reinspect any pods that failed inspection during the previous relist
    if len(g.podsToReinspect) > 0 {
    klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
    for pid, pod := range g.podsToReinspect {
    if err := g.updateCache(pod, pid); err != nil {
    // Rely on updateCache calling GetPodStatus to log the actual error.
    klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
    needsReinspection[pid] = pod
    }
    }
    }

    // Update the cache timestamp. This needs to happen *after*
    // all pods have been properly updated in the cache.
    g.cache.UpdateTime(timestamp)
    }

    // make sure we retain the list of pods that need reinspecting the next time relist is called
    g.podsToReinspect = needsReinspection
    }

GenericPLEG健康状态检查如下:

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/pleg/generic.go#L136

    // Healthy check if PLEG work properly.
    // relistThreshold is the maximum interval between two relist.
    func (g *GenericPLEG) Healthy() (bool, error) {
    relistTime := g.getRelistTime()
    if relistTime.IsZero() {
    return false, fmt.Errorf("pleg has yet to be successful")
    }
    // Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
    metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
    elapsed := g.clock.Since(relistTime)
    if elapsed > relistThreshold {
    return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
    }
    return true, nil
    }