Kube-controller-manager(CronJob)

基于1.25

CronJob Controller 操作控制CronJob

CronJob资源主要包含Job模版(Spec.JobTemplate)和计划表(Spec.Schedule)

  • CronJob 控制器通过资源对象的计划配置,到达预定时间后,创建新的Job对象,并且清理过去成功或者失败的Job资源对象
  • Spec.Schedule字段定义和Linux终端crontab一致

控制器初始化

  • Job资源对象
  • CronJob资源对象

主要执行逻辑

  1. jm.cronJobLister.CronJobs

获取CronJob对象

  1. getJobsToBeReconciled

获取关联的Job对象

  1. syncCronJob

计算Job的启动时间并启动新的Job

  • 获取当前的CronJob对象关联的Job资源对象,更新Status.Active字段,从中去掉已经执行结束的Job,加入正在执行的Job,确保Status.Active都是正在执行的Job
  • 判断CronJob资源对象的状态。如果是删除中,或者其Spec.Suspend属性为true,不再执行后续
  • 根据CronJob创建时间,上一次的调度Job的时间、Schedule时间间隔和StartingDeadlineSeconds计算当前时间最近一次应该启动Job的时间,该时间作为后续Job的启动时间
  • 根据CronJob资源的Spec.ConcurrencyPolicy 决定是否删除旧的正在运行的Job并创建和运行新的Job
  • 清理超出限制数量成功或者失败的Job,并且计算下一次启动新Job的时间,计算下一次启动新的Job事件和当前的时间间隔。该时间间隔用于重新把CronJob资源延迟加入工作时间队列
  1. cleanupFinishedJobs

    清理多余的Job。

    • CronJon设置了Spec.FailedJobsHistoryLimit或Spec.SuccessfulJobsHistoryLimit字段,CronJob Controller仅仅保留该属性设置的成功或者失败的Job数量,而创建时间更早成功或者失败的Job被删除
  2. jm.cronJobControl.UpdateStatus

    更新CronJob的状态

  3. requeueAfter

    根据延迟重新把CronJob的Key加入工作队列

计算Job的启动时间

