K8s-kube-apiserver(启动流程)

基于1.25

APIServer启动流程:

  1. Schema资源注册
  2. Cobra命令桉树解析
  3. 创建API Server通用配置
  4. 创建API EXtensionsServer
  5. 创建KubeAPIServer
  6. 创建AggregatorServer
  7. GenericAPIServer初始化
  8. 准备和启动HTTPS服务

Scheme资源注册

使用Scheme所有资源注册到APIServer中

Scheme资源注册不算使用显式代码触发,通过init注册。

初始化Scheme资源注册表

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/api/legacyscheme/scheme.go#L24

    var (
    // Scheme is the default instance of runtime.Scheme to which types in the Kubernetes API are already registered.
    // NOTE: If you are copying this file to start a new api group, STOP! Copy the
    // extensions group instead. This Scheme is special and should appear ONLY in
    // the api group, unless you really know what you're doing.
    // TODO(lavalamp): make the above error impossible.
    // 资源注册表
    Scheme = runtime.NewScheme()

    // Codecs provides access to encoding and decoding for the scheme
    // Codecs编解码器
    Codecs = serializer.NewCodecFactory(Scheme)

    // ParameterCodec handles versioning of objects that are converted to query parameters.
    // 参数编解码器
    ParameterCodec = runtime.NewParameterCodec(Scheme)
    )

把K8s核心资源注册到Scheme

  • kube-apisevrer注册倒入controlplane包,触发了install包的资源注册

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controlplane/import_known_versions.go#L19

    import (
    // These imports are the API groups the API server will support.
    _ "k8s.io/kubernetes/pkg/apis/admission/install"
    _ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
    _ "k8s.io/kubernetes/pkg/apis/apiserverinternal/install"
    _ "k8s.io/kubernetes/pkg/apis/apps/install"
    _ "k8s.io/kubernetes/pkg/apis/authentication/install"
    _ "k8s.io/kubernetes/pkg/apis/authorization/install"
    _ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
    _ "k8s.io/kubernetes/pkg/apis/batch/install"
    _ "k8s.io/kubernetes/pkg/apis/certificates/install"
    _ "k8s.io/kubernetes/pkg/apis/coordination/install"
    _ "k8s.io/kubernetes/pkg/apis/core/install"
    _ "k8s.io/kubernetes/pkg/apis/discovery/install"
    _ "k8s.io/kubernetes/pkg/apis/events/install"
    _ "k8s.io/kubernetes/pkg/apis/extensions/install"
    _ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
    _ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
    _ "k8s.io/kubernetes/pkg/apis/networking/install"
    _ "k8s.io/kubernetes/pkg/apis/node/install"
    _ "k8s.io/kubernetes/pkg/apis/policy/install"
    _ "k8s.io/kubernetes/pkg/apis/rbac/install"
    _ "k8s.io/kubernetes/pkg/apis/scheduling/install"
    _ "k8s.io/kubernetes/pkg/apis/storage/install"
    )

    init包调用Install实现注册

Cobra命令行参数解析

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-apiserver/app/server.go#L93

    // NewAPIServerCommand creates a *cobra.Command object with default parameters
    func NewAPIServerCommand() *cobra.Command {
    s := options.NewServerRunOptions()
    cmd := &cobra.Command{
    Use: "kube-apiserver",
    Long: `The Kubernetes API server validates and configures data
    for the api objects which include pods, services, replicationcontrollers, and
    others. The API Server services REST operations and provides the frontend to the
    cluster's shared state through which all other components interact.`
    // stop printing usage when the command errors
    SilenceUsage: true,
    PersistentPreRunE: func(*cobra.Command, []string) error {
    // silence client-go warnings.
    // kube-apiserver loopback clients should not log self-issued warnings.
    rest.SetDefaultWarningHandler(rest.NoWarnings{})
    return nil
    },
    RunE: func(cmd *cobra.Command, args []string) error {
    verflag.PrintAndExitIfRequested()
    fs := cmd.Flags()

    // Activate logging as soon as possible, after that
    // show flags with the final logging configuration.
    if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
    return err
    }
    cliflag.PrintFlags(fs)

    // set default options
    completedOptions, err := Complete(s)
    if err != nil {
    return err
    }

    // validate options
    if errs := completedOptions.Validate(); len(errs) != 0 {
    return utilerrors.NewAggregate(errs)
    }

    return Run(completedOptions, genericapiserver.SetupSignalHandler())
    },
    Args: func(cmd *cobra.Command, args []string) error {
    for _, arg := range args {
    if len(arg) > 0 {
    return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
    }
    }
    return nil
    },
    }

    fs := cmd.Flags()
    namedFlagSets := s.Flags()
    verflag.AddFlags(namedFlagSets.FlagSet("global"))
    globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
    options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
    for _, f := range namedFlagSets.FlagSets {
    fs.AddFlagSet(f)
    }

    cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
    cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)

    return cmd
    }

创建API Server通用配置

API Server通用配置设置了kube- APIserver不同模块实例化需要的配置

genericConfig实例化

