Kube-controller-manager(DaemonSet)

基于1.25

DaemonSet Controller控制DaemonSet对象

  • DaemonSet资源对象选中的节点,最终仅仅有一个Pod处于可用

DaemonSet资源对象主要包含Pod模版、更新策略俩部分

控制器初始化

  • DaemonSet资源对象:监听Add、Update、Delete事件
  • Pod资源对象
  • Node资源对象
  • ControllerRevison资源对象:处理ControllerRevison资源对象同步,记录DS的Pod模版历史重现过的版本

主要执行逻辑

  1. dsc.dsListener.DaemonSets

    获取DS资源对象,DS Controller通过DS Informer的Lister获取完整的DS资源对象

  2. dsc.nodeLister.List

    获取集群的所有Node资源对象

  3. ds.DeletionTimestamp !=nil

    判断资源对象是否处于删除中。如果处于删除中行,不需要进行Reconcile

  4. dsc.expections.SatisfiedExpectations

    判断是否执行Reconcile。

    DS Controler通过Expectation机制来判断上一次调谐机制需要创建或删除的Pod是否都被kube-apiserver处理完成

  5. dsc.manage

    调谐Pod数量

    DS Controller统计各个节点上正在运行的Pod,结合Pod的Toleration、Affinity和节点的Taint、Lable等信息,统计需要创建新的Pod和删除的Pod

  6. switch ds.Spec.UpdateStrategy.Type

    按照更新Pod执行更新

    • 如果更新策略为OnDelete:DS不会进行任何操作,需要用户手动删除Pod;DS会在下一次Reconcile创建新Pod

    • 如果更新策略为RollingUpdate:DS会负责Pod 的删除和创建

    • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/daemon/daemon_controller.go#L923

      func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
      err := dsc.manage(ctx, ds, nodeList, hash)
      if err != nil {
      return err
      }

      // Process rolling updates if we're ready.
      if dsc.expectations.SatisfiedExpectations(key) {
      switch ds.Spec.UpdateStrategy.Type {
      case apps.OnDeleteDaemonSetStrategyType:
      case apps.RollingUpdateDaemonSetStrategyType:
      err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
      }
      if err != nil {
      return err
      }
      }

      err = dsc.cleanupHistory(ctx, ds, old)
      if err != nil {
      return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
      }

      return nil
      }
  7. updateDaemonSetStatus

    更新DS资源对象的状态

    • DS Controller计算DS处于Ready状态的活跃Pod数量、处于Available状态的活跃Pod数量等,调用kube-apiserver更新状态
    • 此外更RS Controller一致,Ready 数量的Pod和Available 数量的Pod不一致,并且MinReadySeconds 不为0,则DS Controller延迟MinReadySeconds把当前的DS重现加入到工作队列

调谐Pod的数量

DS Controller通过调用manage func实现同步每个节点上的Pod,补齐Pod上的节点上的Pod并且删除失败和冗余的Pod

manage的主要执行流程如下:

  1. 统计每个节点上运行的Pod

    DS Controller通过getNodesToDaemonPods得到一个Node:Node上运行Pod集合的映射,记录每个节点运行了哪些Pod

  2. 统计每个节点需要创建、删除的Pod

    DS Controller 遍历每个节点,调用podsShouldBeOnNode计算出当前节点是否需要创建Pod,当前节点上哪些Pod需要删除,并且把这些数据存入nodesNeedingDaemonPods和podsToDelete数组中

    • podsShouldBeOnNode:
      • 首先判断是否继续运行Pod
        • 节点应该运行Pod:如果节点上还没有该DS的Pod,Pod就应该被创建。当节点不能满足DS到NodeName要求,或者不能满足节点的Affinity要求,或者遇到了不能容忍的Taint时,认为节点不能运行Pod
        • 节点不应该继续运行Pod:如果已经有该DS的Pod,节点是否容忍某个Taint,切Taint的Effect是NoExecute,Pod才不继续在该节点运行。在其他情况下,之上不能有其他Pod调度到这个节点上

    DS Controller会继续按照下面流程检查Pod:

    • 如果一个节点应该运行Pod,但是存在正在运行的Pod,Pod加入到nodesNeedingDaemonPods数组
    • 如果一个节点不应该继续运行Pod,但是存在正在运行的Pod,则把Pod加入podsToDelete数组
    • 如果一个节点上应该继续运行Pod,则遍历节点上所有的Pod
      • 将处于Failed状态的Pod加入podsToDelete数组
      • 如果DS资源的MaxSurge为0,则仅仅保留最早的Pod,其他的Pod加入podsToDelete数组
  3. 执行Pod的创建和删除

    完成需要创建和删除的Pod统计之后,DS Controller调用syncNodes func,真正调用kube-apiserver执行Pod的创建和删除,通过慢启动的方式启动Pod

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/daemon/daemon_controller.go#L946

    // manage manages the scheduling and running of Pods of ds on nodes.
    // After figuring out which nodes should run a Pod of ds but not yet running one and
    // which nodes should not run a Pod of ds but currently running one, it calls function
    // syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
    func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    // Find out the pods which are created for the nodes by DaemonSet.
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
    if err != nil {
    return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }

    // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
    // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
    var nodesNeedingDaemonPods, podsToDelete []string
    for _, node := range nodeList {
    nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
    node, nodeToDaemonPods, ds, hash)

    nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
    podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
    }

    // Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
    // If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
    podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)

    // Label new pods using the hash label value of the current history when creating them
    if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
    return err
    }

    return nil
    }

