Kube-controller-manager(Job)
Kube-controller-manager(Job)
基于1.25
Job Controller是job的控制器
Job Controller会根据Job的并发(Spec.Parallelism)和完成(Spec.Completions)字段,此间一个或者多个Pod。在指定数量(Spec.Completions)的Pod执行成功之后,宣告Job成功,Job默认采用Orphan删除策略,删除Job不会主动删除其关联的Pod
控制器初始化
- Job资源
- Pod资源
主要执行逻辑
jm.jobLister.Jobs
获取Job资源对象
IsJobFinished
判断Job是否已经执行完成。
Job Controller检查Job.Status.Conditions状态,如果存在Complete或Failed类型的状态且值为true,则Job已经执行完成
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// FinishedCondition returns true if a job is finished as well as the condition type indicating that.
// Returns false and no condition type otherwise
func FinishedCondition(j *batch.Job) (bool, batch.JobConditionType) {
for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
return true, c.Type
}
}
return false, ""
}
// IsJobFinished checks whether the given Job has finished execution.
// It does not discriminate between successful and failed terminations.
func IsJobFinished(j *batch.Job) bool {
isFinished, _ := FinishedCondition(j)
return isFinished
}
-
getPodsForJob
获取关联的活跃的Pod资源对象
活跃的Pod定义:Status.Phase 不是Succeeded也不是Failed,并且未处于删除中状态
getStatus
统计成功和失败的Pod。
Job Controller检查每个Pod的Status.Phase。根据其值(Succeeded或者Failed),计算执行成功和执行失败的Pod
exceedsBackoffLimit || pastBackoffLimitOnFailure(&job,pods)
处理Job执行失败或者超时
Job执行失败,执行exceedsBackoffLimit和pastBackoffLimitOnFailure
Job的RestartPolicy字段决定Job失败的判定方式的不同。如果RestartPolicy为Never,则失败的Pod数量超过Spec.BackoffLimit时,则认为整个Job失败;如果RestartPolicy字段为OnFailure,则Job Controller计算各个Pod中容器重启次数的总和,当总和大于Spec.BackofffLimit,则认为整个Job失败
当Job则判定为失败。finishedCondition变量被赋值。该变量在此处赋值后,Job Controller 会删除Job关联的执行的Pod中,不再执行其他Reconcile
Job执行超时pastActiveDeadline
如果Job没有执行失败,但是执行超时了,即Job从启动开始,已经执行超时了Spec.ActiveDeadlineSeconds规定的时间,但仍然没有全部执行成功,则认为Job执行超时
当Job被判定超后。finished Condition变量将被赋值。此处被赋值后,JOb Controller 会删除Job关联所有执行的Pod,不再执行其他的Reconcile
Job设置了Spec.ActiveDeadlineSeconds
如果Job没有执行失败,但设置了Spec.ActiveDeadlineSeconds,则需要计算Job在当前时刻和截止时间之间的时间间隔,并且让工作队列在延迟时间间隔之后鞥心对Job加入到工作队列
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14// Evaluate failure scenarios for BackoffLimit and ActiveDeadlineSeconds.
if jobCtx.finishedCondition == nil {
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit")
} else if jm.pastActiveDeadline(&job) {
jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline")
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
jm.queue.AddAfter(key, syncDuration)
}
}
manageJob
调谐Pod的数量
如果Job被设置了Suspend状态(Job.Suspend字段为true),则Job Controller删除所有现有的活跃的Pod
如果JOb被设置为未Suspend,则Job Controller 根据当前Spec.Completions 和Spec.Paralllelism字段计算需要创建和删除的Pod,并且调用kube-apiserver慢启动创建和删除
-
1
2
3
4if satisfiedExpectations && job.DeletionTimestamp == nil {
active, action, manageJobErr = jm.manageJob(ctx, &job, jobCtx)
manageJobCalled = true
}
complete
判断Job是否完成。
如果未设置Spec.Completions,则有Pod执行成功不再有活跃的Pod,认为Pod成功
如果设置,Pod执行超过Spec.Completions,认为Pod成功
-
1
2
3
4
5// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0
updateStatusHandler
更新Job的状态
调谐Pod的数量
其中active是集群中该Job的活跃Pod数量,wantActive根据Spec.Parallelism和Spec.Completions计算出活跃的Pod数量
未设置Spec.Completions
- 如果存在已经执行成功的Pod,则wantActive就是active的值
- 如果不存在,就会最多启动Spec.Parallelism个Pod同时执行
如果没有设置Spec.Parallelism,默认设置1
设置Spec.Completions
- wantAtive值为剩余未成功的Pod数量和parallelism中较小者。同样通过慢启动启动Pod
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Respects back-off; does not create new pods if the back-off time has not passed
// Does NOT modify <activePods>.
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {
logger := klog.FromContext(ctx)
active := int32(len(jobCtx.activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return 0, metrics.JobSyncActionTracking, nil
}
if jobSuspended(job) {
logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
if trackTerminatingPods(job) {
*jobCtx.terminating += removed
}
jobCtx.ready -= removedReady
return active, metrics.JobSyncActionPodsDeleted, err
}
wantActive := int32(0)
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if jobCtx.succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - jobCtx.succeeded
if wantActive > parallelism {
wantActive = parallelism
}
if wantActive < 0 {
wantActive = 0
}
}
rmAtLeast := active - wantActive
if rmAtLeast < 0 {
rmAtLeast = 0
}
podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(rmAtLeast))
if len(podsToDelete) > MaxPodCreateDeletePerSync {
podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
}
if len(podsToDelete) > 0 {
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
if trackTerminatingPods(job) {
*jobCtx.terminating += removed
}
jobCtx.ready -= removedReady
// While it is possible for a Job to require both pod creations and
// deletions at the same time (e.g. indexed Jobs with repeated indexes), we
// restrict ourselves to either just pod deletion or pod creation in any
// given sync cycle. Of these two, pod deletion takes precedence.
return active, metrics.JobSyncActionPodsDeleted, err
}
var terminating int32 = 0
if onlyReplaceFailedPods(jobCtx.job) {
// When onlyReplaceFailedPods=true, then also trackTerminatingPods=true,
// and so we can use the value.
terminating = *jobCtx.terminating
}
if diff := wantActive - terminating - active; diff > 0 {
var remainingTime time.Duration
if !hasBackoffLimitPerIndex(job) {
// we compute the global remaining time for pod creation when backoffLimitPerIndex is not used
remainingTime = jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
}
if remainingTime > 0 {
jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil
}
if diff > int32(MaxPodCreateDeletePerSync) {
diff = int32(MaxPodCreateDeletePerSync)
}
var indexesToAdd []int
if isIndexedJob(job) {
indexesToAdd = firstPendingIndexes(jobCtx, int(diff), int(*job.Spec.Completions))
if hasBackoffLimitPerIndex(job) {
indexesToAdd, remainingTime = jm.getPodCreationInfoForIndependentIndexes(logger, indexesToAdd, jobCtx.podsWithDelayedDeletionPerIndex)
if remainingTime > 0 {
jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil
}
}
diff = int32(len(indexesToAdd))
}
jm.expectations.ExpectCreations(logger, jobKey, int(diff))
errCh := make(chan error, diff)
logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff)
wait := sync.WaitGroup{}
active += diff
podTemplate := job.Spec.Template.DeepCopy()
if isIndexedJob(job) {
addCompletionIndexEnvVariables(podTemplate)
}
podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
// Counters for pod creation status (used by the job_pods_creation_total metric)
var creationsSucceeded, creationsFailed int32 = 0, 0
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
for batchSize := min(diff, int32(controller.SlowStartInitialBatchSize)); diff > 0; batchSize = min(2*batchSize, diff) {
errorCount := len(errCh)
wait.Add(int(batchSize))
for i := int32(0); i < batchSize; i++ {
completionIndex := unknownCompletionIndex
if len(indexesToAdd) > 0 {
completionIndex = indexesToAdd[0]
indexesToAdd = indexesToAdd[1:]
}
go func() {
template := podTemplate
generateName := ""
if completionIndex != unknownCompletionIndex {
template = podTemplate.DeepCopy()
addCompletionIndexAnnotation(template, completionIndex)
if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
addCompletionIndexLabel(template, completionIndex)
}
template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
generateName = podGenerateNameWithIndex(job.Name, completionIndex)
if hasBackoffLimitPerIndex(job) {
addIndexFailureCountAnnotation(logger, template, job, jobCtx.podsWithDelayedDeletionPerIndex[completionIndex])
}
}
defer wait.Done()
err := jm.podControl.CreatePodsWithGenerateName(ctx, job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail.
return
}
}
if err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod
logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job))
jm.expectations.CreationObserved(logger, jobKey)
atomic.AddInt32(&active, -1)
errCh <- err
atomic.AddInt32(&creationsFailed, 1)
}
atomic.AddInt32(&creationsSucceeded, 1)
}()
}
wait.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := diff - batchSize
if errorCount < len(errCh) && skippedPods > 0 {
logger.V(2).Info("Slow-start failure. Skipping creating pods, decrementing expectations", "skippedCount", skippedPods, "job", klog.KObj(job))
active -= skippedPods
for i := int32(0); i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
jm.expectations.CreationObserved(logger, jobKey)
}
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
diff -= batchSize
}
recordJobPodsCreationTotal(job, jobCtx, creationsSucceeded, creationsFailed)
return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
}
return active, metrics.JobSyncActionTracking, nil
}