Kube-controller-manager(启动流程)

基于1.25

kube-controller-manger的启动流程主要分:

  1. Cobra命令行参数解析
  2. 运行EventBroadcaster事件处理器
  3. 运行HTTPS服务
  4. 执行Leader选举
  5. 启动控制器主循环

Cobra命令行参数解析

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-controller-manager/app/controllermanager.go#L100

    // NewControllerManagerCommand creates a *cobra.Command object with default parameters
    func NewControllerManagerCommand() *cobra.Command {
    s, err := options.NewKubeControllerManagerOptions()
    if err != nil {
    klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
    Use: "kube-controller-manager",
    Long: `The Kubernetes controller manager is a daemon that embeds
    the core control loops shipped with Kubernetes. In applications of robotics and
    automation, a control loop is a non-terminating loop that regulates the state of
    the system. In Kubernetes, a controller is a control loop that watches the shared
    state of the cluster through the apiserver and makes changes attempting to move the
    current state towards the desired state. Examples of controllers that ship with
    Kubernetes today are the replication controller, endpoints controller, namespace
    controller, and serviceaccounts controller.`,
    PersistentPreRunE: func(*cobra.Command, []string) error {
    // silence client-go warnings.
    // kube-controller-manager generically watches APIs (including deprecated ones),
    // and CI ensures it works properly against matching kube-apiserver versions.
    restclient.SetDefaultWarningHandler(restclient.NoWarnings{})
    return nil
    },
    RunE: func(cmd *cobra.Command, args []string) error {
    verflag.PrintAndExitIfRequested()

    // Activate logging as soon as possible, after that
    // show flags with the final logging configuration.
    if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
    return err
    }
    cliflag.PrintFlags(cmd.Flags())

    c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
    if err != nil {
    return err
    }

    return Run(c.Complete(), wait.NeverStop)
    },
    Args: func(cmd *cobra.Command, args []string) error {
    for _, arg := range args {
    if len(arg) > 0 {
    return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
    }
    }
    return nil
    },
    }

    fs := cmd.Flags()
    namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())
    verflag.AddFlags(namedFlagSets.FlagSet("global"))
    globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
    registerLegacyGlobalFlags(namedFlagSets)
    for _, f := range namedFlagSets.FlagSets {
    fs.AddFlagSet(f)
    }

    cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
    cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)

    return cmd
    }

运行EventBroadcaster事件处理器

运行HTTPS服务

kube-controller-manager提供HTTPS服务,默认监听端口10257,主要用于展示控制器运行的状态信息、包括健康状态、监控指标、当前使用的配置等

  • /configz:打印kube-controller-manager当前使用的实时配置
  • /healthz:用于探测kube-controller-manger是否健康
  • /metrics:监控指标,用于展示控制吞吐情况,一般用于Promtheus采集
  • /debug/pprof:用于golang pprof性能分析
  • /debug/flags:动态调整kube-controller-manager启动参数,方式调试
  • /debug/controller/:查询具体的控制器的运行状态

运行Leader选举

kube-controller-manager在执行真正的主逻辑之前,必须选举为Leader

  • K8s中的Leader选举可以通过leaderelection工具库完成

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-controller-manager/app/controllermanager.go#L703

    // leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
    // TODO: extract this function into staging/controller-manager
    func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
    rl, err := resourcelock.NewFromKubeconfig(resourceLock,
    c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
    leaseName,
    resourcelock.ResourceLockConfig{
    Identity: lockIdentity,
    EventRecorder: c.EventRecorder,
    },
    c.Kubeconfig,
    c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
    if err != nil {
    klog.Fatalf("error creating lock: %v", err)
    }

    leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
    Lock: rl,
    LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
    RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
    RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
    Callbacks: callbacks,
    WatchDog: electionChecker,
    Name: leaseName,
    })

    panic("unreachable")
    }

启动控制器主循环

