Kube-controller-manager(ReplicaSet)

基于1.25

ReplicaSet控制器是ReplicaSet资源对象的控制器,通过Informer资源监听ReplicaSet和Pod对象,监听到Add、Update、Delete时,ReplicaSet控制器会对ReplicaSet资源对象进行Reconcile

ReplicaSet资源对象主要包含Pod模版(Spec.Template)和副本数(Spec.Replicas)

控制器初始化

ReplicaSet控制器初始化的时候,会创建工作队列以存储ReplicaSet资源对象的Key。

  • ReplicaSet资源对象:监听Add、Update、Delete事件,把监听到的ReplicaSet资源对象的Key加入到工作队列,等待Worker协程消费
  • Pod资源对象:监听Add、Update、Delete事件,把监听到的Pod资源对象的Key加入到工作队列,等待Worker协程消费
    • Pod资源对象的OwnerReference记录了父级资源的引用
    • 当触发了Pod的Add或Delete事件,通过OwnerReference找到关联的ReplicaSet资源对象并将Key加入工作队列,等待Worker消费
    • 当触发Pod的Update事件,检查OwnerReference是否更新,如果有,把更新后的OwnerReference引用的新旧RS资源对象的Key加入到工作队列,等待协程消费

主要执行逻辑

ReplicaSet控制器目标是确保Pod数量和期望数量一致

rsc.rsLister.ReplicaSets

获取RS资源对象,RS控制器通过RSInformer的Lister接口的Get获取完整的RS资源对象

rsc.expectations.SatisfiedExpectations

判断上一次调谐是否完整

  • RS控制器通过Expectation机制来判断上一次调谐需要创建或删除的Pod是否已经被kube-apiserver
  • 如果SatisfiedExpectations返回false,说明上一次调谐过程发出的创建或者删除Pod请求还未被kube-apiserver处理完成

controller.FilterActivePods

获取活跃Pod

rsNeedsSync && rs.DeletionTimestamp == nil

判断是否执行Reconcile

  • 如果RS的上一次调谐完成,并且资源对象没有在删除中,那就继续下一次

rsc.manageReplicas

执行Reconcile

  • RS控制器对比活跃Pod数量和期望Pod数量
  • 如果活跃Pod数量比期望Pod数量少,就通过慢启动的方式增加活跃Pod
  • 如果活跃Pod数量比期望多,那就进行Pod排序后,删除超出期望的Pod

calcuateStatus

计算并更新RS的状态

rsc.queue.AddAfer

重新加入工作队列

  • 如果Status.ReadyReplicas和Status.AvailableReplicas不一致,并且MinReadySecond大于0,则可能有一些Pod等待MinReadySecond之后,才能从Ready到Available

慢启动创建Pod

当活跃Pod数量少于期望Pod数量,RS控制器需要创建Pod,由于性能问题出现Pod连续创建失败的情况

  • RS控制器分多批次创建Pod,从小批次开始,逐渐扩大批次,避免大量报错这叫做慢启动

  • 慢启动过程中,RS控制器每次启动bacthSize个gouroutine并发创建Pod,当一个批次的Pod全部启动成功后,RS控制器重新计算batchSize,其值少前面一批大小的俩杯直到Pod全部创建。如果某个批次出现错误,则RS控制器会在该批次执行之后报错,结束调谐,等待后续RS控制器重试

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/replicaset/replica_set.go#L759

    // slowStartBatch tries to call the provided function a total of 'count' times,
    // starting slow to check for errors, then speeding up if calls succeed.
    //
    // It groups the calls into batches, starting with a group of initialBatchSize.
    // Within each batch, it may call the function multiple times concurrently.
    //
    // If a whole batch succeeds, the next batch may get exponentially larger.
    // If there are any failures in a batch, all remaining batches are skipped
    // after waiting for the current batch to complete.
    //
    // It returns the number of successful calls to the function.
    func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
    remaining := count
    successes := 0
    for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
    errCh := make(chan error, batchSize)
    var wg sync.WaitGroup
    wg.Add(batchSize)
    for i := 0; i < batchSize; i++ {
    go func() {
    defer wg.Done()
    if err := fn(); err != nil {
    errCh <- err
    }
    }()
    }
    wg.Wait()
    curSuccesses := batchSize - len(errCh)
    successes += curSuccesses
    if len(errCh) > 0 {
    return successes, <-errCh
    }
    remaining -= batchSize
    }
    return successes, nil
    }