genericConfig.MergedReourceConfig指定启用/禁用GroupVersion或GroupVersionResource

  • 如果没有使用--runrime-config指定启用或者禁用的内置API资源,则会使用内置的DefauleAPI ResourceConfigSourcefunc作为默认配置

  • 如果使用参数指定,kube-apiserver会默认该默认配置

  • 默认情况,kube-apiserver仅启用stable稳定版本的API资源(为了兼容某些之前已经被启用的legacy APi也默认开启,但是新引入的beta api默认被禁用)

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controlplane/instance.go#L700

    // DefaultAPIResourceConfigSource returns default configuration for an APIResource.
    func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
    ret := serverstorage.NewResourceConfig()
    // NOTE: GroupVersions listed here will be enabled by default. Don't put alpha or beta versions in the list.
    ret.EnableVersions(stableAPIGroupVersionsEnabledByDefault...)

    // disable alpha and beta versions explicitly so we have a full list of what's possible to serve
    ret.DisableVersions(betaAPIGroupVersionsDisabledByDefault...)
    ret.DisableVersions(alphaAPIGroupVersionsDisabledByDefault...)

    // enable the legacy beta resources that were present before stopped serving new beta APIs by default.
    ret.EnableResources(legacyBetaEnabledByDefaultResources...)

    return ret
    }

    仅为KubeAPIServer配置,APIExtensionServer和AggregatorServer有各自独立的配置

HTTP Server运行参数配置

kube-apiserver通过一些了的APPlyTo对genericConfig进行赋值

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-apiserver/app/server.go#L357

      
    if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
    return
    }

    if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
    return
    }
    if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
    return
    }
    if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
    return
    }
    if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
    return
    }
    if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
    if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil {
    return
    }
    }
    // wrap

Kube-apiserver通过请求代理获得与边缘节点通信的能力:

  • Traces.ApplyTo的链路追踪使用OpenTelemetry-Go链路追踪,基于OTLP Span Exporter实现上报

OpenAPI/Swagger配置

kube-apiserver支持俩个版本的OpenAPI:oepnapiv2和oepnapiv3

  • 默认使用v2

  • 需要启用OpenAPIV3 Feature Gate才能使用V3

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-apiserver/app/server.go#L394

    // wrap the definitions to revert any changes from disabled features
    getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
    genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
    genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
    if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {
    genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
    genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
    }

StorageFactory etcd存储配置

kube-apiserver使用etcd分布式KV数据结构实现持久化存储,下面是配置过程

Authentication认证配置

kube-apiserver提供了多种方法的认证:

  • RequestHeader
  • ClientCA
  • TokenAuth
  • ServiceAccountAuth
  • BootstrapToken
  • OIDC
  • WebhookTokenAuth
  • Anonymous

Authorization配置

有很多授权策略,只要有一个授权成功就认为成功

Audit审计配置

kube-apiserver支持开启审计,记录集群中发生的活动记录

  • 支持日志方法持久化审计事件

  • 支持通过配置Webhook的方法持久化其他系统

  • 可以通过--audit-policy-file参数传入一个配置文件制定Audit Policy审计策略

  • Ref:https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/server/options/audit.go#L279


    func (o *AuditOptions) ApplyTo(
    c *server.Config,
    ) error {
    if o == nil {
    return nil
    }
    if c == nil {
    return fmt.Errorf("server config must be non-nil")
    }

    // 1. Build policy evaluator
    evaluator, err := o.newPolicyRuleEvaluator()
    if err != nil {
    return err
    }

    // 2. Build log backend
    var logBackend audit.Backend
    w, err := o.LogOptions.getWriter()
    if err != nil {
    return err
    }
    if w != nil {
    if evaluator == nil {
    klog.V(2).Info("No audit policy file provided, no events will be recorded for log backend")
    } else {
    logBackend = o.LogOptions.newBackend(w)
    }
    }

    // 3. Build webhook backend
    var webhookBackend audit.Backend
    if o.WebhookOptions.enabled() {
    if evaluator == nil {
    klog.V(2).Info("No audit policy file provided, no events will be recorded for webhook backend")
    } else {
    if c.EgressSelector != nil {
    var egressDialer utilnet.DialFunc
    egressDialer, err = c.EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext())
    if err != nil {
    return err
    }
    webhookBackend, err = o.WebhookOptions.newUntruncatedBackend(egressDialer)
    } else {
    webhookBackend, err = o.WebhookOptions.newUntruncatedBackend(nil)
    }
    if err != nil {
    return err
    }
    }
    }

    groupVersion, err := schema.ParseGroupVersion(o.WebhookOptions.GroupVersionString)
    if err != nil {
    return err
    }

    // 4. Apply dynamic options.
    var dynamicBackend audit.Backend
    if webhookBackend != nil {
    // if only webhook is enabled wrap it in the truncate options
    dynamicBackend = o.WebhookOptions.TruncateOptions.wrapBackend(webhookBackend, groupVersion)
    }

    // 5. Set the policy rule evaluator
    c.AuditPolicyRuleEvaluator = evaluator

    // 6. Join the log backend with the webhooks
    c.AuditBackend = appendBackend(logBackend, dynamicBackend)

    if c.AuditBackend != nil {
    klog.V(2).Infof("Using audit backend: %s", c.AuditBackend)
    }
    return nil
    }

    func (o *AuditOptions) newPolicyRuleEvaluator() (audit.PolicyRuleEvaluator, error) {
    if o.PolicyFile == "" {
    return nil, nil
    }

    p, err := policy.LoadPolicyFromFile(o.PolicyFile)
    if err != nil {
    return nil, fmt.Errorf("loading audit policy file: %v", err)
    }
    return policy.NewPolicyRuleEvaluator(p), nil
    }

Admission准入控制器配置

