K8s-kube-apiserver(启动流程)
K8s-kube-apiserver(启动流程)
基于1.25
APIServer启动流程:
- Schema资源注册
- Cobra命令桉树解析
- 创建API Server通用配置
- 创建API EXtensionsServer
- 创建KubeAPIServer
- 创建AggregatorServer
- GenericAPIServer初始化
- 准备和启动HTTPS服务
Scheme资源注册
使用Scheme所有资源注册到APIServer中
Scheme资源注册不算使用显式代码触发,通过init注册。
初始化Scheme资源注册表
-
var (
// Scheme is the default instance of runtime.Scheme to which types in the Kubernetes API are already registered.
// NOTE: If you are copying this file to start a new api group, STOP! Copy the
// extensions group instead. This Scheme is special and should appear ONLY in
// the api group, unless you really know what you're doing.
// TODO(lavalamp): make the above error impossible.
// 资源注册表
Scheme = runtime.NewScheme()
// Codecs provides access to encoding and decoding for the scheme
// Codecs编解码器
Codecs = serializer.NewCodecFactory(Scheme)
// ParameterCodec handles versioning of objects that are converted to query parameters.
// 参数编解码器
ParameterCodec = runtime.NewParameterCodec(Scheme)
)
把K8s核心资源注册到Scheme
kube-apisevrer注册倒入controlplane包,触发了install包的资源注册
-
import (
// These imports are the API groups the API server will support.
_ "k8s.io/kubernetes/pkg/apis/admission/install"
_ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
_ "k8s.io/kubernetes/pkg/apis/apiserverinternal/install"
_ "k8s.io/kubernetes/pkg/apis/apps/install"
_ "k8s.io/kubernetes/pkg/apis/authentication/install"
_ "k8s.io/kubernetes/pkg/apis/authorization/install"
_ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/certificates/install"
_ "k8s.io/kubernetes/pkg/apis/coordination/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
_ "k8s.io/kubernetes/pkg/apis/discovery/install"
_ "k8s.io/kubernetes/pkg/apis/events/install"
_ "k8s.io/kubernetes/pkg/apis/extensions/install"
_ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
_ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
_ "k8s.io/kubernetes/pkg/apis/networking/install"
_ "k8s.io/kubernetes/pkg/apis/node/install"
_ "k8s.io/kubernetes/pkg/apis/policy/install"
_ "k8s.io/kubernetes/pkg/apis/rbac/install"
_ "k8s.io/kubernetes/pkg/apis/scheduling/install"
_ "k8s.io/kubernetes/pkg/apis/storage/install"
)init包调用Install实现注册
Cobra命令行参数解析
-
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
s := options.NewServerRunOptions()
cmd := &cobra.Command{
Use: "kube-apiserver",
Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`
// stop printing usage when the command errors
SilenceUsage: true,
PersistentPreRunE: func(*cobra.Command, []string) error {
// silence client-go warnings.
// kube-apiserver loopback clients should not log self-issued warnings.
rest.SetDefaultWarningHandler(rest.NoWarnings{})
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()
// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
return err
}
cliflag.PrintFlags(fs)
// set default options
completedOptions, err := Complete(s)
if err != nil {
return err
}
// validate options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
fs := cmd.Flags()
namedFlagSets := s.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
return cmd
}
创建API Server通用配置
API Server通用配置设置了kube- APIserver不同模块实例化需要的配置
genericConfig实例化
-
...
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
...
if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
return
}
genericConfig.MergedReourceConfig指定启用/禁用GroupVersion或GroupVersionResource
如果没有使用
--runrime-config
指定启用或者禁用的内置API资源,则会使用内置的DefauleAPI ResourceConfigSourcefunc作为默认配置如果使用参数指定,kube-apiserver会默认该默认配置
默认情况,kube-apiserver仅启用stable稳定版本的API资源(为了兼容某些之前已经被启用的legacy APi也默认开启,但是新引入的beta api默认被禁用)
-
// DefaultAPIResourceConfigSource returns default configuration for an APIResource.
func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
ret := serverstorage.NewResourceConfig()
// NOTE: GroupVersions listed here will be enabled by default. Don't put alpha or beta versions in the list.
ret.EnableVersions(stableAPIGroupVersionsEnabledByDefault...)
// disable alpha and beta versions explicitly so we have a full list of what's possible to serve
ret.DisableVersions(betaAPIGroupVersionsDisabledByDefault...)
ret.DisableVersions(alphaAPIGroupVersionsDisabledByDefault...)
// enable the legacy beta resources that were present before stopped serving new beta APIs by default.
ret.EnableResources(legacyBetaEnabledByDefaultResources...)
return ret
}仅为KubeAPIServer配置,APIExtensionServer和AggregatorServer有各自独立的配置
HTTP Server运行参数配置
kube-apiserver通过一些了的APPlyTo对genericConfig进行赋值
-
if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
return
}
if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
return
}
if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil {
return
}
}
// wrap
Kube-apiserver通过请求代理获得与边缘节点通信的能力:
- Traces.ApplyTo的链路追踪使用OpenTelemetry-Go链路追踪,基于OTLP Span Exporter实现上报
OpenAPI/Swagger配置
kube-apiserver支持俩个版本的OpenAPI:oepnapiv2和oepnapiv3
默认使用v2
需要启用OpenAPIV3 Feature Gate才能使用V3
-
// wrap the definitions to revert any changes from disabled features
getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {
genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
}
StorageFactory etcd存储配置
kube-apiserver使用etcd分布式KV数据结构实现持久化存储,下面是配置过程
-
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
if err != nil {
lastErr = err
return
} K8s支持通过
--etcd-servers-overrides=/events#http://etcd-2:2379
把Events资源单独存储到其他集群,减小压力- 也可以通过StorageFactoryConfig配置
Authentication认证配置
kube-apiserver提供了多种方法的认证:
- RequestHeader
- ClientCA
- TokenAuth
- ServiceAccountAuth
- BootstrapToken
- OIDC
- WebhookTokenAuth
- Anonymous
Authorization配置
有很多授权策略,只要有一个授权成功就认为成功
授权需要实现Authorizer接口
-
// Authorizer makes an authorization decision based on information gained by making
// zero or more calls to methods of the Attributes interface. It returns nil when an action is
// authorized, otherwise it returns an error.
type Authorizer interface {
Authorize(ctx context.Context, a Attributes) (authorized Decision, reason string, err error)
}
Audit审计配置
kube-apiserver支持开启审计,记录集群中发生的活动记录
支持日志方法持久化审计事件
支持通过配置Webhook的方法持久化其他系统
可以通过
--audit-policy-file
参数传入一个配置文件制定Audit Policy审计策略-
func (o *AuditOptions) ApplyTo(
c *server.Config,
) error {
if o == nil {
return nil
}
if c == nil {
return fmt.Errorf("server config must be non-nil")
}
// 1. Build policy evaluator
evaluator, err := o.newPolicyRuleEvaluator()
if err != nil {
return err
}
// 2. Build log backend
var logBackend audit.Backend
w, err := o.LogOptions.getWriter()
if err != nil {
return err
}
if w != nil {
if evaluator == nil {
klog.V(2).Info("No audit policy file provided, no events will be recorded for log backend")
} else {
logBackend = o.LogOptions.newBackend(w)
}
}
// 3. Build webhook backend
var webhookBackend audit.Backend
if o.WebhookOptions.enabled() {
if evaluator == nil {
klog.V(2).Info("No audit policy file provided, no events will be recorded for webhook backend")
} else {
if c.EgressSelector != nil {
var egressDialer utilnet.DialFunc
egressDialer, err = c.EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext())
if err != nil {
return err
}
webhookBackend, err = o.WebhookOptions.newUntruncatedBackend(egressDialer)
} else {
webhookBackend, err = o.WebhookOptions.newUntruncatedBackend(nil)
}
if err != nil {
return err
}
}
}
groupVersion, err := schema.ParseGroupVersion(o.WebhookOptions.GroupVersionString)
if err != nil {
return err
}
// 4. Apply dynamic options.
var dynamicBackend audit.Backend
if webhookBackend != nil {
// if only webhook is enabled wrap it in the truncate options
dynamicBackend = o.WebhookOptions.TruncateOptions.wrapBackend(webhookBackend, groupVersion)
}
// 5. Set the policy rule evaluator
c.AuditPolicyRuleEvaluator = evaluator
// 6. Join the log backend with the webhooks
c.AuditBackend = appendBackend(logBackend, dynamicBackend)
if c.AuditBackend != nil {
klog.V(2).Infof("Using audit backend: %s", c.AuditBackend)
}
return nil
}
func (o *AuditOptions) newPolicyRuleEvaluator() (audit.PolicyRuleEvaluator, error) {
if o.PolicyFile == "" {
return nil, nil
}
p, err := policy.LoadPolicyFromFile(o.PolicyFile)
if err != nil {
return nil, fmt.Errorf("loading audit policy file: %v", err)
}
return policy.NewPolicyRuleEvaluator(p), nil
}
Admission准入控制器配置
通过认证和鉴权之后,经过准入控制器,资源对象才会被进入到持久化之前最后的拦截,包含校验、修改或拒绝等
kube-apiserver启动注册所有准入控制器插件,通过
admission.Plugins
进行注册-
// Factory is a function that returns an Interface for admission decisions.
// The config parameter provides an io.Reader handler to the factory in
// order to load specific configurations. If no configuration is provided
// the parameter is nil.
type Factory func(config io.Reader) (Interface, error)
type Plugins struct {
// 保护并发的安全 以键值对的形式保护registry并发安全,确保数据一致性
lock sync.Mutex
// key插件名称
registry map[string]Factory
}
-
准入控制器实现的接口
-
// Interface is an abstract, pluggable interface for Admission Control decisions.
type Interface interface {
// Handles returns true if this admission controller can handle the given operation
// where operation can be one of CREATE, UPDATE, DELETE, or CONNECT
Handles(operation Operation) bool
}
-
如果需要修改默认的准入控制器,需要通过
--enable-admission-plugins
或者--disable-admission-plugins
参数进行配置
AddPostStartHook添加后置钩子
在kube-apiserver启动之后执行,这个AddPostStartHook为start-kube-apiserver-admission-initializer
,主要用于一些控制器需要调谐的时候
创建APIExtensionServer
创建GenericAPIServer实例
创建GenericAPIServer实例大部分复用APIServer的通用配置
实例化CustomResourceDefinitions
APIExtensionServer通过Custom ResourceDefinitions对象进行管理
-
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
apiResourceConfig := c.GenericConfig.MergedResourceConfig
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
storage := map[string]rest.Storage{}
// customresourcedefinitions
if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage[resource] = customResourceDefinitionStorage
storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
}
实例化APIGroupInfo
APIGroupInfo描述K8sAPI资源组,包括资源组名称、版本优先级顺序、资源对象支持的编解码方法、不同版本的资源对象存储实现
-
// Info about an API group.
type APIGroupInfo struct {
PrioritizedVersions []schema.GroupVersion
// Info about the resources in this group. It's a map from version to resource to the storage.
VersionedResourcesStorageMap map[string]map[string]rest.Storage
// OptionsExternalVersion controls the APIVersion used for common objects in the
// schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects.
// If nil, defaults to groupMeta.GroupVersion.
// TODO: Remove this when https://github.com/kubernetes/kubernetes/issues/19018 is fixed.
OptionsExternalVersion *schema.GroupVersion
// MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
// common API implementations like ListOptions. Future changes will allow this to vary by group
// version (for when the inevitable meta/v2 group emerges).
MetaGroupVersion *schema.GroupVersion
// Scheme includes all of the types used by this group and how to convert between them (or
// to convert objects from outside of this group that are accepted in this API).
// TODO: replace with interfaces
Scheme *runtime.Scheme
// NegotiatedSerializer controls how this group encodes and decodes data
NegotiatedSerializer runtime.NegotiatedSerializer
// ParameterCodec performs conversions for query parameters passed to API calls
ParameterCodec runtime.ParameterCodec
// StaticOpenAPISpec is the spec derived from the definitions of all resources installed together.
// It is set during InstallAPIGroups, InstallAPIGroup, and InstallLegacyAPIGroup.
StaticOpenAPISpec map[string]*spec.Schema
}
InstallAPIGroup注册APIGroup
InstallAPIGroup实现了对APIExtensionsServer对路由注册功能
InstallAPIGroup把APIGroupInfo中的<资源组></资源版本><资源/子资源></资源存储对象>注册到APIExentsionsServer HTTP Handler,主要过程遍历APIGroupInfo的PriorizizedVersion资源版本映射到HTTP路由
通过InstallREST func实现封装请求处理的Handler
-
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
aggregatedDiscoveryResources, err := ConvertGroupVersionIntoToDiscovery(apiResources)
if err != nil {
registrationErrors = append(registrationErrors, err)
}
return aggregatedDiscoveryResources, removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
-
InstallREST func接收restful.Container指针对象,路由安装分以下几步
- 通过prefix定义HTTP Path请求路径,其路径为
<APIGroupPrefix>/<Group>/><Version>
,这里即位/api/apiextensions.k8s.io/v1
- 实例化APIInstaller对象
- installer.Install会实例一个restful.WebService对象,遍历APIGroupVersion定义的资源列表,把资源和与之对应的Handler处理函数注册到WebService Route路由
- 通过container.Add func把WebService添加到go-restful Container
- 通过prefix定义HTTP Path请求路径,其路径为
使用了路由判断资源类型的RESTful接口
-
// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
gvAcceptor, _ := storage.(rest.GroupVersionAcceptor)
注册CR的Handler处理函数
创建crdHandler搭配/apis,实现拓展REST接口
crdHandler的ServerHTTP实现:
func (r *versionDiscoveryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { |
配置PostStartHook配置钩子
APIExtensionsServer通过GenericAPIServer配置的PostStartHook钩子函数来实现
-
// start-apiextensions-controllers:启动客户端Informer,同步集群资源列表,开始Watch变化事件
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(hookContext genericapiserver.PostStartHookContext) error {
s.Informers.Start(hookContext.Done())
return nil
})
// start-apiextensions-controllers:启动CRD处理相关控制器
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(hookContext genericapiserver.PostStartHookContext) error {
// OpenAPIVersionedService and StaticOpenAPISpec are populated in generic apiserver PrepareRun().
// Together they serve the /openapi/v2 endpoint on a generic apiserver. A generic apiserver may
// choose to not enable OpenAPI by having null openAPIConfig, and thus OpenAPIVersionedService
// and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller.
if s.GenericAPIServer.StaticOpenAPISpec != nil {
if s.GenericAPIServer.OpenAPIVersionedService != nil {
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, hookContext.Done())
}
if s.GenericAPIServer.OpenAPIV3VersionedService != nil {
openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, hookContext.Done())
}
}
go namingController.Run(hookContext.Done())
go establishingController.Run(hookContext.Done())
go nonStructuralSchemaController.Run(5, hookContext.Done())
go apiApprovalController.Run(5, hookContext.Done())
go finalizingController.Run(5, hookContext.Done())
discoverySyncedCh := make(chan struct{})
go discoveryController.Run(hookContext.Done(), discoverySyncedCh)
select {
case <-hookContext.Done():
case <-discoverySyncedCh:
}
return nil
})
// we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer
// to sync makes sure that the lister will be valid before we begin. There may still be races for CRDs added after startup,
// but we won't go healthy until we can handle the ones already present.
// crd-informer-synced 等到Informer完成资源同步后,向GenericAPIServer发出信号,服务就绪,开始接收和处理客户端请求
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
close(hasCRDInformerSyncedSignal)
return true, nil
}
return false, nil
}, context.Done())
})
return s, nil
核心Controller等及其功能
Controller名称 | 功能描述 |
---|---|
discoveryController | 自动发现集群中已经安装好的CRD资源列表,注册对应的HTTPHandler处理函数<apis/ |
establishingController | 检查CRD是否处于活跃可用状态,自动设置其status.conditions字段 |
namingController | 检查CRD是否存在命名冲突,自动设置其status.conditions字段 |
nonStructuralSchemaController | 检查CRD结构是否正确,自动设置其status.conditions字段 |
apiAppovalController | 检查CRD是是否遵循K8sAPI声明策略,自动设置status.conditions字段 |
finaliziongController | 负责处理与CRD删除相关的清理工作,包含设置和移除CRD的Finializer,在删除CRD前确保相关CR对象已经被清理干净 |
openapiController | 自动同步CRD变化到OpenAPI相关文档,可以通过/openapiv2访问 |
openapiv3Controller | 自动同步CRD变化到OpenAPI文档,可以通过/openapiv3访问 |
创建KubeAPIServer
- <资源组>/<资源版本>/<资源>与资源存储对象的RESTStorage机械映射,存储到APIGroupInfo对象的VersionResourcesStorageMap字段中
- 通过installer.Install 为资源注册相关的HTTP Handler处理函数,完成参数绑定,为go-restful中的WebService注册对应的路由
- 把WebService对象添加到go-restful对象添加到go-restful Container中
创建GenericAPIServer实例
-
// New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
//
// KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
...
注册Log和OpenID路由
通过启动参数
--enable-logs-handler
可以设置是否安装支持路由支持读取/logs/{logpath}
读取kube-apiserver所在机器或者容器的/var/log
目录下的日志文件通过routes.NewOpenIDMetadataServer(…)Install注册OpenID相关路由,提供OIDC服务
- 路由
/.well-know/openid-configuration
用于OIDC服务发现,有名OIDC Discovery Doc - 路由
/openid/v1/jwks
:读取基于ServiceAccount的OpenIDJSON Web Key Set密钥集,用于Token验证
- 路由
-
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
// Metadata and keys are expected to only change across restarts at present,
// so we just marshal immediately and serve the cached JSON bytes.
md, err := serviceaccount.NewOpenIDMetadata(
c.ExtraConfig.ServiceAccountIssuerURL,
c.ExtraConfig.ServiceAccountJWKSURI,
c.GenericConfig.ExternalAddress,
c.ExtraConfig.ServiceAccountPublicKeys,
)
实例化Instance
KubeAPIServer通过Instacne对象进行管理,Instance对象是对GenericAPIServer的拓展
m := &Instance{ |
InstallLegacyAPI注册/api资源路由
-
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter); err != nil {
return nil, err
}
通过kubectl get --raw /api/v1
可以获取/apiv1
下所有的资源和子资源信息
InstallLegacyAPI func执行流程如下 :
通过legacyRESTStorgaeProvider.NewLegacyRESTStorage func实例化APIGroupInfo
通过m.GenericAPIServer.InstallLegcayAPIGroup func把APIGroupInfo对象中注册到KubeAPIServer HTTP Handler
通过m.GenericAPIServer.AddPostStartHookOrDie func注册到kube-apiserver,启动之后后置钩子bootstarp-controller,启动controller
-
controllerName := "bootstrap-controller"
client := kubernetes.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, client)
if err != nil {
return fmt.Errorf("error creating bootstrap controller: %v", err)
}
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIS
-
InstallAPIs注册/apis资源路由
-
// The order here is preserved in discovery.
// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
// This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
// with specific priorities.
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
restStorageProviders := []RESTStorageProvider{
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
} 通过
kubectl get --raw /apis | jq
可以获取/apis
下的资源组列表信息➜ ~ k get --raw /apis| jq
{
"kind": "APIGroupList",
"apiVersion": "v1",
"groups": [
{
"name": "apiregistration.k8s.io",
"versions": [
{
"groupVersion": "apiregistration.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "apiregistration.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "apiregistration.k8s.io/v1",
"version": "v1"
}
},
{
"name": "apps",
"versions": [
{
"groupVersion": "apps/v1",
"version": "v1"
}
],
"preferredVersion": {
"groupVersion": "apps/v1",
"version": "v1"
}
},
{
"name": "events.k8s.io",
"versions": [
{
"groupVersion": "events.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "events.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "events.k8s.io/v1",
"version": "v1"
}
},
{
"name": "authentication.k8s.io",
"versions": [
{
"groupVersion": "authentication.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "authentication.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "authentication.k8s.io/v1",
"version": "v1"
}
},
{
"name": "authorization.k8s.io",
"versions": [
{
"groupVersion": "authorization.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "authorization.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "authorization.k8s.io/v1",
"version": "v1"
}
},
{
"name": "autoscaling",
"versions": [
{
"groupVersion": "autoscaling/v1",
"version": "v1"
},
{
"groupVersion": "autoscaling/v2beta1",
"version": "v2beta1"
},
{
"groupVersion": "autoscaling/v2beta2",
"version": "v2beta2"
}
],
"preferredVersion": {
"groupVersion": "autoscaling/v1",
"version": "v1"
}
},
{
"name": "batch",
"versions": [
{
"groupVersion": "batch/v1",
"version": "v1"
},
{
"groupVersion": "batch/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "batch/v1",
"version": "v1"
}
},
{
"name": "certificates.k8s.io",
"versions": [
{
"groupVersion": "certificates.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "certificates.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "certificates.k8s.io/v1",
"version": "v1"
}
},
{
"name": "networking.k8s.io",
"versions": [
{
"groupVersion": "networking.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "networking.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "networking.k8s.io/v1",
"version": "v1"
}
},
{
"name": "extensions",
"versions": [
{
"groupVersion": "extensions/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "extensions/v1beta1",
"version": "v1beta1"
}
},
{
"name": "policy",
"versions": [
{
"groupVersion": "policy/v1",
"version": "v1"
},
{
"groupVersion": "policy/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "policy/v1",
"version": "v1"
}
},
{
"name": "rbac.authorization.k8s.io",
"versions": [
{
"groupVersion": "rbac.authorization.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "rbac.authorization.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "rbac.authorization.k8s.io/v1",
"version": "v1"
}
},
{
"name": "storage.k8s.io",
"versions": [
{
"groupVersion": "storage.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "storage.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "storage.k8s.io/v1",
"version": "v1"
}
},
{
"name": "admissionregistration.k8s.io",
"versions": [
{
"groupVersion": "admissionregistration.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "admissionregistration.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "admissionregistration.k8s.io/v1",
"version": "v1"
}
},
{
"name": "apiextensions.k8s.io",
"versions": [
{
"groupVersion": "apiextensions.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "apiextensions.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "apiextensions.k8s.io/v1",
"version": "v1"
}
},
{
"name": "scheduling.k8s.io",
"versions": [
{
"groupVersion": "scheduling.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "scheduling.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "scheduling.k8s.io/v1",
"version": "v1"
}
},
{
"name": "coordination.k8s.io",
"versions": [
{
"groupVersion": "coordination.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "coordination.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "coordination.k8s.io/v1",
"version": "v1"
}
},
{
"name": "node.k8s.io",
"versions": [
{
"groupVersion": "node.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "node.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "node.k8s.io/v1",
"version": "v1"
}
},
{
"name": "discovery.k8s.io",
"versions": [
{
"groupVersion": "discovery.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "discovery.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "discovery.k8s.io/v1",
"version": "v1"
}
},
{
"name": "flowcontrol.apiserver.k8s.io",
"versions": [
{
"groupVersion": "flowcontrol.apiserver.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "flowcontrol.apiserver.k8s.io/v1beta1",
"version": "v1beta1"
}
}
]
}
➜ ~
InstallAPIs func执行流程如下:
- 调用对应资源组的RESTStorageProvider的NewRESTStorage func,实例化所有已启动的资源组的APIGroupInfo
- 检查资源类型对象的RESTStorageProvider是否实现了genericapiserver.PostSTartHookProvider接口
几个常见的PostStartHook
资源组 | PostStartHook名称 | 功能描述 |
---|---|---|
rbac.authorization.k8s.io | rbac/bootstrap-roles | 确保系统核心RBAC资源正确存在 |
Scheduling.k8s.io | Scheduing/bootstrap-system-priority-classes | 确保系统PriorityClass资源正确存在,包含system-node-critial和system-cluster- critical |
flowcontrol.apiserver.k8s.io | Priority-and-fairness-config-producer | 确保系统核心PriorityLevel Configuration和FlowSchema资源正确存在 |
- 通过m.GenericAPIServer.InstallAPIGroups func把APIGroupInfo对象中的<资源组>/<子资源>/<资源存储对象>注册到KubeAPIServerHTTP Handler
配置PostStartHook 后置钩子
-
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-hookContext.StopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
// prime values and start listeners
if m.ClusterAuthenticationInfo.ClientCA != nil {
m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
}
}
if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
}
}
创建AggregatorServer
创建GenericAPIServer实例
通过c.GenericConfig.New创建
实例化APIAggregator
-
s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: c.GenericConfig.OpenAPIConfig,
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
}
实例化APIGroupInfo
-
// NewRESTStorage returns an APIGroupInfo object that will work against apiservice.
func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, shouldServeBeta bool) genericapiserver.APIGroupInfo {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs)
storage := map[string]rest.Storage{}
if resource := "apiservices"; apiResourceConfigSource.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
storage[resource] = apiServiceREST
storage[resource+"/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
}
if len(storage) > 0 {
apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
}
return apiGroupInfo
}
InstallAPIGroup注册APIGroup
负责管理apiregistration.k8s.io的资源,通过
kubectl get --raw /apis/apiregistration.k8s.io/v1|jq
获取数据-
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, resourceExpirationEvaluator.ShouldServeForVersion(1, 22))
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
注册拓展APIServer Handler
-
apisHandler := &apisHandler{
codecs: aggregatorscheme.Codecs,
lister: s.lister,
discoveryGroup: discoveryGroup(enabledVersions),
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
配置PostStartHook后置钩子
-
// aggregator-reload-proxy-client-cert:如果启用了StorageVersionAPI和APIServerIdentity feature gate,则启动StorageVersionManager协程,每10min检查更新StorageVersion资源对象状态
s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error {
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-postStartHookContext.StopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
go aggregatorProxyCerts.Run(ctx, 1)
return nil
})
}
availableController, err := statuscontrollers.NewAvailableConditionController(
informerFactory.Apiregistration().V1().APIServices(),
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
apiregistrationClient.ApiregistrationV1(),
c.ExtraConfig.ProxyTransport,
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
s.serviceResolver,
c.GenericConfig.EgressSelector,
)
if err != nil {
return nil, err
}
// start-kube-aggregator-informers 通过--proxy-client-cert-filehe --proxy-client-key-file 启动参数指定了代理客户端证书,则启动该控制器,持续监听Watch证书文件的变化,重新载入证书,实现证书热更新
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.StopCh)
c.GenericConfig.SharedInformerFactory.Start(context.StopCh)
return nil
})
// apiservice-registration-controller: 启动客户端Informer,实现事件监听
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)
select {
case <-context.StopCh:
case <-apiServiceRegistrationControllerInitiated:
}
return nil
})
// apiservice-status-available-controller 启动APIService RegistrationController,自动发现注册APIService确保kube- APIserver Aggregator能正确处理请求和转发请求
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
// if we end up blocking for long periods of time, we may need to increase workers.
go availableController.Run(5, context.StopCh)
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
// Spawn a goroutine in aggregator apiserver to update storage version for
// all built-in resources
s.GenericAPIServer.AddPostStartHookOrDie(StorageVersionPostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {
// Wait for apiserver-identity to exist first before updating storage
// versions, to avoid storage version GC accidentally garbage-collecting
// storage versions.
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
_, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(
context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}, hookContext.StopCh); err != nil {
return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",
s.GenericAPIServer.APIServerID, err)
}
// Technically an apiserver only needs to update storage version once during bootstrap.
// Reconcile StorageVersion objects every 10 minutes will help in the case that the
// StorageVersion objects get accidentally modified/deleted by a different agent. In that
// case, the reconciliation ensures future storage migration still works. If nothing gets
// changed, the reconciliation update is a noop and gets short-circuited by the apiserver,
// therefore won't change the resource version and trigger storage migration.
go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {
// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
// share the same generic apiserver config. The same StorageVersion manager is used
// to register all built-in resources when the generic apiservers install APIs.
s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
return false, nil
}, hookContext.StopCh)
// Once the storage version updater finishes the first round of update,
// the PostStartHook will return to unblock /healthz. The handler chain
// won't block write requests anymore. Check every second since it's not
// expensive.
wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
return s.GenericAPIServer.StorageVersionManager.Completed(), nil
}, hookContext.StopCh)
return nil
})
}
return s, nil
}
// PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec and calling
// the generic PrepareRun.
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
// add post start hook before generic PrepareRun in order to be before /healthz installation
if s.openAPIConfig != nil {
// apiservice-openapi-controller 启动Available ConditionController,监听集群中APIService、Service等资源的变化,探测Aggregated APIServer可用性,自动更新APIService对象状态
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIAggregationController.Run(context.StopCh)
return nil
})
}
// apiservice-openapiv3-controller 启动CRD注册器,自动把CRD的GroupVersion同步为APIService的version.group资源对象
if s.openAPIV3Config != nil && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapiv3-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIV3AggregationController.Run(context.StopCh)
return nil
})
}
GenericAPIServer初始化
无论是APIExtensionServer,KubeAPIServer,还是AggregatorServer 底层都依赖于GenericAPIServer,GenericAPIServer启动HTTP Server承载服务
通过
curl -X PUT -k \
--cert /etc/kubernetes/pki/apiserver-kubelet-client.crt \
--key /etc/kubernetes/pki/apiserver-kubelet-client.key \
https://127.0.0.1:6443/debug/flags/v -d "4"
准备和启动HTTPS服务
server.PrepareRun准备阶段
server.PrepareRun准备阶段主要包含俩个关键步骤
配置PostStartHook后置钩子
由于kube-apiserver支持CRD和AggredatedAPIServer类型拓展,集群资源类型存在动态变化
-
// add post start hook before generic PrepareRun in order to be before /healthz installation
if s.openAPIConfig != nil {
// 聚合/openapi/v2接口
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIAggregationController.Run(context.StopCh)
return nil
})
}
if s.openAPIV3Config != nil && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {
// 聚合/openapi/v3接口
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapiv3-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIV3AggregationController.Run(context.StopCh)
return nil
})
}
-
执行GenericAPIServer.PrepareRun预处理
主要包含辅助类型的HTTP Handler,包含以下:
routes.OpenAPI
- v2和v3版本的OpenAPI
installHealthz:安装健康检查的Handler
[root@hcss-ecs-5425 ~]# kubectl get --raw / |grep healthz
"/healthz",
"/healthz/autoregister-completion",
"/healthz/etcd",
"/healthz/log",
"/healthz/ping",
"/healthz/poststarthook/aggregator-reload-proxy-client-cert",
"/healthz/poststarthook/apiservice-openapi-controller",
"/healthz/poststarthook/apiservice-registration-controller",
"/healthz/poststarthook/apiservice-status-available-controller",
"/healthz/poststarthook/bootstrap-controller",
"/healthz/poststarthook/crd-informer-synced",
"/healthz/poststarthook/generic-apiserver-start-informers",
"/healthz/poststarthook/kube-apiserver-autoregistration",
"/healthz/poststarthook/priority-and-fairness-config-consumer",
"/healthz/poststarthook/priority-and-fairness-config-producer",
"/healthz/poststarthook/priority-and-fairness-filter",
"/healthz/poststarthook/rbac/bootstrap-roles",
"/healthz/poststarthook/scheduling/bootstrap-system-priority-classes",
"/healthz/poststarthook/start-apiextensions-controllers",
"/healthz/poststarthook/start-apiextensions-informers",
"/healthz/poststarthook/start-cluster-authentication-info-controller",
"/healthz/poststarthook/start-kube-aggregator-informers",
"/healthz/poststarthook/start-kube-apiserver-admission-initializer",
[root@hcss-ecs-5425 ~]#installLivez:安装存活探测Handler,用于kube-apiserver的存活检测,包含/live和/livez/前缀的检查项,默认情况所有的HealthCheck都会安装Livez探针
[root@hcss-ecs-5425 ~]# kubectl get --raw / |grep livez
"/livez",
"/livez/autoregister-completion",
"/livez/etcd",
"/livez/log",
"/livez/ping",
"/livez/poststarthook/aggregator-reload-proxy-client-cert",
"/livez/poststarthook/apiservice-openapi-controller",
"/livez/poststarthook/apiservice-registration-controller",
"/livez/poststarthook/apiservice-status-available-controller",
"/livez/poststarthook/bootstrap-controller",
"/livez/poststarthook/crd-informer-synced",
"/livez/poststarthook/generic-apiserver-start-informers",
"/livez/poststarthook/kube-apiserver-autoregistration",
"/livez/poststarthook/priority-and-fairness-config-consumer",
"/livez/poststarthook/priority-and-fairness-config-producer",
"/livez/poststarthook/priority-and-fairness-filter",
"/livez/poststarthook/rbac/bootstrap-roles",
"/livez/poststarthook/scheduling/bootstrap-system-priority-classes",
"/livez/poststarthook/start-apiextensions-controllers",
"/livez/poststarthook/start-apiextensions-informers",
"/livez/poststarthook/start-cluster-authentication-info-controller",
"/livez/poststarthook/start-kube-aggregator-informers",
"/livez/poststarthook/start-kube-apiserver-admission-initializer",
[root@hcss-ecs-5425 ~]#installReadyz:安装就绪探测Handler,用于kube-apiserver就绪探测,包括/readyz以及/readyz开头的检查项目,默认情况下,所有的HealthCheck都会同步安装Readyz探针,为了支持优雅停机,还有添加ShutdownCheck健康检查项目
[root@hcss-ecs-5425 ~]# kubectl get --raw / |grep readyz
"/readyz",
"/readyz/autoregister-completion",
"/readyz/etcd",
"/readyz/informer-sync",
"/readyz/log",
"/readyz/ping",
"/readyz/poststarthook/aggregator-reload-proxy-client-cert",
"/readyz/poststarthook/apiservice-openapi-controller",
"/readyz/poststarthook/apiservice-registration-controller",
"/readyz/poststarthook/apiservice-status-available-controller",
"/readyz/poststarthook/bootstrap-controller",
"/readyz/poststarthook/crd-informer-synced",
"/readyz/poststarthook/generic-apiserver-start-informers",
"/readyz/poststarthook/kube-apiserver-autoregistration",
"/readyz/poststarthook/priority-and-fairness-config-consumer",
"/readyz/poststarthook/priority-and-fairness-config-producer",
"/readyz/poststarthook/priority-and-fairness-filter",
"/readyz/poststarthook/rbac/bootstrap-roles",
"/readyz/poststarthook/scheduling/bootstrap-system-priority-classes",
"/readyz/poststarthook/start-apiextensions-controllers",
"/readyz/poststarthook/start-apiextensions-informers",
"/readyz/poststarthook/start-cluster-authentication-info-controller",
"/readyz/poststarthook/start-kube-aggregator-informers",
"/readyz/poststarthook/start-kube-apiserver-admission-initializer",
"/readyz/shutdown",
[root@hcss-ecs-5425 ~]#
-
prepared := s.GenericAPIServer.PrepareRun()
prepared.Run执行阶段
注册优雅关闭信号处理
真正启动HTTPS服务前,kube-apiserver会利用协程和defer机制注册和优雅关闭处理流程,在收到关闭信号之后,不会立即退出
s.AuditBackend.Run启动审计后端服务
-
// Start the audit backend before any request comes in. This means we must call Backend.Run
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
// AuditBackend.Run will stop as soon as all in-flight requests are drained.
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(drainedCh.Signaled()); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}
-
s.NonBlockingRun启动HTTPS服务
-
stoppedCh, listenerStoppedCh, err := s.NonBlockingRunWithContext(stopHTTPServerCtx, shutdownTimeout)
if err != nil {
return err
}
-