- 抢占:把优先级低的Pod驱逐,高优先级的Pod先运行
- 通过PriorityClass可以为某个优先级定一个名称
- 通过spec.prioityClassName就可以自动载入优先级
- system-cluster-critial:优先级2000000000
- system-node-critial:优先级2000001000
- 一个集群只有一个globalDefault可以设置为true
- 允许被设置为PreemptLowerPrioity或者Never,设置为Never,该PrioityClass的Pod无法被调度,也不会被触发对低优先级低Pod驱逐
驱逐抢占基于Scheduling Framework的PostFilter拓展点实现,该拓展点仅在Pod没有找到合适的节点才会被调用
// Interface is expected to be implemented by different preemption plugins as all those member
// methods might have different behavior compared with the default preemption.
type Interface interface {
// GetOffsetAndNumCandidates chooses a random offset and calculates the number of candidates that should be
// shortlisted for dry running preemption.
GetOffsetAndNumCandidates(nodes int32) (int32, int32)
// CandidatesToVictimsMap builds a map from the target node to a list of to-be-preempted Pods and the number of PDB violation.
CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims
// PodEligibleToPreemptOthers returns one bool and one string. The bool indicates whether this pod should be considered for
// preempting other pods or not. The string includes the reason if this pod isn't eligible.
PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string)
// SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room
// for "pod" to be scheduled.
// Note that both `state` and `nodeInfo` are deep copied.
SelectVictimsOnNode(ctx context.Context, state *framework.CycleState,
pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status)
驱逐的串联操作由调度器preemption Evaluator框架提供
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
// 0) Fetch the latest version of <pod>.
// It's safe to directly fetch pod here. Because the informer cache has already been
// initialized when creating the Scheduler obj.
// However, tests may need to manually initialize the shared pod informer.
podNamespace, podName := pod.Namespace, pod.Name
pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name)
if err != nil {
klog.ErrorS(err, "Getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
return nil, framework.AsStatus(err)
// 1) Ensure the preemptor is eligible to preempt other pods.
if ok, msg := ev.PodEligibleToPreemptOthers(pod, m[pod.Status.NominatedNodeName]); !ok {
klog.V(5).InfoS("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg)
return nil, framework.NewStatus(framework.Unschedulable, msg)
// 2) Find all preemption candidates.
candidates, nodeToStatusMap, err := ev.findCandidates(ctx, pod, m)
if err != nil && len(candidates) == 0 {
return nil, framework.AsStatus(err)
// Return a FitError only when there are no candidates that fit the pod.
if len(candidates) == 0 {
fitError := &framework.FitError{
Pod: pod,
NumAllNodes: len(nodeToStatusMap),
Diagnosis: framework.Diagnosis{
NodeToStatusMap: nodeToStatusMap,
// Leave FailedPlugins as nil as it won't be used on moving Pods.
// Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable.
return framework.NewPostFilterResultWithNominatedNode(""), framework.NewStatus(framework.Unschedulable, fitError.Error())
// 3) Interact with registered Extenders to filter out some candidates if needed.
candidates, status := ev.callExtenders(pod, candidates)
if !status.IsSuccess() {
return nil, status
// 4) Find the best candidate.
bestCandidate := ev.SelectCandidate(candidates)
if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption")
// 5) Perform preparation work before nominating the selected candidate.
if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
return nil, status
return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success)
调用Preempt Extende外部拓展,内置插件收敛候选节点和对应的Pod驱逐列表
// prepareCandidate does some preparation work before nominating the selected candidate:
// - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map
// - Clear the low-priority pods' nominatedNodeName status if needed
func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
fh := ev.Handler
cs := ev.Handler.ClientSet()
for _, victim := range c.Victims().Pods {
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
// Otherwise we should delete the victim.
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
klog.V(2).InfoS("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
condition := &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "PreemptionByKubeScheduler",
Message: "Kube-scheduler: preempting",
newStatus := pod.Status.DeepCopy()
if apipod.UpdatePodCondition(newStatus, condition) {
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
if err := util.DeletePod(ctx, cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
klog.V(2).InfoS("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(pod), "victim", klog.KObj(victim), "node", c.Name())
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by a pod on node %v", c.Name())
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil {
klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field")
// We do not return as this error is not critical.
return nil
按照Filter结构,把Unschedulable节点列为潜在驱逐节点potentialNodes,把UnschedulableAnd Unresolvable节点设置为不可驱逐不可调度节点
// calculateNumCandidates returns the number of candidates the FindCandidates
// method must produce from dry running based on the constraints given by
// <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of
// candidates returned will never be greater than <numNodes>.
func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {
n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100
if n < pl.args.MinCandidateNodesAbsolute {
n = pl.args.MinCandidateNodesAbsolute
if n > numNodes {
n = numNodes
return n
// GetOffsetAndNumCandidates chooses a random offset and calculates the number
// of candidates that should be shortlisted for dry running preemption.
func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) {
return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
- MinCandidateNodesPrecentage和MinCandidateNodesAbsolute默认值为10和100
// DryRunPreemption simulates Preemption logic on <potentialNodes> in parallel,
// returns preemption candidates and a map indicating filtered nodes statuses.
// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of
// candidates, ones that do not violate PDB are preferred over ones that do.
// NOTE: This method is exported for easier testing in default preemption.
func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap, error) {
fh := ev.Handler
nonViolatingCandidates := newCandidateList(numCandidates)
violatingCandidates := newCandidateList(numCandidates)
parallelCtx, cancel := context.WithCancel(ctx)
defer cancel()
nodeStatuses := make(framework.NodeToStatusMap)
var statusesLock sync.Mutex
var errs []error
checkNode := func(i int) {
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
stateCopy := ev.State.Clone()
pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
if status.IsSuccess() && len(pods) != 0 {
victims := extenderv1.Victims{
Pods: pods,
NumPDBViolations: int64(numPDBViolations),
c := &candidate{
victims: &victims,
name: nodeInfoCopy.Node().Name,
if numPDBViolations == 0 {
} else {
nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size()
if nvcSize > 0 && nvcSize+vcSize >= numCandidates {
if status.IsSuccess() && len(pods) == 0 {
status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name))
if status.Code() == framework.Error {
errs = append(errs, status.AsError())
nodeStatuses[nodeInfoCopy.Node().Name] = status
fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode)
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs)
- Ref:K8s 内置调度插件 列表