根据代码,可能产生三种情况:

  1. 启动新Job时间还没到

    当getNextScheduleTime func ==nil

    CronJob Controller通过getNextScheduleTimeDuration 获取当前时间和下一次应当启动Job到时间和时间间隔返回。CronJob Controller通过时间间隔重新把Key延迟放进工作队列

  2. 启动新的Job时间到了,但是超过了启动新的Job的截止时间

    错过了创建新的Job的截止时间

  3. 启动新Job时间到了,且没有超过启动新Job时间

    CronJob Controller开始按照Spec.ConcurrentPolicy启动新的Job

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/cronjob/cronjob_controllerv2.go#L412

    // syncCronJob reconciles a CronJob with a list of any Jobs that it created.
    // All known jobs created by "cronJob" should be included in "jobs".
    // The current time is passed in to facilitate testing.
    // It returns a bool to indicate an update to api-server is needed
    func (jm *ControllerV2) syncCronJob(
    ctx context.Context,
    cronJob *batchv1.CronJob,
    jobs []*batchv1.Job) (*time.Duration, bool, error) {

    now := jm.now()
    timeZoneEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CronJobTimeZone)
    updateStatus := false

    childrenJobs := make(map[types.UID]bool)
    for _, j := range jobs {
    childrenJobs[j.ObjectMeta.UID] = true
    found := inActiveList(*cronJob, j.ObjectMeta.UID)
    if !found && !IsJobFinished(j) {
    cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)
    if err != nil {
    return nil, updateStatus, err
    }
    if inActiveList(*cjCopy, j.ObjectMeta.UID) {
    cronJob = cjCopy
    continue
    }
    jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
    // We found an unfinished job that has us as the parent, but it is not in our Active list.
    // This could happen if we crashed right after creating the Job and before updating the status,
    // or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
    // a job that they wanted us to adopt.
    } else if found && IsJobFinished(j) {
    _, status := getFinishedStatus(j)
    deleteFromActiveList(cronJob, j.ObjectMeta.UID)
    jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
    updateStatus = true
    } else if IsJobFinished(j) {
    // a job does not have to be in active list, as long as it is finished, we will process the timestamp
    if cronJob.Status.LastSuccessfulTime == nil {
    cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
    updateStatus = true
    }
    if j.Status.CompletionTime != nil && j.Status.CompletionTime.After(cronJob.Status.LastSuccessfulTime.Time) {
    cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
    updateStatus = true
    }
    }
    }

    // Remove any job reference from the active list if the corresponding job does not exist any more.
    // Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
    // job running.
    for _, j := range cronJob.Status.Active {
    _, found := childrenJobs[j.UID]
    if found {
    continue
    }
    // Explicitly try to get the job from api-server to avoid a slow watch not able to update
    // the job lister on time, giving an unwanted miss
    _, err := jm.jobControl.GetJob(j.Namespace, j.Name)
    switch {
    case errors.IsNotFound(err):
    // The job is actually missing, delete from active list and schedule a new one if within
    // deadline
    jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
    deleteFromActiveList(cronJob, j.UID)
    updateStatus = true
    case err != nil:
    return nil, updateStatus, err
    }
    // the job is missing in the lister but found in api-server
    }

    if cronJob.DeletionTimestamp != nil {
    // The CronJob is being deleted.
    // Don't do anything other than updating status.
    return nil, updateStatus, nil
    }

    if timeZoneEnabled && cronJob.Spec.TimeZone != nil {
    if _, err := time.LoadLocation(*cronJob.Spec.TimeZone); err != nil {
    timeZone := pointer.StringDeref(cronJob.Spec.TimeZone, "")
    klog.V(4).InfoS("Not starting job because timeZone is invalid", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "timeZone", timeZone, "err", err)
    jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err)
    return nil, updateStatus, nil
    }
    }

    if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
    klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
    return nil, updateStatus, nil
    }

    sched, err := cron.ParseStandard(formatSchedule(timeZoneEnabled, cronJob, jm.recorder))
    if err != nil {
    // this is likely a user error in defining the spec value
    // we should log the error and not reconcile this cronjob until an update to spec
    klog.V(2).InfoS("Unparseable schedule", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", cronJob.Spec.Schedule, "err", err)
    jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err)
    return nil, updateStatus, nil
    }

    scheduledTime, err := getNextScheduleTime(*cronJob, now, sched, jm.recorder)
    if err != nil {
    // this is likely a user error in defining the spec value
    // we should log the error and not reconcile this cronjob until an update to spec
    klog.V(2).InfoS("invalid schedule", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", cronJob.Spec.Schedule, "err", err)
    jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err)
    return nil, updateStatus, nil
    }
    if scheduledTime == nil {
    // no unmet start time, return cj,.
    // The only time this should happen is if queue is filled after restart.
    // Otherwise, the queue is always suppose to trigger sync function at the time of
    // the scheduled time, that will give atleast 1 unmet time schedule
    klog.V(4).InfoS("No unmet start times", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
    t := nextScheduledTimeDuration(*cronJob, sched, now)
    return t, updateStatus, nil
    }

    tooLate := false
    if cronJob.Spec.StartingDeadlineSeconds != nil {
    tooLate = scheduledTime.Add(time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)).Before(now)
    }
    if tooLate {
    klog.V(4).InfoS("Missed starting window", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
    jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))

    // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
    // the miss every cycle. In order to avoid sending multiple events, and to avoid processing
    // the cj again and again, we could set a Status.LastMissedTime when we notice a miss.
    // Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
    // Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
    // and event the next time we process it, and also so the user looking at the status
    // can see easily that there was a missed execution.
    t := nextScheduledTimeDuration(*cronJob, sched, now)
    return t, updateStatus, nil
    }
    if isJobInActiveList(&batchv1.Job{
    ObjectMeta: metav1.ObjectMeta{
    Name: getJobName(cronJob, *scheduledTime),
    Namespace: cronJob.Namespace,
    }}, cronJob.Status.Active) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
    klog.V(4).InfoS("Not starting job because the scheduled time is already processed", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", scheduledTime)
    t := nextScheduledTimeDuration(*cronJob, sched, now)
    return t, updateStatus, nil
    }
    if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
    // Regardless which source of information we use for the set of active jobs,
    // there is some risk that we won't see an active job when there is one.
    // (because we haven't seen the status update to the SJ or the created pod).
    // So it is theoretically possible to have concurrency with Forbid.
    // As long the as the invocations are "far enough apart in time", this usually won't happen.
    //
    // TODO: for Forbid, we could use the same name for every execution, as a lock.
    // With replace, we could use a name that is deterministic per execution time.
    // But that would mean that you could not inspect prior successes or failures of Forbid jobs.
    klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
    jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
    t := nextScheduledTimeDuration(*cronJob, sched, now)
    return t, updateStatus, nil
    }
    if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
    for _, j := range cronJob.Status.Active {
    klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))

    job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
    if err != nil {
    jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
    return nil, updateStatus, err
    }
    if !deleteJob(cronJob, job, jm.jobControl, jm.recorder) {
    return nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
    }
    updateStatus = true
    }
    }

    jobAlreadyExists := false
    jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
    if err != nil {
    klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
    return nil, updateStatus, err
    }
    jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)
    switch {
    case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
    // if the namespace is being terminated, we don't have to do
    // anything because any creation will fail
    return nil, updateStatus, err
    case errors.IsAlreadyExists(err):
    // If the job is created by other actor, assume it has updated the cronjob status accordingly.
    // However, if the job was created by cronjob controller, this means we've previously created the job
    // but failed to update the active list in the status, in which case we should reattempt to add the job
    // into the active list and update the status.
    jobAlreadyExists = true
    job, err := jm.jobControl.GetJob(jobReq.GetNamespace(), jobReq.GetName())
    if err != nil {
    return nil, updateStatus, err
    }
    jobResp = job

    // check that this job is owned by cronjob controller, otherwise do nothing and assume external controller
    // is updating the status.
    if !metav1.IsControlledBy(job, cronJob) {
    return nil, updateStatus, nil
    }

    // Recheck if the job is missing from the active list before attempting to update the status again.
    found := inActiveList(*cronJob, job.ObjectMeta.UID)
    if found {
    return nil, updateStatus, nil
    }
    case err != nil:
    // default error handling
    jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
    return nil, updateStatus, err
    }

    if jobAlreadyExists {
    klog.InfoS("Job already exists", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName()))
    } else {
    metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
    klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
    jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
    }

    // ------------------------------------------------------------------ //

    // If this process restarts at this point (after posting a job, but
    // before updating the status), then we might try to start the job on
    // the next time. Actually, if we re-list the SJs and Jobs on the next
    // iteration of syncAll, we might not see our own status update, and
    // then post one again. So, we need to use the job name as a lock to
    // prevent us from making the job twice (name the job with hash of its
    // scheduled time).

    // Add the just-started job to the status list.
    jobRef, err := getRef(jobResp)
    if err != nil {
    klog.V(2).InfoS("Unable to make object reference", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err)
    return nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
    }
    cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
    cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
    updateStatus = true

    t := nextScheduledTimeDuration(*cronJob, sched, now)
    return t, updateStatus, nil
    }

