Kube-controller-manager(Job)

基于1.25

Job Controller是job的控制器

Job Controller会根据Job的并发(Spec.Parallelism)和完成(Spec.Completions)字段,此间一个或者多个Pod。在指定数量(Spec.Completions)的Pod执行成功之后,宣告Job成功,Job默认采用Orphan删除策略,删除Job不会主动删除其关联的Pod

控制器初始化

  • Job资源
  • Pod资源

主要执行逻辑

  1. jm.jobLister.Jobs

    获取Job资源对象

  2. IsJobFinished

    判断Job是否已经执行完成。

    Job Controller检查Job.Status.Conditions状态,如果存在Complete或Failed类型的状态且值为true,则Job已经执行完成

    • Ref:https://github.com/kubernetes/kubernetes/blob/1504f10e7946f95a8b1da35e28e4c7453ff62775/pkg/controller/job/util/utils.go#L36

      // FinishedCondition returns true if a job is finished as well as the condition type indicating that.
      // Returns false and no condition type otherwise
      func FinishedCondition(j *batch.Job) (bool, batch.JobConditionType) {
      for _, c := range j.Status.Conditions {
      if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
      return true, c.Type
      }
      }
      return false, ""
      }

      // IsJobFinished checks whether the given Job has finished execution.
      // It does not discriminate between successful and failed terminations.
      func IsJobFinished(j *batch.Job) bool {
      isFinished, _ := FinishedCondition(j)
      return isFinished
      }
  3. getPodsForJob

    获取关联的活跃的Pod资源对象

    活跃的Pod定义:Status.Phase 不是Succeeded也不是Failed,并且未处于删除中状态

  4. getStatus

    统计成功和失败的Pod。

    Job Controller检查每个Pod的Status.Phase。根据其值(Succeeded或者Failed),计算执行成功和执行失败的Pod

  5. exceedsBackoffLimit || pastBackoffLimitOnFailure(&job,pods)

    处理Job执行失败或者超时

      1. Job执行失败,执行exceedsBackoffLimit和pastBackoffLimitOnFailure

        Job的RestartPolicy字段决定Job失败的判定方式的不同。如果RestartPolicy为Never,则失败的Pod数量超过Spec.BackoffLimit时,则认为整个Job失败;如果RestartPolicy字段为OnFailure,则Job Controller计算各个Pod中容器重启次数的总和,当总和大于Spec.BackofffLimit,则认为整个Job失败

        当Job则判定为失败。finishedCondition变量被赋值。该变量在此处赋值后,Job Controller 会删除Job关联的执行的Pod中,不再执行其他Reconcile

      2. Job执行超时pastActiveDeadline

        如果Job没有执行失败,但是执行超时了,即Job从启动开始,已经执行超时了Spec.ActiveDeadlineSeconds规定的时间,但仍然没有全部执行成功,则认为Job执行超时

        当Job被判定超后。finished Condition变量将被赋值。此处被赋值后,JOb Controller 会删除Job关联所有执行的Pod,不再执行其他的Reconcile

      3. Job设置了Spec.ActiveDeadlineSeconds

        如果Job没有执行失败,但设置了Spec.ActiveDeadlineSeconds,则需要计算Job在当前时刻和截止时间之间的时间间隔,并且让工作队列在延迟时间间隔之后鞥心对Job加入到工作队列

    • Ref:https://github.com/kubernetes/kubernetes/blob/1504f10e7946f95a8b1da35e28e4c7453ff62775/pkg/controller/job/job_controller.go#L930

      // Evaluate failure scenarios for BackoffLimit and ActiveDeadlineSeconds.
      if jobCtx.finishedCondition == nil {
      if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
      // check if the number of pod restart exceeds backoff (for restart OnFailure only)
      // OR if the number of failed jobs increased since the last syncJob
      jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit")
      } else if jm.pastActiveDeadline(&job) {
      jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline")
      } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
      syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
      logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
      jm.queue.AddAfter(key, syncDuration)
      }
      }
  6. manageJob

    调谐Pod的数量

  7. complete

    判断Job是否完成。

  8. updateStatusHandler

    更新Job的状态