排序并删除多余的Pod

当实际活跃的Pod数量多于期望Pod数量,RS控制器会删除一些Pod

  • RS控制器对现有的活跃的Pod进行排序,按照排序删除超出期望Pod数量

  • Pod按照以下规则,逐个应用每个规则排序,实现函数为ActivePodsWithRanks.Less

    1. 根据Pod是否被调度:如果只有一个Pod被调度,则没有调度的拍到调度前面
    2. 根据Stats.Phase: Pending的Pod排到Unknown的Pod之前,Unknown的Pod排在RunningPod之前
    3. 根据Ready.Condition:尚未就绪的Pod排在就绪的Pod之前
    4. 根据宿主节点的同类Pod数量:宿主节点有多个相同的Pod存在排在Pod在节点上有唯一Pod之前
    5. 根据Ready的时间:就绪事件短的排在之前
    6. 根据重启次数:重启次数多的排在h之前
    7. 根据运行时间:创建晚的排在之前
    • 不正常的Pod先被删除,正常运行稳定运行时间长的Pod被保留
  • 如果同时大量删除Pod,控制平面压力大,所以设置了每次RS控制器调谐Pod的最大数量

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/replicaset/replica_set.go#L70

    const (
    // Realistic value of the burstReplica field for the replica set manager based off
    // performance requirements for kubernetes 1.0.
    BurstReplicas = 500

    // The number of times we retry updating a ReplicaSet's status.
    statusUpdateRetries = 1

    // controllerUIDIndex is the name for the ReplicaSet store's index function,
    // which is to index by ReplicaSet's controllerUID.
    controllerUIDIndex = "controllerUID"
    )

Expectation机制

在RS控制器中使用Expectation机制来记录当次的Reconcile调谐过程中需要创建或删除的Pod数量,都会生成一个对应的Expectation对象

RS控制器主要使用了Expectation的以下接口

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/controller_utils.go#L147

    // ControllerExpectationsInterface is an interface that allows users to set and wait on expectations.
    // Only abstracted out for testing.
    // Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different
    // types of controllers, because the keys might conflict across types.
    type ControllerExpectationsInterface interface {
    GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
    // 判断Expectation缓存是否已经被满足,在满足以下条件之一返回true
    // 1.add字段和del字段均为0 (创建或者删除请求全部被执行)
    // 2.当前时间距离timestamp已经超过了5min(使用超时机制防止卡住)
    // 3.未找到该资源对象的Key对应的Expectation缓存
    SatisfiedExpectations(controllerKey string) bool
    // 删除Expectation资源对象的Key对应的Expectation缓存
    DeleteExpectations(controllerKey string)
    SetExpectations(controllerKey string, add, del int) error
    // 设置Expectation缓存记录的希望创建Pod(add字段),并且设置timestamp
    ExpectCreations(controllerKey string, adds int) error
    // 设置Expectation缓存记录的希望删除Pod(del字段),并且设置timestamp
    ExpectDeletions(controllerKey string, dels int) error
    // 把Expectation缓存记录的希望创建的Pod数量(add字段)减1
    CreationObserved(controllerKey string)
    // 把Expectation缓存记录的希望删除的Pod数量(del字段)减1
    DeletionObserved(controllerKey string)
    RaiseExpectations(controllerKey string, add, del int)
    LowerExpectations(controllerKey string, add, del int)
    }

Expectation的设置与删除

