Kube-controller-manager(ReplicaSet)
Kube-controller-manager(ReplicaSet)
基于1.25
ReplicaSet控制器是ReplicaSet资源对象的控制器,通过Informer资源监听ReplicaSet和Pod对象,监听到Add、Update、Delete时,ReplicaSet控制器会对ReplicaSet资源对象进行Reconcile
ReplicaSet资源对象主要包含Pod模版(Spec.Template)和副本数(Spec.Replicas)
控制器初始化
ReplicaSet控制器初始化的时候,会创建工作队列以存储ReplicaSet资源对象的Key。
- ReplicaSet资源对象:监听Add、Update、Delete事件,把监听到的ReplicaSet资源对象的Key加入到工作队列,等待Worker协程消费
- Pod资源对象:监听Add、Update、Delete事件,把监听到的Pod资源对象的Key加入到工作队列,等待Worker协程消费
- Pod资源对象的OwnerReference记录了父级资源的引用
- 当触发了Pod的Add或Delete事件,通过OwnerReference找到关联的ReplicaSet资源对象并将Key加入工作队列,等待Worker消费
- 当触发Pod的Update事件,检查OwnerReference是否更新,如果有,把更新后的OwnerReference引用的新旧RS资源对象的Key加入到工作队列,等待协程消费
主要执行逻辑
ReplicaSet控制器目标是确保Pod数量和期望数量一致
rsc.rsLister.ReplicaSets
获取RS资源对象,RS控制器通过RSInformer的Lister接口的Get获取完整的RS资源对象
rsc.expectations.SatisfiedExpectations
判断上一次调谐是否完整
- RS控制器通过Expectation机制来判断上一次调谐需要创建或删除的Pod是否已经被kube-apiserver
- 如果SatisfiedExpectations返回false,说明上一次调谐过程发出的创建或者删除Pod请求还未被kube-apiserver处理完成
controller.FilterActivePods
获取活跃Pod
RS控制器通过Pod Informer的Lister接口List方法获取当前的命名空间下所有的Pod,然后过滤出活跃的Pod
活跃的Pod:Pod.Status.Phase未处于Succeeded或Failed,并且未处于删除中的Pod
筛选活跃的Pod的是func
-
func IsPodActive(p *v1.Pod) bool {
return v1.PodSucceeded != p.Status.Phase &&
v1.PodFailed != p.Status.Phase &&
p.DeletionTimestamp == nil
}
-
最后筛选出当前RS资源对象关联的活跃的Pod
rsNeedsSync && rs.DeletionTimestamp == nil
判断是否执行Reconcile
- 如果RS的上一次调谐完成,并且资源对象没有在删除中,那就继续下一次
rsc.manageReplicas
执行Reconcile
- RS控制器对比活跃Pod数量和期望Pod数量
- 如果活跃Pod数量比期望Pod数量少,就通过慢启动的方式增加活跃Pod
- 如果活跃Pod数量比期望多,那就进行Pod排序后,删除超出期望的Pod
calcuateStatus
计算并更新RS的状态
rsc.queue.AddAfer
重新加入工作队列
- 如果Status.ReadyReplicas和Status.AvailableReplicas不一致,并且MinReadySecond大于0,则可能有一些Pod等待MinReadySecond之后,才能从Ready到Available
慢启动创建Pod
当活跃Pod数量少于期望Pod数量,RS控制器需要创建Pod,由于性能问题出现Pod连续创建失败的情况
RS控制器分多批次创建Pod,从小批次开始,逐渐扩大批次,避免大量报错这叫做慢启动
慢启动过程中,RS控制器每次启动bacthSize个gouroutine并发创建Pod,当一个批次的Pod全部启动成功后,RS控制器重新计算batchSize,其值少前面一批大小的俩杯直到Pod全部创建。如果某个批次出现错误,则RS控制器会在该批次执行之后报错,结束调谐,等待后续RS控制器重试
-
// slowStartBatch tries to call the provided function a total of 'count' times,
// starting slow to check for errors, then speeding up if calls succeed.
//
// It groups the calls into batches, starting with a group of initialBatchSize.
// Within each batch, it may call the function multiple times concurrently.
//
// If a whole batch succeeds, the next batch may get exponentially larger.
// If there are any failures in a batch, all remaining batches are skipped
// after waiting for the current batch to complete.
//
// It returns the number of successful calls to the function.
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
remaining := count
successes := 0
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
if err := fn(); err != nil {
errCh <- err
}
}()
}
wg.Wait()
curSuccesses := batchSize - len(errCh)
successes += curSuccesses
if len(errCh) > 0 {
return successes, <-errCh
}
remaining -= batchSize
}
return successes, nil
}
排序并删除多余的Pod
当实际活跃的Pod数量多于期望Pod数量,RS控制器会删除一些Pod
RS控制器对现有的活跃的Pod进行排序,按照排序删除超出期望Pod数量
Pod按照以下规则,逐个应用每个规则排序,实现函数为ActivePodsWithRanks.Less
- 根据Pod是否被调度:如果只有一个Pod被调度,则没有调度的拍到调度前面
- 根据Stats.Phase: Pending的Pod排到Unknown的Pod之前,Unknown的Pod排在RunningPod之前
- 根据Ready.Condition:尚未就绪的Pod排在就绪的Pod之前
- 根据宿主节点的同类Pod数量:宿主节点有多个相同的Pod存在排在Pod在节点上有唯一Pod之前
- 根据Ready的时间:就绪事件短的排在之前
- 根据重启次数:重启次数多的排在h之前
- 根据运行时间:创建晚的排在之前
- 不正常的Pod先被删除,正常运行稳定运行时间长的Pod被保留
如果同时大量删除Pod,控制平面压力大,所以设置了每次RS控制器调谐Pod的最大数量
-
const (
// Realistic value of the burstReplica field for the replica set manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// The number of times we retry updating a ReplicaSet's status.
statusUpdateRetries = 1
// controllerUIDIndex is the name for the ReplicaSet store's index function,
// which is to index by ReplicaSet's controllerUID.
controllerUIDIndex = "controllerUID"
)
Expectation机制
在RS控制器中使用Expectation机制来记录当次的Reconcile调谐过程中需要创建或删除的Pod数量,都会生成一个对应的Expectation对象
-
// ControlleeExpectations track controllee creates/deletes.
type ControlleeExpectations struct {
// Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
// 创建Pod数量
add int64
// 删除Pod数量
del int64
// Expectation缓存对应的正在调谐的资源对象的Key
key string
// Expectation对象被设置的时间
timestamp time.Time
}
RS控制器主要使用了Expectation的以下接口
-
// ControllerExpectationsInterface is an interface that allows users to set and wait on expectations.
// Only abstracted out for testing.
// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different
// types of controllers, because the keys might conflict across types.
type ControllerExpectationsInterface interface {
GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
// 判断Expectation缓存是否已经被满足,在满足以下条件之一返回true
// 1.add字段和del字段均为0 (创建或者删除请求全部被执行)
// 2.当前时间距离timestamp已经超过了5min(使用超时机制防止卡住)
// 3.未找到该资源对象的Key对应的Expectation缓存
SatisfiedExpectations(controllerKey string) bool
// 删除Expectation资源对象的Key对应的Expectation缓存
DeleteExpectations(controllerKey string)
SetExpectations(controllerKey string, add, del int) error
// 设置Expectation缓存记录的希望创建Pod(add字段),并且设置timestamp
ExpectCreations(controllerKey string, adds int) error
// 设置Expectation缓存记录的希望删除Pod(del字段),并且设置timestamp
ExpectDeletions(controllerKey string, dels int) error
// 把Expectation缓存记录的希望创建的Pod数量(add字段)减1
CreationObserved(controllerKey string)
// 把Expectation缓存记录的希望删除的Pod数量(del字段)减1
DeletionObserved(controllerKey string)
RaiseExpectations(controllerKey string, add, del int)
LowerExpectations(controllerKey string, add, del int)
}
Expectation的设置与删除
managerReplicas func负责RS控制调谐Pod数量
-
// manageReplicas checks and updates replicas for the given ReplicaSet.
// Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods.
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
if diff < 0 {
diff *= -1
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
rsc.expectations.ExpectCreations(rsKey, diff)
klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// if the namespace is being terminated, we don't have to do
// anything because any creation will fail
return nil
}
}
return err
})
// Any skipped pods that we never attempted to start shouldn't be expected.
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(rsKey)
}
}
return err
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
klog.V(2).InfoS("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)
relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
utilruntime.HandleError(err)
// Choose which Pods to delete, preferring those in earlier phases of startup.
podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rs change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
errCh := make(chan error, diff)
var wg sync.WaitGroup
wg.Add(diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
defer wg.Done()
if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod)
rsc.expectations.DeletionObserved(rsKey, podKey)
if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
errCh <- err
}
}
}(pod)
}
wg.Wait()
select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default:
}
}
return nil
}
Expectation数值调整
RS控制器调用kube-apiserver创建或删除Pod,在kube-apiserver作出响应之后,会调用CreationObserved或者DeletionObserved 调整Expectation 中的add或者del字段
RS创建Pod之后的回调
-
// When a pod is created, enqueue the replica set that manages it and update its expectations.
func (rsc *ReplicaSetController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
rsc.deletePod(pod)
return
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
rsc.expectations.CreationObserved(rsKey)
rsc.queue.Add(rsKey)
return
}
// Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
rss := rsc.getPodReplicaSets(pod)
if len(rss) == 0 {
return
}
klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
for _, rs := range rss {
rsc.enqueueRS(rs)
}
}
创建Pod请求报错之后,说了kube-apiserver现式报错了未能成功创建Pod
-
// manageReplicas checks and updates replicas for the given ReplicaSet.
// Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods.
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
if diff < 0 {
diff *= -1
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
rsc.expectations.ExpectCreations(rsKey, diff)
klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// if the namespace is being terminated, we don't have to do
// anything because any creation will fail
return nil
}
}
return err
})
// Any skipped pods that we never attempted to start shouldn't be expected.
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(rsKey)
}
}
return err
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
klog.V(2).InfoS("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)
relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
utilruntime.HandleError(err)
// Choose which Pods to delete, preferring those in earlier phases of startup.
podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rs change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
errCh := make(chan error, diff)
var wg sync.WaitGroup
wg.Add(diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
defer wg.Done()
if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod)
rsc.expectations.DeletionObserved(rsKey, podKey)
if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
errCh <- err
}
}
}(pod)
}
wg.Wait()
select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default:
}
}
return nil
}
Expectation的满足判定
RS控制器主要是Reconcile在syncReplicaSetfunc中完成
如果在前一次调谐过程中,还存在Pod创建或者删除请求未被kube-apiserver处理完成,即监听器Pod Informer还没有监听到Pod的Add或者Delete事件,Expectation中add字段、del字段不会为0,syncReplicaSet不会反悔true,阻止新的Reconcile产生
-
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))
return nil
}