Kube-controller-manager(Job)

基于1.25

Job Controller是job的控制器

Job Controller会根据Job的并发(Spec.Parallelism)和完成(Spec.Completions)字段,此间一个或者多个Pod。在指定数量(Spec.Completions)的Pod执行成功之后,宣告Job成功,Job默认采用Orphan删除策略,删除Job不会主动删除其关联的Pod

控制器初始化

  • Job资源
  • Pod资源

主要执行逻辑

  1. jm.jobLister.Jobs

    获取Job资源对象

  2. IsJobFinished

    判断Job是否已经执行完成。

    Job Controller检查Job.Status.Conditions状态,如果存在Complete或Failed类型的状态且值为true,则Job已经执行完成

    • Ref:https://github.com/kubernetes/kubernetes/blob/1504f10e7946f95a8b1da35e28e4c7453ff62775/pkg/controller/job/util/utils.go#L36

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      // FinishedCondition returns true if a job is finished as well as the condition type indicating that.
      // Returns false and no condition type otherwise
      func FinishedCondition(j *batch.Job) (bool, batch.JobConditionType) {
      for _, c := range j.Status.Conditions {
      if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
      return true, c.Type
      }
      }
      return false, ""
      }

      // IsJobFinished checks whether the given Job has finished execution.
      // It does not discriminate between successful and failed terminations.
      func IsJobFinished(j *batch.Job) bool {
      isFinished, _ := FinishedCondition(j)
      return isFinished
      }
  3. getPodsForJob

    获取关联的活跃的Pod资源对象

    活跃的Pod定义:Status.Phase 不是Succeeded也不是Failed,并且未处于删除中状态

  4. getStatus

    统计成功和失败的Pod。

    Job Controller检查每个Pod的Status.Phase。根据其值(Succeeded或者Failed),计算执行成功和执行失败的Pod

  5. exceedsBackoffLimit || pastBackoffLimitOnFailure(&job,pods)

    处理Job执行失败或者超时

      1. Job执行失败,执行exceedsBackoffLimit和pastBackoffLimitOnFailure

        Job的RestartPolicy字段决定Job失败的判定方式的不同。如果RestartPolicy为Never,则失败的Pod数量超过Spec.BackoffLimit时,则认为整个Job失败;如果RestartPolicy字段为OnFailure,则Job Controller计算各个Pod中容器重启次数的总和,当总和大于Spec.BackofffLimit,则认为整个Job失败

        当Job则判定为失败。finishedCondition变量被赋值。该变量在此处赋值后,Job Controller 会删除Job关联的执行的Pod中,不再执行其他Reconcile

      2. Job执行超时pastActiveDeadline

        如果Job没有执行失败,但是执行超时了,即Job从启动开始,已经执行超时了Spec.ActiveDeadlineSeconds规定的时间,但仍然没有全部执行成功,则认为Job执行超时

        当Job被判定超后。finished Condition变量将被赋值。此处被赋值后,JOb Controller 会删除Job关联所有执行的Pod,不再执行其他的Reconcile

      3. Job设置了Spec.ActiveDeadlineSeconds

        如果Job没有执行失败,但设置了Spec.ActiveDeadlineSeconds,则需要计算Job在当前时刻和截止时间之间的时间间隔,并且让工作队列在延迟时间间隔之后鞥心对Job加入到工作队列

    • Ref:https://github.com/kubernetes/kubernetes/blob/1504f10e7946f95a8b1da35e28e4c7453ff62775/pkg/controller/job/job_controller.go#L930

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      // Evaluate failure scenarios for BackoffLimit and ActiveDeadlineSeconds.
      if jobCtx.finishedCondition == nil {
      if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
      // check if the number of pod restart exceeds backoff (for restart OnFailure only)
      // OR if the number of failed jobs increased since the last syncJob
      jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit")
      } else if jm.pastActiveDeadline(&job) {
      jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline")
      } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
      syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
      logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
      jm.queue.AddAfter(key, syncDuration)
      }
      }
  6. manageJob

    调谐Pod的数量

  7. complete

    判断Job是否完成。

  8. updateStatusHandler

    更新Job的状态

