K8s-kubelet(Pod生命周期管理)
K8s-kubelet(Pod生命周期管理)
基于1.25
kubelet以Pod为基本处理单元,负责Pod从创建到消亡的整个生命周期
在1.21中Unknown状态已经被标记为弃用。
CRI
kubelet通过CRI RPC管理容器的生命周期,执行容器的lifecycle hook和 startup/liveness/readiness的健康检查,同时根据Pod的重启策略在容器失败退出后自动重启容器,CRI是kubelet管理Pod和容器的基础
-
// Runtime service defines the public APIs for remote container runtimes
service RuntimeService {
// Version returns the runtime name, runtime version, and runtime API version.
rpc Version(VersionRequest) returns (VersionResponse) {}
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure
// the sandbox is in the ready state on success.
rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}
// StopPodSandbox stops any running process that is part of the sandbox and
// reclaims network resources (e.g., IP addresses) allocated to the sandbox.
// If there are any running containers in the sandbox, they must be forcibly
// terminated.
// This call is idempotent, and must not return an error if all relevant
// resources have already been reclaimed. kubelet will call StopPodSandbox
// at least once before calling RemovePodSandbox. It will also attempt to
// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,
// multiple StopPodSandbox calls are expected.
rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {}
// RemovePodSandbox removes the sandbox. If there are any running containers
// in the sandbox, they must be forcibly terminated and removed.
// This call is idempotent, and must not return an error if the sandbox has
// already been removed.
rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {}
// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not
// present, returns an error.
rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}
// ListPodSandbox returns a list of PodSandboxes.
rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}
// CreateContainer creates a new container in specified PodSandbox
rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}
// StartContainer starts the container.
rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {}
// StopContainer stops a running container with a grace period (i.e., timeout).
// This call is idempotent, and must not return an error if the container has
// already been stopped.
// The runtime must forcibly kill the container after the grace period is
// reached.
rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {}
// RemoveContainer removes the container. If the container is running, the
// container must be forcibly removed.
// This call is idempotent, and must not return an error if the container has
// already been removed.
rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}
// ListContainers lists all containers by filters.
rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}
// ContainerStatus returns status of the container. If the container is not
// present, returns an error.
rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}
// UpdateContainerResources updates ContainerConfig of the container synchronously.
// If runtime fails to transactionally update the requested resources, an error is returned.
rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}
// ReopenContainerLog asks runtime to reopen the stdout/stderr log file
// for the container. This is often called after the log file has been
// rotated. If the container is not running, container runtime can choose
// to either create a new log file and return nil, or return an error.
// Once it returns error, new container log file MUST NOT be created.
rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}
// ExecSync runs a command in a container synchronously.
rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}
// Exec prepares a streaming endpoint to execute a command in the container.
rpc Exec(ExecRequest) returns (ExecResponse) {}
// Attach prepares a streaming endpoint to attach to a running container.
rpc Attach(AttachRequest) returns (AttachResponse) {}
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.
rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}
// ContainerStats returns stats of the container. If the container does not
// exist, the call returns an error.
rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}
// ListContainerStats returns stats of all running containers.
rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}
// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not
// exist, the call returns an error.
rpc PodSandboxStats(PodSandboxStatsRequest) returns (PodSandboxStatsResponse) {}
// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.
rpc ListPodSandboxStats(ListPodSandboxStatsRequest) returns (ListPodSandboxStatsResponse) {}
// UpdateRuntimeConfig updates the runtime configuration based on the given request.
rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}
// Status returns the status of the runtime.
rpc Status(StatusRequest) returns (StatusResponse) {}
// CheckpointContainer checkpoints a container
rpc CheckpointContainer(CheckpointContainerRequest) returns (CheckpointContainerResponse) {}
// GetContainerEvents gets container events from the CRI runtime
rpc GetContainerEvents(GetEventsRequest) returns (stream ContainerEventResponse) {}
}
Pod启动流程
syncLoop监听到Pod创建事件,触发执行HandlerPodAdditions Handler
kubelet最终会启动一个sync主调谐程序,最终他们都触发执行syncLoopIteration,执行HandlerPodAdditions,将任务下发给podWorkers异步执行
-
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
// The Reflector responsible for watching pods at the apiserver should be run only after
// the node sync with the apiserver has completed.
klog.InfoS("Waiting for node sync before watching apiserver pods")
go func() {
for {
if nodeHasSynced() {
klog.V(4).InfoS("node sync completed")
break
}
time.Sleep(WaitForAPIServerSyncPeriod)
klog.V(4).InfoS("node sync has not completed yet")
}
klog.InfoS("Watching apiserver")
newSourceApiserverFromLW(lw, updates)
}()
}NewSourceApiserver 负责创建kube-apiserver事件监听源。此处通过设置fieldSelector确保收到的Pod仅于本节点相关,即Pod的spec.nodeName必须和当前的kubelet设置的nodeName匹配,才能接受并处理。
- 在收到Pod事件后,通过Reflector的Store存储接口设置为send,来实现将Event直接发送到updates Channel
来自不同的事件源的Event首先会被PodConfig做Merge聚合处理,同时确保事件按照正确的顺序推送给syncLoopIteration
-
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
addPods := []*v1.Pod{}
updatePods := []*v1.Pod{}
deletePods := []*v1.Pod{}
removePods := []*v1.Pod{}
reconcilePods := []*v1.Pod{}
pods := s.pods[source]
if pods == nil {
pods = make(map[types.UID]*v1.Pod)
}
// updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
// After updated, new pod will be stored in the pod cache *pods*.
// Notice that *pods* and *oldPods* could be the same cache.
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
recordFirstSeenTime(ref)
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
}
update := change.(kubetypes.PodUpdate)
switch update.Op {
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
if update.Op == kubetypes.ADD {
klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjs(update.Pods))
} else if update.Op == kubetypes.DELETE {
klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjs(update.Pods))
} else {
klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjs(update.Pods))
}
updatePodsFunc(update.Pods, pods, pods)
case kubetypes.REMOVE:
klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjs(update.Pods))
for _, value := range update.Pods {
if existing, found := pods[value.UID]; found {
// this is a delete
delete(pods, value.UID)
removePods = append(removePods, existing)
continue
}
// this is a no-op
}
case kubetypes.SET:
klog.V(4).InfoS("Setting pods for source", "source", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
}
default:
klog.InfoS("Received invalid update type", "type", update)
}
s.pods[source] = pods
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
return adds, updates, deletes, removes, reconciles
}PodConfig通过内部的Cache已经发现的Pod,当有change的事件,通过对比新老数据,判断Pod事件分为ADD、UPDATE、DELETE、REMOVE、RECONFILE几种
syncLoopIeration处理并处理PodUpdate事件
-
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
if e.Type == pleg.ContainerStarted {
// record the most recent time we observed a container start for this pod.
// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
// to make sure we don't miss handling graceful termination for containers we reported as having started.
kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
status := ""
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
start := time.Now()
klog.V(4).InfoS("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.ErrorS(err, "Failed cleaning pods")
}
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
}
klog.V(4).InfoS("SyncLoop (housekeeping) end")
}
}
return true
}
-
新建Pod对应的ADD事件,直接由HandlePodAdditions处理
-
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// Only go through the admission process if the pod is not requested
// for termination by another part of the kubelet. If the pod is already
// using resources (previously admitted), the pod worker is going to be
// shutting it down. If the pod hasn't started yet, we know that when
// the pod worker is invoked it will also avoid setting up the pod, so
// we simply avoid doing any work.
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutInactivePods(existingPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
}
新建Pod对应的ADD事件类型,直接由HandlePodAdditions处理。
HandlePodAdditions:通过dispatchWork->kl.podWorkers.UpdatePod的调用链,将Pod创建请求的发送给podWorkers做进一步的处理
-
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// Only go through the admission process if the pod is not requested
// for termination by another part of the kubelet. If the pod is already
// using resources (previously admitted), the pod worker is going to be
// shutting it down. If the pod hasn't started yet, we know that when
// the pod worker is invoked it will also avoid setting up the pod, so
// we simply avoid doing any work.
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutInactivePods(existingPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
}
在处理新增Pod前,首先会进行排序操作,确保创建时间早的Pod总是能被优先处理。
- 对于Static Pod的Mirror Pod,kubelet将忽略其Spec的变化,总是以File或HTTP源定义的PodSpec为准,更新Mirror POd。因此,用户无法通过删除或者修改Mirror pod的方式修改或者重启Staic Pod
- kubelet在创建Mirror Pod时,会为其设置一个key为kubernetes.io/config.mirror,value为static Pod的哈希值的Annotation,并且该值不能允许更新或者修改,在执行SyncPod,通过判断哈希值,判断是否需要重建Mirror Pod。
在真正调用dispatch分发任务,还会通过canAdmitPod 执行节点级准入控制器
kubelte默认注册的准入控制器如下:
准入控制器 | 功能描述 |
---|---|
EvictionAdmitHandler | 当面临资源压力的时候,拒绝可能影响节点稳定性的Pod运行,具体规则如下:1. Critical Pod不考虑资源压力,将被直接允许2. 对于仅仅存在内存压力的情况,Guranteed呵呵Burstable类型的Pod将被允许3.对于仅存在内存压力场景,当BestEffot类型的Pod容忍了内存压力污点时,将被允许4. 在其他情况下,Pod将会被拒绝访问 |
SyctlsAllowlist | 检查Pod的SecurityContext定义中包含的sysctls是否在allowlist白名单中,默认许可的sysctls如下(可以通过–allow-unsafe-sysctls设置更多)1. “kernel.shm_rmid_forced” 2. Net.ipv4.ip_local_port_ranage 3. Net.ipv4tcp_syncookies 4. Net.ipv4.ping_group_range 5.net.ipv4.ip_unprivileged_port_start |
AllocateResourcesPodAdmitHandler | 检查节点Device\CPU\Memory资源是否满足Pod需求。当启用TopologpyManager特性门控(默认开启),会合并资源拓扑分布是否满足Polcy策略 |
PredicateAdmintHandler | 从调度特性评估Pod是否能够通过准入控制器,检查内容是否包括noderesources、nodeport、nodeAffity、nodeName、taint/toleration。当Critical Pod被当前准入控制器拒绝且原因是资源不足时,将首先尝试驱逐低优先级Pod来释放需要的资源,以满足调度要求,此准入控制器次阿辉检查Pod的OS Label(kubernetes.io/os)以及Field(Spec.OS.Name)是否和当前的kubelet运行OS匹配 |
ShutdownAdmitHadnler | 在节点处于shutting关机中,拒绝所有Pod |
dispatchWork主要工作就是对Pod(创建、更新、删除、同步)事件下发给podWorkers做异步处理。
-
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
podWorkers收到Update事件,采用独立协程异步调用syncPod来处理
podWorkers会为每个Pod创建一个独立的任务Channel和一个goroutine,每个Pod处理协程都会阻塞式等待Channel中任务,并且对获取的任务进行处理。podWorkers则负责将Pod任务发送到对应的Channel中
-
// UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable,
// terminating, or terminated, and will transition to terminating if deleted on the apiserver, it is
// discovered to have a terminal phase (Succeeded or Failed), or if it is evicted by the kubelet.
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// handle when the pod is an orphan (no config) and we only have runtime status by running only
// the terminating part of the lifecycle
pod := options.Pod
var isRuntimePod bool
if options.RunningPod != nil {
if options.Pod == nil {
pod = options.RunningPod.ToAPIPod()
if options.UpdateType != kubetypes.SyncPodKill {
klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
options.Pod = pod
isRuntimePod = true
} else {
options.RunningPod = nil
klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KObj(options.Pod), "podUID", options.Pod.UID)
}
}
uid := pod.UID
p.podLock.Lock()
defer p.podLock.Unlock()
// decide what to do with this pod - we are either setting it up, tearing it down, or ignoring it
now := time.Now()
status, ok := p.podSyncStatuses[uid]
if !ok {
klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
status = &podSyncStatus{
syncedAt: now,
fullname: kubecontainer.GetPodFullName(pod),
}
// if this pod is being synced for the first time, we need to make sure it is an active pod
if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
// check to see if the pod is not running and the pod is terminal.
// If this succeeds then record in the podWorker that it is terminated.
if statusCache, err := p.podCache.Get(pod.UID); err == nil {
if isPodStatusCacheTerminal(statusCache) {
status = &podSyncStatus{
terminatedAt: now,
terminatingAt: now,
syncedAt: now,
startedTerminating: true,
finished: true,
fullname: kubecontainer.GetPodFullName(pod),
}
}
}
}
p.podSyncStatuses[uid] = status
}
// if an update is received that implies the pod should be running, but we are already terminating a pod by
// that UID, assume that two pods with the same UID were created in close temporal proximity (usually static
// pod but it's possible for an apiserver to extremely rarely do something similar) - flag the sync status
// to indicate that after the pod terminates it should be reset to "not running" to allow a subsequent add/update
// to start the pod worker again
if status.IsTerminationRequested() {
if options.UpdateType == kubetypes.SyncPodCreate {
status.restartRequested = true
klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
}
// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
if status.IsFinished() {
klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
// check for a transition to terminating
var becameTerminating bool
if !status.IsTerminationRequested() {
switch {
case isRuntimePod:
klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KObj(pod), "podUID", pod.UID)
status.deleted = true
status.terminatingAt = now
becameTerminating = true
case pod.DeletionTimestamp != nil:
klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
status.deleted = true
status.terminatingAt = now
becameTerminating = true
case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded:
klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
status.terminatingAt = now
becameTerminating = true
case options.UpdateType == kubetypes.SyncPodKill:
if options.KillPodOptions != nil && options.KillPodOptions.Evict {
klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
status.evicted = true
} else {
klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.terminatingAt = now
becameTerminating = true
}
}
// once a pod is terminating, all updates are kills and the grace period can only decrease
var workType PodWorkType
var wasGracePeriodShortened bool
switch {
case status.IsTerminated():
// A terminated pod may still be waiting for cleanup - if we receive a runtime pod kill request
// due to housekeeping seeing an older cached version of the runtime pod simply ignore it until
// after the pod worker completes.
if isRuntimePod {
klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
workType = TerminatedPodWork
if options.KillPodOptions != nil {
if ch := options.KillPodOptions.CompletedCh; ch != nil {
close(ch)
}
}
options.KillPodOptions = nil
case status.IsTerminationRequested():
workType = TerminatingPodWork
if options.KillPodOptions == nil {
options.KillPodOptions = &KillPodOptions{}
}
if ch := options.KillPodOptions.CompletedCh; ch != nil {
status.notifyPostTerminating = append(status.notifyPostTerminating, ch)
}
if fn := options.KillPodOptions.PodStatusFunc; fn != nil {
status.statusPostTerminating = append(status.statusPostTerminating, fn)
}
gracePeriod, gracePeriodShortened := calculateEffectiveGracePeriod(status, pod, options.KillPodOptions)
wasGracePeriodShortened = gracePeriodShortened
status.gracePeriod = gracePeriod
// always set the grace period for syncTerminatingPod so we don't have to recalculate,
// will never be zero.
options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod
default:
workType = SyncPodWork
// KillPodOptions is not valid for sync actions outside of the terminating phase
if options.KillPodOptions != nil {
if ch := options.KillPodOptions.CompletedCh; ch != nil {
close(ch)
}
options.KillPodOptions = nil
}
}
// the desired work we want to be performing
work := podWork{
WorkType: workType,
Options: options,
}
// start the pod worker goroutine if it doesn't exist
podUpdates, exists := p.podUpdates[uid]
if !exists {
// We need to have a buffer here, because checkForUpdates() method that
// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
// the channel is empty, so buffer of size 1 is enough.
podUpdates = make(chan podWork, 1)
p.podUpdates[uid] = podUpdates
// ensure that static pods start in the order they are received by UpdatePod
if kubetypes.IsStaticPod(pod) {
p.waitingToStartStaticPodsByFullname[status.fullname] =
append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
}
// allow testing of delays in the pod update channel
var outCh <-chan podWork
if p.workerChannelFn != nil {
outCh = p.workerChannelFn(uid, podUpdates)
} else {
outCh = podUpdates
}
// Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
go func() {
defer runtime.HandleCrash()
p.managePodLoop(outCh)
}()
}
// dispatch a request to the pod worker if none are running
if !status.IsWorking() {
status.working = true
podUpdates <- work
return
}
// capture the maximum latency between a requested update and when the pod
// worker observes it
if undelivered, ok := p.lastUndeliveredWorkUpdate[pod.UID]; ok {
// track the max latency between when a config change is requested and when it is realized
// NOTE: this undercounts the latency when multiple requests are queued, but captures max latency
if !undelivered.Options.StartTime.IsZero() && undelivered.Options.StartTime.Before(work.Options.StartTime) {
work.Options.StartTime = undelivered.Options.StartTime
}
}
// always sync the most recent data
p.lastUndeliveredWorkUpdate[pod.UID] = work
if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil {
klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", work.WorkType)
status.cancelFn()
return
}
}- UpdatePod首先构造出work任务对象,WorkType支持三种类型,即SyncPodWork、TerminatingPodWork和TerminatePodWork分别对应kl.syncPod\kl.syncTerminatingPod\kl.syncTerminatedPod
- 根据Pod UID检索是否已经支持对应的work Channel,如果没有则为Pod创建一个Channel和goroutine
- 将work任务发送给Pod的任务的Channel
执行容器创建准备工作(包括检查网络、设置cgroup、挂载磁盘等)
podWorkers的syncPodFn实际上就是kl.ysncPod
-
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer func() {
klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
}()
// Latency measurements for the main workflow are relative to the
// first time the pod was seen by kubelet.
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
}
// Record pod worker start latency if being created
// TODO: make pod workers record their own latencies
if updateType == kubetypes.SyncPodCreate {
if !firstSeenTime.IsZero() {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
} else {
klog.V(3).InfoS("First seen time not recorded for pod",
"podUID", pod.UID,
"pod", klog.KObj(pod))
}
}
// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
for _, ipInfo := range apiPodStatus.PodIPs {
podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
}
if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
podStatus.IPs = []string{apiPodStatus.PodIP}
}
// If the pod is terminal, we don't need to continue to setup the pod
if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
kl.statusManager.SetPodStatus(pod, apiPodStatus)
isTerminal = true
return isTerminal, nil
}
// If the pod should not be running, we request the pod's containers be stopped. This is not the same
// as termination (we want to stop the pod, but potentially restart it later if soft admission allows
// it later). Set the status and phase appropriately
runnable := kl.canRunPod(pod)
if !runnable.Admit {
// Pod is not runnable; and update the Pod and Container statuses to why.
if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded {
apiPodStatus.Phase = v1.PodPending
}
apiPodStatus.Reason = runnable.Reason
apiPodStatus.Message = runnable.Message
// Waiting containers are not creating.
const waitingReason = "Blocked"
for _, cs := range apiPodStatus.InitContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
for _, cs := range apiPodStatus.ContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
}
// Record the time it takes for the pod to become running
// since kubelet first saw the pod if firstSeenTime is set.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// Pods that are not runnable must be stopped - return a typed error to the pod worker
if !runnable.Admit {
klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
var syncErr error
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.killPod(pod, p, nil); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
syncErr = fmt.Errorf("error killing pod: %v", err)
utilruntime.HandleError(syncErr)
} else {
// There was no error killing the pod, but the pod cannot be run.
// Return an error to signal that the sync loop should back off.
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
}
return false, syncErr
}
// If the network plugin is not ready, only start the pod if it uses the host network
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
}
// ensure the kubelet knows about referenced secrets or configmaps used by the pod
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
if kl.secretManager != nil {
kl.secretManager.RegisterPod(pod)
}
if kl.configMapManager != nil {
kl.configMapManager.RegisterPod(pod)
}
}
// Create Cgroups for the pod and apply resource parameters
// to them if cgroups-per-qos flag is enabled.
pcm := kl.containerManager.NewPodContainerManager()
// If pod has already been terminated then we need not create
// or update the pod's cgroup
// TODO: once context cancellation is added this check can be removed
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// When the kubelet is restarted with the cgroups-per-qos
// flag enabled, all the pod's running containers
// should be killed intermittently and brought back up
// under the qos cgroup hierarchy.
// Check if this is the pod's first sync
firstSync := true
for _, containerStatus := range apiPodStatus.ContainerStatuses {
if containerStatus.State.Running != nil {
firstSync = false
break
}
}
// Don't kill containers in pod if pod's cgroups already
// exists or the pod is running for the first time
podKilled := false
if !pcm.Exists(pod) && !firstSync {
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.killPod(pod, p, nil); err == nil {
podKilled = true
} else {
klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
}
}
// Create and Update pod's Cgroups
// Don't create cgroups for run once pod if it was killed above
// The current policy is not to restart the run once pods when
// the kubelet is restarted with the new flag as run once pods are
// expected to run only once and if the kubelet is restarted then
// they are not expected to run again.
// We don't create and apply updates to cgroup if its a run once pod and was killed above
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if !pcm.Exists(pod) {
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
}
if err := pcm.EnsureExists(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
}
}
}
// Create Mirror Pod for Static Pod if it doesn't already exist
if kubetypes.IsStaticPod(pod) {
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(pod)
var err error
deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil || node.DeletionTimestamp != nil {
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
}
}
}
}
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
return false, err
}
// Volume manager will not mount volumes for terminating pods
// TODO: once context cancellation is added this check can be removed
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
return false, err
}
}
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
// Ensure the pod is being probed
kl.probeManager.AddPod(pod)
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
// Do not return error if the only failures were pods in backoff
for _, r := range result.SyncResults {
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
// Do not record an event here, as we keep all event logging for sync pod failures
// local to container runtime, so we get better errors.
return false, err
}
}
return false, nil
}
return false, nil
}
syncPod 主要步骤如下:
通过canRunPod检查是否能够在当前节点启动。canRunPod内部遍历softAdmitHandlers准入控制器列表,分别调用各准入控制器Admit func。
- 如果有任意一个准入控制器返回拒绝,则拒绝运行Pod
通过runtimeState.networkErrors检查网络插件是否就绪
- 如果网络插件未就绪,值启动Host Network类型的Pod。网络插件状态由容器运行时检测并且反馈,kubelet通过CRI的Status接口的StatusResponse响应,通过判断RuntimeStatus中的Conditions是否包含NetworkReady来确定网络插件是否就绪
通过secretManager.RegisterPod\configMapManager.ResgisterPod将当前Pod注册到相应的manager,以将Pod引用的Secret和ConfigMap对象注册到Manager,通过watch、TTL等机制维护kubelet本地缓存,方便在Pod挂载时快速读取,避免每次同步Pod都向kube-apiserver发起请求
当
--cgroups-per-qos
参数被启用(默认被启用),通过pcm.EnsureExists为Pod设置Pod级别的cgroup资源限制,而QoS级别的cgroup层级通过containerManager.UpdateQOSCgroups处理。QOSContainerManager通过内部协程每隔1min执行一次UpdateCgroups同步操作-
// EnsureExists takes a pod as argument and makes sure that
// pod cgroup exists if qos cgroup hierarchy flag is enabled.
// If the pod level container doesn't already exist it is created.
func (m *podContainerManagerImpl) EnsureExists(pod *v1.Pod) error {
podContainerName, _ := m.GetPodContainerName(pod)
// check if container already exist
alreadyExists := m.Exists(pod)
if !alreadyExists {
enforceMemoryQoS := false
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryQoS) &&
libcontainercgroups.IsCgroup2UnifiedMode() {
enforceMemoryQoS = true
}
// Create the pod container
containerConfig := &CgroupConfig{
Name: podContainerName,
ResourceParameters: ResourceConfigForPod(pod, m.enforceCPULimits, m.cpuCFSQuotaPeriod, enforceMemoryQoS),
}
if m.podPidsLimit > 0 {
containerConfig.ResourceParameters.PidsLimit = &m.podPidsLimit
}
if enforceMemoryQoS {
klog.V(4).InfoS("MemoryQoS config for pod", "pod", klog.KObj(pod), "unified", containerConfig.ResourceParameters.Unified)
}
if err := m.cgroupManager.Create(containerConfig); err != nil {
return fmt.Errorf("failed to create container for %v : %v", podContainerName, err)
}
}
return nil
}
-
- 这里没有Memory Requests,是否设置Memory Request并不重要吗?不是,requests更多调度阶段才被关注,limits在运行阶段更多被关注
kubelet通过EnsureExists实现了Pod级别的cgroup资源限制,更低层级的Container级别的cgroup资源借助CRI来完成
通过makePodDataDirs 为Pod准备以下文件夹:
/var/lib/kubelet/pods/<pod_uid>
/var/lib/kubelet/pods/<pod_uid>/volumes
/var/lib/kubelet/pods/<pod_uid>/plugins
对于使用了存储卷的Pod,通过volumeManager.WaitForAttachAndMount等待VolumeManager完成对存储卷Attach和Mount操作,挂载后的数据卷会在宿主机的/var/lib/kubelete/pods/
- 启动条件就绪,通过probeManager.AddPod将Pod加入到健康检查探测,再通过SyncPod创建容器
调用CRI创建Sandbox隔离环境(包含调用CNI设置网络)
containerRuntimes.SyncPod执行主要是以下几步:
- 计算Sandbox隔离环境是否发生了变化,如Pod的网络从容器网络切换为主机网络会导致Sandbox隔离环境改变
- 如果Sandbox隔离环境发生了变化,则重建Sandbox隔离环境
- 根据计算结果,终止不应该保持继续运行的容器
- 如果需要,为Pod创建新的Sandbox隔离环境
- 创建临时容器
- 创建初始化容器
- 创建普通业务容器
对于Pod启动而言,较为核心的就是创建Sandbox隔离环境和运行普通容器环境。
首先,通过generatePodSandboxConfig生成runtimeapi.PodSandboxConfig对象,内部包含创建Sandbox隔离环境所需要的信息,如基本元信息
为Pod生存日志目录,默认为
/var/log/pods/<pod_uid>
调用RunPodSandbox创建并启动Sandbox容器
RunPodSandbox不同的CRI实现有不通
整体流程:
- 确保Sandbox容器镜像存在
- 根据传入的PodSandboxConfig创建Container对象
- 为Sandbox容器配置网络环境,包含调用CNI的ADD方法为Sandbox容器分配IP地址
- 调用底层OCI Runtime(如runc)启动Sandbox容器
CNI定义了容器IP地址分配和回收的标准接口,最核心的是ADD(IP分配)和DEL(IP释放)俩个操作
-
// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error).
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
if err != nil {
message := fmt.Sprintf("Failed to generate sandbox config for pod %q: %v", format.Pod(pod), err)
klog.ErrorS(err, "Failed to generate sandbox config for pod", "pod", klog.KObj(pod))
return "", message, err
}
// Create pod logs directory
err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
if err != nil {
message := fmt.Sprintf("Failed to create log directory for pod %q: %v", format.Pod(pod), err)
klog.ErrorS(err, "Failed to create log directory for pod", "pod", klog.KObj(pod))
return "", message, err
}
runtimeHandler := ""
if m.runtimeClassManager != nil {
runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
if err != nil {
message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)
return "", message, err
}
if runtimeHandler != "" {
klog.V(2).InfoS("Running pod with runtime handler", "pod", klog.KObj(pod), "runtimeHandler", runtimeHandler)
}
}
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
if err != nil {
message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)
klog.ErrorS(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod))
return "", message, err
}
return podSandBoxID, "", nil
}
调用CRI创建普通容器
在Sandbox隔离环境创建之后,kubelet接着启动普通容器,普通容器包含Ephemeral临时容器、Init容器以及Normal业务容器。
上面的三类容器都通过高satrt func启动
-
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
// startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
// useful to cluster administrators to distinguish "server errors" from "user errors".
metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
if sc.HasWindowsHostProcessRequest(pod, spec.container) {
metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
}
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
default:
utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
}
return err
}
return nil
} -
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string, imageVolumes kubecontainer.ImageVolumes) (string, error) {
container := spec.container
// Step 1: pull the image.
podRuntimeHandler, err := m.getPodRuntimeHandler(pod)
if err != nil {
return "", err
}
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod), "containerName", container.Name)
}
imageRef, msg, err := m.imagePuller.EnsureImageExists(ctx, ref, pod, container.Image, pullSecrets, podSandboxConfig, podRuntimeHandler, container.ImagePullPolicy)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return msg, err
}
// Step 2: create the container.
// For a new container, the RestartCount should be 0
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
restartCount = containerStatus.RestartCount + 1
} else {
// The container runtime keeps state on container statuses and
// what the container restart count is. When nodes are rebooted
// some container runtimes clear their state which causes the
// restartCount to be reset to 0. This causes the logfile to
// start at 0.log, which either overwrites or appends to the
// already existing log.
//
// We are checking to see if the log directory exists, and find
// the latest restartCount by checking the log name -
// {restartCount}.log - and adding 1 to it.
logDir := BuildContainerLogsDirectory(m.podLogsDirectory, pod.Namespace, pod.Name, pod.UID, container.Name)
restartCount, err = calcRestartCountByLogDir(logDir)
if err != nil {
klog.InfoS("Cannot calculate restartCount from the log directory", "logDir", logDir, "err", err)
restartCount = 0
}
}
target, err := spec.getTargetID(podStatus)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return s.Message(), ErrCreateContainerConfig
}
containerConfig, cleanupAction, err := m.generateContainerConfig(ctx, container, pod, restartCount, podIP, imageRef, podIPs, target, imageVolumes)
if cleanupAction != nil {
defer cleanupAction()
}
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return s.Message(), ErrCreateContainerConfig
}
err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message())
return s.Message(), ErrPreCreateHook
}
containerID, err := m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return s.Message(), ErrCreateContainer
}
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", s.Message())
return s.Message(), ErrPreStartHook
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container: %v", container.Name)
// Step 3: start the container.
err = m.runtimeService.StartContainer(ctx, containerID)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())
return s.Message(), kubecontainer.ErrRunContainer
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container %v", container.Name)
// Symlink container logs to the legacy container log location for cluster logging
// support.
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
containerMeta := containerConfig.GetMetadata()
sandboxMeta := podSandboxConfig.GetMetadata()
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
sandboxMeta.Namespace)
containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
// Because if containerLog path does not exist, only dangling legacySymlink is created.
// This dangling legacySymlink is later removed by container gc, so it does not make sense
// to create it in the first place. it happens when journald logging driver is used with docker.
if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,
"containerID", containerID, "containerLogPath", containerLog)
}
}
// Step 4: execute the post start hook.
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
kubeContainerID := kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}
msg, handlerErr := m.runner.Run(ctx, kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
// do not record the message in the event so that secrets won't leak from the server.
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, "PostStartHook failed")
if err := m.killContainer(ctx, pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil, nil); err != nil {
klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
}
return msg, ErrPostStartHook
}
}
return "", nil
}由于普通容器共享Sandbox隔离环境的Network、IPC、UTS等命名空间,创建普通容器等过程如下:
- 确保运行容器镜像存在,如果不存在,尝试主动拉取
- 调用CRI的CreateContainer创建容器,并且绑定Sandbox隔离环境
- 调用CRI的StartContainer启动容器,底层通过OCI runtime完成
- 如果容器配置了PostStart Hook,则通过runner.Run调用执行
Pod驱逐流程
为了保证节点的稳定性以及Pod QoS,kubelet会在节点资源不足的情况,按照一定的的策略驱逐Pod,确保节点上的资源得到最佳利用
kubelet驱逐Pod的场景主要包含俩类:
- Critical Pod被调度到节点,因资源竞争,触发对低有优先级的Pod的驱逐
- 在节点资源紧张的时候,为了保证节点的稳定性,触发对低优先级的Pod的驱逐
Critical Pod被调度到节点,因资源竞争,触发对低有优先级的Pod的驱逐
触发驱逐的核心逻辑是:CriticalPodAdmissionHandler
-
// HandleAdmissionFailure gracefully handles admission rejection, and, in some cases,
// to allow admission of the pod despite its previous failure.
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []lifecycle.PredicateFailureReason) ([]lifecycle.PredicateFailureReason, error) {
if !kubetypes.IsCriticalPod(admitPod) {
return failureReasons, nil
}
// InsufficientResourceError is not a reason to reject a critical pod.
// Instead of rejecting, we free up resources to admit it, if no other reasons for rejection exist.
nonResourceReasons := []lifecycle.PredicateFailureReason{}
resourceReasons := []*admissionRequirement{}
for _, reason := range failureReasons {
if r, ok := reason.(*lifecycle.InsufficientResourceError); ok {
resourceReasons = append(resourceReasons, &admissionRequirement{
resourceName: r.ResourceName,
quantity: r.GetInsufficientAmount(),
})
} else {
nonResourceReasons = append(nonResourceReasons, reason)
}
}
if len(nonResourceReasons) > 0 {
// Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons.
return nonResourceReasons, nil
}
err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))
// if no error is returned, preemption succeeded and the pod is safe to admit.
return nil, err
}
kubelet选择最低优先级的Pod作为驱逐对象遵循一定的策略
-
// getPodsToPreempt returns a list of pods that could be preempted to free requests >= requirements
func getPodsToPreempt(pod *v1.Pod, pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pod, pods)
// make sure that pods exist to reclaim the requirements
unableToMeetRequirements := requirements.subtract(append(append(bestEffortPods, burstablePods...), guaranteedPods...)...)
if len(unableToMeetRequirements) > 0 {
return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", unableToMeetRequirements.toString())
}
// find the guaranteed pods we would need to evict if we already evicted ALL burstable and besteffort pods.
guaranteedToEvict, err := getPodsToPreemptByDistance(guaranteedPods, requirements.subtract(append(bestEffortPods, burstablePods...)...))
if err != nil {
return nil, err
}
// Find the burstable pods we would need to evict if we already evicted ALL besteffort pods, and the required guaranteed pods.
burstableToEvict, err := getPodsToPreemptByDistance(burstablePods, requirements.subtract(append(bestEffortPods, guaranteedToEvict...)...))
if err != nil {
return nil, err
}
// Find the besteffort pods we would need to evict if we already evicted the required guaranteed and burstable pods.
bestEffortToEvict, err := getPodsToPreemptByDistance(bestEffortPods, requirements.subtract(append(burstableToEvict, guaranteedToEvict...)...))
if err != nil {
return nil, err
}
return append(append(bestEffortToEvict, burstableToEvict...), guaranteedToEvict...), nil
}具体计算逻辑如下:
筛选出节点上优先级低于当前运行Pod的Pod列表(Critial Pod优先级高于非Critial Pod,Priority值大的Pod优先级高于Prioity值小的Pod
将筛选出的Pod列表,按照Pod的QoS分为三类,即BestEffort、Burstable和Guaranteed
判断驱逐当前统计出的可驱逐Pod列表中的Pod能释放的资源总和能否满足待运行的Pod的需求,如果不能,则返回错误,不触发后续驱逐操作
假设所有的BestEffort Pod和Butstable Pod已经全部被驱逐,占用的资源得到释放,则计算剩余资源需要驱逐的Guraranteed Pod列表。在选择Pod时,遍历候选的Pod列表,依次计算驱逐每个Pod能释放的资源和所需资源的距离,距离越小表示驱逐当前的Pod越能满足所需资源。每次都选择距离最小的Pod作为驱逐对象,并且添加到待驱逐列表。
- 当同时存在多个Pod能释放的资源与所需要的资源的距离计算结果相同,优先选择资源占用更小的Pod
假设所有的BestEffort Pod和计算出的需要驱逐的Guaranteed Pod已经全部被驱逐,占用的资源假设得到释放,计算释放剩余资源需要驱逐的BestEffort Pod列表
假设需要驱逐的Guaranteed Pod和Burstable Pod已经全部被驱逐,占用的资源得到释放,则计算释放剩余资源需要驱逐的Besteffort Pod列表
将前面几步计算得出驱逐的Pod叠加,按照BestEffort、Burstable、Guaranteed的顺序排序,作为最终的带驱逐的Pod列表
整体,选择待驱逐的Pod,还是遵循QoS等级,按咋后BestEffort\Burstable\Guarantedd顺序,同时采用资源距离算法尽可撒后减少驱逐Pod的数量
在节点资源紧张的时候,为了保证节点的稳定性,触发对低优先级的Pod的驱逐
kubelet使用EvictionManager监控节点的资源使用情况,包含内存、存储、PID等,当节点存在资源压力的,主动驱逐Pod释放资源,确保节点工作稳定
EvictionManager每间隔10s轮询Eviction Signal驱逐信号,并且与设置的Thresholds阈值比较,当满足驱逐条件的时候,从Active Pods按照一定的规则策略选择需要驱逐的Pod进行比较,因节点资源压力导致的驱逐在每个执行周期最多驱逐一个Pod,不会批量驱逐。
为了提高对内存变化的感知速度,EvictionManager使用内存用量监听,使用MemoryThresholdNotifier监听cgroups memcg event事件,通过基于事件触发的模式降低每隔10s轮询的延迟
kubelet支持俩种驱逐类型:硬驱逐和软驱逐
硬驱逐相关参数
--eviction-hard
:硬驱逐阈值,当资源压力达到阈值,立即触发硬驱逐,此时Pod的优雅停机时间会被忽略,直接发送Kill信号,默认值为imagefs.available<15%,memory.available<100Mi,nodefs.available<10%
软驱逐相关参数
--eviction-soft
:软驱逐阈值,当资源达到阈值,触发软驱逐,此刻不会立即终止Pod,而是等待eviction-sofe-grace-period
时间后,资源压力任然触发,才会执行Pod驱逐eviction-sofe-grace-period
:软驱逐的优雅等待时间,防止因为抖动导致驱逐--eviction-max-pod-grace-period
:软驱逐的最大优雅停机等待时间。如果待驱逐的Pod已经设置了TerminationGradePeriodSeconds,则会选择Pod优雅停机等待时间和软驱逐的优雅停机等待时间的最小作为,而时间上直接用了软驱逐的优雅等待时间,这是一个已知道问题issue#64530
公共参数
--eviction-minimum-reclaim
:驱逐最小回收量。在某些情况下,驱逐Pod只会回收少量的紧俏资源,这可能导致kubelet反复达到配置的驱逐条件并多次触发驱逐,可以通过该配置为每类资源资源的最小回收量--eviction-pressure-transition-period
:节点状态转换等待时间。在某些情况下,节点在软驱逐条件上上下振荡,这会导致报告节点状态一直在true和false转换,导致错误的驱逐错误。为了防止振荡,可以设置该参数在节点转换的等待时间,默认5min--kernel-memcg-notification
:启用内核的memcg通知机制。在默认情况下,kubelet轮询cAdvisor以定期收集内存使用情况统计信息,这可能不及时。通过启用memcg通知,可以在内存超过阈值的时候,kubelet立即感知到压力,触发驱逐
工作机制上,EvictionManager通过内部协程持续监测节点资源使用情况,默认10s一次探测:
-
// Start starts the control loop to observe and response to low compute resources.
func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration) {
thresholdHandler := func(message string) {
klog.InfoS(message)
m.synchronize(diskInfoProvider, podFunc)
}
if m.config.KernelMemcgNotification {
for _, threshold := range m.config.Thresholds {
if threshold.Signal == evictionapi.SignalMemoryAvailable || threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable {
notifier, err := NewMemoryThresholdNotifier(threshold, m.config.PodCgroupRoot, &CgroupNotifierFactory{}, thresholdHandler)
if err != nil {
klog.InfoS("Eviction manager: failed to create memory threshold notifier", "err", err)
} else {
go notifier.Start()
m.thresholdNotifiers = append(m.thresholdNotifiers, notifier)
}
}
}
}
// start the eviction manager monitoring
go func() {
for {
if evictedPods := m.synchronize(diskInfoProvider, podFunc); evictedPods != nil {
klog.InfoS("Eviction manager: pods evicted, waiting for pod to be cleaned up", "pods", klog.KObjs(evictedPods))
m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
} else {
time.Sleep(monitoringInterval)
}
}
}()
}
synchronize是驱逐探测的核心逻辑;
-
// synchronize is the main control loop that enforces eviction thresholds.
// Returns the pod that was killed, or nil if no pod was killed.
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
// if we have nothing to do, just return
thresholds := m.config.Thresholds
if len(thresholds) == 0 && !m.localStorageCapacityIsolation {
return nil
}
klog.V(3).InfoS("Eviction manager: synchronize housekeeping")
// build the ranking functions (if not yet known)
// TODO: have a function in cadvisor that lets us know if global housekeeping has completed
if m.dedicatedImageFs == nil {
hasImageFs, ok := diskInfoProvider.HasDedicatedImageFs()
if ok != nil {
return nil
}
m.dedicatedImageFs = &hasImageFs
m.signalToRankFunc = buildSignalToRankFunc(hasImageFs)
m.signalToNodeReclaimFuncs = buildSignalToNodeReclaimFuncs(m.imageGC, m.containerGC, hasImageFs)
}
activePods := podFunc()
updateStats := true
summary, err := m.summaryProvider.Get(updateStats)
if err != nil {
klog.ErrorS(err, "Eviction manager: failed to get summary stats")
return nil
}
if m.clock.Since(m.thresholdsLastUpdated) > notifierRefreshInterval {
m.thresholdsLastUpdated = m.clock.Now()
for _, notifier := range m.thresholdNotifiers {
if err := notifier.UpdateThreshold(summary); err != nil {
klog.InfoS("Eviction manager: failed to update notifier", "notifier", notifier.Description(), "err", err)
}
}
}
// make observations and get a function to derive pod usage stats relative to those observations.
observations, statsFunc := makeSignalObservations(summary)
debugLogObservations("observations", observations)
// determine the set of thresholds met independent of grace period
thresholds = thresholdsMet(thresholds, observations, false)
debugLogThresholdsWithObservation("thresholds - ignoring grace period", thresholds, observations)
// determine the set of thresholds previously met that have not yet satisfied the associated min-reclaim
if len(m.thresholdsMet) > 0 {
thresholdsNotYetResolved := thresholdsMet(m.thresholdsMet, observations, true)
thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved)
}
debugLogThresholdsWithObservation("thresholds - reclaim not satisfied", thresholds, observations)
// track when a threshold was first observed
now := m.clock.Now()
thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now)
// the set of node conditions that are triggered by currently observed thresholds
nodeConditions := nodeConditions(thresholds)
if len(nodeConditions) > 0 {
klog.V(3).InfoS("Eviction manager: node conditions - observed", "nodeCondition", nodeConditions)
}
// track when a node condition was last observed
nodeConditionsLastObservedAt := nodeConditionsLastObservedAt(nodeConditions, m.nodeConditionsLastObservedAt, now)
// node conditions report true if it has been observed within the transition period window
nodeConditions = nodeConditionsObservedSince(nodeConditionsLastObservedAt, m.config.PressureTransitionPeriod, now)
if len(nodeConditions) > 0 {
klog.V(3).InfoS("Eviction manager: node conditions - transition period not met", "nodeCondition", nodeConditions)
}
// determine the set of thresholds we need to drive eviction behavior (i.e. all grace periods are met)
thresholds = thresholdsMetGracePeriod(thresholdsFirstObservedAt, now)
debugLogThresholdsWithObservation("thresholds - grace periods satisfied", thresholds, observations)
// update internal state
m.Lock()
m.nodeConditions = nodeConditions
m.thresholdsFirstObservedAt = thresholdsFirstObservedAt
m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt
m.thresholdsMet = thresholds
// determine the set of thresholds whose stats have been updated since the last sync
thresholds = thresholdsUpdatedStats(thresholds, observations, m.lastObservations)
debugLogThresholdsWithObservation("thresholds - updated stats", thresholds, observations)
m.lastObservations = observations
m.Unlock()
// evict pods if there is a resource usage violation from local volume temporary storage
// If eviction happens in localStorageEviction function, skip the rest of eviction action
if m.localStorageCapacityIsolation {
if evictedPods := m.localStorageEviction(activePods, statsFunc); len(evictedPods) > 0 {
return evictedPods
}
}
if len(thresholds) == 0 {
klog.V(3).InfoS("Eviction manager: no resources are starved")
return nil
}
// rank the thresholds by eviction priority
sort.Sort(byEvictionPriority(thresholds))
thresholdToReclaim, resourceToReclaim, foundAny := getReclaimableThreshold(thresholds)
if !foundAny {
return nil
}
klog.InfoS("Eviction manager: attempting to reclaim", "resourceName", resourceToReclaim)
// record an event about the resources we are now attempting to reclaim via eviction
m.recorder.Eventf(m.nodeRef, v1.EventTypeWarning, "EvictionThresholdMet", "Attempting to reclaim %s", resourceToReclaim)
// check if there are node-level resources we can reclaim to reduce pressure before evicting end-user pods.
if m.reclaimNodeLevelResources(thresholdToReclaim.Signal, resourceToReclaim) {
klog.InfoS("Eviction manager: able to reduce resource pressure without evicting pods.", "resourceName", resourceToReclaim)
return nil
}
klog.InfoS("Eviction manager: must evict pod(s) to reclaim", "resourceName", resourceToReclaim)
// rank the pods for eviction
rank, ok := m.signalToRankFunc[thresholdToReclaim.Signal]
if !ok {
klog.ErrorS(nil, "Eviction manager: no ranking function for signal", "threshold", thresholdToReclaim.Signal)
return nil
}
// the only candidates viable for eviction are those pods that had anything running.
if len(activePods) == 0 {
klog.ErrorS(nil, "Eviction manager: eviction thresholds have been met, but no pods are active to evict")
return nil
}
// rank the running pods for eviction for the specified resource
rank(activePods, statsFunc)
klog.InfoS("Eviction manager: pods ranked for eviction", "pods", klog.KObjs(activePods))
//record age of metrics for met thresholds that we are using for evictions.
for _, t := range thresholds {
timeObserved := observations[t.Signal].time
if !timeObserved.IsZero() {
metrics.EvictionStatsAge.WithLabelValues(string(t.Signal)).Observe(metrics.SinceInSeconds(timeObserved.Time))
}
}
// we kill at most a single pod during each eviction interval
for i := range activePods {
pod := activePods[i]
gracePeriodOverride := int64(0)
if !isHardEvictionThreshold(thresholdToReclaim) {
gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
}
message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc)
if m.evictPod(pod, gracePeriodOverride, message, annotations) {
metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()
return []*v1.Pod{pod}
}
}
klog.InfoS("Eviction manager: unable to evict any pods from the node")
return nil
}通过makeSignalObserations计算当前观测到的驱逐信号(资源使用),kubelet支持的驱逐信号如下
驱逐信号 描述 计算公式 memory.available 节点可用内存 memory.available=node.status.capacity[memory]-node.status.memory.workingSet allocatableMemory.available 留给Pod用的可用内存,仅当Node Allocatable Enforcemennts包含pods时起作用,默认为enforceNodeAllocatable=[“pods”] allocateableMemory.available:=pod.allocatable-pod.workingSet nodefs.avaiable kubelet使用的文件系统的可用容量 nodefs.available:=node.stats.fs.available nodefs.innodesFree kubelet使用的文件系统的可用inodes数量 nodefs.innodesFree:=node.stats.runtime.imagefs.available imagefs.available 容器运行时用来存放镜像以及容器可写层的文件系统可用容量 imagefs.available:=node.stats.runtime.imagefs.available imagesfs.indoesFree 容器运行时用来存放镜像以及容器可写层的文件系统的inodes数量 imagefs.inodesFree:=node.stats.runtime.imagefs.inodesFree pid.available 留给分配Pod使用的可用PID pid.available:=node.stats.rlimit.maxpid-node.stats.rlimit.curproc Pod可分配资源并不一定与节点剩余可用资源一致,为了保证系统的稳定性,一般会为系统守护进程保留资源。
- Pod可用资源:节点总可用资源-
kube-reserved
-system-reserved
-eviction-threshold
- kube-reserved:运行kubelet、Container Runtime等K8s系统服务保留资源
- system-reserved:运行操作系统内核、sshd等系统服务需要保留的资源
- eviction-threshold:驱逐设置的保留资源
- Pod可用资源:节点总可用资源-