通过认证和鉴权之后,经过准入控制器,资源对象才会被进入到持久化之前最后的拦截,包含校验、修改或拒绝等

  • kube-apiserver启动注册所有准入控制器插件,通过admission.Plugins进行注册

    • https://github.com/kubernetes/apiserver/blob/ba592e4ccd41a320ceb91bab90eebee3bb4a4f33/pkg/admission/plugins.go#L38

      // Factory is a function that returns an Interface for admission decisions.
      // The config parameter provides an io.Reader handler to the factory in
      // order to load specific configurations. If no configuration is provided
      // the parameter is nil.
      type Factory func(config io.Reader) (Interface, error)

      type Plugins struct {
      // 保护并发的安全 以键值对的形式保护registry并发安全,确保数据一致性
      lock sync.Mutex
      // key插件名称
      registry map[string]Factory
      }
  • 准入控制器实现的接口

  • 如果需要修改默认的准入控制器,需要通过--enable-admission-plugins或者--disable-admission-plugins参数进行配置

AddPostStartHook添加后置钩子

在kube-apiserver启动之后执行,这个AddPostStartHook为start-kube-apiserver-admission-initializer,主要用于一些控制器需要调谐的时候

创建APIExtensionServer

创建GenericAPIServer实例

创建GenericAPIServer实例大部分复用APIServer的通用配置

实例化CustomResourceDefinitions

APIExtensionServer通过Custom ResourceDefinitions对象进行管理

  • Ref:https://github.com/kubernetes/apiextensions-apiserver/blob/1ba7acf883e7790aea078187fed8e0b45e8c68c9/pkg/apiserver/apiserver.go#L138

    s := &CustomResourceDefinitions{
    GenericAPIServer: genericServer,
    }

    apiResourceConfig := c.GenericConfig.MergedResourceConfig
    apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
    storage := map[string]rest.Storage{}
    // customresourcedefinitions
    if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
    customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
    if err != nil {
    return nil, err
    }
    storage[resource] = customResourceDefinitionStorage
    storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
    }

实例化APIGroupInfo

APIGroupInfo描述K8sAPI资源组,包括资源组名称、版本优先级顺序、资源对象支持的编解码方法、不同版本的资源对象存储实现

  • Ref:https://github.com/kubernetes/apiserver/blob/cccad306d649184bf2a0e319ba830c53f65c445c/pkg/server/genericapiserver.go#L69

    // Info about an API group.
    type APIGroupInfo struct {
    PrioritizedVersions []schema.GroupVersion
    // Info about the resources in this group. It's a map from version to resource to the storage.
    VersionedResourcesStorageMap map[string]map[string]rest.Storage
    // OptionsExternalVersion controls the APIVersion used for common objects in the
    // schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
    // define a version "v1beta1" but want to use the Kubernetes "v1" internal objects.
    // If nil, defaults to groupMeta.GroupVersion.
    // TODO: Remove this when https://github.com/kubernetes/kubernetes/issues/19018 is fixed.
    OptionsExternalVersion *schema.GroupVersion
    // MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
    // common API implementations like ListOptions. Future changes will allow this to vary by group
    // version (for when the inevitable meta/v2 group emerges).
    MetaGroupVersion *schema.GroupVersion

    // Scheme includes all of the types used by this group and how to convert between them (or
    // to convert objects from outside of this group that are accepted in this API).
    // TODO: replace with interfaces
    Scheme *runtime.Scheme
    // NegotiatedSerializer controls how this group encodes and decodes data
    NegotiatedSerializer runtime.NegotiatedSerializer
    // ParameterCodec performs conversions for query parameters passed to API calls
    ParameterCodec runtime.ParameterCodec

    // StaticOpenAPISpec is the spec derived from the definitions of all resources installed together.
    // It is set during InstallAPIGroups, InstallAPIGroup, and InstallLegacyAPIGroup.
    StaticOpenAPISpec map[string]*spec.Schema
    }

InstallAPIGroup注册APIGroup

InstallAPIGroup实现了对APIExtensionsServer对路由注册功能

  • InstallAPIGroup把APIGroupInfo中的<资源组></资源版本><资源/子资源></资源存储对象>注册到APIExentsionsServer HTTP Handler,主要过程遍历APIGroupInfo的PriorizizedVersion资源版本映射到HTTP路由

  • 通过InstallREST func实现封装请求处理的Handler

    • Ref:https://github.com/kubernetes/apiserver/blob/cccad306d649184bf2a0e319ba830c53f65c445c/pkg/endpoints/groupversion.go#L106

      // InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
      // It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
      // in a slash.
      func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
      prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
      installer := &APIInstaller{
      group: g,
      prefix: prefix,
      minRequestTimeout: g.MinRequestTimeout,
      }

      apiResources, resourceInfos, ws, registrationErrors := installer.Install()
      versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
      versionDiscoveryHandler.AddToWebService(ws)
      container.Add(ws)
      aggregatedDiscoveryResources, err := ConvertGroupVersionIntoToDiscovery(apiResources)
      if err != nil {
      registrationErrors = append(registrationErrors, err)
      }
      return aggregatedDiscoveryResources, removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
      }
  • InstallREST func接收restful.Container指针对象,路由安装分以下几步

      1. 通过prefix定义HTTP Path请求路径,其路径为<APIGroupPrefix>/<Group>/><Version>,这里即位/api/apiextensions.k8s.io/v1
      2. 实例化APIInstaller对象
      3. installer.Install会实例一个restful.WebService对象,遍历APIGroupVersion定义的资源列表,把资源和与之对应的Handler处理函数注册到WebService Route路由
      4. 通过container.Add func把WebService添加到go-restful Container
    • 使用了路由判断资源类型的RESTful接口

    • Ref:https://github.com/kubernetes/apiserver/blob/cccad306d649184bf2a0e319ba830c53f65c445c/pkg/endpoints/installer.go#L333

      // what verbs are supported by the storage, used to know what verbs we support per path
      creater, isCreater := storage.(rest.Creater)
      namedCreater, isNamedCreater := storage.(rest.NamedCreater)
      lister, isLister := storage.(rest.Lister)
      getter, isGetter := storage.(rest.Getter)
      getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
      gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
      collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
      updater, isUpdater := storage.(rest.Updater)
      patcher, isPatcher := storage.(rest.Patcher)
      watcher, isWatcher := storage.(rest.Watcher)
      connecter, isConnecter := storage.(rest.Connecter)
      storageMeta, isMetadata := storage.(rest.StorageMetadata)
      storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
      gvAcceptor, _ := storage.(rest.GroupVersionAcceptor)

