Kube-scheduler(优先级和抢占式调度)

基于1.25

K8s支持Pod优先级设置

  • 抢占:把优先级低的Pod驱逐,高优先级的Pod先运行

Pod优先级

pod优先级通过spec.priority字段定义,该字段为int32整数指针,数字越大,标识Pod优先级等级越大

  • 通过PriorityClass可以为某个优先级定一个名称
  • 通过spec.prioityClassName就可以自动载入优先级

K8s默认提供了俩种PriorityClass:

  • system-cluster-critial:优先级2000000000
  • system-node-critial:优先级2000001000

PriorityClass主要注意把某个globalDefault设置为true,那么没有指定的都会使用这个

  • 一个集群只有一个globalDefault可以设置为true
  • 允许被设置为PreemptLowerPrioity或者Never,设置为Never,该PrioityClass的Pod无法被调度,也不会被触发对低优先级低Pod驱逐

Pod驱逐抢占机制

当调度器无法为高优先级的Pod找到合适的节点,会触发Pod驱逐抢占调度机制

驱逐抢占基于Scheduling Framework的PostFilter拓展点实现,该拓展点仅在Pod没有找到合适的节点才会被调用

  • kube-scheduler默认只有一个插件实现了PostFilter拓展点(处理驱逐抢占的DefaultPreemption插件)

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/preemption/preemption.go#L104

    // Interface is expected to be implemented by different preemption plugins as all those member
    // methods might have different behavior compared with the default preemption.
    type Interface interface {
    // GetOffsetAndNumCandidates chooses a random offset and calculates the number of candidates that should be
    // shortlisted for dry running preemption.
    GetOffsetAndNumCandidates(nodes int32) (int32, int32)
    // CandidatesToVictimsMap builds a map from the target node to a list of to-be-preempted Pods and the number of PDB violation.
    CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims
    // PodEligibleToPreemptOthers returns one bool and one string. The bool indicates whether this pod should be considered for
    // preempting other pods or not. The string includes the reason if this pod isn't eligible.
    PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string)
    // SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room
    // for "pod" to be scheduled.
    // Note that both `state` and `nodeInfo` are deep copied.
    SelectVictimsOnNode(ctx context.Context, state *framework.CycleState,
    pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status)
    }

驱逐的串联操作由调度器preemption Evaluator框架提供

核心步骤主要分为:

  1. 通过ev.PodLister.Pods(pod.Namespace).Get(pod.Name)从Informer中读取最新的Pod状态

  2. 验证当前Pod对低优先级Pod进行驱逐抢占的合法性,在下面的场景中,不会对低优先级低进行驱逐抢占

  3. 查找有效的驱逐抢占候选节点(驱逐预选)

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/preemption/preemption.go#L145

    func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
    // 0) Fetch the latest version of <pod>.
    // It's safe to directly fetch pod here. Because the informer cache has already been
    // initialized when creating the Scheduler obj.
    // However, tests may need to manually initialize the shared pod informer.
    podNamespace, podName := pod.Namespace, pod.Name
    pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name)
    if err != nil {
    klog.ErrorS(err, "Getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
    return nil, framework.AsStatus(err)
    }

    // 1) Ensure the preemptor is eligible to preempt other pods.
    if ok, msg := ev.PodEligibleToPreemptOthers(pod, m[pod.Status.NominatedNodeName]); !ok {
    klog.V(5).InfoS("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg)
    return nil, framework.NewStatus(framework.Unschedulable, msg)
    }

    // 2) Find all preemption candidates.
    candidates, nodeToStatusMap, err := ev.findCandidates(ctx, pod, m)
    if err != nil && len(candidates) == 0 {
    return nil, framework.AsStatus(err)
    }

    // Return a FitError only when there are no candidates that fit the pod.
    if len(candidates) == 0 {
    fitError := &framework.FitError{
    Pod: pod,
    NumAllNodes: len(nodeToStatusMap),
    Diagnosis: framework.Diagnosis{
    NodeToStatusMap: nodeToStatusMap,
    // Leave FailedPlugins as nil as it won't be used on moving Pods.
    },
    }
    // Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable.
    return framework.NewPostFilterResultWithNominatedNode(""), framework.NewStatus(framework.Unschedulable, fitError.Error())
    }

    // 3) Interact with registered Extenders to filter out some candidates if needed.
    candidates, status := ev.callExtenders(pod, candidates)
    if !status.IsSuccess() {
    return nil, status
    }

    // 4) Find the best candidate.
    bestCandidate := ev.SelectCandidate(candidates)
    if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
    return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption")
    }

    // 5) Perform preparation work before nominating the selected candidate.
    if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
    return nil, status
    }

    return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success)
    }
  1. 调用Preempt Extende外部拓展,内置插件收敛候选节点和对应的Pod驱逐列表

  2. 选择一个最佳的驱逐抢占节点

  3. 完整驱逐最后的准备工作

    • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/preemption/preemption.go#L338

      // prepareCandidate does some preparation work before nominating the selected candidate:
      // - Evict the victim pods
      // - Reject the victim pods if they are in waitingPod map
      // - Clear the low-priority pods' nominatedNodeName status if needed
      func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
      fh := ev.Handler
      cs := ev.Handler.ClientSet()
      for _, victim := range c.Victims().Pods {
      // If the victim is a WaitingPod, send a reject message to the PermitPlugin.
      // Otherwise we should delete the victim.
      if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
      waitingPod.Reject(pluginName, "preempted")
      klog.V(2).InfoS("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name())
      } else {
      if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
      condition := &v1.PodCondition{
      Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
      Status: v1.ConditionTrue,
      Reason: "PreemptionByKubeScheduler",
      Message: "Kube-scheduler: preempting",
      }
      newStatus := pod.Status.DeepCopy()
      if apipod.UpdatePodCondition(newStatus, condition) {
      if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
      klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
      return framework.AsStatus(err)
      }
      }
      }
      if err := util.DeletePod(ctx, cs, victim); err != nil {
      klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
      return framework.AsStatus(err)
      }
      klog.V(2).InfoS("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(pod), "victim", klog.KObj(victim), "node", c.Name())
      }

      fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by a pod on node %v", c.Name())
      }
      metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))

      // Lower priority pods nominated to run on this node, may no longer fit on
      // this node. So, we should remove their nomination. Removing their
      // nomination updates these pods and moves them to the active queue. It
      // lets scheduler find another place for them.
      nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
      if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil {
      klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field")
      // We do not return as this error is not critical.
      }

      return nil
      }