managerReplicas func负责RS控制调谐Pod数量

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/replicaset/replica_set.go#L559

    // manageReplicas checks and updates replicas for the given ReplicaSet.
    // Does NOT modify <filteredPods>.
    // It will requeue the replica set in case of an error while creating/deleting pods.
    func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
    return nil
    }
    if diff < 0 {
    diff *= -1
    if diff > rsc.burstReplicas {
    diff = rsc.burstReplicas
    }
    // TODO: Track UIDs of creates just like deletes. The problem currently
    // is we'd need to wait on the result of a create to record the pod's
    // UID, which would require locking *across* the create, which will turn
    // into a performance bottleneck. We should generate a UID for the pod
    // beforehand and store it via ExpectCreations.
    rsc.expectations.ExpectCreations(rsKey, diff)
    klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
    // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
    // and double with each successful iteration in a kind of "slow start".
    // This handles attempts to start large numbers of pods that would
    // likely all fail with the same error. For example a project with a
    // low quota that attempts to create a large number of pods will be
    // prevented from spamming the API service with the pod create requests
    // after one of its pods fails. Conveniently, this also prevents the
    // event spam that those failures would generate.
    successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
    err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
    if err != nil {
    if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
    // if the namespace is being terminated, we don't have to do
    // anything because any creation will fail
    return nil
    }
    }
    return err
    })

    // Any skipped pods that we never attempted to start shouldn't be expected.
    // The skipped pods will be retried later. The next controller resync will
    // retry the slow start process.
    if skippedPods := diff - successfulCreations; skippedPods > 0 {
    klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
    for i := 0; i < skippedPods; i++ {
    // Decrement the expected number of creates because the informer won't observe this pod
    rsc.expectations.CreationObserved(rsKey)
    }
    }
    return err
    } else if diff > 0 {
    if diff > rsc.burstReplicas {
    diff = rsc.burstReplicas
    }
    klog.V(2).InfoS("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)

    relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
    utilruntime.HandleError(err)

    // Choose which Pods to delete, preferring those in earlier phases of startup.
    podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)

    // Snapshot the UIDs (ns/name) of the pods we're expecting to see
    // deleted, so we know to record their expectations exactly once either
    // when we see it as an update of the deletion timestamp, or as a delete.
    // Note that if the labels on a pod/rs change in a way that the pod gets
    // orphaned, the rs will only wake up after the expectations have
    // expired even if other pods are deleted.
    rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

    errCh := make(chan error, diff)
    var wg sync.WaitGroup
    wg.Add(diff)
    for _, pod := range podsToDelete {
    go func(targetPod *v1.Pod) {
    defer wg.Done()
    if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
    // Decrement the expected number of deletes because the informer won't observe this deletion
    podKey := controller.PodKey(targetPod)
    rsc.expectations.DeletionObserved(rsKey, podKey)
    if !apierrors.IsNotFound(err) {
    klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
    errCh <- err
    }
    }
    }(pod)
    }
    wg.Wait()

    select {
    case err := <-errCh:
    // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
    if err != nil {
    return err
    }
    default:
    }
    }

    return nil
    }

Expectation数值调整

RS控制器调用kube-apiserver创建或删除Pod,在kube-apiserver作出响应之后,会调用CreationObserved或者DeletionObserved 调整Expectation 中的add或者del字段

RS创建Pod之后的回调

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/replicaset/replica_set.go#L374

    // When a pod is created, enqueue the replica set that manages it and update its expectations.
    func (rsc *ReplicaSetController) addPod(obj interface{}) {
    pod := obj.(*v1.Pod)

    if pod.DeletionTimestamp != nil {
    // on a restart of the controller manager, it's possible a new pod shows up in a state that
    // is already pending deletion. Prevent the pod from being a creation observation.
    rsc.deletePod(pod)
    return
    }

    // If it has a ControllerRef, that's all that matters.
    if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
    rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
    if rs == nil {
    return
    }
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
    return
    }
    klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
    rsc.expectations.CreationObserved(rsKey)
    rsc.queue.Add(rsKey)
    return
    }

    // Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync
    // them to see if anyone wants to adopt it.
    // DO NOT observe creation because no controller should be waiting for an
    // orphan.
    rss := rsc.getPodReplicaSets(pod)
    if len(rss) == 0 {
    return
    }
    klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
    for _, rs := range rss {
    rsc.enqueueRS(rs)
    }
    }

