Kube-scheduler(SchedulingFramework&调度器运行流程)

基于1.25

SchedulingFramework是调度器中一个核心概念,支持Pod调度过程中定义为一系列的拓展点

SchedulingFramework背景

K8s不断强大, 调度日渐复杂,以前版本支持了Webhook Extender拓展原生调度但是存在以下的限制

  • 拓展点数量和调用位置限制
    • Filter Extender只能在内置的预选算法结束之后才能调用
    • Preempt Extender只能在内置驱逐算法完成之后才被调用
    • Bind Extender只能用于Pod绑定节点,同一时刻只能使用一个Bind Exender,启用了Webhook的Bind插件,内置的Bind不再使用
  • 网络调用降低调度器性能
    • 每次调用Webhook都会产生HTTP请求响应过程,有多次数据的JSON序列化和反序列化
  • 难以通知外部的Extender内部的调度已经被终止
  • 外部的Extender难以和内置的调度器恭喜那个缓存

基于以上问题,设计了SchedulingFramework,插件化的架构模式,通过架构化的,提供一系列的PluginAPI

  • SchedulingFramework从1.16引入,1.19进入成熟阶段

SchedulingFramework核心架构

QueueSort

QueueSort插件用于处理Pod在调度队列中的排序顺序

QueueSort插件需要实现Lessfunc:比较俩个Pod大小

  • 默认调度器使用PrioritySort插件:按照pOd优先级调度
  • 在一个调度器中只能启用一个QueueSort,启动自定义的插件,原生排序插件就不生效

PreFilter

PreFilter主要实现实现Filter之前的预处理,如根据调度Pod计算Filter阶段需要使用的调度相关信息,或者检查Pod依赖的就去那状态必须满足的调度需求

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/interface.go#L348

    // PreFilterPlugin is an interface that must be implemented by "PreFilter" plugins.
    // These plugins are called at the beginning of the scheduling cycle.
    type PreFilterPlugin interface {
    Plugin
    // PreFilter is called at the beginning of the scheduling cycle. All PreFilter
    // plugins must return success or the pod will be rejected. PreFilter could optionally
    // return a PreFilterResult to influence which nodes to evaluate downstream. This is useful
    // for cases where it is possible to determine the subset of nodes to process in O(1) time.
    PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
    // PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one,
    // or nil if it does not. A Pre-filter plugin can provide extensions to incrementally
    // modify its pre-processed info. The framework guarantees that the extensions
    // AddPod/RemovePod will only be called after PreFilter, possibly on a cloned
    // CycleState, and may call those functions more than once before calling
    // Filter again on a specific node.
    PreFilterExtensions() PreFilterExtensions
    }

Filter

Filter插件执行预选主逻辑,能对于每个节点进行配置,如果有一个Filter标记不可调度,改节点就认为不可调度

  • 默认启动16个协程分片处理

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/interface.go#L371

    // FilterPlugin is an interface for Filter plugins. These plugins are called at the
    // filter extension point for filtering out hosts that cannot run a pod.
    // This concept used to be called 'predicate' in the original scheduler.
    // These plugins should return "Success", "Unschedulable" or "Error" in Status.code.
    // However, the scheduler accepts other valid codes as well.
    // Anything other than "Success" will lead to exclusion of the given host from
    // running the pod.
    type FilterPlugin interface {
    Plugin
    // Filter is called by the scheduling framework.
    // All FilterPlugins should return "Success" to declare that
    // the given node fits the pod. If Filter doesn't return "Success",
    // it will return "Unschedulable", "UnschedulableAndUnresolvable" or "Error".
    // For the node being evaluated, Filter plugins should look at the passed
    // nodeInfo reference for this particular node's information (e.g., pods
    // considered to be running on the node) instead of looking it up in the
    // NodeInfoSnapshot because we don't guarantee that they will be the same.
    // For example, during preemption, we may pass a copy of the original
    // nodeInfo object that has some pods removed from it to evaluate the
    // possibility of preempting them to schedule the target pod.
    Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
    }

PostFilter