注册CR的Handler处理函数

创建crdHandler搭配/apis,实现拓展REST接口

crdHandler的ServerHTTP实现:

func (r *versionDiscoveryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
pathParts := splitPath(req.URL.Path)
// only match /apis/<group>/<version>
if len(pathParts) != 3 || pathParts[0] != "apis" {
r.delegate.ServeHTTP(w, req)
return
}
discovery, ok := r.getDiscovery(schema.GroupVersion{Group: pathParts[1], Version: pathParts[2]})
if !ok {
r.delegate.ServeHTTP(w, req)
return
}

discovery.ServeHTTP(w, req)
}

配置PostStartHook配置钩子

APIExtensionsServer通过GenericAPIServer配置的PostStartHook钩子函数来实现

  • Ref:https://github.com/kubernetes/apiextensions-apiserver/blob/1ba7acf883e7790aea078187fed8e0b45e8c68c9/pkg/apiserver/apiserver.go#L222

    // start-apiextensions-controllers:启动客户端Informer,同步集群资源列表,开始Watch变化事件
    s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(hookContext genericapiserver.PostStartHookContext) error {
    s.Informers.Start(hookContext.Done())
    return nil
    })
    // start-apiextensions-controllers:启动CRD处理相关控制器
    s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(hookContext genericapiserver.PostStartHookContext) error {
    // OpenAPIVersionedService and StaticOpenAPISpec are populated in generic apiserver PrepareRun().
    // Together they serve the /openapi/v2 endpoint on a generic apiserver. A generic apiserver may
    // choose to not enable OpenAPI by having null openAPIConfig, and thus OpenAPIVersionedService
    // and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller.
    if s.GenericAPIServer.StaticOpenAPISpec != nil {
    if s.GenericAPIServer.OpenAPIVersionedService != nil {
    openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
    go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, hookContext.Done())
    }

    if s.GenericAPIServer.OpenAPIV3VersionedService != nil {
    openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
    go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, hookContext.Done())
    }
    }

    go namingController.Run(hookContext.Done())
    go establishingController.Run(hookContext.Done())
    go nonStructuralSchemaController.Run(5, hookContext.Done())
    go apiApprovalController.Run(5, hookContext.Done())
    go finalizingController.Run(5, hookContext.Done())

    discoverySyncedCh := make(chan struct{})
    go discoveryController.Run(hookContext.Done(), discoverySyncedCh)
    select {
    case <-hookContext.Done():
    case <-discoverySyncedCh:
    }

    return nil
    })
    // we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer
    // to sync makes sure that the lister will be valid before we begin. There may still be races for CRDs added after startup,
    // but we won't go healthy until we can handle the ones already present.
    // crd-informer-synced 等到Informer完成资源同步后,向GenericAPIServer发出信号,服务就绪,开始接收和处理客户端请求
    s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
    return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
    if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
    close(hasCRDInformerSyncedSignal)
    return true, nil
    }
    return false, nil
    }, context.Done())
    })

    return s, nil

核心Controller等及其功能

Controller名称 功能描述
discoveryController 自动发现集群中已经安装好的CRD资源列表,注册对应的HTTPHandler处理函数<apis/
establishingController 检查CRD是否处于活跃可用状态,自动设置其status.conditions字段
namingController 检查CRD是否存在命名冲突,自动设置其status.conditions字段
nonStructuralSchemaController 检查CRD结构是否正确,自动设置其status.conditions字段
apiAppovalController 检查CRD是是否遵循K8sAPI声明策略,自动设置status.conditions字段
finaliziongController 负责处理与CRD删除相关的清理工作,包含设置和移除CRD的Finializer,在删除CRD前确保相关CR对象已经被清理干净
openapiController 自动同步CRD变化到OpenAPI相关文档,可以通过/openapiv2访问
openapiv3Controller 自动同步CRD变化到OpenAPI文档,可以通过/openapiv3访问

创建KubeAPIServer

  1. <资源组>/<资源版本>/<资源>与资源存储对象的RESTStorage机械映射,存储到APIGroupInfo对象的VersionResourcesStorageMap字段中
  2. 通过installer.Install 为资源注册相关的HTTP Handler处理函数,完成参数绑定,为go-restful中的WebService注册对应的路由
  3. 把WebService对象添加到go-restful对象添加到go-restful Container中

创建GenericAPIServer实例

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controlplane/instance.go#L337

    // New returns a new instance of Master from the given config.
    // Certain config fields will be set to a default value if unset.
    // Certain config fields must be specified, including:
    //
    // KubeletClientConfig
    func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
    if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
    return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
    }

    s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
    if err != nil {
    return nil, err
    }
    ...