// TODO:一些时间计算的细节

Job并行策略

CronJob Controller 启动新的Job,需要处理之前启动但还没执行结束的Job

Spec.ConcurrencyPolcy支持了3种模式

  • Forbid:继续维持旧的执行中的Job,跳过此轮Job创建

  • Replace:先删除旧的并且创建 新的Job

  • Allow:无视旧的在执行的Job,直接创建新的Job

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/cronjob/cronjob_controllerv2.go#L555

    if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
    // Regardless which source of information we use for the set of active jobs,
    // there is some risk that we won't see an active job when there is one.
    // (because we haven't seen the status update to the SJ or the created pod).
    // So it is theoretically possible to have concurrency with Forbid.
    // As long the as the invocations are "far enough apart in time", this usually won't happen.
    //
    // TODO: for Forbid, we could use the same name for every execution, as a lock.
    // With replace, we could use a name that is deterministic per execution time.
    // But that would mean that you could not inspect prior successes or failures of Forbid jobs.
    klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
    jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
    t := nextScheduledTimeDuration(*cronJob, sched, now)
    return t, updateStatus, nil
    }
    if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
    for _, j := range cronJob.Status.Active {
    klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))

    job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
    if err != nil {
    jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
    return nil, updateStatus, err
    }
    if !deleteJob(cronJob, job, jm.jobControl, jm.recorder) {
    return nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
    }
    updateStatus = true
    }
    }