在Filter没有筛选出适合的节点被调用

  • 当被某个插件把节点标记为可调度,就流程结束

    • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/interface.go#L389

      // PostFilterPlugin is an interface for "PostFilter" plugins. These plugins are called
      // after a pod cannot be scheduled.
      type PostFilterPlugin interface {
      Plugin
      // PostFilter is called by the scheduling framework.
      // A PostFilter plugin should return one of the following statuses:
      // - Unschedulable: the plugin gets executed successfully but the pod cannot be made schedulable.
      // - Success: the plugin gets executed successfully and the pod can be made schedulable.
      // - Error: the plugin aborts due to some internal error.
      //
      // Informational plugins should be configured ahead of other ones, and always return Unschedulable status.
      // Optionally, a non-nil PostFilterResult may be returned along with a Success status. For example,
      // a preemption plugin may choose to return nominatedNodeName, so that framework can reuse that to update the
      // preemptor pod's .spec.status.nominatedNodeName field.
      PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
      }

PreScore

准备执行Score的前置任务。如预处理打分阶段需要用到的相关信息

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/interface.go#L408

    // PreScorePlugin is an interface for "PreScore" plugin. PreScore is an
    // informational extension point. Plugins will be called with a list of nodes
    // that passed the filtering phase. A plugin may use this data to update internal
    // state or to generate logs/metrics.
    type PreScorePlugin interface {
    Plugin
    // PreScore is called by the scheduling framework after a list of nodes
    // passed the filtering phase. All prescore plugins must return success or
    // the pod will be rejected
    PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
    }

Score

已经筛选出的节点进行打分排名,并且Score基于NormalizeSocre归一化处理为[0,100],最后基于不同Score插件权重处理

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/interface.go#L426

    // ScorePlugin is an interface that must be implemented by "Score" plugins to rank
    // nodes that passed the filtering phase.
    type ScorePlugin interface {
    Plugin
    // Score is called on each filtered node. It must return success and an integer
    // indicating the rank of the node. All scoring plugins must return success or
    // the pod will be rejected.
    Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status)

    // ScoreExtensions returns a ScoreExtensions interface if it implements one, or nil if does not.
    ScoreExtensions() ScoreExtensions
    }

NormailzeScore

NormailzeScore用于节点最后打分排名前,得分归一化处理

Reserve

实现Reserve拓展,主要实现Reserve和UnReserve俩个接口

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/interface.go#L443

    // ReservePlugin is an interface for plugins with Reserve and Unreserve
    // methods. These are meant to update the state of the plugin. This concept
    // used to be called 'assume' in the original scheduler. These plugins should
    // return only Success or Error in Status.code. However, the scheduler accepts
    // other valid codes as well. Anything other than Success will lead to
    // rejection of the pod.
    type ReservePlugin interface {
    Plugin
    // Reserve is called by the scheduling framework when the scheduler cache is
    // updated. If this method returns a failed Status, the scheduler will call
    // the Unreserve method for all enabled ReservePlugins.
    Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
    // Unreserve is called by the scheduling framework when a reserved pod was
    // rejected, an error occurred during reservation of subsequent plugins, or
    // in a later phase. The Unreserve method implementation must be idempotent
    // and may be called by the scheduler even if the corresponding Reserve
    // method for the same plugin was not called.
    Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
    }

Permit

Permit插件用于调度决策完成但是还没有发起绑定流程,用于阻止或者延迟绑定的流程

Permit插件允许执行以下逻辑:

  1. approve:所有的Permit插件都对某个Pod执行了approve,就开始对Pod的绑定
  2. deny:如果一个Permit插件返回了deny,pod调度被终止,同时,Unreserve触发,重新放回调度队列,等待下一次调度
  3. wait:如果有一个Permit插件返回了wait,pod会被放入一个内置的等待列表,虽然回调用对Pod的绑定操作,但是立即阻塞在等待Pod被approve或deny 的阶段
    • 最长不超过16min
    • 如果没有收到approve或deny,认为是被deny
  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/interface.go#L479

    // PermitPlugin is an interface that must be implemented by "Permit" plugins.
    // These plugins are called before a pod is bound to a node.
    type PermitPlugin interface {
    Plugin
    // Permit is called before binding a pod (and before prebind plugins). Permit
    // plugins are used to prevent or delay the binding of a Pod. A permit plugin
    // must return success or wait with timeout duration, or the pod will be rejected.
    // The pod will also be rejected if the wait timeout or the pod is rejected while
    // waiting. Note that if the plugin returns "wait", the framework will wait only
    // after running the remaining plugins given that no other plugin rejects the pod.
    Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)
    }

PreBind

PreBind执行绑定前的准备工作,如提供网络存储卷并且挂载到目前节点

Bind

