Kube-scheduler(启动流程)
Kube-scheduler(启动流程)
基于1.25
Cobra命令参数解析
-
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
// explicitly register (if not already registered) the kube effective version and feature gate in DefaultComponentGlobalsRegistry,
// which will be used in NewOptions.
_, _ = featuregate.DefaultComponentGlobalsRegistry.ComponentGlobalsOrRegister(
featuregate.DefaultKubeComponent, utilversion.DefaultBuildEffectiveVersion(), utilfeature.DefaultMutableFeatureGate)
opts := options.NewOptions()
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a control plane process which assigns
Pods to Nodes. The scheduler determines which Nodes are valid placements for
each Pod in the scheduling queue according to constraints and available
resources. The scheduler then ranks each valid Node and binds the Pod to a
suitable Node. Multiple different schedulers may be used within a cluster;
kube-scheduler is the reference implementation.
See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
for more information about scheduling and the kube-scheduler component.`,
PersistentPreRunE: func(*cobra.Command, []string) error {
// makes sure feature gates are set before RunE.
return opts.ComponentGlobalsRegistry.Set()
},
RunE: func(cmd *cobra.Command, args []string) error {
return runCommand(cmd, opts, registryOptions...)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
nfs := opts.Flags
verflag.AddFlags(nfs.FlagSet("global"))
globalflag.AddGlobalFlags(nfs.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
fs := cmd.Flags()
for _, f := range nfs.FlagSets {
fs.AddFlagSet(f)
}
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, *nfs, cols)
if err := cmd.MarkFlagFilename("config", "yaml", "yml", "json"); err != nil {
klog.Background().Error(err, "Failed to mark flag filename")
}
return cmd
}
实例化Scheduler对象
Scheduler对象是kube-scheduler运行的主要载体,主要包含五个关键步骤
- frameworkplugins.NewInTreeRegistry(内置插件注册)
- registry.Merge外部插件注册
- buildExtenders(Extender拓展接口注册)
- newScheduler(Schduler对象实例化)
- addAllEventHandlers(Informer事件处理函数注册)
内置插件注册
-
// NewInTreeRegistry builds the registry with all the in-tree plugins.
// A scheduler that runs out of tree plugins can register additional plugins
// through the WithFrameworkOutOfTreeRegistry option.
func NewInTreeRegistry() runtime.Registry {
fts := plfeature.Features{
EnableDRAAdminAccess: feature.DefaultFeatureGate.Enabled(features.DRAAdminAccess),
EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread),
EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
EnableSidecarContainers: feature.DefaultFeatureGate.Enabled(features.SidecarContainers),
EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),
EnableAsyncPreemption: feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption),
EnablePodLevelResources: feature.DefaultFeatureGate.Enabled(features.PodLevelResources),
}
registry := runtime.Registry{
dynamicresources.Name: runtime.FactoryAdapter(fts, dynamicresources.New),
imagelocality.Name: imagelocality.New,
tainttoleration.Name: runtime.FactoryAdapter(fts, tainttoleration.New),
nodename.Name: runtime.FactoryAdapter(fts, nodename.New),
nodeports.Name: runtime.FactoryAdapter(fts, nodeports.New),
nodeaffinity.Name: runtime.FactoryAdapter(fts, nodeaffinity.New),
podtopologyspread.Name: runtime.FactoryAdapter(fts, podtopologyspread.New),
nodeunschedulable.Name: runtime.FactoryAdapter(fts, nodeunschedulable.New),
noderesources.Name: runtime.FactoryAdapter(fts, noderesources.NewFit),
noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New),
volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New),
volumezone.Name: runtime.FactoryAdapter(fts, volumezone.New),
nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
interpodaffinity.Name: runtime.FactoryAdapter(fts, interpodaffinity.New),
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New),
schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New),
}
return registry
}
外部插件注册
-
// Merge merges the provided registry to the current one.
func (r Registry) Merge(in Registry) error {
for name, factory := range in {
if err := r.Register(name, factory); err != nil {
return err
}
}
return nil
}
Extender拓展接口注册
除了Scheduling Framework代码侵入的拓展方式吗还支持基于Webhook的拓展,即Extender
Extender在过滤,打分、绑定和驱逐的关键注册注册了hook,在执行了调度算法之后,把调度的结果通过HTTP发送给外部第三方服务器进行了二次决策,但是也存在网络实验,导致性能下降,拓展不够丰富、拓展能力受到限制
目前Scheduling Framework才是主推的调度器拓展方案
Scheduler对象实例化
Scheduler对象数据结构为Schedling Queue和Cache
Schedling Queue维护了等待调度的Pod队列,调度器每次从队列中取出堆头的Pod进行调度
Cache维护了Pod和节点的缓存信息,便于在调度更快的获取到调度的上下文,不需要请求kube-apiserver,提高调度性能
-
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodLister(podLister),
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)
for _, fwk := range profiles {
fwk.SetPodNominator(podQueue)
}
schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)
// Setup cache debugger.
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(stopEverything)
sched := newScheduler(
schedulerCache,
extenders,
internalqueue.MakeNextPodFunc(podQueue),
stopEverything,
podQueue,
profiles,
client,
snapshot,
options.percentageOfNodesToScore,
)
Informer事件处理函数注册
kube-scheduler整体使用事件驱动模型,基于Informer实现。通过注册Informer事件处理函数次,kube-schdeuler能发现集群中调度相关事件
-
informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.addNodeToCache,
UpdateFunc: sched.updateNodeInCache,
DeleteFunc: sched.deleteNodeFromCache,
},
)
运行EventBroadcaster事件管理器
K8s事件是一种资源对象,用于展示集群内容的情况,kube- scheduler会把运行的各种事件上报给api-server
事件在K8s,默认保留一个笑死
-
// Start events processing pipeline.
// 上报各类事件给kube-apiserver
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
defer cc.EventBroadcaster.Shutdown()
运行HTTP Server
kube-scheduler在1.23版本之后不在支持使用非安全的HTTP服务,仅支持TLS加密的HTTPS服务,默认服务端口是10259
kube-scheduler提供的HTTPS服务主要展示调度器运行的状态信息,包括健康状态、健康指标、当前使用的配置信息
HTTPS主要提供以下接口
- /healthz:探测kube-schedule健康状态
- /metrics:健康指标,展示调度器时延、调度队列等信息
- /configz:打印kube- scheduler当前使用的实时配置
- /debug/pprof:用于golang pprof性能分析
默认情况下,只有/healthz允许匿名访问,其他接口需要授权
curl -k -H 'Authorization: Bearer ${kubectl create token my-monitor}' \
https://localhost:10259/metrics
运行Informer同步资源
运行CompletedConfig中已经实例化的Informer对象
-
startInformersAndWaitForSync := func(ctx context.Context) {
// Start all informers.
cc.InformerFactory.Start(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.Start(ctx.Done())
}
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}
执行Leader选举
为了满足高可用需求,kube-schedule支持多副本运行,多副本存在调度冲突,所以引入了Leader选举机制
-
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
// OnStartedLeading d当前实例被选举为Leader之后,需要执行的回调函数
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
sched.Run(ctx)
},
// OnStoppedLeading 丢失掉Leader之后,进行回调函数
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// We were asked to terminate. Exit 0.
klog.InfoS("Requested to terminate, exiting")
os.Exit(0)
default:
// We lost the lock.
klog.ErrorS(nil, "Leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
运行调度器
Leader选举成功之后,通过回调方式进行schedu.Run函数实现,真正启动调度器的主逻辑
sched.SchedulingQueue.Run():启动维护Schedulung Queue中的多级队列的协程,反复调度失败的Pod
-
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Run()
// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
<-ctx.Done()
sched.SchedulingQueue.Close()
}