Kube-controller-manager(DaemonSet)
Kube-controller-manager(DaemonSet)
基于1.25
DaemonSet Controller控制DaemonSet对象
- DaemonSet资源对象选中的节点,最终仅仅有一个Pod处于可用
DaemonSet资源对象主要包含Pod模版、更新策略俩部分
控制器初始化
- DaemonSet资源对象:监听Add、Update、Delete事件
- Pod资源对象
- Node资源对象
- ControllerRevison资源对象:处理ControllerRevison资源对象同步,记录DS的Pod模版历史重现过的版本
主要执行逻辑
dsc.dsListener.DaemonSets
获取DS资源对象,DS Controller通过DS Informer的Lister获取完整的DS资源对象
dsc.nodeLister.List
获取集群的所有Node资源对象
ds.DeletionTimestamp !=nil
判断资源对象是否处于删除中。如果处于删除中行,不需要进行Reconcile
dsc.expections.SatisfiedExpectations
判断是否执行Reconcile。
DS Controler通过Expectation机制来判断上一次调谐机制需要创建或删除的Pod是否都被kube-apiserver处理完成
dsc.manage
调谐Pod数量
DS Controller统计各个节点上正在运行的Pod,结合Pod的Toleration、Affinity和节点的Taint、Lable等信息,统计需要创建新的Pod和删除的Pod
switch ds.Spec.UpdateStrategy.Type
按照更新Pod执行更新
如果更新策略为OnDelete:DS不会进行任何操作,需要用户手动删除Pod;DS会在下一次Reconcile创建新Pod
如果更新策略为RollingUpdate:DS会负责Pod 的删除和创建
-
func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
err := dsc.manage(ctx, ds, nodeList, hash)
if err != nil {
return err
}
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(key) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
if err != nil {
return err
}
}
err = dsc.cleanupHistory(ctx, ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
}
return nil
}
updateDaemonSetStatus
更新DS资源对象的状态
- DS Controller计算DS处于Ready状态的活跃Pod数量、处于Available状态的活跃Pod数量等,调用kube-apiserver更新状态
- 此外更RS Controller一致,Ready 数量的Pod和Available 数量的Pod不一致,并且MinReadySeconds 不为0,则DS Controller延迟MinReadySeconds把当前的DS重现加入到工作队列
调谐Pod的数量
DS Controller通过调用manage func实现同步每个节点上的Pod,补齐Pod上的节点上的Pod并且删除失败和冗余的Pod
manage的主要执行流程如下:
统计每个节点上运行的Pod
DS Controller通过getNodesToDaemonPods得到一个Node:Node上运行Pod集合的映射,记录每个节点运行了哪些Pod
统计每个节点需要创建、删除的Pod
DS Controller 遍历每个节点,调用podsShouldBeOnNode计算出当前节点是否需要创建Pod,当前节点上哪些Pod需要删除,并且把这些数据存入nodesNeedingDaemonPods和podsToDelete数组中
- podsShouldBeOnNode:
- 首先判断是否继续运行Pod
- 节点应该运行Pod:如果节点上还没有该DS的Pod,Pod就应该被创建。当节点不能满足DS到NodeName要求,或者不能满足节点的Affinity要求,或者遇到了不能容忍的Taint时,认为节点不能运行Pod
- 节点不应该继续运行Pod:如果已经有该DS的Pod,节点是否容忍某个Taint,切Taint的Effect是NoExecute,Pod才不继续在该节点运行。在其他情况下,之上不能有其他Pod调度到这个节点上
- 首先判断是否继续运行Pod
DS Controller会继续按照下面流程检查Pod:
- 如果一个节点应该运行Pod,但是存在正在运行的Pod,Pod加入到nodesNeedingDaemonPods数组
- 如果一个节点不应该继续运行Pod,但是存在正在运行的Pod,则把Pod加入podsToDelete数组
- 如果一个节点上应该继续运行Pod,则遍历节点上所有的Pod
- 将处于Failed状态的Pod加入podsToDelete数组
- 如果DS资源的MaxSurge为0,则仅仅保留最早的Pod,其他的Pod加入podsToDelete数组
- podsShouldBeOnNode:
执行Pod的创建和删除
完成需要创建和删除的Pod统计之后,DS Controller调用syncNodes func,真正调用kube-apiserver执行Pod的创建和删除,通过慢启动的方式启动Pod
-
// manage manages the scheduling and running of Pods of ds on nodes.
// After figuring out which nodes should run a Pod of ds but not yet running one and
// which nodes should not run a Pod of ds but currently running one, it calls function
// syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
// Find out the pods which are created for the nodes by DaemonSet.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList {
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds, hash)
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
}
// Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
// If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
// Label new pods using the hash label value of the current history when creating them
if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return err
}
return nil
}
更新策略
DS的Pod更新策略主要有OnDelete和RollingUpdtae俩种
OnDelete
DS Controller不会对Pod做任何处理:更新节点上运行的Pod,需要手动删除Pod,当Pod删除之后,再一次同步DS中,DS Controller会在Manage过程补齐新版本的Pod
RollingUpdate
DS Controller会逐步对所有的DS的Pod进行替换,主要通过rollingUpdate func实现
-
// rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes,
// remaining within the constraints imposed by the update strategy.
func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)
if err != nil {
return fmt.Errorf("couldn't get unavailable numbers: %v", err)
}
now := dsc.failedPodsBackoff.Clock.Now()
// When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any
// are necessary, and let the core loop create new instances on those nodes.
//
// Assumptions:
// * Expect manage loop to allow no more than one pod per node
// * Expect manage loop will create new pods
// * Expect manage loop will handle failed pods
// * Deleted pods do not count as unavailable so that updates make progress when nodes are down
// Invariants:
// * The number of new pods that are unavailable must be less than maxUnavailable
// * A node with an available old pod is a candidate for deletion if it does not violate other invariants
//
if maxSurge == 0 {
var numUnavailable int
var allowedReplacementPods []string
var candidatePodsToDelete []string
for nodeName, pods := range nodeToDaemonPods {
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
if !ok {
// let the manage loop clean up this node, and treat it as an unavailable node
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
numUnavailable++
continue
}
switch {
case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
// the manage loop will handle creating or deleting the appropriate pod, consider this unavailable
numUnavailable++
case newPod != nil:
// this pod is up to date, check its availability
if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
// an unavailable new pod is counted against maxUnavailable
numUnavailable++
}
default:
// this pod is old, it is an update candidate
switch {
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
// the old pod isn't available, so it needs to be replaced
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the replacement
if allowedReplacementPods == nil {
allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
}
allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
case numUnavailable >= maxUnavailable:
// no point considering any other candidates
continue
default:
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the candidate
if candidatePodsToDelete == nil {
candidatePodsToDelete = make([]string, 0, maxUnavailable)
}
candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
}
}
}
// use any of the candidates we can, including the allowedReplacemnntPods
klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d new are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete))
remainingUnavailable := maxUnavailable - numUnavailable
if remainingUnavailable < 0 {
remainingUnavailable = 0
}
if max := len(candidatePodsToDelete); remainingUnavailable > max {
remainingUnavailable = max
}
oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...)
return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash)
}
// When surging, we create new pods whenever an old pod is unavailable, and we can create up
// to maxSurge extra pods
//
// Assumptions:
// * Expect manage loop to allow no more than two pods per node, one old, one new
// * Expect manage loop will create new pods if there are no pods on node
// * Expect manage loop will handle failed pods
// * Deleted pods do not count as unavailable so that updates make progress when nodes are down
// Invariants:
// * A node with an unavailable old pod is a candidate for immediate new pod creation
// * An old available pod is deleted if a new pod is available
// * No more than maxSurge new pods are created for old available pods at any one time
//
var oldPodsToDelete []string // these pods are already updated or unavailable on sunsetted node
var shouldNotRunPodsToDelete []string // candidate pods to be deleted on sunsetted nodes
var candidateNewNodes []string
var allowedNewNodes []string
var numSurge int
var numAvailable int
for nodeName, pods := range nodeToDaemonPods {
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
if !ok {
// let the manage loop clean up this node, and treat it as a surge node
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
numSurge++
continue
}
// first count availability for all the nodes (even the ones that we are sunsetting due to scheduling constraints)
if oldPod != nil {
if podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
numAvailable++
}
} else if newPod != nil {
if podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
numAvailable++
}
}
switch {
case oldPod == nil:
// we don't need to do anything to this node, the manage loop will handle it
case newPod == nil:
// this is a surge candidate
switch {
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
node, err := dsc.nodeLister.Get(nodeName)
if err != nil {
return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
}
if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
klog.FromContext(ctx).V(5).Info("DaemonSet pod on node is not available and does not match scheduling constraints, remove old pod", "daemonset", klog.KObj(ds), "node", nodeName, "oldPod", klog.KObj(oldPod))
oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
continue
}
// the old pod isn't available, allow it to become a replacement
klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the replacement
if allowedNewNodes == nil {
allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
}
allowedNewNodes = append(allowedNewNodes, nodeName)
default:
node, err := dsc.nodeLister.Get(nodeName)
if err != nil {
return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
}
if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
shouldNotRunPodsToDelete = append(shouldNotRunPodsToDelete, oldPod.Name)
continue
}
if numSurge >= maxSurge {
// no point considering any other candidates
continue
}
klog.FromContext(ctx).V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the candidate
if candidateNewNodes == nil {
candidateNewNodes = make([]string, 0, maxSurge)
}
candidateNewNodes = append(candidateNewNodes, nodeName)
}
default:
// we have already surged onto this node, determine our state
if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
// we're waiting to go available here
numSurge++
continue
}
// we're available, delete the old pod
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is available, remove %s", ds.Namespace, ds.Name, newPod.Name, nodeName, oldPod.Name)
oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
}
}
// use any of the candidates we can, including the allowedNewNodes
klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, surge up to %d, %d are in progress, %d candidates", ds.Namespace, ds.Name, len(allowedNewNodes), maxSurge, numSurge, len(candidateNewNodes))
remainingSurge := maxSurge - numSurge
// With maxSurge, the application owner expects 100% availability.
// When the scheduling constraint change from node A to node B, we do not want the application to stay
// without any available pods. Only delete a pod on node A when a pod on node B becomes available.
if deletablePodsNumber := numAvailable - desiredNumberScheduled; deletablePodsNumber > 0 {
if shouldNotRunPodsToDeleteNumber := len(shouldNotRunPodsToDelete); deletablePodsNumber > shouldNotRunPodsToDeleteNumber {
deletablePodsNumber = shouldNotRunPodsToDeleteNumber
}
for _, podToDeleteName := range shouldNotRunPodsToDelete[:deletablePodsNumber] {
podToDelete, err := dsc.podLister.Pods(ds.Namespace).Get(podToDeleteName)
if err != nil {
if errors.IsNotFound(err) {
continue
}
return fmt.Errorf("couldn't get pod which should be deleted due to scheduling constraints %q: %v", podToDeleteName, err)
}
klog.FromContext(ctx).V(5).Info("DaemonSet pod on node should be deleted due to scheduling constraints", "daemonset", klog.KObj(ds), "pod", klog.KObj(podToDelete), "node", podToDelete.Spec.NodeName)
oldPodsToDelete = append(oldPodsToDelete, podToDeleteName)
}
}
if remainingSurge < 0 {
remainingSurge = 0
}
if max := len(candidateNewNodes); remainingSurge > max {
remainingSurge = max
}
newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...)
return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
}
dsc.getNodesToDaemonPods
DS Controller统计每个节点上运行Pod
dsc.updateDesiredNodeCounts
计算MaxSurge和Max Unavailable
if maxSurge==0
执行Pod更新,分为MaxSurge为0和不为0的情况
- maxSurge==0:不允许有超出期望副本数的Pod在运行,在旧的Pod删除之前,新的Pod不会被创建,DS Controller仅仅删除旧的Pod,新的Pod会通过Manage过程创建
- 节点新旧Pod都有或者都没没有:不处理,新Pod创建和旧Pod删除都在Manage中
- 节点仅有新Pod:不操作
- 节点仅有旧Pod:把Pod加入待删除的Pod数组,未处于Available的Pod将会优先删除。完成待删除的Pod统计,DS Controloller 通过syncNodes func删除
- maxSurge!=0:DS Controller 允许旧Pod未处于Available状态,在部分节点上启动新Pod
- 节点上没有旧Pod:不处理,Manage过程中解决
- 节点上只有旧Pod,没有新Pod:把该节点加入到需要创建Pod的节点数组。如果该节点上的旧Pod未处于available状态,则这些节点会优先创建新Pod,DS Controller每个创建的Pod数量不会超过MaxSurge
- 节点上新旧Pod都有:如果新Pod已经处于Available,则旧的Pod加入待删除的Pod数组
- maxSurge==0:不允许有超出期望副本数的Pod在运行,在旧的Pod删除之前,新的Pod不会被创建,DS Controller仅仅删除旧的Pod,新的Pod会通过Manage过程创建
dsc.syncNodes
完成统计之后,调用dsc.syncNodes执行kube-apiserver删除和创建Pod