按照Filter结构,把Unschedulable节点列为潜在驱逐节点potentialNodes,把UnschedulableAnd Unresolvable节点设置为不可驱逐不可调度节点

为了避免集群过大,驱逐阶段采用局部最优解

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go#L121

    // calculateNumCandidates returns the number of candidates the FindCandidates
    // method must produce from dry running based on the constraints given by
    // <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of
    // candidates returned will never be greater than <numNodes>.
    func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {
    n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100
    if n < pl.args.MinCandidateNodesAbsolute {
    n = pl.args.MinCandidateNodesAbsolute
    }
    if n > numNodes {
    n = numNodes
    }
    return n
    }

    // GetOffsetAndNumCandidates chooses a random offset and calculates the number
    // of candidates that should be shortlisted for dry running preemption.
    func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) {
    return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
    }

驱逐默认插件采用随机方案确定始偏移量offset,范围[0,numNodes),需要搜索的节点课驱逐节点最大数量则通过百分比计算

  • MinCandidateNodesPrecentage和MinCandidateNodesAbsolute默认值为10和100

通过模拟调度的方式,确定候选节点需要驱逐哪些低优先级的Pod后,才能满足Pod的运行

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/preemption/preemption.go#L572


    // DryRunPreemption simulates Preemption logic on <potentialNodes> in parallel,
    // returns preemption candidates and a map indicating filtered nodes statuses.
    // The number of candidates depends on the constraints defined in the plugin's args. In the returned list of
    // candidates, ones that do not violate PDB are preferred over ones that do.
    // NOTE: This method is exported for easier testing in default preemption.
    func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
    pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap, error) {
    fh := ev.Handler
    nonViolatingCandidates := newCandidateList(numCandidates)
    violatingCandidates := newCandidateList(numCandidates)
    parallelCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    nodeStatuses := make(framework.NodeToStatusMap)
    var statusesLock sync.Mutex
    var errs []error
    checkNode := func(i int) {
    nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
    stateCopy := ev.State.Clone()
    pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
    if status.IsSuccess() && len(pods) != 0 {
    victims := extenderv1.Victims{
    Pods: pods,
    NumPDBViolations: int64(numPDBViolations),
    }
    c := &candidate{
    victims: &victims,
    name: nodeInfoCopy.Node().Name,
    }
    if numPDBViolations == 0 {
    nonViolatingCandidates.add(c)
    } else {
    violatingCandidates.add(c)
    }
    nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size()
    if nvcSize > 0 && nvcSize+vcSize >= numCandidates {
    cancel()
    }
    return
    }
    if status.IsSuccess() && len(pods) == 0 {
    status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name))
    }
    statusesLock.Lock()
    if status.Code() == framework.Error {
    errs = append(errs, status.AsError())
    }
    nodeStatuses[nodeInfoCopy.Node().Name] = status
    statusesLock.Unlock()
    }
    fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode)
    return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs)
    }

内置调度插件介绍