Bind插件向kube-apiserver发起Bind请求,真正执行Pod和节点绑定的操作

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/interface.go#L492

    // BindPlugin is an interface that must be implemented by "Bind" plugins. Bind
    // plugins are used to bind a pod to a Node.
    type BindPlugin interface {
    Plugin
    // Bind plugins will not be called until all pre-bind plugins have completed. Each
    // bind plugin is called in the configured order. A bind plugin may choose whether
    // or not to handle the given Pod. If a bind plugin chooses to handle a Pod, the
    // remaining bind plugins are skipped. When a bind plugin does not handle a pod,
    // it must return Skip in its Status code. If a bind plugin returns an Error, the
    // pod is rejected and will not be bound.
    Bind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
    }

PostBind

PostBind插件在Pod绑定完成后被调用,主要用于执行通知或清理操作。某些资源或调度器内存状态需要在确认Pod成功之后才能被释放,可以在PostBind执行

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/interface.go#L468

    // PostBindPlugin is an interface that must be implemented by "PostBind" plugins.
    // These plugins are called after a pod is successfully bound to a node.
    type PostBindPlugin interface {
    Plugin
    // PostBind is called after a pod is successfully bound. These plugins are
    // informational. A common application of this extension point is for cleaning
    // up. If a plugin needs to clean-up its state after a pod is scheduled and
    // bound, PostBind is the extension point that it should register.
    PostBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
    }

在1.27还引入了PreEnqueue拓展点,在Pod进入内部活动队列之前,只有符合条件的Pod才能放到活动队列

调度器运行流程

整体运行流程

  • 调度器通过Infromer持续感知ClusterEvent事件,通过注册的回调函数更新内部调度队列Scheduling Queue和本地缓存Cache

  • 调度队列Scheduling Queue负责维护待调度的Pod列表,默认实现优先队列PrioityQueue

schedueOme func是kube- scheduler调度器的核心逻辑,不退出,实现调度任务,主要是几个步骤:

  1. 通过sched.NextPod func从调度队列中获取一个优先级最高的进行待调度,不存在待调度,进入阻塞
  2. 通过sched.frameworkForPod func:根据Pod的spec.SchedulerName字段,选择对应的Scheduling Freamework实例进行调度
    • 支持多个Scheduling Framework实例
    • schedulerName是唯一标识,调度Pod的时候
    • 默认只初始化一个Scheduling Framework实例
  3. 执行调度周期的处理逻辑。使用串行方案,核心通过sched.SchedulePod,进行过滤算法和打分
  4. 执行绑定周期的处理逻辑,使用异步方案设计,核心通过s ched.Bindfunc通过调度周期选取的Pod检查是否满足Permit的要求,最后通过fwl.RunPostBindPlugins 进行调度最终过程

Scheduling Cycle

执行Filter和Score算法,选出目标节点

sched.SchedulePod func主要是Filte和Score算法

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/schedule_one.go#L314

    // schedulePod tries to schedule the given pod to one of the nodes in the node list.
    // If it succeeds, it will return the name of the node.
    // If it fails, it will return a FitError with reasons.
    func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    defer trace.LogIfLong(100 * time.Millisecond)

    if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
    return result, err
    }
    trace.Step("Snapshotting scheduler cache and node infos done")

    if sched.nodeInfoSnapshot.NumNodes() == 0 {
    return result, ErrNoNodesAvailable
    }

    feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
    if err != nil {
    return result, err
    }
    trace.Step("Computing predicates done")

    if len(feasibleNodes) == 0 {
    return result, &framework.FitError{
    Pod: pod,
    NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
    Diagnosis: diagnosis,
    }
    }

    // When only one node after predicate, just use it.
    if len(feasibleNodes) == 1 {
    return ScheduleResult{
    SuggestedHost: feasibleNodes[0].Name,
    EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
    FeasibleNodes: 1,
    }, nil
    }

    priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
    if err != nil {
    return result, err
    }

    host, err := selectHost(priorityList)
    trace.Step("Prioritizing done")

    return ScheduleResult{
    SuggestedHost: host,
    EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
    FeasibleNodes: len(feasibleNodes),
    }, err
    }