注册Log和OpenID路由

  • 通过启动参数--enable-logs-handler可以设置是否安装支持路由支持读取/logs/{logpath}读取kube-apiserver所在机器或者容器的/var/log目录下的日志文件

  • 通过routes.NewOpenIDMetadataServer(…)Install注册OpenID相关路由,提供OIDC服务

    • 路由/.well-know/openid-configuration用于OIDC服务发现,有名OIDC Discovery Doc
    • 路由/openid/v1/jwks:读取基于ServiceAccount的OpenIDJSON Web Key Set密钥集,用于Token验证
  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controlplane/instance.go#L342

    if c.ExtraConfig.EnableLogsSupport {
    routes.Logs{}.Install(s.Handler.GoRestfulContainer)
    }

    // Metadata and keys are expected to only change across restarts at present,
    // so we just marshal immediately and serve the cached JSON bytes.
    md, err := serviceaccount.NewOpenIDMetadata(
    c.ExtraConfig.ServiceAccountIssuerURL,
    c.ExtraConfig.ServiceAccountJWKSURI,
    c.GenericConfig.ExternalAddress,
    c.ExtraConfig.ServiceAccountPublicKeys,
    )

实例化Instance

m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}

InstallLegacyAPI注册/api资源路由

通过kubectl get --raw /api/v1可以获取/apiv1下所有的资源和子资源信息

InstallLegacyAPI func执行流程如下 :

  1. 通过legacyRESTStorgaeProvider.NewLegacyRESTStorage func实例化APIGroupInfo

  2. 通过m.GenericAPIServer.InstallLegcayAPIGroup func把APIGroupInfo对象中注册到KubeAPIServer HTTP Handler

  3. 通过m.GenericAPIServer.AddPostStartHookOrDie func注册到kube-apiserver,启动之后后置钩子bootstarp-controller,启动controller

