Kube-controller-manager(DaemonSet)

基于1.25

DaemonSet Controller控制DaemonSet对象

  • DaemonSet资源对象选中的节点,最终仅仅有一个Pod处于可用

DaemonSet资源对象主要包含Pod模版、更新策略俩部分

控制器初始化

  • DaemonSet资源对象:监听Add、Update、Delete事件
  • Pod资源对象
  • Node资源对象
  • ControllerRevison资源对象:处理ControllerRevison资源对象同步,记录DS的Pod模版历史重现过的版本

主要执行逻辑

  1. dsc.dsListener.DaemonSets

    获取DS资源对象,DS Controller通过DS Informer的Lister获取完整的DS资源对象

  2. dsc.nodeLister.List

    获取集群的所有Node资源对象

  3. ds.DeletionTimestamp !=nil

    判断资源对象是否处于删除中。如果处于删除中行,不需要进行Reconcile

  4. dsc.expections.SatisfiedExpectations

    判断是否执行Reconcile。

    DS Controler通过Expectation机制来判断上一次调谐机制需要创建或删除的Pod是否都被kube-apiserver处理完成

  5. dsc.manage

    调谐Pod数量

    DS Controller统计各个节点上正在运行的Pod,结合Pod的Toleration、Affinity和节点的Taint、Lable等信息,统计需要创建新的Pod和删除的Pod

  6. switch ds.Spec.UpdateStrategy.Type

    按照更新Pod执行更新

    • 如果更新策略为OnDelete:DS不会进行任何操作,需要用户手动删除Pod;DS会在下一次Reconcile创建新Pod

    • 如果更新策略为RollingUpdate:DS会负责Pod 的删除和创建

    • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/daemon/daemon_controller.go#L923

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
      err := dsc.manage(ctx, ds, nodeList, hash)
      if err != nil {
      return err
      }

      // Process rolling updates if we're ready.
      if dsc.expectations.SatisfiedExpectations(key) {
      switch ds.Spec.UpdateStrategy.Type {
      case apps.OnDeleteDaemonSetStrategyType:
      case apps.RollingUpdateDaemonSetStrategyType:
      err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
      }
      if err != nil {
      return err
      }
      }

      err = dsc.cleanupHistory(ctx, ds, old)
      if err != nil {
      return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
      }

      return nil
      }
  7. updateDaemonSetStatus

    更新DS资源对象的状态

    • DS Controller计算DS处于Ready状态的活跃Pod数量、处于Available状态的活跃Pod数量等,调用kube-apiserver更新状态
    • 此外更RS Controller一致,Ready 数量的Pod和Available 数量的Pod不一致,并且MinReadySeconds 不为0,则DS Controller延迟MinReadySeconds把当前的DS重现加入到工作队列

调谐Pod的数量

DS Controller通过调用manage func实现同步每个节点上的Pod,补齐Pod上的节点上的Pod并且删除失败和冗余的Pod

manage的主要执行流程如下:

  1. 统计每个节点上运行的Pod

    DS Controller通过getNodesToDaemonPods得到一个Node:Node上运行Pod集合的映射,记录每个节点运行了哪些Pod

  2. 统计每个节点需要创建、删除的Pod

    DS Controller 遍历每个节点,调用podsShouldBeOnNode计算出当前节点是否需要创建Pod,当前节点上哪些Pod需要删除,并且把这些数据存入nodesNeedingDaemonPods和podsToDelete数组中

    • podsShouldBeOnNode:
      • 首先判断是否继续运行Pod
        • 节点应该运行Pod:如果节点上还没有该DS的Pod,Pod就应该被创建。当节点不能满足DS到NodeName要求,或者不能满足节点的Affinity要求,或者遇到了不能容忍的Taint时,认为节点不能运行Pod
        • 节点不应该继续运行Pod:如果已经有该DS的Pod,节点是否容忍某个Taint,切Taint的Effect是NoExecute,Pod才不继续在该节点运行。在其他情况下,之上不能有其他Pod调度到这个节点上

    DS Controller会继续按照下面流程检查Pod:

    • 如果一个节点应该运行Pod,但是存在正在运行的Pod,Pod加入到nodesNeedingDaemonPods数组
    • 如果一个节点不应该继续运行Pod,但是存在正在运行的Pod,则把Pod加入podsToDelete数组
    • 如果一个节点上应该继续运行Pod,则遍历节点上所有的Pod
      • 将处于Failed状态的Pod加入podsToDelete数组
      • 如果DS资源的MaxSurge为0,则仅仅保留最早的Pod,其他的Pod加入podsToDelete数组
  3. 执行Pod的创建和删除

    完成需要创建和删除的Pod统计之后,DS Controller调用syncNodes func,真正调用kube-apiserver执行Pod的创建和删除,通过慢启动的方式启动Pod

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/daemon/daemon_controller.go#L946

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    // manage manages the scheduling and running of Pods of ds on nodes.
    // After figuring out which nodes should run a Pod of ds but not yet running one and
    // which nodes should not run a Pod of ds but currently running one, it calls function
    // syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
    func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    // Find out the pods which are created for the nodes by DaemonSet.
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
    if err != nil {
    return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }

    // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
    // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
    var nodesNeedingDaemonPods, podsToDelete []string
    for _, node := range nodeList {
    nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
    node, nodeToDaemonPods, ds, hash)

    nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
    podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
    }

    // Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
    // If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
    podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)

    // Label new pods using the hash label value of the current history when creating them
    if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
    return err
    }

    return nil
    }