调谐Pod的数量

其中active是集群中该Job的活跃Pod数量,wantActive根据Spec.Parallelism和Spec.Completions计算出活跃的Pod数量

  1. 未设置Spec.Completions

    • 如果存在已经执行成功的Pod,则wantActive就是active的值
    • 如果不存在,就会最多启动Spec.Parallelism个Pod同时执行

    如果没有设置Spec.Parallelism,默认设置1

  2. 设置Spec.Completions

    • wantAtive值为剩余未成功的Pod数量和parallelism中较小者。同样通过慢启动启动Pod
  • Ref:https://github.com/kubernetes/kubernetes/blob/1504f10e7946f95a8b1da35e28e4c7453ff62775/pkg/controller/job/job_controller.go#L1616

    // manageJob is the core method responsible for managing the number of running
    // pods according to what is specified in the job.Spec.
    // Respects back-off; does not create new pods if the back-off time has not passed
    // Does NOT modify <activePods>.
    func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {
    logger := klog.FromContext(ctx)
    active := int32(len(jobCtx.activePods))
    parallelism := *job.Spec.Parallelism
    jobKey, err := controller.KeyFunc(job)
    if err != nil {
    utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
    return 0, metrics.JobSyncActionTracking, nil
    }

    if jobSuspended(job) {
    logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
    podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
    jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
    removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
    active -= removed
    if trackTerminatingPods(job) {
    *jobCtx.terminating += removed
    }
    jobCtx.ready -= removedReady
    return active, metrics.JobSyncActionPodsDeleted, err
    }

    wantActive := int32(0)
    if job.Spec.Completions == nil {
    // Job does not specify a number of completions. Therefore, number active
    // should be equal to parallelism, unless the job has seen at least
    // once success, in which leave whatever is running, running.
    if jobCtx.succeeded > 0 {
    wantActive = active
    } else {
    wantActive = parallelism
    }
    } else {
    // Job specifies a specific number of completions. Therefore, number
    // active should not ever exceed number of remaining completions.
    wantActive = *job.Spec.Completions - jobCtx.succeeded
    if wantActive > parallelism {
    wantActive = parallelism
    }
    if wantActive < 0 {
    wantActive = 0
    }
    }

    rmAtLeast := active - wantActive
    if rmAtLeast < 0 {
    rmAtLeast = 0
    }
    podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(rmAtLeast))
    if len(podsToDelete) > MaxPodCreateDeletePerSync {
    podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
    }
    if len(podsToDelete) > 0 {
    jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
    logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
    removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
    active -= removed
    if trackTerminatingPods(job) {
    *jobCtx.terminating += removed
    }
    jobCtx.ready -= removedReady
    // While it is possible for a Job to require both pod creations and
    // deletions at the same time (e.g. indexed Jobs with repeated indexes), we
    // restrict ourselves to either just pod deletion or pod creation in any
    // given sync cycle. Of these two, pod deletion takes precedence.
    return active, metrics.JobSyncActionPodsDeleted, err
    }

    var terminating int32 = 0
    if onlyReplaceFailedPods(jobCtx.job) {
    // When onlyReplaceFailedPods=true, then also trackTerminatingPods=true,
    // and so we can use the value.
    terminating = *jobCtx.terminating
    }
    if diff := wantActive - terminating - active; diff > 0 {
    var remainingTime time.Duration
    if !hasBackoffLimitPerIndex(job) {
    // we compute the global remaining time for pod creation when backoffLimitPerIndex is not used
    remainingTime = jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
    }
    if remainingTime > 0 {
    jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
    return 0, metrics.JobSyncActionPodsCreated, nil
    }
    if diff > int32(MaxPodCreateDeletePerSync) {
    diff = int32(MaxPodCreateDeletePerSync)
    }

    var indexesToAdd []int
    if isIndexedJob(job) {
    indexesToAdd = firstPendingIndexes(jobCtx, int(diff), int(*job.Spec.Completions))
    if hasBackoffLimitPerIndex(job) {
    indexesToAdd, remainingTime = jm.getPodCreationInfoForIndependentIndexes(logger, indexesToAdd, jobCtx.podsWithDelayedDeletionPerIndex)
    if remainingTime > 0 {
    jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
    return 0, metrics.JobSyncActionPodsCreated, nil
    }
    }
    diff = int32(len(indexesToAdd))
    }

    jm.expectations.ExpectCreations(logger, jobKey, int(diff))
    errCh := make(chan error, diff)
    logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff)

    wait := sync.WaitGroup{}

    active += diff

    podTemplate := job.Spec.Template.DeepCopy()
    if isIndexedJob(job) {
    addCompletionIndexEnvVariables(podTemplate)
    }
    podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)

    // Counters for pod creation status (used by the job_pods_creation_total metric)
    var creationsSucceeded, creationsFailed int32 = 0, 0

    // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
    // and double with each successful iteration in a kind of "slow start".
    // This handles attempts to start large numbers of pods that would
    // likely all fail with the same error. For example a project with a
    // low quota that attempts to create a large number of pods will be
    // prevented from spamming the API service with the pod create requests
    // after one of its pods fails. Conveniently, this also prevents the
    // event spam that those failures would generate.
    for batchSize := min(diff, int32(controller.SlowStartInitialBatchSize)); diff > 0; batchSize = min(2*batchSize, diff) {
    errorCount := len(errCh)
    wait.Add(int(batchSize))
    for i := int32(0); i < batchSize; i++ {
    completionIndex := unknownCompletionIndex
    if len(indexesToAdd) > 0 {
    completionIndex = indexesToAdd[0]
    indexesToAdd = indexesToAdd[1:]
    }
    go func() {
    template := podTemplate
    generateName := ""
    if completionIndex != unknownCompletionIndex {
    template = podTemplate.DeepCopy()
    addCompletionIndexAnnotation(template, completionIndex)

    if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
    addCompletionIndexLabel(template, completionIndex)
    }
    template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
    generateName = podGenerateNameWithIndex(job.Name, completionIndex)
    if hasBackoffLimitPerIndex(job) {
    addIndexFailureCountAnnotation(logger, template, job, jobCtx.podsWithDelayedDeletionPerIndex[completionIndex])
    }
    }
    defer wait.Done()
    err := jm.podControl.CreatePodsWithGenerateName(ctx, job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
    if err != nil {
    if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
    // If the namespace is being torn down, we can safely ignore
    // this error since all subsequent creations will fail.
    return
    }
    }
    if err != nil {
    defer utilruntime.HandleError(err)
    // Decrement the expected number of creates because the informer won't observe this pod
    logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job))
    jm.expectations.CreationObserved(logger, jobKey)
    atomic.AddInt32(&active, -1)
    errCh <- err
    atomic.AddInt32(&creationsFailed, 1)
    }
    atomic.AddInt32(&creationsSucceeded, 1)
    }()
    }
    wait.Wait()
    // any skipped pods that we never attempted to start shouldn't be expected.
    skippedPods := diff - batchSize
    if errorCount < len(errCh) && skippedPods > 0 {
    logger.V(2).Info("Slow-start failure. Skipping creating pods, decrementing expectations", "skippedCount", skippedPods, "job", klog.KObj(job))
    active -= skippedPods
    for i := int32(0); i < skippedPods; i++ {
    // Decrement the expected number of creates because the informer won't observe this pod
    jm.expectations.CreationObserved(logger, jobKey)
    }
    // The skipped pods will be retried later. The next controller resync will
    // retry the slow start process.
    break
    }
    diff -= batchSize
    }
    recordJobPodsCreationTotal(job, jobCtx, creationsSucceeded, creationsFailed)
    return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
    }

    return active, metrics.JobSyncActionTracking, nil
    }