Kube-controller-manager(DaemonSet)
Kube-controller-manager(DaemonSet)
基于1.25
DaemonSet Controller控制DaemonSet对象
- DaemonSet资源对象选中的节点,最终仅仅有一个Pod处于可用
DaemonSet资源对象主要包含Pod模版、更新策略俩部分
控制器初始化
- DaemonSet资源对象:监听Add、Update、Delete事件
- Pod资源对象
- Node资源对象
- ControllerRevison资源对象:处理ControllerRevison资源对象同步,记录DS的Pod模版历史重现过的版本
主要执行逻辑
dsc.dsListener.DaemonSets
获取DS资源对象,DS Controller通过DS Informer的Lister获取完整的DS资源对象
dsc.nodeLister.List
获取集群的所有Node资源对象
ds.DeletionTimestamp !=nil
判断资源对象是否处于删除中。如果处于删除中行,不需要进行Reconcile
dsc.expections.SatisfiedExpectations
判断是否执行Reconcile。
DS Controler通过Expectation机制来判断上一次调谐机制需要创建或删除的Pod是否都被kube-apiserver处理完成
dsc.manage
调谐Pod数量
DS Controller统计各个节点上正在运行的Pod,结合Pod的Toleration、Affinity和节点的Taint、Lable等信息,统计需要创建新的Pod和删除的Pod
switch ds.Spec.UpdateStrategy.Type
按照更新Pod执行更新
如果更新策略为OnDelete:DS不会进行任何操作,需要用户手动删除Pod;DS会在下一次Reconcile创建新Pod
如果更新策略为RollingUpdate:DS会负责Pod 的删除和创建
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
err := dsc.manage(ctx, ds, nodeList, hash)
if err != nil {
return err
}
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(key) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
if err != nil {
return err
}
}
err = dsc.cleanupHistory(ctx, ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
}
return nil
}
updateDaemonSetStatus
更新DS资源对象的状态
- DS Controller计算DS处于Ready状态的活跃Pod数量、处于Available状态的活跃Pod数量等,调用kube-apiserver更新状态
- 此外更RS Controller一致,Ready 数量的Pod和Available 数量的Pod不一致,并且MinReadySeconds 不为0,则DS Controller延迟MinReadySeconds把当前的DS重现加入到工作队列
调谐Pod的数量
DS Controller通过调用manage func实现同步每个节点上的Pod,补齐Pod上的节点上的Pod并且删除失败和冗余的Pod
manage的主要执行流程如下:
统计每个节点上运行的Pod
DS Controller通过getNodesToDaemonPods得到一个Node:Node上运行Pod集合的映射,记录每个节点运行了哪些Pod
统计每个节点需要创建、删除的Pod
DS Controller 遍历每个节点,调用podsShouldBeOnNode计算出当前节点是否需要创建Pod,当前节点上哪些Pod需要删除,并且把这些数据存入nodesNeedingDaemonPods和podsToDelete数组中
- podsShouldBeOnNode:
- 首先判断是否继续运行Pod
- 节点应该运行Pod:如果节点上还没有该DS的Pod,Pod就应该被创建。当节点不能满足DS到NodeName要求,或者不能满足节点的Affinity要求,或者遇到了不能容忍的Taint时,认为节点不能运行Pod
- 节点不应该继续运行Pod:如果已经有该DS的Pod,节点是否容忍某个Taint,切Taint的Effect是NoExecute,Pod才不继续在该节点运行。在其他情况下,之上不能有其他Pod调度到这个节点上
- 首先判断是否继续运行Pod
DS Controller会继续按照下面流程检查Pod:
- 如果一个节点应该运行Pod,但是存在正在运行的Pod,Pod加入到nodesNeedingDaemonPods数组
- 如果一个节点不应该继续运行Pod,但是存在正在运行的Pod,则把Pod加入podsToDelete数组
- 如果一个节点上应该继续运行Pod,则遍历节点上所有的Pod
- 将处于Failed状态的Pod加入podsToDelete数组
- 如果DS资源的MaxSurge为0,则仅仅保留最早的Pod,其他的Pod加入podsToDelete数组
- podsShouldBeOnNode:
执行Pod的创建和删除
完成需要创建和删除的Pod统计之后,DS Controller调用syncNodes func,真正调用kube-apiserver执行Pod的创建和删除,通过慢启动的方式启动Pod
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33// manage manages the scheduling and running of Pods of ds on nodes.
// After figuring out which nodes should run a Pod of ds but not yet running one and
// which nodes should not run a Pod of ds but currently running one, it calls function
// syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
// Find out the pods which are created for the nodes by DaemonSet.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList {
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds, hash)
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
}
// Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
// If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
// Label new pods using the hash label value of the current history when creating them
if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return err
}
return nil
}
更新策略
DS的Pod更新策略主要有OnDelete和RollingUpdtae俩种
OnDelete
DS Controller不会对Pod做任何处理:更新节点上运行的Pod,需要手动删除Pod,当Pod删除之后,再一次同步DS中,DS Controller会在Manage过程补齐新版本的Pod
RollingUpdate
DS Controller会逐步对所有的DS的Pod进行替换,主要通过rollingUpdate func实现
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217// rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes,
// remaining within the constraints imposed by the update strategy.
func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)
if err != nil {
return fmt.Errorf("couldn't get unavailable numbers: %v", err)
}
now := dsc.failedPodsBackoff.Clock.Now()
// When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any
// are necessary, and let the core loop create new instances on those nodes.
//
// Assumptions:
// * Expect manage loop to allow no more than one pod per node
// * Expect manage loop will create new pods
// * Expect manage loop will handle failed pods
// * Deleted pods do not count as unavailable so that updates make progress when nodes are down
// Invariants:
// * The number of new pods that are unavailable must be less than maxUnavailable
// * A node with an available old pod is a candidate for deletion if it does not violate other invariants
//
if maxSurge == 0 {
var numUnavailable int
var allowedReplacementPods []string
var candidatePodsToDelete []string
for nodeName, pods := range nodeToDaemonPods {
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
if !ok {
// let the manage loop clean up this node, and treat it as an unavailable node
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
numUnavailable++
continue
}
switch {
case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
// the manage loop will handle creating or deleting the appropriate pod, consider this unavailable
numUnavailable++
case newPod != nil:
// this pod is up to date, check its availability
if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
// an unavailable new pod is counted against maxUnavailable
numUnavailable++
}
default:
// this pod is old, it is an update candidate
switch {
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
// the old pod isn't available, so it needs to be replaced
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the replacement
if allowedReplacementPods == nil {
allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
}
allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
case numUnavailable >= maxUnavailable:
// no point considering any other candidates
continue
default:
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the candidate
if candidatePodsToDelete == nil {
candidatePodsToDelete = make([]string, 0, maxUnavailable)
}
candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
}
}
}
// use any of the candidates we can, including the allowedReplacemnntPods
klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d new are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete))
remainingUnavailable := maxUnavailable - numUnavailable
if remainingUnavailable < 0 {
remainingUnavailable = 0
}
if max := len(candidatePodsToDelete); remainingUnavailable > max {
remainingUnavailable = max
}
oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...)
return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash)
}
// When surging, we create new pods whenever an old pod is unavailable, and we can create up
// to maxSurge extra pods
//
// Assumptions:
// * Expect manage loop to allow no more than two pods per node, one old, one new
// * Expect manage loop will create new pods if there are no pods on node
// * Expect manage loop will handle failed pods
// * Deleted pods do not count as unavailable so that updates make progress when nodes are down
// Invariants:
// * A node with an unavailable old pod is a candidate for immediate new pod creation
// * An old available pod is deleted if a new pod is available
// * No more than maxSurge new pods are created for old available pods at any one time
//
var oldPodsToDelete []string // these pods are already updated or unavailable on sunsetted node
var shouldNotRunPodsToDelete []string // candidate pods to be deleted on sunsetted nodes
var candidateNewNodes []string
var allowedNewNodes []string
var numSurge int
var numAvailable int
for nodeName, pods := range nodeToDaemonPods {
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
if !ok {
// let the manage loop clean up this node, and treat it as a surge node
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
numSurge++
continue
}
// first count availability for all the nodes (even the ones that we are sunsetting due to scheduling constraints)
if oldPod != nil {
if podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
numAvailable++
}
} else if newPod != nil {
if podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
numAvailable++
}
}
switch {
case oldPod == nil:
// we don't need to do anything to this node, the manage loop will handle it
case newPod == nil:
// this is a surge candidate
switch {
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
node, err := dsc.nodeLister.Get(nodeName)
if err != nil {
return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
}
if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
klog.FromContext(ctx).V(5).Info("DaemonSet pod on node is not available and does not match scheduling constraints, remove old pod", "daemonset", klog.KObj(ds), "node", nodeName, "oldPod", klog.KObj(oldPod))
oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
continue
}
// the old pod isn't available, allow it to become a replacement
klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the replacement
if allowedNewNodes == nil {
allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
}
allowedNewNodes = append(allowedNewNodes, nodeName)
default:
node, err := dsc.nodeLister.Get(nodeName)
if err != nil {
return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
}
if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
shouldNotRunPodsToDelete = append(shouldNotRunPodsToDelete, oldPod.Name)
continue
}
if numSurge >= maxSurge {
// no point considering any other candidates
continue
}
klog.FromContext(ctx).V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the candidate
if candidateNewNodes == nil {
candidateNewNodes = make([]string, 0, maxSurge)
}
candidateNewNodes = append(candidateNewNodes, nodeName)
}
default:
// we have already surged onto this node, determine our state
if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
// we're waiting to go available here
numSurge++
continue
}
// we're available, delete the old pod
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is available, remove %s", ds.Namespace, ds.Name, newPod.Name, nodeName, oldPod.Name)
oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
}
}
// use any of the candidates we can, including the allowedNewNodes
klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, surge up to %d, %d are in progress, %d candidates", ds.Namespace, ds.Name, len(allowedNewNodes), maxSurge, numSurge, len(candidateNewNodes))
remainingSurge := maxSurge - numSurge
// With maxSurge, the application owner expects 100% availability.
// When the scheduling constraint change from node A to node B, we do not want the application to stay
// without any available pods. Only delete a pod on node A when a pod on node B becomes available.
if deletablePodsNumber := numAvailable - desiredNumberScheduled; deletablePodsNumber > 0 {
if shouldNotRunPodsToDeleteNumber := len(shouldNotRunPodsToDelete); deletablePodsNumber > shouldNotRunPodsToDeleteNumber {
deletablePodsNumber = shouldNotRunPodsToDeleteNumber
}
for _, podToDeleteName := range shouldNotRunPodsToDelete[:deletablePodsNumber] {
podToDelete, err := dsc.podLister.Pods(ds.Namespace).Get(podToDeleteName)
if err != nil {
if errors.IsNotFound(err) {
continue
}
return fmt.Errorf("couldn't get pod which should be deleted due to scheduling constraints %q: %v", podToDeleteName, err)
}
klog.FromContext(ctx).V(5).Info("DaemonSet pod on node should be deleted due to scheduling constraints", "daemonset", klog.KObj(ds), "pod", klog.KObj(podToDelete), "node", podToDelete.Spec.NodeName)
oldPodsToDelete = append(oldPodsToDelete, podToDeleteName)
}
}
if remainingSurge < 0 {
remainingSurge = 0
}
if max := len(candidateNewNodes); remainingSurge > max {
remainingSurge = max
}
newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...)
return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
}
dsc.getNodesToDaemonPods
DS Controller统计每个节点上运行Pod
dsc.updateDesiredNodeCounts
计算MaxSurge和Max Unavailable
if maxSurge==0
执行Pod更新,分为MaxSurge为0和不为0的情况
- maxSurge==0:不允许有超出期望副本数的Pod在运行,在旧的Pod删除之前,新的Pod不会被创建,DS Controller仅仅删除旧的Pod,新的Pod会通过Manage过程创建
- 节点新旧Pod都有或者都没没有:不处理,新Pod创建和旧Pod删除都在Manage中
- 节点仅有新Pod:不操作
- 节点仅有旧Pod:把Pod加入待删除的Pod数组,未处于Available的Pod将会优先删除。完成待删除的Pod统计,DS Controloller 通过syncNodes func删除
- maxSurge!=0:DS Controller 允许旧Pod未处于Available状态,在部分节点上启动新Pod
- 节点上没有旧Pod:不处理,Manage过程中解决
- 节点上只有旧Pod,没有新Pod:把该节点加入到需要创建Pod的节点数组。如果该节点上的旧Pod未处于available状态,则这些节点会优先创建新Pod,DS Controller每个创建的Pod数量不会超过MaxSurge
- 节点上新旧Pod都有:如果新Pod已经处于Available,则旧的Pod加入待删除的Pod数组
- maxSurge==0:不允许有超出期望副本数的Pod在运行,在旧的Pod删除之前,新的Pod不会被创建,DS Controller仅仅删除旧的Pod,新的Pod会通过Manage过程创建
dsc.syncNodes
完成统计之后,调用dsc.syncNodes执行kube-apiserver删除和创建Pod