更新策略

DS的Pod更新策略主要有OnDelete和RollingUpdtae俩种

OnDelete

DS Controller不会对Pod做任何处理:更新节点上运行的Pod,需要手动删除Pod,当Pod删除之后,再一次同步DS中,DS Controller会在Manage过程补齐新版本的Pod

RollingUpdate

DS Controller会逐步对所有的DS的Pod进行替换,主要通过rollingUpdate func实现

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/daemon/update.go#L44

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    // rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes,
    // remaining within the constraints imposed by the update strategy.
    func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
    if err != nil {
    return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }
    maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)
    if err != nil {
    return fmt.Errorf("couldn't get unavailable numbers: %v", err)
    }

    now := dsc.failedPodsBackoff.Clock.Now()

    // When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any
    // are necessary, and let the core loop create new instances on those nodes.
    //
    // Assumptions:
    // * Expect manage loop to allow no more than one pod per node
    // * Expect manage loop will create new pods
    // * Expect manage loop will handle failed pods
    // * Deleted pods do not count as unavailable so that updates make progress when nodes are down
    // Invariants:
    // * The number of new pods that are unavailable must be less than maxUnavailable
    // * A node with an available old pod is a candidate for deletion if it does not violate other invariants
    //
    if maxSurge == 0 {
    var numUnavailable int
    var allowedReplacementPods []string
    var candidatePodsToDelete []string
    for nodeName, pods := range nodeToDaemonPods {
    newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
    if !ok {
    // let the manage loop clean up this node, and treat it as an unavailable node
    klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
    numUnavailable++
    continue
    }
    switch {
    case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
    // the manage loop will handle creating or deleting the appropriate pod, consider this unavailable
    numUnavailable++
    case newPod != nil:
    // this pod is up to date, check its availability
    if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
    // an unavailable new pod is counted against maxUnavailable
    numUnavailable++
    }
    default:
    // this pod is old, it is an update candidate
    switch {
    case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
    // the old pod isn't available, so it needs to be replaced
    klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
    // record the replacement
    if allowedReplacementPods == nil {
    allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
    }
    allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
    case numUnavailable >= maxUnavailable:
    // no point considering any other candidates
    continue
    default:
    klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName)
    // record the candidate
    if candidatePodsToDelete == nil {
    candidatePodsToDelete = make([]string, 0, maxUnavailable)
    }
    candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
    }
    }
    }

    // use any of the candidates we can, including the allowedReplacemnntPods
    klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d new are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete))
    remainingUnavailable := maxUnavailable - numUnavailable
    if remainingUnavailable < 0 {
    remainingUnavailable = 0
    }
    if max := len(candidatePodsToDelete); remainingUnavailable > max {
    remainingUnavailable = max
    }
    oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...)

    return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash)
    }

    // When surging, we create new pods whenever an old pod is unavailable, and we can create up
    // to maxSurge extra pods
    //
    // Assumptions:
    // * Expect manage loop to allow no more than two pods per node, one old, one new
    // * Expect manage loop will create new pods if there are no pods on node
    // * Expect manage loop will handle failed pods
    // * Deleted pods do not count as unavailable so that updates make progress when nodes are down
    // Invariants:
    // * A node with an unavailable old pod is a candidate for immediate new pod creation
    // * An old available pod is deleted if a new pod is available
    // * No more than maxSurge new pods are created for old available pods at any one time
    //
    var oldPodsToDelete []string // these pods are already updated or unavailable on sunsetted node
    var shouldNotRunPodsToDelete []string // candidate pods to be deleted on sunsetted nodes
    var candidateNewNodes []string
    var allowedNewNodes []string
    var numSurge int
    var numAvailable int

    for nodeName, pods := range nodeToDaemonPods {
    newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
    if !ok {
    // let the manage loop clean up this node, and treat it as a surge node
    klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
    numSurge++
    continue
    }

    // first count availability for all the nodes (even the ones that we are sunsetting due to scheduling constraints)
    if oldPod != nil {
    if podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
    numAvailable++
    }
    } else if newPod != nil {
    if podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
    numAvailable++
    }
    }

    switch {
    case oldPod == nil:
    // we don't need to do anything to this node, the manage loop will handle it
    case newPod == nil:
    // this is a surge candidate
    switch {
    case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
    node, err := dsc.nodeLister.Get(nodeName)
    if err != nil {
    return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
    }
    if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
    klog.FromContext(ctx).V(5).Info("DaemonSet pod on node is not available and does not match scheduling constraints, remove old pod", "daemonset", klog.KObj(ds), "node", nodeName, "oldPod", klog.KObj(oldPod))
    oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
    continue
    }
    // the old pod isn't available, allow it to become a replacement
    klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
    // record the replacement
    if allowedNewNodes == nil {
    allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
    }
    allowedNewNodes = append(allowedNewNodes, nodeName)
    default:
    node, err := dsc.nodeLister.Get(nodeName)
    if err != nil {
    return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
    }
    if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
    shouldNotRunPodsToDelete = append(shouldNotRunPodsToDelete, oldPod.Name)
    continue
    }
    if numSurge >= maxSurge {
    // no point considering any other candidates
    continue
    }
    klog.FromContext(ctx).V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
    // record the candidate
    if candidateNewNodes == nil {
    candidateNewNodes = make([]string, 0, maxSurge)
    }
    candidateNewNodes = append(candidateNewNodes, nodeName)
    }
    default:
    // we have already surged onto this node, determine our state
    if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
    // we're waiting to go available here
    numSurge++
    continue
    }
    // we're available, delete the old pod
    klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is available, remove %s", ds.Namespace, ds.Name, newPod.Name, nodeName, oldPod.Name)
    oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
    }
    }

    // use any of the candidates we can, including the allowedNewNodes
    klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, surge up to %d, %d are in progress, %d candidates", ds.Namespace, ds.Name, len(allowedNewNodes), maxSurge, numSurge, len(candidateNewNodes))
    remainingSurge := maxSurge - numSurge

    // With maxSurge, the application owner expects 100% availability.
    // When the scheduling constraint change from node A to node B, we do not want the application to stay
    // without any available pods. Only delete a pod on node A when a pod on node B becomes available.
    if deletablePodsNumber := numAvailable - desiredNumberScheduled; deletablePodsNumber > 0 {
    if shouldNotRunPodsToDeleteNumber := len(shouldNotRunPodsToDelete); deletablePodsNumber > shouldNotRunPodsToDeleteNumber {
    deletablePodsNumber = shouldNotRunPodsToDeleteNumber
    }
    for _, podToDeleteName := range shouldNotRunPodsToDelete[:deletablePodsNumber] {
    podToDelete, err := dsc.podLister.Pods(ds.Namespace).Get(podToDeleteName)
    if err != nil {
    if errors.IsNotFound(err) {
    continue
    }
    return fmt.Errorf("couldn't get pod which should be deleted due to scheduling constraints %q: %v", podToDeleteName, err)
    }
    klog.FromContext(ctx).V(5).Info("DaemonSet pod on node should be deleted due to scheduling constraints", "daemonset", klog.KObj(ds), "pod", klog.KObj(podToDelete), "node", podToDelete.Spec.NodeName)
    oldPodsToDelete = append(oldPodsToDelete, podToDeleteName)
    }
    }

    if remainingSurge < 0 {
    remainingSurge = 0
    }
    if max := len(candidateNewNodes); remainingSurge > max {
    remainingSurge = max
    }
    newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...)

    return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
    }
  1. dsc.getNodesToDaemonPods

    DS Controller统计每个节点上运行Pod

  2. dsc.updateDesiredNodeCounts

    计算MaxSurge和Max Unavailable

  3. if maxSurge==0

    执行Pod更新,分为MaxSurge为0和不为0的情况

    • maxSurge==0:不允许有超出期望副本数的Pod在运行,在旧的Pod删除之前,新的Pod不会被创建,DS Controller仅仅删除旧的Pod,新的Pod会通过Manage过程创建
      • 节点新旧Pod都有或者都没没有:不处理,新Pod创建和旧Pod删除都在Manage中
      • 节点仅有新Pod:不操作
      • 节点仅有旧Pod:把Pod加入待删除的Pod数组,未处于Available的Pod将会优先删除。完成待删除的Pod统计,DS Controloller 通过syncNodes func删除
    • maxSurge!=0:DS Controller 允许旧Pod未处于Available状态,在部分节点上启动新Pod
      • 节点上没有旧Pod:不处理,Manage过程中解决
      • 节点上只有旧Pod,没有新Pod:把该节点加入到需要创建Pod的节点数组。如果该节点上的旧Pod未处于available状态,则这些节点会优先创建新Pod,DS Controller每个创建的Pod数量不会超过MaxSurge
      • 节点上新旧Pod都有:如果新Pod已经处于Available,则旧的Pod加入待删除的Pod数组
  4. dsc.syncNodes

    完成统计之后,调用dsc.syncNodes执行kube-apiserver删除和创建Pod