Filter默认是并发运行,通过配置文件的Parallelism字段可以指定处理的协程数量,默认(16)

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/schedule_one.go#L550

    func findNodesThatPassExtenders(extenders []framework.Extender, pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
    // Extenders are called sequentially.
    // Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
    // extender in a decreasing manner.
    for _, extender := range extenders {
    if len(feasibleNodes) == 0 {
    break
    }
    if !extender.IsInterested(pod) {
    continue
    }

    // Status of failed nodes in failedAndUnresolvableMap will be added or overwritten in <statuses>,
    // so that the scheduler framework can respect the UnschedulableAndUnresolvable status for
    // particular nodes, and this may eventually improve preemption efficiency.
    // Note: users are recommended to configure the extenders that may return UnschedulableAndUnresolvable
    // status ahead of others.
    feasibleList, failedMap, failedAndUnresolvableMap, err := extender.Filter(pod, feasibleNodes)
    if err != nil {
    if extender.IsIgnorable() {
    klog.InfoS("Skipping extender as it returned error and has ignorable flag set", "extender", extender, "err", err)
    continue
    }
    return nil, err
    }

    for failedNodeName, failedMsg := range failedAndUnresolvableMap {
    var aggregatedReasons []string
    if _, found := statuses[failedNodeName]; found {
    aggregatedReasons = statuses[failedNodeName].Reasons()
    }
    aggregatedReasons = append(aggregatedReasons, failedMsg)
    statuses[failedNodeName] = framework.NewStatus(framework.UnschedulableAndUnresolvable, aggregatedReasons...)
    }

    for failedNodeName, failedMsg := range failedMap {
    if _, found := failedAndUnresolvableMap[failedNodeName]; found {
    // failedAndUnresolvableMap takes precedence over failedMap
    // note that this only happens if the extender returns the node in both maps
    continue
    }
    if _, found := statuses[failedNodeName]; !found {
    statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
    } else {
    statuses[failedNodeName].AppendReason(failedMsg)
    }
    }

    feasibleNodes = feasibleList
    }
    return feasibleNodes, nil
    }

numFeasibleNodeToFind解决了大规模集群调度的性能问题,支持通过配置文件中的percentageOfNodesToScore字段设置,寻找满足调度要求的的Node节点最大集群节点总数占比,默认0,标识根据集群规划自动适配比例

  • 默认下

    • 集群规划小于100.寻找所有可用节点进入打分
    • 大于等于100,小于5625,按照(50-numAllNodes/125)✖️numAllNodes/100计算最大数量,不足100,按照一百算
    • 大于5625,按照百分之5寻找
  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/schedule_one.go#L528

    // numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
    // its search for more feasible nodes.
    func (sched *Scheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
    if numAllNodes < minFeasibleNodesToFind || sched.percentageOfNodesToScore >= 100 {
    return numAllNodes
    }

    adaptivePercentage := sched.percentageOfNodesToScore
    if adaptivePercentage <= 0 {
    basePercentageOfNodesToScore := int32(50)
    adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
    if adaptivePercentage < minFeasibleNodesPercentageToFind {
    adaptivePercentage = minFeasibleNodesPercentageToFind
    }
    }

    numNodes = numAllNodes * adaptivePercentage / 100
    if numNodes < minFeasibleNodesToFind {
    return minFeasibleNodesToFind
    }

    return numNodes
    }

为了保证每个节点都有机会参与调度,设计了游标指针nextStartIndex,每次执行完Filter,游标自动向后移动

最后获取到带有分数的节点列表后NodeScoreList,会通过selectHost选取最高节点作为目标节点,有多个得分相同的会随机选取目标节点

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/schedule_one.go#L712

    // selectHost takes a prioritized list of nodes and then picks one
    // in a reservoir sampling manner from the nodes that had the highest score.
    func selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
    if len(nodeScoreList) == 0 {
    return "", fmt.Errorf("empty priorityList")
    }
    maxScore := nodeScoreList[0].Score
    selected := nodeScoreList[0].Name
    cntOfMaxScore := 1
    for _, ns := range nodeScoreList[1:] {
    if ns.Score > maxScore {
    maxScore = ns.Score
    selected = ns.Name
    cntOfMaxScore = 1
    } else if ns.Score == maxScore {
    cntOfMaxScore++
    if rand.Intn(cntOfMaxScore) == 0 {
    // Replace the candidate with probability of 1/cntOfMaxScore
    selected = ns.Name
    }
    }
    }
    return selected, nil
    }

执行assumefunc更新本地缓存Cache