创建Pod请求报错之后,说了kube-apiserver现式报错了未能成功创建Pod

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/replicaset/replica_set.go#L559

    // manageReplicas checks and updates replicas for the given ReplicaSet.
    // Does NOT modify <filteredPods>.
    // It will requeue the replica set in case of an error while creating/deleting pods.
    func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
    return nil
    }
    if diff < 0 {
    diff *= -1
    if diff > rsc.burstReplicas {
    diff = rsc.burstReplicas
    }
    // TODO: Track UIDs of creates just like deletes. The problem currently
    // is we'd need to wait on the result of a create to record the pod's
    // UID, which would require locking *across* the create, which will turn
    // into a performance bottleneck. We should generate a UID for the pod
    // beforehand and store it via ExpectCreations.
    rsc.expectations.ExpectCreations(rsKey, diff)
    klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
    // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
    // and double with each successful iteration in a kind of "slow start".
    // This handles attempts to start large numbers of pods that would
    // likely all fail with the same error. For example a project with a
    // low quota that attempts to create a large number of pods will be
    // prevented from spamming the API service with the pod create requests
    // after one of its pods fails. Conveniently, this also prevents the
    // event spam that those failures would generate.
    successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
    err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
    if err != nil {
    if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
    // if the namespace is being terminated, we don't have to do
    // anything because any creation will fail
    return nil
    }
    }
    return err
    })

    // Any skipped pods that we never attempted to start shouldn't be expected.
    // The skipped pods will be retried later. The next controller resync will
    // retry the slow start process.
    if skippedPods := diff - successfulCreations; skippedPods > 0 {
    klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
    for i := 0; i < skippedPods; i++ {
    // Decrement the expected number of creates because the informer won't observe this pod
    rsc.expectations.CreationObserved(rsKey)
    }
    }
    return err
    } else if diff > 0 {
    if diff > rsc.burstReplicas {
    diff = rsc.burstReplicas
    }
    klog.V(2).InfoS("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)

    relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
    utilruntime.HandleError(err)

    // Choose which Pods to delete, preferring those in earlier phases of startup.
    podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)

    // Snapshot the UIDs (ns/name) of the pods we're expecting to see
    // deleted, so we know to record their expectations exactly once either
    // when we see it as an update of the deletion timestamp, or as a delete.
    // Note that if the labels on a pod/rs change in a way that the pod gets
    // orphaned, the rs will only wake up after the expectations have
    // expired even if other pods are deleted.
    rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

    errCh := make(chan error, diff)
    var wg sync.WaitGroup
    wg.Add(diff)
    for _, pod := range podsToDelete {
    go func(targetPod *v1.Pod) {
    defer wg.Done()
    if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
    // Decrement the expected number of deletes because the informer won't observe this deletion
    podKey := controller.PodKey(targetPod)
    rsc.expectations.DeletionObserved(rsKey, podKey)
    if !apierrors.IsNotFound(err) {
    klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
    errCh <- err
    }
    }
    }(pod)
    }
    wg.Wait()

    select {
    case err := <-errCh:
    // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
    if err != nil {
    return err
    }
    default:
    }
    }

    return nil
    }

Expectation的满足判定

RS控制器主要是Reconcile在syncReplicaSetfunc中完成

  • 如果在前一次调谐过程中,还存在Pod创建或者删除请求未被kube-apiserver处理完成,即监听器Pod Informer还没有监听到Pod的Add或者Delete事件,Expectation中add字段、del字段不会为0,syncReplicaSet不会反悔true,阻止新的Reconcile产生

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/replicaset/replica_set.go#L684

    rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    if err != nil {
    utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))
    return nil
    }