更新策略

DS的Pod更新策略主要有OnDelete和RollingUpdtae俩种

OnDelete

DS Controller不会对Pod做任何处理:更新节点上运行的Pod,需要手动删除Pod,当Pod删除之后,再一次同步DS中,DS Controller会在Manage过程补齐新版本的Pod

RollingUpdate

DS Controller会逐步对所有的DS的Pod进行替换,主要通过rollingUpdate func实现

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/daemon/update.go#L44

    // rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes,
    // remaining within the constraints imposed by the update strategy.
    func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
    if err != nil {
    return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }
    maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)
    if err != nil {
    return fmt.Errorf("couldn't get unavailable numbers: %v", err)
    }

    now := dsc.failedPodsBackoff.Clock.Now()

    // When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any
    // are necessary, and let the core loop create new instances on those nodes.
    //
    // Assumptions:
    // * Expect manage loop to allow no more than one pod per node
    // * Expect manage loop will create new pods
    // * Expect manage loop will handle failed pods
    // * Deleted pods do not count as unavailable so that updates make progress when nodes are down
    // Invariants:
    // * The number of new pods that are unavailable must be less than maxUnavailable
    // * A node with an available old pod is a candidate for deletion if it does not violate other invariants
    //
    if maxSurge == 0 {
    var numUnavailable int
    var allowedReplacementPods []string
    var candidatePodsToDelete []string
    for nodeName, pods := range nodeToDaemonPods {
    newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
    if !ok {
    // let the manage loop clean up this node, and treat it as an unavailable node
    klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
    numUnavailable++
    continue
    }
    switch {
    case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
    // the manage loop will handle creating or deleting the appropriate pod, consider this unavailable
    numUnavailable++
    case newPod != nil:
    // this pod is up to date, check its availability
    if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
    // an unavailable new pod is counted against maxUnavailable
    numUnavailable++
    }
    default:
    // this pod is old, it is an update candidate
    switch {
    case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
    // the old pod isn't available, so it needs to be replaced
    klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
    // record the replacement
    if allowedReplacementPods == nil {
    allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
    }
    allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
    case numUnavailable >= maxUnavailable:
    // no point considering any other candidates
    continue
    default:
    klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName)
    // record the candidate
    if candidatePodsToDelete == nil {
    candidatePodsToDelete = make([]string, 0, maxUnavailable)
    }
    candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
    }
    }
    }

    // use any of the candidates we can, including the allowedReplacemnntPods
    klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d new are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete))
    remainingUnavailable := maxUnavailable - numUnavailable
    if remainingUnavailable < 0 {
    remainingUnavailable = 0
    }
    if max := len(candidatePodsToDelete); remainingUnavailable > max {
    remainingUnavailable = max
    }
    oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...)

    return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash)
    }

    // When surging, we create new pods whenever an old pod is unavailable, and we can create up
    // to maxSurge extra pods
    //
    // Assumptions:
    // * Expect manage loop to allow no more than two pods per node, one old, one new
    // * Expect manage loop will create new pods if there are no pods on node
    // * Expect manage loop will handle failed pods
    // * Deleted pods do not count as unavailable so that updates make progress when nodes are down
    // Invariants:
    // * A node with an unavailable old pod is a candidate for immediate new pod creation
    // * An old available pod is deleted if a new pod is available
    // * No more than maxSurge new pods are created for old available pods at any one time
    //
    var oldPodsToDelete []string // these pods are already updated or unavailable on sunsetted node
    var shouldNotRunPodsToDelete []string // candidate pods to be deleted on sunsetted nodes
    var candidateNewNodes []string
    var allowedNewNodes []string
    var numSurge int
    var numAvailable int

    for nodeName, pods := range nodeToDaemonPods {
    newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
    if !ok {
    // let the manage loop clean up this node, and treat it as a surge node
    klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
    numSurge++
    continue
    }

    // first count availability for all the nodes (even the ones that we are sunsetting due to scheduling constraints)
    if oldPod != nil {
    if podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
    numAvailable++
    }
    } else if newPod != nil {
    if podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
    numAvailable++
    }
    }

    switch {
    case oldPod == nil:
    // we don't need to do anything to this node, the manage loop will handle it
    case newPod == nil:
    // this is a surge candidate
    switch {
    case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
    node, err := dsc.nodeLister.Get(nodeName)
    if err != nil {
    return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
    }
    if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
    klog.FromContext(ctx).V(5).Info("DaemonSet pod on node is not available and does not match scheduling constraints, remove old pod", "daemonset", klog.KObj(ds), "node", nodeName, "oldPod", klog.KObj(oldPod))
    oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
    continue
    }
    // the old pod isn't available, allow it to become a replacement
    klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
    // record the replacement
    if allowedNewNodes == nil {
    allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
    }
    allowedNewNodes = append(allowedNewNodes, nodeName)
    default:
    node, err := dsc.nodeLister.Get(nodeName)
    if err != nil {
    return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
    }
    if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
    shouldNotRunPodsToDelete = append(shouldNotRunPodsToDelete, oldPod.Name)
    continue
    }
    if numSurge >= maxSurge {
    // no point considering any other candidates
    continue
    }
    klog.FromContext(ctx).V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
    // record the candidate
    if candidateNewNodes == nil {
    candidateNewNodes = make([]string, 0, maxSurge)
    }
    candidateNewNodes = append(candidateNewNodes, nodeName)
    }
    default:
    // we have already surged onto this node, determine our state
    if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
    // we're waiting to go available here
    numSurge++
    continue
    }
    // we're available, delete the old pod
    klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is available, remove %s", ds.Namespace, ds.Name, newPod.Name, nodeName, oldPod.Name)
    oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
    }
    }

    // use any of the candidates we can, including the allowedNewNodes
    klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, surge up to %d, %d are in progress, %d candidates", ds.Namespace, ds.Name, len(allowedNewNodes), maxSurge, numSurge, len(candidateNewNodes))
    remainingSurge := maxSurge - numSurge

    // With maxSurge, the application owner expects 100% availability.
    // When the scheduling constraint change from node A to node B, we do not want the application to stay
    // without any available pods. Only delete a pod on node A when a pod on node B becomes available.
    if deletablePodsNumber := numAvailable - desiredNumberScheduled; deletablePodsNumber > 0 {
    if shouldNotRunPodsToDeleteNumber := len(shouldNotRunPodsToDelete); deletablePodsNumber > shouldNotRunPodsToDeleteNumber {
    deletablePodsNumber = shouldNotRunPodsToDeleteNumber
    }
    for _, podToDeleteName := range shouldNotRunPodsToDelete[:deletablePodsNumber] {
    podToDelete, err := dsc.podLister.Pods(ds.Namespace).Get(podToDeleteName)
    if err != nil {
    if errors.IsNotFound(err) {
    continue
    }
    return fmt.Errorf("couldn't get pod which should be deleted due to scheduling constraints %q: %v", podToDeleteName, err)
    }
    klog.FromContext(ctx).V(5).Info("DaemonSet pod on node should be deleted due to scheduling constraints", "daemonset", klog.KObj(ds), "pod", klog.KObj(podToDelete), "node", podToDelete.Spec.NodeName)
    oldPodsToDelete = append(oldPodsToDelete, podToDeleteName)
    }
    }

    if remainingSurge < 0 {
    remainingSurge = 0
    }
    if max := len(candidateNewNodes); remainingSurge > max {
    remainingSurge = max
    }
    newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...)

    return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
    }
  1. dsc.getNodesToDaemonPods

    DS Controller统计每个节点上运行Pod

  2. dsc.updateDesiredNodeCounts

    计算MaxSurge和Max Unavailable

  3. if maxSurge==0

    执行Pod更新,分为MaxSurge为0和不为0的情况

    • maxSurge==0:不允许有超出期望副本数的Pod在运行,在旧的Pod删除之前,新的Pod不会被创建,DS Controller仅仅删除旧的Pod,新的Pod会通过Manage过程创建
      • 节点新旧Pod都有或者都没没有:不处理,新Pod创建和旧Pod删除都在Manage中
      • 节点仅有新Pod:不操作
      • 节点仅有旧Pod:把Pod加入待删除的Pod数组,未处于Available的Pod将会优先删除。完成待删除的Pod统计,DS Controloller 通过syncNodes func删除
    • maxSurge!=0:DS Controller 允许旧Pod未处于Available状态,在部分节点上启动新Pod
      • 节点上没有旧Pod:不处理,Manage过程中解决
      • 节点上只有旧Pod,没有新Pod:把该节点加入到需要创建Pod的节点数组。如果该节点上的旧Pod未处于available状态,则这些节点会优先创建新Pod,DS Controller每个创建的Pod数量不会超过MaxSurge
      • 节点上新旧Pod都有:如果新Pod已经处于Available,则旧的Pod加入待删除的Pod数组
  4. dsc.syncNodes

    完成统计之后,调用dsc.syncNodes执行kube-apiserver删除和创建Pod