Kube-scheduler(优先级和抢占式调度)
Kube-scheduler(优先级和抢占式调度)
基于1.25
K8s支持Pod优先级设置
- 抢占:把优先级低的Pod驱逐,高优先级的Pod先运行
Pod优先级
pod优先级通过spec.priority字段定义,该字段为int32整数指针,数字越大,标识Pod优先级等级越大
- 通过PriorityClass可以为某个优先级定一个名称
- 通过spec.prioityClassName就可以自动载入优先级
K8s默认提供了俩种PriorityClass:
- system-cluster-critial:优先级2000000000
- system-node-critial:优先级2000001000
PriorityClass主要注意把某个globalDefault设置为true,那么没有指定的都会使用这个
- 一个集群只有一个globalDefault可以设置为true
- 允许被设置为PreemptLowerPrioity或者Never,设置为Never,该PrioityClass的Pod无法被调度,也不会被触发对低优先级低Pod驱逐
Pod驱逐抢占机制
当调度器无法为高优先级的Pod找到合适的节点,会触发Pod驱逐抢占调度机制
驱逐抢占基于Scheduling Framework的PostFilter拓展点实现,该拓展点仅在Pod没有找到合适的节点才会被调用
kube-scheduler默认只有一个插件实现了PostFilter拓展点(处理驱逐抢占的DefaultPreemption插件)
-
// Interface is expected to be implemented by different preemption plugins as all those member
// methods might have different behavior compared with the default preemption.
type Interface interface {
// GetOffsetAndNumCandidates chooses a random offset and calculates the number of candidates that should be
// shortlisted for dry running preemption.
GetOffsetAndNumCandidates(nodes int32) (int32, int32)
// CandidatesToVictimsMap builds a map from the target node to a list of to-be-preempted Pods and the number of PDB violation.
CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims
// PodEligibleToPreemptOthers returns one bool and one string. The bool indicates whether this pod should be considered for
// preempting other pods or not. The string includes the reason if this pod isn't eligible.
PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string)
// SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room
// for "pod" to be scheduled.
// Note that both `state` and `nodeInfo` are deep copied.
SelectVictimsOnNode(ctx context.Context, state *framework.CycleState,
pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status)
}
驱逐的串联操作由调度器preemption Evaluator框架提供
核心步骤主要分为:
通过ev.PodLister.Pods(pod.Namespace).Get(pod.Name)从Informer中读取最新的Pod状态
验证当前Pod对低优先级Pod进行驱逐抢占的合法性,在下面的场景中,不会对低优先级低进行驱逐抢占
当前的Pod的spec.preemptionPolicy设置为Never
当前Pod存在上次驱逐时被提名的节点,即被提名的节点可以通过驱逐运行的Pod满足当前Pod的需求,并且被提名的节点存在依然处于删除中的Pod
查找有效的驱逐抢占候选节点(驱逐预选)
-
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
// 0) Fetch the latest version of <pod>.
// It's safe to directly fetch pod here. Because the informer cache has already been
// initialized when creating the Scheduler obj.
// However, tests may need to manually initialize the shared pod informer.
podNamespace, podName := pod.Namespace, pod.Name
pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name)
if err != nil {
klog.ErrorS(err, "Getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
return nil, framework.AsStatus(err)
}
// 1) Ensure the preemptor is eligible to preempt other pods.
if ok, msg := ev.PodEligibleToPreemptOthers(pod, m[pod.Status.NominatedNodeName]); !ok {
klog.V(5).InfoS("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg)
return nil, framework.NewStatus(framework.Unschedulable, msg)
}
// 2) Find all preemption candidates.
candidates, nodeToStatusMap, err := ev.findCandidates(ctx, pod, m)
if err != nil && len(candidates) == 0 {
return nil, framework.AsStatus(err)
}
// Return a FitError only when there are no candidates that fit the pod.
if len(candidates) == 0 {
fitError := &framework.FitError{
Pod: pod,
NumAllNodes: len(nodeToStatusMap),
Diagnosis: framework.Diagnosis{
NodeToStatusMap: nodeToStatusMap,
// Leave FailedPlugins as nil as it won't be used on moving Pods.
},
}
// Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable.
return framework.NewPostFilterResultWithNominatedNode(""), framework.NewStatus(framework.Unschedulable, fitError.Error())
}
// 3) Interact with registered Extenders to filter out some candidates if needed.
candidates, status := ev.callExtenders(pod, candidates)
if !status.IsSuccess() {
return nil, status
}
// 4) Find the best candidate.
bestCandidate := ev.SelectCandidate(candidates)
if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption")
}
// 5) Perform preparation work before nominating the selected candidate.
if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
return nil, status
}
return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success)
}
调用Preempt Extende外部拓展,内置插件收敛候选节点和对应的Pod驱逐列表
选择一个最佳的驱逐抢占节点
完整驱逐最后的准备工作
-
// prepareCandidate does some preparation work before nominating the selected candidate:
// - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map
// - Clear the low-priority pods' nominatedNodeName status if needed
func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
fh := ev.Handler
cs := ev.Handler.ClientSet()
for _, victim := range c.Victims().Pods {
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
// Otherwise we should delete the victim.
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
klog.V(2).InfoS("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
condition := &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "PreemptionByKubeScheduler",
Message: "Kube-scheduler: preempting",
}
newStatus := pod.Status.DeepCopy()
if apipod.UpdatePodCondition(newStatus, condition) {
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
}
}
}
if err := util.DeletePod(ctx, cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
}
klog.V(2).InfoS("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(pod), "victim", klog.KObj(victim), "node", c.Name())
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by a pod on node %v", c.Name())
}
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil {
klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field")
// We do not return as this error is not critical.
}
return nil
}
-
按照Filter结构,把Unschedulable节点列为潜在驱逐节点potentialNodes,把UnschedulableAnd Unresolvable节点设置为不可驱逐不可调度节点
为了避免集群过大,驱逐阶段采用局部最优解
-
// calculateNumCandidates returns the number of candidates the FindCandidates
// method must produce from dry running based on the constraints given by
// <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of
// candidates returned will never be greater than <numNodes>.
func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {
n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100
if n < pl.args.MinCandidateNodesAbsolute {
n = pl.args.MinCandidateNodesAbsolute
}
if n > numNodes {
n = numNodes
}
return n
}
// GetOffsetAndNumCandidates chooses a random offset and calculates the number
// of candidates that should be shortlisted for dry running preemption.
func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) {
return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
}
驱逐默认插件采用随机方案确定始偏移量offset,范围[0,numNodes),需要搜索的节点课驱逐节点最大数量则通过百分比计算
- MinCandidateNodesPrecentage和MinCandidateNodesAbsolute默认值为10和100
通过模拟调度的方式,确定候选节点需要驱逐哪些低优先级的Pod后,才能满足Pod的运行
-
// DryRunPreemption simulates Preemption logic on <potentialNodes> in parallel,
// returns preemption candidates and a map indicating filtered nodes statuses.
// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of
// candidates, ones that do not violate PDB are preferred over ones that do.
// NOTE: This method is exported for easier testing in default preemption.
func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap, error) {
fh := ev.Handler
nonViolatingCandidates := newCandidateList(numCandidates)
violatingCandidates := newCandidateList(numCandidates)
parallelCtx, cancel := context.WithCancel(ctx)
defer cancel()
nodeStatuses := make(framework.NodeToStatusMap)
var statusesLock sync.Mutex
var errs []error
checkNode := func(i int) {
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
stateCopy := ev.State.Clone()
pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
if status.IsSuccess() && len(pods) != 0 {
victims := extenderv1.Victims{
Pods: pods,
NumPDBViolations: int64(numPDBViolations),
}
c := &candidate{
victims: &victims,
name: nodeInfoCopy.Node().Name,
}
if numPDBViolations == 0 {
nonViolatingCandidates.add(c)
} else {
violatingCandidates.add(c)
}
nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size()
if nvcSize > 0 && nvcSize+vcSize >= numCandidates {
cancel()
}
return
}
if status.IsSuccess() && len(pods) == 0 {
status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name))
}
statusesLock.Lock()
if status.Code() == framework.Error {
errs = append(errs, status.AsError())
}
nodeStatuses[nodeInfoCopy.Node().Name] = status
statusesLock.Unlock()
}
fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode)
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs)
}
内置调度插件介绍
- Ref:K8s 内置调度插件 列表