Kube-controller-manager(Job)
Kube-controller-manager(Job)
基于1.25
Job Controller是job的控制器
Job Controller会根据Job的并发(Spec.Parallelism)和完成(Spec.Completions)字段,此间一个或者多个Pod。在指定数量(Spec.Completions)的Pod执行成功之后,宣告Job成功,Job默认采用Orphan删除策略,删除Job不会主动删除其关联的Pod
控制器初始化
- Job资源
- Pod资源
主要执行逻辑
jm.jobLister.Jobs
获取Job资源对象
IsJobFinished
判断Job是否已经执行完成。
Job Controller检查Job.Status.Conditions状态,如果存在Complete或Failed类型的状态且值为true,则Job已经执行完成
-
// FinishedCondition returns true if a job is finished as well as the condition type indicating that.
// Returns false and no condition type otherwise
func FinishedCondition(j *batch.Job) (bool, batch.JobConditionType) {
for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
return true, c.Type
}
}
return false, ""
}
// IsJobFinished checks whether the given Job has finished execution.
// It does not discriminate between successful and failed terminations.
func IsJobFinished(j *batch.Job) bool {
isFinished, _ := FinishedCondition(j)
return isFinished
}
-
getPodsForJob
获取关联的活跃的Pod资源对象
活跃的Pod定义:Status.Phase 不是Succeeded也不是Failed,并且未处于删除中状态
getStatus
统计成功和失败的Pod。
Job Controller检查每个Pod的Status.Phase。根据其值(Succeeded或者Failed),计算执行成功和执行失败的Pod
exceedsBackoffLimit || pastBackoffLimitOnFailure(&job,pods)
处理Job执行失败或者超时
Job执行失败,执行exceedsBackoffLimit和pastBackoffLimitOnFailure
Job的RestartPolicy字段决定Job失败的判定方式的不同。如果RestartPolicy为Never,则失败的Pod数量超过Spec.BackoffLimit时,则认为整个Job失败;如果RestartPolicy字段为OnFailure,则Job Controller计算各个Pod中容器重启次数的总和,当总和大于Spec.BackofffLimit,则认为整个Job失败
当Job则判定为失败。finishedCondition变量被赋值。该变量在此处赋值后,Job Controller 会删除Job关联的执行的Pod中,不再执行其他Reconcile
Job执行超时pastActiveDeadline
如果Job没有执行失败,但是执行超时了,即Job从启动开始,已经执行超时了Spec.ActiveDeadlineSeconds规定的时间,但仍然没有全部执行成功,则认为Job执行超时
当Job被判定超后。finished Condition变量将被赋值。此处被赋值后,JOb Controller 会删除Job关联所有执行的Pod,不再执行其他的Reconcile
Job设置了Spec.ActiveDeadlineSeconds
如果Job没有执行失败,但设置了Spec.ActiveDeadlineSeconds,则需要计算Job在当前时刻和截止时间之间的时间间隔,并且让工作队列在延迟时间间隔之后鞥心对Job加入到工作队列
-
// Evaluate failure scenarios for BackoffLimit and ActiveDeadlineSeconds.
if jobCtx.finishedCondition == nil {
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit")
} else if jm.pastActiveDeadline(&job) {
jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline")
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
jm.queue.AddAfter(key, syncDuration)
}
}
manageJob
调谐Pod的数量
如果Job被设置了Suspend状态(Job.Suspend字段为true),则Job Controller删除所有现有的活跃的Pod
如果JOb被设置为未Suspend,则Job Controller 根据当前Spec.Completions 和Spec.Paralllelism字段计算需要创建和删除的Pod,并且调用kube-apiserver慢启动创建和删除
-
if satisfiedExpectations && job.DeletionTimestamp == nil {
active, action, manageJobErr = jm.manageJob(ctx, &job, jobCtx)
manageJobCalled = true
}
complete
判断Job是否完成。
如果未设置Spec.Completions,则有Pod执行成功不再有活跃的Pod,认为Pod成功
如果设置,Pod执行超过Spec.Completions,认为Pod成功
-
// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0
updateStatusHandler
更新Job的状态
调谐Pod的数量
其中active是集群中该Job的活跃Pod数量,wantActive根据Spec.Parallelism和Spec.Completions计算出活跃的Pod数量
未设置Spec.Completions
- 如果存在已经执行成功的Pod,则wantActive就是active的值
- 如果不存在,就会最多启动Spec.Parallelism个Pod同时执行
如果没有设置Spec.Parallelism,默认设置1
设置Spec.Completions
- wantAtive值为剩余未成功的Pod数量和parallelism中较小者。同样通过慢启动启动Pod
-
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Respects back-off; does not create new pods if the back-off time has not passed
// Does NOT modify <activePods>.
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {
logger := klog.FromContext(ctx)
active := int32(len(jobCtx.activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return 0, metrics.JobSyncActionTracking, nil
}
if jobSuspended(job) {
logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
if trackTerminatingPods(job) {
*jobCtx.terminating += removed
}
jobCtx.ready -= removedReady
return active, metrics.JobSyncActionPodsDeleted, err
}
wantActive := int32(0)
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if jobCtx.succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - jobCtx.succeeded
if wantActive > parallelism {
wantActive = parallelism
}
if wantActive < 0 {
wantActive = 0
}
}
rmAtLeast := active - wantActive
if rmAtLeast < 0 {
rmAtLeast = 0
}
podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(rmAtLeast))
if len(podsToDelete) > MaxPodCreateDeletePerSync {
podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
}
if len(podsToDelete) > 0 {
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
if trackTerminatingPods(job) {
*jobCtx.terminating += removed
}
jobCtx.ready -= removedReady
// While it is possible for a Job to require both pod creations and
// deletions at the same time (e.g. indexed Jobs with repeated indexes), we
// restrict ourselves to either just pod deletion or pod creation in any
// given sync cycle. Of these two, pod deletion takes precedence.
return active, metrics.JobSyncActionPodsDeleted, err
}
var terminating int32 = 0
if onlyReplaceFailedPods(jobCtx.job) {
// When onlyReplaceFailedPods=true, then also trackTerminatingPods=true,
// and so we can use the value.
terminating = *jobCtx.terminating
}
if diff := wantActive - terminating - active; diff > 0 {
var remainingTime time.Duration
if !hasBackoffLimitPerIndex(job) {
// we compute the global remaining time for pod creation when backoffLimitPerIndex is not used
remainingTime = jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
}
if remainingTime > 0 {
jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil
}
if diff > int32(MaxPodCreateDeletePerSync) {
diff = int32(MaxPodCreateDeletePerSync)
}
var indexesToAdd []int
if isIndexedJob(job) {
indexesToAdd = firstPendingIndexes(jobCtx, int(diff), int(*job.Spec.Completions))
if hasBackoffLimitPerIndex(job) {
indexesToAdd, remainingTime = jm.getPodCreationInfoForIndependentIndexes(logger, indexesToAdd, jobCtx.podsWithDelayedDeletionPerIndex)
if remainingTime > 0 {
jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil
}
}
diff = int32(len(indexesToAdd))
}
jm.expectations.ExpectCreations(logger, jobKey, int(diff))
errCh := make(chan error, diff)
logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff)
wait := sync.WaitGroup{}
active += diff
podTemplate := job.Spec.Template.DeepCopy()
if isIndexedJob(job) {
addCompletionIndexEnvVariables(podTemplate)
}
podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
// Counters for pod creation status (used by the job_pods_creation_total metric)
var creationsSucceeded, creationsFailed int32 = 0, 0
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
for batchSize := min(diff, int32(controller.SlowStartInitialBatchSize)); diff > 0; batchSize = min(2*batchSize, diff) {
errorCount := len(errCh)
wait.Add(int(batchSize))
for i := int32(0); i < batchSize; i++ {
completionIndex := unknownCompletionIndex
if len(indexesToAdd) > 0 {
completionIndex = indexesToAdd[0]
indexesToAdd = indexesToAdd[1:]
}
go func() {
template := podTemplate
generateName := ""
if completionIndex != unknownCompletionIndex {
template = podTemplate.DeepCopy()
addCompletionIndexAnnotation(template, completionIndex)
if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
addCompletionIndexLabel(template, completionIndex)
}
template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
generateName = podGenerateNameWithIndex(job.Name, completionIndex)
if hasBackoffLimitPerIndex(job) {
addIndexFailureCountAnnotation(logger, template, job, jobCtx.podsWithDelayedDeletionPerIndex[completionIndex])
}
}
defer wait.Done()
err := jm.podControl.CreatePodsWithGenerateName(ctx, job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail.
return
}
}
if err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod
logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job))
jm.expectations.CreationObserved(logger, jobKey)
atomic.AddInt32(&active, -1)
errCh <- err
atomic.AddInt32(&creationsFailed, 1)
}
atomic.AddInt32(&creationsSucceeded, 1)
}()
}
wait.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := diff - batchSize
if errorCount < len(errCh) && skippedPods > 0 {
logger.V(2).Info("Slow-start failure. Skipping creating pods, decrementing expectations", "skippedCount", skippedPods, "job", klog.KObj(job))
active -= skippedPods
for i := int32(0); i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
jm.expectations.CreationObserved(logger, jobKey)
}
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
diff -= batchSize
}
recordJobPodsCreationTotal(job, jobCtx, creationsSucceeded, creationsFailed)
return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
}
return active, metrics.JobSyncActionTracking, nil
}