func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) return nil } if diff < 0 { diff *= -1 if diff > rsc.burstReplicas { diff = rsc.burstReplicas } rsc.expectations.ExpectCreations(rsKey, diff) klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff) successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error { err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) if err != nil { if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { return nil } } return err })
if skippedPods := diff - successfulCreations; skippedPods > 0 { klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name) for i := 0; i < skippedPods; i++ { rsc.expectations.CreationObserved(rsKey) } } return err } else if diff > 0 { if diff > rsc.burstReplicas { diff = rsc.burstReplicas } klog.V(2).InfoS("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)
relatedPods, err := rsc.getIndirectlyRelatedPods(rs) utilruntime.HandleError(err)
podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
errCh := make(chan error, diff) var wg sync.WaitGroup wg.Add(diff) for _, pod := range podsToDelete { go func(targetPod *v1.Pod) { defer wg.Done() if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil { podKey := controller.PodKey(targetPod) rsc.expectations.DeletionObserved(rsKey, podKey) if !apierrors.IsNotFound(err) { klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name) errCh <- err } } }(pod) } wg.Wait()
select { case err := <-errCh: if err != nil { return err } default: } }
return nil }
|