InstallAPIs注册/apis资源路由

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controlplane/instance.go#L395

    // The order here is preserved in discovery.
    // If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
    // the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
    // This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
    // with specific priorities.
    // TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
    // handlers that we have.
    restStorageProviders := []RESTStorageProvider{
    apiserverinternalrest.StorageProvider{},
    authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
    authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
    autoscalingrest.RESTStorageProvider{},
    batchrest.RESTStorageProvider{},
    certificatesrest.RESTStorageProvider{},
    coordinationrest.RESTStorageProvider{},
    discoveryrest.StorageProvider{},
    networkingrest.RESTStorageProvider{},
    noderest.RESTStorageProvider{},
    policyrest.RESTStorageProvider{},
    rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
    schedulingrest.RESTStorageProvider{},
    storagerest.RESTStorageProvider{},
    flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
    // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
    // See https://github.com/kubernetes/kubernetes/issues/42392
    appsrest.StorageProvider{},
    admissionregistrationrest.RESTStorageProvider{},
    eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
    }
  • 通过kubectl get --raw /apis | jq可以获取/apis下的资源组列表信息

    ➜  ~ k get --raw /apis| jq
    {
    "kind": "APIGroupList",
    "apiVersion": "v1",
    "groups": [
    {
    "name": "apiregistration.k8s.io",
    "versions": [
    {
    "groupVersion": "apiregistration.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "apiregistration.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "apiregistration.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "apps",
    "versions": [
    {
    "groupVersion": "apps/v1",
    "version": "v1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "apps/v1",
    "version": "v1"
    }
    },
    {
    "name": "events.k8s.io",
    "versions": [
    {
    "groupVersion": "events.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "events.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "events.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "authentication.k8s.io",
    "versions": [
    {
    "groupVersion": "authentication.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "authentication.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "authentication.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "authorization.k8s.io",
    "versions": [
    {
    "groupVersion": "authorization.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "authorization.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "authorization.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "autoscaling",
    "versions": [
    {
    "groupVersion": "autoscaling/v1",
    "version": "v1"
    },
    {
    "groupVersion": "autoscaling/v2beta1",
    "version": "v2beta1"
    },
    {
    "groupVersion": "autoscaling/v2beta2",
    "version": "v2beta2"
    }
    ],
    "preferredVersion": {
    "groupVersion": "autoscaling/v1",
    "version": "v1"
    }
    },
    {
    "name": "batch",
    "versions": [
    {
    "groupVersion": "batch/v1",
    "version": "v1"
    },
    {
    "groupVersion": "batch/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "batch/v1",
    "version": "v1"
    }
    },
    {
    "name": "certificates.k8s.io",
    "versions": [
    {
    "groupVersion": "certificates.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "certificates.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "certificates.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "networking.k8s.io",
    "versions": [
    {
    "groupVersion": "networking.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "networking.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "networking.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "extensions",
    "versions": [
    {
    "groupVersion": "extensions/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "extensions/v1beta1",
    "version": "v1beta1"
    }
    },
    {
    "name": "policy",
    "versions": [
    {
    "groupVersion": "policy/v1",
    "version": "v1"
    },
    {
    "groupVersion": "policy/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "policy/v1",
    "version": "v1"
    }
    },
    {
    "name": "rbac.authorization.k8s.io",
    "versions": [
    {
    "groupVersion": "rbac.authorization.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "rbac.authorization.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "rbac.authorization.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "storage.k8s.io",
    "versions": [
    {
    "groupVersion": "storage.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "storage.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "storage.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "admissionregistration.k8s.io",
    "versions": [
    {
    "groupVersion": "admissionregistration.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "admissionregistration.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "admissionregistration.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "apiextensions.k8s.io",
    "versions": [
    {
    "groupVersion": "apiextensions.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "apiextensions.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "apiextensions.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "scheduling.k8s.io",
    "versions": [
    {
    "groupVersion": "scheduling.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "scheduling.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "scheduling.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "coordination.k8s.io",
    "versions": [
    {
    "groupVersion": "coordination.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "coordination.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "coordination.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "node.k8s.io",
    "versions": [
    {
    "groupVersion": "node.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "node.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "node.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "discovery.k8s.io",
    "versions": [
    {
    "groupVersion": "discovery.k8s.io/v1",
    "version": "v1"
    },
    {
    "groupVersion": "discovery.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "discovery.k8s.io/v1",
    "version": "v1"
    }
    },
    {
    "name": "flowcontrol.apiserver.k8s.io",
    "versions": [
    {
    "groupVersion": "flowcontrol.apiserver.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    ],
    "preferredVersion": {
    "groupVersion": "flowcontrol.apiserver.k8s.io/v1beta1",
    "version": "v1beta1"
    }
    }
    ]
    }
    ➜ ~

InstallAPIs func执行流程如下:

  1. 调用对应资源组的RESTStorageProvider的NewRESTStorage func,实例化所有已启动的资源组的APIGroupInfo
  2. 检查资源类型对象的RESTStorageProvider是否实现了genericapiserver.PostSTartHookProvider接口

几个常见的PostStartHook

资源组 PostStartHook名称 功能描述
rbac.authorization.k8s.io rbac/bootstrap-roles 确保系统核心RBAC资源正确存在
Scheduling.k8s.io Scheduing/bootstrap-system-priority-classes 确保系统PriorityClass资源正确存在,包含system-node-critial和system-cluster- critical
flowcontrol.apiserver.k8s.io Priority-and-fairness-config-producer 确保系统核心PriorityLevel Configuration和FlowSchema资源正确存在
  1. 通过m.GenericAPIServer.InstallAPIGroups func把APIGroupInfo对象中的<资源组>/<子资源>/<资源存储对象>注册到KubeAPIServerHTTP Handler

配置PostStartHook 后置钩子

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controlplane/instance.go#L421C1-L460C1

    m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
    kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
    if err != nil {
    return err
    }
    controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)

    // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
    // TODO: See if we can pass ctx to the current method
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
    select {
    case <-hookContext.StopCh:
    cancel() // stopCh closed, so cancel our context
    case <-ctx.Done():
    }
    }()

    // prime values and start listeners
    if m.ClusterAuthenticationInfo.ClientCA != nil {
    m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
    if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
    // runonce to be sure that we have a value.
    if err := controller.RunOnce(ctx); err != nil {
    runtime.HandleError(err)
    }
    go controller.Run(ctx, 1)
    }
    }
    if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
    m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
    if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
    // runonce to be sure that we have a value.
    if err := controller.RunOnce(ctx); err != nil {
    runtime.HandleError(err)
    }
    go controller.Run(ctx, 1)
    }
    }

创建AggregatorServer

创建GenericAPIServer实例

通过c.GenericConfig.New创建

实例化APIAggregator

  • Ref:https://github.com/kubernetes/kube-aggregator/blob/95365c33286680c2b01d372492e539bd068f5da8/pkg/apiserver/apiserver.go#L207

    s := &APIAggregator{
    GenericAPIServer: genericServer,
    delegateHandler: delegationTarget.UnprotectedHandler(),
    proxyTransport: c.ExtraConfig.ProxyTransport,
    proxyHandlers: map[string]*proxyHandler{},
    handledGroups: sets.String{},
    lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
    APIRegistrationInformers: informerFactory,
    serviceResolver: c.ExtraConfig.ServiceResolver,
    openAPIConfig: c.GenericConfig.OpenAPIConfig,
    openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
    egressSelector: c.GenericConfig.EgressSelector,
    proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
    rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
    }

实例化APIGroupInfo

  • Ref:https://github.com/kubernetes/kube-aggregator/blob/95365c33286680c2b01d372492e539bd068f5da8/pkg/registry/apiservice/rest/storage_apiservice.go#L33

    // NewRESTStorage returns an APIGroupInfo object that will work against apiservice.
    func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, shouldServeBeta bool) genericapiserver.APIGroupInfo {
    apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs)

    storage := map[string]rest.Storage{}

    if resource := "apiservices"; apiResourceConfigSource.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
    apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
    storage[resource] = apiServiceREST
    storage[resource+"/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
    }

    if len(storage) > 0 {
    apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
    }

    return apiGroupInfo
    }

InstallAPIGroup注册APIGroup

注册拓展APIServer Handler

配置PostStartHook后置钩子

  • Ref:https://github.com/kubernetes/kube-aggregator/blob/95365c33286680c2b01d372492e539bd068f5da8/pkg/apiserver/apiserver.go#L265

    // 	aggregator-reload-proxy-client-cert:如果启用了StorageVersionAPI和APIServerIdentity feature gate,则启动StorageVersionManager协程,每10min检查更新StorageVersion资源对象状态
    s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error {
    // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
    // TODO: See if we can pass ctx to the current method
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
    select {
    case <-postStartHookContext.StopCh:
    cancel() // stopCh closed, so cancel our context
    case <-ctx.Done():
    }
    }()
    go aggregatorProxyCerts.Run(ctx, 1)
    return nil
    })
    }

    availableController, err := statuscontrollers.NewAvailableConditionController(
    informerFactory.Apiregistration().V1().APIServices(),
    c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
    c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
    apiregistrationClient.ApiregistrationV1(),
    c.ExtraConfig.ProxyTransport,
    (func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
    s.serviceResolver,
    c.GenericConfig.EgressSelector,
    )
    if err != nil {
    return nil, err
    }
    // start-kube-aggregator-informers 通过--proxy-client-cert-filehe --proxy-client-key-file 启动参数指定了代理客户端证书,则启动该控制器,持续监听Watch证书文件的变化,重新载入证书,实现证书热更新
    s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
    informerFactory.Start(context.StopCh)
    c.GenericConfig.SharedInformerFactory.Start(context.StopCh)
    return nil
    })
    // apiservice-registration-controller: 启动客户端Informer,实现事件监听
    s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
    go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)
    select {
    case <-context.StopCh:
    case <-apiServiceRegistrationControllerInitiated:
    }

    return nil
    })
    // apiservice-status-available-controller 启动APIService RegistrationController,自动发现注册APIService确保kube- APIserver Aggregator能正确处理请求和转发请求
    s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
    // if we end up blocking for long periods of time, we may need to increase workers.
    go availableController.Run(5, context.StopCh)
    return nil
    })

    if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
    utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
    // Spawn a goroutine in aggregator apiserver to update storage version for
    // all built-in resources
    s.GenericAPIServer.AddPostStartHookOrDie(StorageVersionPostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {
    // Wait for apiserver-identity to exist first before updating storage
    // versions, to avoid storage version GC accidentally garbage-collecting
    // storage versions.
    kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
    if err != nil {
    return err
    }
    if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
    _, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(
    context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{})
    if apierrors.IsNotFound(err) {
    return false, nil
    }
    if err != nil {
    return false, err
    }
    return true, nil
    }, hookContext.StopCh); err != nil {
    return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",
    s.GenericAPIServer.APIServerID, err)
    }
    // Technically an apiserver only needs to update storage version once during bootstrap.
    // Reconcile StorageVersion objects every 10 minutes will help in the case that the
    // StorageVersion objects get accidentally modified/deleted by a different agent. In that
    // case, the reconciliation ensures future storage migration still works. If nothing gets
    // changed, the reconciliation update is a noop and gets short-circuited by the apiserver,
    // therefore won't change the resource version and trigger storage migration.
    go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {
    // All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
    // share the same generic apiserver config. The same StorageVersion manager is used
    // to register all built-in resources when the generic apiservers install APIs.
    s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
    return false, nil
    }, hookContext.StopCh)
    // Once the storage version updater finishes the first round of update,
    // the PostStartHook will return to unblock /healthz. The handler chain
    // won't block write requests anymore. Check every second since it's not
    // expensive.
    wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
    return s.GenericAPIServer.StorageVersionManager.Completed(), nil
    }, hookContext.StopCh)
    return nil
    })
    }

    return s, nil
    }

    // PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec and calling
    // the generic PrepareRun.
    func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
    // add post start hook before generic PrepareRun in order to be before /healthz installation
    if s.openAPIConfig != nil {
    // apiservice-openapi-controller 启动Available ConditionController,监听集群中APIService、Service等资源的变化,探测Aggregated APIServer可用性,自动更新APIService对象状态
    s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
    go s.openAPIAggregationController.Run(context.StopCh)
    return nil
    })
    }
    // apiservice-openapiv3-controller 启动CRD注册器,自动把CRD的GroupVersion同步为APIService的version.group资源对象
    if s.openAPIV3Config != nil && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {
    s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapiv3-controller", func(context genericapiserver.PostStartHookContext) error {
    go s.openAPIV3AggregationController.Run(context.StopCh)
    return nil
    })
    }

GenericAPIServer初始化

无论是APIExtensionServer,KubeAPIServer,还是AggregatorServer 底层都依赖于GenericAPIServer,GenericAPIServer启动HTTP Server承载服务

  • 通过

    curl -X PUT -k \
    --cert /etc/kubernetes/pki/apiserver-kubelet-client.crt \
    --key /etc/kubernetes/pki/apiserver-kubelet-client.key \
    https://127.0.0.1:6443/debug/flags/v -d "4"

准备和启动HTTPS服务

server.PrepareRun准备阶段

server.PrepareRun准备阶段主要包含俩个关键步骤

  1. 配置PostStartHook后置钩子

    由于kube-apiserver支持CRD和AggredatedAPIServer类型拓展,集群资源类型存在动态变化

    • Ref:https://github.com/kubernetes/kube-aggregator/blob/95365c33286680c2b01d372492e539bd068f5da8/pkg/apiserver/apiserver.go#L373

      // add post start hook before generic PrepareRun in order to be before /healthz installation
      if s.openAPIConfig != nil {
      // 聚合/openapi/v2接口
      s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
      go s.openAPIAggregationController.Run(context.StopCh)
      return nil
      })
      }

      if s.openAPIV3Config != nil && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {
      // 聚合/openapi/v3接口
      s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapiv3-controller", func(context genericapiserver.PostStartHookContext) error {
      go s.openAPIV3AggregationController.Run(context.StopCh)
      return nil
      })
      }
  2. 执行GenericAPIServer.PrepareRun预处理

    • 主要包含辅助类型的HTTP Handler,包含以下:

      • routes.OpenAPI

        • v2和v3版本的OpenAPI
      • installHealthz:安装健康检查的Handler

        [root@hcss-ecs-5425 ~]# kubectl get --raw / |grep healthz
        "/healthz",
        "/healthz/autoregister-completion",
        "/healthz/etcd",
        "/healthz/log",
        "/healthz/ping",
        "/healthz/poststarthook/aggregator-reload-proxy-client-cert",
        "/healthz/poststarthook/apiservice-openapi-controller",
        "/healthz/poststarthook/apiservice-registration-controller",
        "/healthz/poststarthook/apiservice-status-available-controller",
        "/healthz/poststarthook/bootstrap-controller",
        "/healthz/poststarthook/crd-informer-synced",
        "/healthz/poststarthook/generic-apiserver-start-informers",
        "/healthz/poststarthook/kube-apiserver-autoregistration",
        "/healthz/poststarthook/priority-and-fairness-config-consumer",
        "/healthz/poststarthook/priority-and-fairness-config-producer",
        "/healthz/poststarthook/priority-and-fairness-filter",
        "/healthz/poststarthook/rbac/bootstrap-roles",
        "/healthz/poststarthook/scheduling/bootstrap-system-priority-classes",
        "/healthz/poststarthook/start-apiextensions-controllers",
        "/healthz/poststarthook/start-apiextensions-informers",
        "/healthz/poststarthook/start-cluster-authentication-info-controller",
        "/healthz/poststarthook/start-kube-aggregator-informers",
        "/healthz/poststarthook/start-kube-apiserver-admission-initializer",
        [root@hcss-ecs-5425 ~]#
      • installLivez:安装存活探测Handler,用于kube-apiserver的存活检测,包含/live和/livez/前缀的检查项,默认情况所有的HealthCheck都会安装Livez探针

        [root@hcss-ecs-5425 ~]# kubectl get --raw / |grep livez
        "/livez",
        "/livez/autoregister-completion",
        "/livez/etcd",
        "/livez/log",
        "/livez/ping",
        "/livez/poststarthook/aggregator-reload-proxy-client-cert",
        "/livez/poststarthook/apiservice-openapi-controller",
        "/livez/poststarthook/apiservice-registration-controller",
        "/livez/poststarthook/apiservice-status-available-controller",
        "/livez/poststarthook/bootstrap-controller",
        "/livez/poststarthook/crd-informer-synced",
        "/livez/poststarthook/generic-apiserver-start-informers",
        "/livez/poststarthook/kube-apiserver-autoregistration",
        "/livez/poststarthook/priority-and-fairness-config-consumer",
        "/livez/poststarthook/priority-and-fairness-config-producer",
        "/livez/poststarthook/priority-and-fairness-filter",
        "/livez/poststarthook/rbac/bootstrap-roles",
        "/livez/poststarthook/scheduling/bootstrap-system-priority-classes",
        "/livez/poststarthook/start-apiextensions-controllers",
        "/livez/poststarthook/start-apiextensions-informers",
        "/livez/poststarthook/start-cluster-authentication-info-controller",
        "/livez/poststarthook/start-kube-aggregator-informers",
        "/livez/poststarthook/start-kube-apiserver-admission-initializer",
        [root@hcss-ecs-5425 ~]#
      • installReadyz:安装就绪探测Handler,用于kube-apiserver就绪探测,包括/readyz以及/readyz开头的检查项目,默认情况下,所有的HealthCheck都会同步安装Readyz探针,为了支持优雅停机,还有添加ShutdownCheck健康检查项目

        [root@hcss-ecs-5425 ~]# kubectl get --raw / |grep readyz
        "/readyz",
        "/readyz/autoregister-completion",
        "/readyz/etcd",
        "/readyz/informer-sync",
        "/readyz/log",
        "/readyz/ping",
        "/readyz/poststarthook/aggregator-reload-proxy-client-cert",
        "/readyz/poststarthook/apiservice-openapi-controller",
        "/readyz/poststarthook/apiservice-registration-controller",
        "/readyz/poststarthook/apiservice-status-available-controller",
        "/readyz/poststarthook/bootstrap-controller",
        "/readyz/poststarthook/crd-informer-synced",
        "/readyz/poststarthook/generic-apiserver-start-informers",
        "/readyz/poststarthook/kube-apiserver-autoregistration",
        "/readyz/poststarthook/priority-and-fairness-config-consumer",
        "/readyz/poststarthook/priority-and-fairness-config-producer",
        "/readyz/poststarthook/priority-and-fairness-filter",
        "/readyz/poststarthook/rbac/bootstrap-roles",
        "/readyz/poststarthook/scheduling/bootstrap-system-priority-classes",
        "/readyz/poststarthook/start-apiextensions-controllers",
        "/readyz/poststarthook/start-apiextensions-informers",
        "/readyz/poststarthook/start-cluster-authentication-info-controller",
        "/readyz/poststarthook/start-kube-aggregator-informers",
        "/readyz/poststarthook/start-kube-apiserver-admission-initializer",
        "/readyz/shutdown",
        [root@hcss-ecs-5425 ~]#
    • Ref:https://github.com/kubernetes/kube-aggregator/blob/95365c33286680c2b01d372492e539bd068f5da8/pkg/apiserver/apiserver.go#L386

      prepared := s.GenericAPIServer.PrepareRun()

prepared.Run执行阶段

  1. 注册优雅关闭信号处理

    真正启动HTTPS服务前,kube-apiserver会利用协程和defer机制注册和优雅关闭处理流程,在收到关闭信号之后,不会立即退出

  2. s.AuditBackend.Run启动审计后端服务

  3. s.NonBlockingRun启动HTTPS服务