Kube-scheduler(SchedulingFramework&调度器运行流程)
Kube-scheduler(SchedulingFramework&调度器运行流程)
基于1.25
SchedulingFramework是调度器中一个核心概念,支持Pod调度过程中定义为一系列的拓展点
SchedulingFramework背景
K8s不断强大, 调度日渐复杂,以前版本支持了Webhook Extender拓展原生调度但是存在以下的限制
- 拓展点数量和调用位置限制
- Filter Extender只能在内置的预选算法结束之后才能调用
- Preempt Extender只能在内置驱逐算法完成之后才被调用
- Bind Extender只能用于Pod绑定节点,同一时刻只能使用一个Bind Exender,启用了Webhook的Bind插件,内置的Bind不再使用
- 网络调用降低调度器性能
- 每次调用Webhook都会产生HTTP请求响应过程,有多次数据的JSON序列化和反序列化
- 难以通知外部的Extender内部的调度已经被终止
- 外部的Extender难以和内置的调度器恭喜那个缓存
基于以上问题,设计了SchedulingFramework,插件化的架构模式,通过架构化的,提供一系列的PluginAPI
- SchedulingFramework从1.16引入,1.19进入成熟阶段
SchedulingFramework核心架构
QueueSort
QueueSort插件用于处理Pod在调度队列中的排序顺序
-
// QueueSortPlugin is an interface that must be implemented by "QueueSort" plugins.
// These plugins are used to sort pods in the scheduling queue. Only one queue sort
// plugin may be enabled at a time.
type QueueSortPlugin interface {
Plugin
// Less are used to sort pods in the scheduling queue.
Less(*QueuedPodInfo, *QueuedPodInfo) bool
}
QueueSort插件需要实现Lessfunc:比较俩个Pod大小
- 默认调度器使用PrioritySort插件:按照pOd优先级调度
- 在一个调度器中只能启用一个QueueSort,启动自定义的插件,原生排序插件就不生效
PreFilter
PreFilter主要实现实现Filter之前的预处理,如根据调度Pod计算Filter阶段需要使用的调度相关信息,或者检查Pod依赖的就去那状态必须满足的调度需求
-
// PreFilterPlugin is an interface that must be implemented by "PreFilter" plugins.
// These plugins are called at the beginning of the scheduling cycle.
type PreFilterPlugin interface {
Plugin
// PreFilter is called at the beginning of the scheduling cycle. All PreFilter
// plugins must return success or the pod will be rejected. PreFilter could optionally
// return a PreFilterResult to influence which nodes to evaluate downstream. This is useful
// for cases where it is possible to determine the subset of nodes to process in O(1) time.
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
// PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one,
// or nil if it does not. A Pre-filter plugin can provide extensions to incrementally
// modify its pre-processed info. The framework guarantees that the extensions
// AddPod/RemovePod will only be called after PreFilter, possibly on a cloned
// CycleState, and may call those functions more than once before calling
// Filter again on a specific node.
PreFilterExtensions() PreFilterExtensions
}
Filter
Filter插件执行预选主逻辑,能对于每个节点进行配置,如果有一个Filter标记不可调度,改节点就认为不可调度
默认启动16个协程分片处理
-
// FilterPlugin is an interface for Filter plugins. These plugins are called at the
// filter extension point for filtering out hosts that cannot run a pod.
// This concept used to be called 'predicate' in the original scheduler.
// These plugins should return "Success", "Unschedulable" or "Error" in Status.code.
// However, the scheduler accepts other valid codes as well.
// Anything other than "Success" will lead to exclusion of the given host from
// running the pod.
type FilterPlugin interface {
Plugin
// Filter is called by the scheduling framework.
// All FilterPlugins should return "Success" to declare that
// the given node fits the pod. If Filter doesn't return "Success",
// it will return "Unschedulable", "UnschedulableAndUnresolvable" or "Error".
// For the node being evaluated, Filter plugins should look at the passed
// nodeInfo reference for this particular node's information (e.g., pods
// considered to be running on the node) instead of looking it up in the
// NodeInfoSnapshot because we don't guarantee that they will be the same.
// For example, during preemption, we may pass a copy of the original
// nodeInfo object that has some pods removed from it to evaluate the
// possibility of preempting them to schedule the target pod.
Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
}
PostFilter
在Filter没有筛选出适合的节点被调用
当被某个插件把节点标记为可调度,就流程结束
-
// PostFilterPlugin is an interface for "PostFilter" plugins. These plugins are called
// after a pod cannot be scheduled.
type PostFilterPlugin interface {
Plugin
// PostFilter is called by the scheduling framework.
// A PostFilter plugin should return one of the following statuses:
// - Unschedulable: the plugin gets executed successfully but the pod cannot be made schedulable.
// - Success: the plugin gets executed successfully and the pod can be made schedulable.
// - Error: the plugin aborts due to some internal error.
//
// Informational plugins should be configured ahead of other ones, and always return Unschedulable status.
// Optionally, a non-nil PostFilterResult may be returned along with a Success status. For example,
// a preemption plugin may choose to return nominatedNodeName, so that framework can reuse that to update the
// preemptor pod's .spec.status.nominatedNodeName field.
PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
}
-
PreScore
准备执行Score的前置任务。如预处理打分阶段需要用到的相关信息
-
// PreScorePlugin is an interface for "PreScore" plugin. PreScore is an
// informational extension point. Plugins will be called with a list of nodes
// that passed the filtering phase. A plugin may use this data to update internal
// state or to generate logs/metrics.
type PreScorePlugin interface {
Plugin
// PreScore is called by the scheduling framework after a list of nodes
// passed the filtering phase. All prescore plugins must return success or
// the pod will be rejected
PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
}
Score
已经筛选出的节点进行打分排名,并且Score基于NormalizeSocre归一化处理为[0,100],最后基于不同Score插件权重处理
-
// ScorePlugin is an interface that must be implemented by "Score" plugins to rank
// nodes that passed the filtering phase.
type ScorePlugin interface {
Plugin
// Score is called on each filtered node. It must return success and an integer
// indicating the rank of the node. All scoring plugins must return success or
// the pod will be rejected.
Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status)
// ScoreExtensions returns a ScoreExtensions interface if it implements one, or nil if does not.
ScoreExtensions() ScoreExtensions
}
NormailzeScore
NormailzeScore用于节点最后打分排名前,得分归一化处理
-
// ScoreExtensions is an interface for Score extended functionality.
type ScoreExtensions interface {
// NormalizeScore is called for all node scores produced by the same plugin's "Score"
// method. A successful run of NormalizeScore will update the scores list and return
// a success status.
NormalizeScore(ctx context.Context, state *CycleState, p *v1.Pod, scores NodeScoreList) *Status
}
Reserve
实现Reserve拓展,主要实现Reserve和UnReserve俩个接口
-
// ReservePlugin is an interface for plugins with Reserve and Unreserve
// methods. These are meant to update the state of the plugin. This concept
// used to be called 'assume' in the original scheduler. These plugins should
// return only Success or Error in Status.code. However, the scheduler accepts
// other valid codes as well. Anything other than Success will lead to
// rejection of the pod.
type ReservePlugin interface {
Plugin
// Reserve is called by the scheduling framework when the scheduler cache is
// updated. If this method returns a failed Status, the scheduler will call
// the Unreserve method for all enabled ReservePlugins.
Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
// Unreserve is called by the scheduling framework when a reserved pod was
// rejected, an error occurred during reservation of subsequent plugins, or
// in a later phase. The Unreserve method implementation must be idempotent
// and may be called by the scheduler even if the corresponding Reserve
// method for the same plugin was not called.
Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}
Permit
Permit插件用于调度决策完成但是还没有发起绑定流程,用于阻止或者延迟绑定的流程
Permit插件允许执行以下逻辑:
- approve:所有的Permit插件都对某个Pod执行了approve,就开始对Pod的绑定
- deny:如果一个Permit插件返回了deny,pod调度被终止,同时,Unreserve触发,重新放回调度队列,等待下一次调度
- wait:如果有一个Permit插件返回了wait,pod会被放入一个内置的等待列表,虽然回调用对Pod的绑定操作,但是立即阻塞在等待Pod被approve或deny 的阶段
- 最长不超过16min
- 如果没有收到approve或deny,认为是被deny
-
// PermitPlugin is an interface that must be implemented by "Permit" plugins.
// These plugins are called before a pod is bound to a node.
type PermitPlugin interface {
Plugin
// Permit is called before binding a pod (and before prebind plugins). Permit
// plugins are used to prevent or delay the binding of a Pod. A permit plugin
// must return success or wait with timeout duration, or the pod will be rejected.
// The pod will also be rejected if the wait timeout or the pod is rejected while
// waiting. Note that if the plugin returns "wait", the framework will wait only
// after running the remaining plugins given that no other plugin rejects the pod.
Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)
}
PreBind
PreBind执行绑定前的准备工作,如提供网络存储卷并且挂载到目前节点
-
// PreBindPlugin is an interface that must be implemented by "PreBind" plugins.
// These plugins are called before a pod being scheduled.
type PreBindPlugin interface {
Plugin
// PreBind is called before binding a pod. All prebind plugins must return
// success or the pod will be rejected and won't be sent for binding.
PreBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}
Bind
Bind插件向kube-apiserver发起Bind请求,真正执行Pod和节点绑定的操作
-
// BindPlugin is an interface that must be implemented by "Bind" plugins. Bind
// plugins are used to bind a pod to a Node.
type BindPlugin interface {
Plugin
// Bind plugins will not be called until all pre-bind plugins have completed. Each
// bind plugin is called in the configured order. A bind plugin may choose whether
// or not to handle the given Pod. If a bind plugin chooses to handle a Pod, the
// remaining bind plugins are skipped. When a bind plugin does not handle a pod,
// it must return Skip in its Status code. If a bind plugin returns an Error, the
// pod is rejected and will not be bound.
Bind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}
PostBind
PostBind插件在Pod绑定完成后被调用,主要用于执行通知或清理操作。某些资源或调度器内存状态需要在确认Pod成功之后才能被释放,可以在PostBind执行
-
// PostBindPlugin is an interface that must be implemented by "PostBind" plugins.
// These plugins are called after a pod is successfully bound to a node.
type PostBindPlugin interface {
Plugin
// PostBind is called after a pod is successfully bound. These plugins are
// informational. A common application of this extension point is for cleaning
// up. If a plugin needs to clean-up its state after a pod is scheduled and
// bound, PostBind is the extension point that it should register.
PostBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}
在1.27还引入了PreEnqueue拓展点,在Pod进入内部活动队列之前,只有符合条件的Pod才能放到活动队列
调度器运行流程
整体运行流程
调度器通过Infromer持续感知ClusterEvent事件,通过注册的回调函数更新内部调度队列Scheduling Queue和本地缓存Cache
调度队列Scheduling Queue负责维护待调度的Pod列表,默认实现优先队列PrioityQueue
schedueOme func是kube- scheduler调度器的核心逻辑,不退出,实现调度任务,主要是几个步骤:
- 通过sched.NextPod func从调度队列中获取一个优先级最高的进行待调度,不存在待调度,进入阻塞
- 通过sched.frameworkForPod func:根据Pod的spec.SchedulerName字段,选择对应的Scheduling Freamework实例进行调度
- 支持多个Scheduling Framework实例
- schedulerName是唯一标识,调度Pod的时候
- 默认只初始化一个Scheduling Framework实例
- 执行调度周期的处理逻辑。使用串行方案,核心通过sched.SchedulePod,进行过滤算法和打分
- 执行绑定周期的处理逻辑,使用异步方案设计,核心通过s ched.Bindfunc通过调度周期选取的Pod检查是否满足Permit的要求,最后通过fwl.RunPostBindPlugins 进行调度最终过程
Scheduling Cycle
执行Filter和Score算法,选出目标节点
sched.SchedulePod func主要是Filte和Score算法
-
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError with reasons.
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
return result, err
}
trace.Step("Snapshotting scheduler cache and node infos done")
if sched.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")
if len(feasibleNodes) == 0 {
return result, &framework.FitError{
Pod: pod,
NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
Diagnosis: diagnosis,
}
}
// When only one node after predicate, just use it.
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
FeasibleNodes: 1,
}, nil
}
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
if err != nil {
return result, err
}
host, err := selectHost(priorityList)
trace.Step("Prioritizing done")
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
FeasibleNodes: len(feasibleNodes),
}, err
}
Filter默认是并发运行,通过配置文件的Parallelism字段可以指定处理的协程数量,默认(16)
-
func findNodesThatPassExtenders(extenders []framework.Extender, pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
// Extenders are called sequentially.
// Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
// extender in a decreasing manner.
for _, extender := range extenders {
if len(feasibleNodes) == 0 {
break
}
if !extender.IsInterested(pod) {
continue
}
// Status of failed nodes in failedAndUnresolvableMap will be added or overwritten in <statuses>,
// so that the scheduler framework can respect the UnschedulableAndUnresolvable status for
// particular nodes, and this may eventually improve preemption efficiency.
// Note: users are recommended to configure the extenders that may return UnschedulableAndUnresolvable
// status ahead of others.
feasibleList, failedMap, failedAndUnresolvableMap, err := extender.Filter(pod, feasibleNodes)
if err != nil {
if extender.IsIgnorable() {
klog.InfoS("Skipping extender as it returned error and has ignorable flag set", "extender", extender, "err", err)
continue
}
return nil, err
}
for failedNodeName, failedMsg := range failedAndUnresolvableMap {
var aggregatedReasons []string
if _, found := statuses[failedNodeName]; found {
aggregatedReasons = statuses[failedNodeName].Reasons()
}
aggregatedReasons = append(aggregatedReasons, failedMsg)
statuses[failedNodeName] = framework.NewStatus(framework.UnschedulableAndUnresolvable, aggregatedReasons...)
}
for failedNodeName, failedMsg := range failedMap {
if _, found := failedAndUnresolvableMap[failedNodeName]; found {
// failedAndUnresolvableMap takes precedence over failedMap
// note that this only happens if the extender returns the node in both maps
continue
}
if _, found := statuses[failedNodeName]; !found {
statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
} else {
statuses[failedNodeName].AppendReason(failedMsg)
}
}
feasibleNodes = feasibleList
}
return feasibleNodes, nil
}
numFeasibleNodeToFind解决了大规模集群调度的性能问题,支持通过配置文件中的percentageOfNodesToScore字段设置,寻找满足调度要求的的Node节点最大集群节点总数占比,默认0,标识根据集群规划自动适配比例
默认下
- 集群规划小于100.寻找所有可用节点进入打分
- 大于等于100,小于5625,按照(50-numAllNodes/125)✖️numAllNodes/100计算最大数量,不足100,按照一百算
- 大于5625,按照百分之5寻找
-
// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
// its search for more feasible nodes.
func (sched *Scheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
if numAllNodes < minFeasibleNodesToFind || sched.percentageOfNodesToScore >= 100 {
return numAllNodes
}
adaptivePercentage := sched.percentageOfNodesToScore
if adaptivePercentage <= 0 {
basePercentageOfNodesToScore := int32(50)
adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
if adaptivePercentage < minFeasibleNodesPercentageToFind {
adaptivePercentage = minFeasibleNodesPercentageToFind
}
}
numNodes = numAllNodes * adaptivePercentage / 100
if numNodes < minFeasibleNodesToFind {
return minFeasibleNodesToFind
}
return numNodes
}
为了保证每个节点都有机会参与调度,设计了游标指针nextStartIndex,每次执行完Filter,游标自动向后移动
-
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
...
processedNodes := len(feasibleNodes) + len(diagnosis.NodeToStatusMap)
sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes)
...
最后获取到带有分数的节点列表后NodeScoreList,会通过selectHost选取最高节点作为目标节点,有多个得分相同的会随机选取目标节点
-
// selectHost takes a prioritized list of nodes and then picks one
// in a reservoir sampling manner from the nodes that had the highest score.
func selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
if len(nodeScoreList) == 0 {
return "", fmt.Errorf("empty priorityList")
}
maxScore := nodeScoreList[0].Score
selected := nodeScoreList[0].Name
cntOfMaxScore := 1
for _, ns := range nodeScoreList[1:] {
if ns.Score > maxScore {
maxScore = ns.Score
selected = ns.Name
cntOfMaxScore = 1
} else if ns.Score == maxScore {
cntOfMaxScore++
if rand.Intn(cntOfMaxScore) == 0 {
// Replace the candidate with probability of 1/cntOfMaxScore
selected = ns.Name
}
}
}
return selected, nil
}
执行assumefunc更新本地缓存Cache
在完成调度策略后,调度器会通过asssume func,更新本地缓存Cache。如果Pod还在目标节点上运行,会对资源进行扣减
首先把Pod的Spec.NodeName设置为目标调度节点
然后调用sched.Cache.AssumePod更新缓存
Pod调度成功,清理Scheduling Queue存储改Pod提名信息
-
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// Optimistically assume that the binding will succeed and send it to apiserver
// in the background.
// If the binding fails, scheduler will release resources allocated to assumed pod
// immediately.
assumed.Spec.NodeName = host
if err := sched.Cache.AssumePod(assumed); err != nil {
klog.ErrorS(err, "Scheduler cache AssumePod failed")
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}
return nil
}
执行Reserve更新有状态插件的状态
Reserve拓展点主要用于支持有状态插件更新状态
-
// RunReservePluginsReserve runs the Reserve method in the set of configured
// reserve plugins. If any of these plugins returns an error, it does not
// continue running the remaining ones and returns the error. In such a case,
// the pod will not be scheduled and the caller will be expected to call
// RunReservePluginsUnreserve.
func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.reservePlugins {
status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
err := status.AsError()
klog.ErrorS(err, "Failed running Reserve plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))
}
}
return nil
}
当Reserve执行失败的时候,会按照相反顺序依次Unreserve,进行回滚,要求进行幂等
执行Unreserve,会通过sched.Cache.ForgetPod func从Cache清理已经被assume的Pod信息
-
// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
return
}
执行Permit延迟绑定
Permit插件支持Wait状态,让Pod不立即进入到绑定流程,而是等待收到Approve或Deny信号再继续
-
// WaitingPod represents a pod currently waiting in the permit phase.
type WaitingPod interface {
// GetPod returns a reference to the waiting pod.
GetPod() *v1.Pod
// GetPendingPlugins returns a list of pending Permit plugin's name.
GetPendingPlugins() []string
// Allow declares the waiting pod is allowed to be scheduled by the plugin named as "pluginName".
// If this is the last remaining plugin to allow, then a success signal is delivered
// to unblock the pod.
Allow(pluginName string)
// Reject declares the waiting pod unschedulable.
Reject(pluginName, msg string)
}
Binding Cycle
Binding Cycle是绑定周期,负责节点绑定
等待Permit确认或者拒绝(延迟绑定)
-
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framework.Status {
waitingPod := f.waitingPods.get(pod.UID)
if waitingPod == nil {
return nil
}
defer f.waitingPods.remove(pod.UID)
klog.V(4).InfoS("Pod waiting on permit", "pod", klog.KObj(pod))
startTime := time.Now()
s := <-waitingPod.s
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
if !s.IsSuccess() {
if s.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message())
s.SetFailedPlugin(s.FailedPlugin())
return s
}
err := s.AsError()
klog.ErrorS(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithFailedPlugin(s.FailedPlugin())
}
return nil
}
执行Bind操作中,完成Pod和Node资源绑定
在Permit确认之后,调度器会依次执行PreBind、Bind、PostBind插件
-
// bind binds a pod to a given node defined in a binding object.
// The precedence for binding is: (1) extenders and (2) framework plugins.
// We expect this to run asynchronously, so we handle binding metrics internally.
func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
defer func() {
sched.finishBinding(fwk, assumed, targetNode, err)
}()
bound, err := sched.extendersBinding(assumed, targetNode)
if bound {
return err
}
bindStatus := fwk.RunBindPlugins(ctx, state, assumed, targetNode)
if bindStatus.IsSuccess() {
return nil
}
if bindStatus.Code() == framework.Error {
return bindStatus.AsError()
}
return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())
}