在竞争获取Leader之后,kube-controller-manager开启run

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-controller-manager/app/controllermanager.go#L218

    	run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {
    controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
    if err != nil {
    klog.Fatalf("error building controller context: %v", err)
    }
    controllerInitializers := initializersFunc(controllerContext.LoopMode)
    if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
    klog.Fatalf("error starting controllers: %v", err)
    }

    controllerContext.InformerFactory.Start(stopCh)
    controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
    close(controllerContext.InformersStarted)

    <-ctx.Done()
    }
    ...
  • 首先通过CrateControllerContext 初始化控制器需要的资源对象(如K8s Client和Informer)

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-controller-manager/app/controllermanager.go#L548

    // StartControllers starts a set of controllers with a specified ControllerContext
    func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
    unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
    // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
    // If this fails, just return here and fail since other controllers won't be able to get credentials.
    if startSATokenController != nil {
    if _, _, err := startSATokenController(ctx, controllerCtx); err != nil {
    return err
    }
    }

    // Initialize the cloud provider with a reference to the clientBuilder only after token controller
    // has started in case the cloud provider uses the client builder.
    if controllerCtx.Cloud != nil {
    controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, ctx.Done())
    }

    var controllerChecks []healthz.HealthChecker

    for controllerName, initFn := range controllers {
    if !controllerCtx.IsControllerEnabled(controllerName) {
    klog.Warningf("%q is disabled", controllerName)
    continue
    }

    time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

    klog.V(1).Infof("Starting %q", controllerName)
    ctrl, started, err := initFn(ctx, controllerCtx)
    if err != nil {
    klog.Errorf("Error starting %q", controllerName)
    return err
    }
    if !started {
    klog.Warningf("Skipping %q", controllerName)
    continue
    }
    check := controllerhealthz.NamedPingChecker(controllerName)
    if ctrl != nil {
    // check if the controller supports and requests a debugHandler
    // and it needs the unsecuredMux to mount the handler onto.
    if debuggable, ok := ctrl.(controller.Debuggable); ok && unsecuredMux != nil {
    if debugHandler := debuggable.DebuggingHandler(); debugHandler != nil {
    basePath := "/debug/controllers/" + controllerName
    unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
    unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
    }
    }
    if healthCheckable, ok := ctrl.(controller.HealthCheckable); ok {
    if realCheck := healthCheckable.HealthChecker(); realCheck != nil {
    check = controllerhealthz.NamedHealthChecker(controllerName, realCheck)
    }
    }
    }
    controllerChecks = append(controllerChecks, check)

    klog.Infof("Started %q", controllerName)
    }

    healthzHandler.AddHealthChecker(controllerChecks...)

    return nil
    }

StartController func支持传入controllers参数

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-controller-manager/app/controllermanager.go#L416

    // NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
    // paired to their InitFunc. This allows for structured downstream composition and subdivision.
    func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    controllers := map[string]InitFunc{}
    controllers["endpoint"] = startEndpointController
    controllers["endpointslice"] = startEndpointSliceController
    controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    controllers["job"] = startJobController
    controllers["deployment"] = startDeploymentController
    controllers["replicaset"] = startReplicaSetController
    controllers["horizontalpodautoscaling"] = startHPAController
    controllers["disruption"] = startDisruptionController
    controllers["statefulset"] = startStatefulSetController
    controllers["cronjob"] = startCronJobController
    controllers["csrsigning"] = startCSRSigningController
    controllers["csrapproving"] = startCSRApprovingController
    controllers["csrcleaner"] = startCSRCleanerController
    controllers["ttl"] = startTTLController
    controllers["bootstrapsigner"] = startBootstrapSignerController
    controllers["tokencleaner"] = startTokenCleanerController
    controllers["nodeipam"] = startNodeIpamController
    controllers["nodelifecycle"] = startNodeLifecycleController
    if loopMode == IncludeCloudLoops {
    controllers["service"] = startServiceController
    controllers["route"] = startRouteController
    controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
    // TODO: volume controller into the IncludeCloudLoops only set.
    }
    controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
    controllers["attachdetach"] = startAttachDetachController
    controllers["persistentvolume-expander"] = startVolumeExpandController
    controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
    controllers["pvc-protection"] = startPVCProtectionController
    controllers["pv-protection"] = startPVProtectionController
    controllers["ttl-after-finished"] = startTTLAfterFinishedController
    controllers["root-ca-cert-publisher"] = startRootCACertPublisher
    controllers["ephemeral-volume"] = startEphemeralVolumeController
    if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
    utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
    controllers["storage-version-gc"] = startStorageVersionGCController
    }

    return controllers
    }