在完成调度策略后,调度器会通过asssume func,更新本地缓存Cache。如果Pod还在目标节点上运行,会对资源进行扣减

  • 首先把Pod的Spec.NodeName设置为目标调度节点

  • 然后调用sched.Cache.AssumePod更新缓存

  • Pod调度成功,清理Scheduling Queue存储改Pod提名信息

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/schedule_one.go#L737

    // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
    // assume modifies `assumed`.
    func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    // Optimistically assume that the binding will succeed and send it to apiserver
    // in the background.
    // If the binding fails, scheduler will release resources allocated to assumed pod
    // immediately.
    assumed.Spec.NodeName = host

    if err := sched.Cache.AssumePod(assumed); err != nil {
    klog.ErrorS(err, "Scheduler cache AssumePod failed")
    return err
    }
    // if "assumed" is a nominated pod, we should remove it from internal cache
    if sched.SchedulingQueue != nil {
    sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
    }

    return nil
    }

执行Reserve更新有状态插件的状态

Reserve拓展点主要用于支持有状态插件更新状态

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/runtime/framework.go#L1086

    // RunReservePluginsReserve runs the Reserve method in the set of configured
    // reserve plugins. If any of these plugins returns an error, it does not
    // continue running the remaining ones and returns the error. In such a case,
    // the pod will not be scheduled and the caller will be expected to call
    // RunReservePluginsUnreserve.
    func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
    startTime := time.Now()
    defer func() {
    metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
    }()
    for _, pl := range f.reservePlugins {
    status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
    if !status.IsSuccess() {
    err := status.AsError()
    klog.ErrorS(err, "Failed running Reserve plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
    return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))
    }
    }
    return nil
    }

当Reserve执行失败的时候,会按照相反顺序依次Unreserve,进行回滚,要求进行幂等

  • 执行Unreserve,会通过sched.Cache.ForgetPod func从Cache清理已经被assume的Pod信息

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/schedule_one.go#L157

    // Run the Reserve method of reserve plugins.
    if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
    metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    // trigger un-reserve to clean up state associated with the reserved Pod
    fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
    klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
    }
    sched.FailureHandler(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
    return
    }

执行Permit延迟绑定

Permit插件支持Wait状态,让Pod不立即进入到绑定流程,而是等待收到Approve或Deny信号再继续

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/interface.go#L291

    // WaitingPod represents a pod currently waiting in the permit phase.
    type WaitingPod interface {
    // GetPod returns a reference to the waiting pod.
    GetPod() *v1.Pod
    // GetPendingPlugins returns a list of pending Permit plugin's name.
    GetPendingPlugins() []string
    // Allow declares the waiting pod is allowed to be scheduled by the plugin named as "pluginName".
    // If this is the last remaining plugin to allow, then a success signal is delivered
    // to unblock the pod.
    Allow(pluginName string)
    // Reject declares the waiting pod unschedulable.
    Reject(pluginName, msg string)
    }

Binding Cycle

Binding Cycle是绑定周期,负责节点绑定

等待Permit确认或者拒绝(延迟绑定)

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/framework/runtime/framework.go#L1192

    // WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
    func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framework.Status {
    waitingPod := f.waitingPods.get(pod.UID)
    if waitingPod == nil {
    return nil
    }
    defer f.waitingPods.remove(pod.UID)
    klog.V(4).InfoS("Pod waiting on permit", "pod", klog.KObj(pod))

    startTime := time.Now()
    s := <-waitingPod.s
    metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))

    if !s.IsSuccess() {
    if s.IsUnschedulable() {
    klog.V(4).InfoS("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message())
    s.SetFailedPlugin(s.FailedPlugin())
    return s
    }
    err := s.AsError()
    klog.ErrorS(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod))
    return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithFailedPlugin(s.FailedPlugin())
    }
    return nil
    }

执行Bind操作中,完成Pod和Node资源绑定

在Permit确认之后,调度器会依次执行PreBind、Bind、PostBind插件

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/scheduler/schedule_one.go#L759

    // bind binds a pod to a given node defined in a binding object.
    // The precedence for binding is: (1) extenders and (2) framework plugins.
    // We expect this to run asynchronously, so we handle binding metrics internally.
    func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
    defer func() {
    sched.finishBinding(fwk, assumed, targetNode, err)
    }()

    bound, err := sched.extendersBinding(assumed, targetNode)
    if bound {
    return err
    }
    bindStatus := fwk.RunBindPlugins(ctx, state, assumed, targetNode)
    if bindStatus.IsSuccess() {
    return nil
    }
    if bindStatus.Code() == framework.Error {
    return bindStatus.AsError()
    }
    return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())
    }