调谐Pod的数量

其中active是集群中该Job的活跃Pod数量,wantActive根据Spec.Parallelism和Spec.Completions计算出活跃的Pod数量

  1. 未设置Spec.Completions

    • 如果存在已经执行成功的Pod,则wantActive就是active的值
    • 如果不存在,就会最多启动Spec.Parallelism个Pod同时执行

    如果没有设置Spec.Parallelism,默认设置1

  2. 设置Spec.Completions

    • wantAtive值为剩余未成功的Pod数量和parallelism中较小者。同样通过慢启动启动Pod
  • Ref:https://github.com/kubernetes/kubernetes/blob/1504f10e7946f95a8b1da35e28e4c7453ff62775/pkg/controller/job/job_controller.go#L1616

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    // manageJob is the core method responsible for managing the number of running
    // pods according to what is specified in the job.Spec.
    // Respects back-off; does not create new pods if the back-off time has not passed
    // Does NOT modify <activePods>.
    func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {
    logger := klog.FromContext(ctx)
    active := int32(len(jobCtx.activePods))
    parallelism := *job.Spec.Parallelism
    jobKey, err := controller.KeyFunc(job)
    if err != nil {
    utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
    return 0, metrics.JobSyncActionTracking, nil
    }

    if jobSuspended(job) {
    logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
    podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
    jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
    removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
    active -= removed
    if trackTerminatingPods(job) {
    *jobCtx.terminating += removed
    }
    jobCtx.ready -= removedReady
    return active, metrics.JobSyncActionPodsDeleted, err
    }

    wantActive := int32(0)
    if job.Spec.Completions == nil {
    // Job does not specify a number of completions. Therefore, number active
    // should be equal to parallelism, unless the job has seen at least
    // once success, in which leave whatever is running, running.
    if jobCtx.succeeded > 0 {
    wantActive = active
    } else {
    wantActive = parallelism
    }
    } else {
    // Job specifies a specific number of completions. Therefore, number
    // active should not ever exceed number of remaining completions.
    wantActive = *job.Spec.Completions - jobCtx.succeeded
    if wantActive > parallelism {
    wantActive = parallelism
    }
    if wantActive < 0 {
    wantActive = 0
    }
    }

    rmAtLeast := active - wantActive
    if rmAtLeast < 0 {
    rmAtLeast = 0
    }
    podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(rmAtLeast))
    if len(podsToDelete) > MaxPodCreateDeletePerSync {
    podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
    }
    if len(podsToDelete) > 0 {
    jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
    logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
    removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
    active -= removed
    if trackTerminatingPods(job) {
    *jobCtx.terminating += removed
    }
    jobCtx.ready -= removedReady
    // While it is possible for a Job to require both pod creations and
    // deletions at the same time (e.g. indexed Jobs with repeated indexes), we
    // restrict ourselves to either just pod deletion or pod creation in any
    // given sync cycle. Of these two, pod deletion takes precedence.
    return active, metrics.JobSyncActionPodsDeleted, err
    }

    var terminating int32 = 0
    if onlyReplaceFailedPods(jobCtx.job) {
    // When onlyReplaceFailedPods=true, then also trackTerminatingPods=true,
    // and so we can use the value.
    terminating = *jobCtx.terminating
    }
    if diff := wantActive - terminating - active; diff > 0 {
    var remainingTime time.Duration
    if !hasBackoffLimitPerIndex(job) {
    // we compute the global remaining time for pod creation when backoffLimitPerIndex is not used
    remainingTime = jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
    }
    if remainingTime > 0 {
    jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
    return 0, metrics.JobSyncActionPodsCreated, nil
    }
    if diff > int32(MaxPodCreateDeletePerSync) {
    diff = int32(MaxPodCreateDeletePerSync)
    }

    var indexesToAdd []int
    if isIndexedJob(job) {
    indexesToAdd = firstPendingIndexes(jobCtx, int(diff), int(*job.Spec.Completions))
    if hasBackoffLimitPerIndex(job) {
    indexesToAdd, remainingTime = jm.getPodCreationInfoForIndependentIndexes(logger, indexesToAdd, jobCtx.podsWithDelayedDeletionPerIndex)
    if remainingTime > 0 {
    jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
    return 0, metrics.JobSyncActionPodsCreated, nil
    }
    }
    diff = int32(len(indexesToAdd))
    }

    jm.expectations.ExpectCreations(logger, jobKey, int(diff))
    errCh := make(chan error, diff)
    logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff)

    wait := sync.WaitGroup{}

    active += diff

    podTemplate := job.Spec.Template.DeepCopy()
    if isIndexedJob(job) {
    addCompletionIndexEnvVariables(podTemplate)
    }
    podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)

    // Counters for pod creation status (used by the job_pods_creation_total metric)
    var creationsSucceeded, creationsFailed int32 = 0, 0

    // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
    // and double with each successful iteration in a kind of "slow start".
    // This handles attempts to start large numbers of pods that would
    // likely all fail with the same error. For example a project with a
    // low quota that attempts to create a large number of pods will be
    // prevented from spamming the API service with the pod create requests
    // after one of its pods fails. Conveniently, this also prevents the
    // event spam that those failures would generate.
    for batchSize := min(diff, int32(controller.SlowStartInitialBatchSize)); diff > 0; batchSize = min(2*batchSize, diff) {
    errorCount := len(errCh)
    wait.Add(int(batchSize))
    for i := int32(0); i < batchSize; i++ {
    completionIndex := unknownCompletionIndex
    if len(indexesToAdd) > 0 {
    completionIndex = indexesToAdd[0]
    indexesToAdd = indexesToAdd[1:]
    }
    go func() {
    template := podTemplate
    generateName := ""
    if completionIndex != unknownCompletionIndex {
    template = podTemplate.DeepCopy()
    addCompletionIndexAnnotation(template, completionIndex)

    if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
    addCompletionIndexLabel(template, completionIndex)
    }
    template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
    generateName = podGenerateNameWithIndex(job.Name, completionIndex)
    if hasBackoffLimitPerIndex(job) {
    addIndexFailureCountAnnotation(logger, template, job, jobCtx.podsWithDelayedDeletionPerIndex[completionIndex])
    }
    }
    defer wait.Done()
    err := jm.podControl.CreatePodsWithGenerateName(ctx, job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
    if err != nil {
    if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
    // If the namespace is being torn down, we can safely ignore
    // this error since all subsequent creations will fail.
    return
    }
    }
    if err != nil {
    defer utilruntime.HandleError(err)
    // Decrement the expected number of creates because the informer won't observe this pod
    logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job))
    jm.expectations.CreationObserved(logger, jobKey)
    atomic.AddInt32(&active, -1)
    errCh <- err
    atomic.AddInt32(&creationsFailed, 1)
    }
    atomic.AddInt32(&creationsSucceeded, 1)
    }()
    }
    wait.Wait()
    // any skipped pods that we never attempted to start shouldn't be expected.
    skippedPods := diff - batchSize
    if errorCount < len(errCh) && skippedPods > 0 {
    logger.V(2).Info("Slow-start failure. Skipping creating pods, decrementing expectations", "skippedCount", skippedPods, "job", klog.KObj(job))
    active -= skippedPods
    for i := int32(0); i < skippedPods; i++ {
    // Decrement the expected number of creates because the informer won't observe this pod
    jm.expectations.CreationObserved(logger, jobKey)
    }
    // The skipped pods will be retried later. The next controller resync will
    // retry the slow start process.
    break
    }
    diff -= batchSize
    }
    recordJobPodsCreationTotal(job, jobCtx, creationsSucceeded, creationsFailed)
    return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
    }

    return active, metrics.JobSyncActionTracking, nil
    }