K8s-kubelet(Overview)
K8s-kubelet(Overview)
基于1.25
kubelete的启动流程主要分为5个步骤:
- Cobra命令参数解析
- 运行环境检测和设置
- Kubelet对象实例化
- 启动kubelet主服务
- 启动HTTP Server服务和gRPC Server
Cobra命令行参数解析
-
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
kubeletFlags := options.NewKubeletFlags()
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
klog.ErrorS(err, "Failed to create a new kubelet configuration")
os.Exit(1)
}
cmd := &cobra.Command{
Use: componentKubelet,
Long: `The kubelet is the primary "node agent" that runs on each
node. It can register the node with the apiserver using one of: the hostname; a flag to
override the hostname; or specific logic for a cloud provider.
The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
that describes a pod. The kubelet takes a set of PodSpecs that are provided through
various mechanisms (primarily through the apiserver) and ensures that the containers
described in those PodSpecs are running and healthy. The kubelet doesn't manage
containers which were not created by Kubernetes.
Other than from an PodSpec from the apiserver, there are three ways that a container
manifest can be provided to the Kubelet.
File: Path passed as a flag on the command line. Files under this path will be monitored
periodically for updates. The monitoring period is 20s by default and is configurable
via a flag.
HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
is checked every 20 seconds (also configurable with a flag).
HTTP server: The kubelet can also listen for HTTP and respond to a simple API
(underspec'd currently) to submit a new manifest.`,
// The Kubelet has special flag parsing requirements to enforce flag precedence rules,
// so we do all our parsing manually in Run, below.
// DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
// `args` arg to Run, without Cobra's interference.
DisableFlagParsing: true,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
// initial flag parse, since we disable cobra's flag parsing
if err := cleanFlagSet.Parse(args); err != nil {
return fmt.Errorf("failed to parse kubelet flag: %w", err)
}
// check if there are non-flag arguments in the command line
cmds := cleanFlagSet.Args()
if len(cmds) > 0 {
return fmt.Errorf("unknown command %+s", cmds[0])
}
// short-circuit on help
help, err := cleanFlagSet.GetBool("help")
if err != nil {
return errors.New(`"help" flag is non-bool, programmer error, please correct`)
}
if help {
return cmd.Help()
}
// short-circuit on verflag
verflag.PrintAndExitIfRequested()
// set feature gates from initial flags-based config
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
return fmt.Errorf("failed to set feature gates from initial flags-based config: %w", err)
}
// validate the initial KubeletFlags
if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
return fmt.Errorf("failed to validate kubelet flags: %w", err)
}
if cleanFlagSet.Changed("pod-infra-container-image") {
klog.InfoS("--pod-infra-container-image will not be pruned by the image garbage collector in kubelet and should also be set in the remote runtime")
}
// load kubelet config file, if provided
if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
kubeletConfig, err = loadConfigFile(configFile)
if err != nil {
return fmt.Errorf("failed to load kubelet config file, error: %w, path: %s", err, configFile)
}
// We must enforce flag precedence by re-parsing the command line into the new object.
// This is necessary to preserve backwards-compatibility across binary upgrades.
// See issue #56171 for more details.
if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
return fmt.Errorf("failed to precedence kubeletConfigFlag: %w", err)
}
// update feature gates based on new config
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
return fmt.Errorf("failed to set feature gates from initial flags-based config: %w", err)
}
}
// Config and flags parsed, now we can initialize logging.
logs.InitLogs()
if err := logsapi.ValidateAndApplyAsField(&kubeletConfig.Logging, utilfeature.DefaultFeatureGate, field.NewPath("logging")); err != nil {
return fmt.Errorf("initialize logging: %v", err)
}
cliflag.PrintFlags(cleanFlagSet)
// We always validate the local configuration (command line + config file).
// This is the default "last-known-good" config for dynamic config, and must always remain valid.
if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig, utilfeature.DefaultFeatureGate); err != nil {
return fmt.Errorf("failed to validate kubelet configuration, error: %w, path: %s", err, kubeletConfig)
}
if (kubeletConfig.KubeletCgroups != "" && kubeletConfig.KubeReservedCgroup != "") && (strings.Index(kubeletConfig.KubeletCgroups, kubeletConfig.KubeReservedCgroup) != 0) {
klog.InfoS("unsupported configuration:KubeletCgroups is not within KubeReservedCgroup")
}
// The features.DynamicKubeletConfig is locked to false,
// feature gate is not locked using the LockedToDefault flag
// to make sure node authorizer can keep working with the older nodes
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
return fmt.Errorf("cannot set feature gate %v to %v, feature is locked to %v", features.DynamicKubeletConfig, true, false)
}
// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
return fmt.Errorf("failed to construct kubelet dependencies: %w", err)
}
if err := checkPermissions(); err != nil {
klog.ErrorS(err, "kubelet running with insufficient permissions")
}
// make the kubelet's config safe for logging
config := kubeletServer.KubeletConfiguration.DeepCopy()
for k := range config.StaticPodURLHeader {
config.StaticPodURLHeader[k] = []string{"<masked>"}
}
// log the kubelet's config for inspection
klog.V(5).InfoS("KubeletConfiguration", "configuration", config)
// set up signal context for kubelet shutdown
ctx := genericapiserver.SetupSignalContext()
// run the kubelet
return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
},
}
// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
kubeletFlags.AddFlags(cleanFlagSet)
options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
options.AddGlobalFlags(cleanFlagSet)
cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
cmd.SetUsageFunc(func(cmd *cobra.Command) error {
fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
return nil
})
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
})
return cmd
}
运行环境检测与设置
- Run func 执行initForOS func为kubelet设置WIndowsService守护进程(仅Windows)
- 然后调用Run,执行依赖初始化环境检查和设置等工作
- RunKubelet func启动kubelet主服务,并且启动Healthz HTTP Server
主要流程如下:
kubelet根据是否传入kubeconfig,判断运行模式
- 首次按照启动,kubelet还没有启动,获取到集群凭证,所以进在standone模式运行下,支持启动StaticPod
- 正常运行之后,从standone切换到bootstrap模式
依次完成Auth、cAdvisor和ContainerManaer对象的初始化,这些对象说kubelete的基础组件
通过ApplyOOOMScoreAdj跳转kubelete的OOM分数,为kubelet设置oom_socre_adj的取值。当节点内存不足的时候,操作系统会通过打分的方法选择牺牲进程,oom_socre_adj越高,越容易被杀掉,取值范围[-1000,1000]。默认地,kubelet自己的oom_score_adj=-999,保证自己的可用
查看kubelete的oom_socre_adj shell
[root@hcss-ecs-5425 ~]# cat /proc/$(pidof kubelet)/oom_score_adj
-999RunKubelet 正式启动kubelet服务
-
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// Set global feature gates based on the value on the initial KubeletServer
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
if err := options.ValidateKubeletServer(s); err != nil {
return err
}
// Warn if MemoryQoS enabled with cgroups v1
if utilfeature.DefaultFeatureGate.Enabled(features.MemoryQoS) &&
!isCgroup2UnifiedMode() {
klog.InfoS("Warning: MemoryQoS feature only works with cgroups v2 on Linux, but enabled with cgroups v1")
}
// Obtain Kubelet Lock File
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}
done := make(chan struct{})
if s.LockFilePath != "" {
klog.InfoS("Acquiring file lock", "path", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %w", s.LockFilePath, err)
}
if s.ExitOnLockContention {
klog.InfoS("Watching for inotify events", "path", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
}
}
// Register current configuration with /configz endpoint
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
klog.ErrorS(err, "Failed to register kubelet configuration with configz")
}
if len(s.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}
// About to get clients and such, detect standaloneMode
standaloneMode := true
if len(s.KubeConfig) > 0 {
standaloneMode = false
}
if kubeDeps == nil {
kubeDeps, err = UnsecuredDependencies(s, featureGate)
if err != nil {
return err
}
}
if kubeDeps.Cloud == nil {
if !cloudprovider.IsExternal(s.CloudProvider) {
cloudprovider.DeprecationWarningForProvider(s.CloudProvider)
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
if cloud != nil {
klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
}
kubeDeps.Cloud = cloud
}
}
hostName, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
if err != nil {
return err
}
// if in standalone mode, indicate as much by setting all clients to nil
switch {
case standaloneMode:
kubeDeps.KubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
klog.InfoS("Standalone mode, no API client")
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, kubeDeps.TracerProvider, nodeName)
if err != nil {
return err
}
if onHeartbeatFailure == nil {
return errors.New("onHeartbeatFailure must be a valid function other than nil")
}
kubeDeps.OnHeartbeatFailure = onHeartbeatFailure
kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet client: %w", err)
}
// make a separate client for events
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet event client: %w", err)
}
// make a separate client for heartbeat with throttling disabled and a timeout attached
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
// The timeout is the minimum of the lease duration and status update frequency
leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
if heartbeatClientConfig.Timeout > leaseTimeout {
heartbeatClientConfig.Timeout = leaseTimeout
}
heartbeatClientConfig.QPS = float32(-1)
kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)
}
}
if kubeDeps.Auth == nil {
auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
runAuthenticatorCAReload(ctx.Done())
}
var cgroupRoots []string
nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}
if s.RuntimeCgroups != "" {
// RuntimeCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, s.RuntimeCgroups)
}
if s.SystemCgroups != "" {
// SystemCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.RemoteRuntimeEndpoint)
kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.RemoteRuntimeEndpoint), s.LocalStorageCapacityIsolation)
if err != nil {
return err
}
}
// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}
machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
if err != nil {
return err
}
reservedSystemCPUs, err := getReservedCPUs(machineInfo, s.ReservedSystemCPUs)
if err != nil {
return err
}
if reservedSystemCPUs.Size() > 0 {
// at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
if s.KubeReserved != nil {
delete(s.KubeReserved, "cpu")
}
if s.SystemReserved == nil {
s.SystemReserved = make(map[string]string)
}
s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
}
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
}
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
return err
}
var hardEvictionThresholds []evictionapi.Threshold
// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
if err != nil {
return err
}
}
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
return err
}
devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
var cpuManagerPolicyOptions map[string]string
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
} else if s.CPUManagerPolicyOptions != nil {
return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
}
}
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
KubeletOOMScoreAdj: s.OOMScoreAdj,
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver,
KubeletRootDir: s.RootDirectory,
ProtectKernelDefaults: s.ProtectKernelDefaults,
NodeAllocatableConfig: cm.NodeAllocatableConfig{
KubeReservedCgroupName: s.KubeReservedCgroup,
SystemReservedCgroupName: s.SystemReservedCgroup,
EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
KubeReserved: kubeReserved,
SystemReserved: systemReserved,
ReservedSystemCPUs: reservedSystemCPUs,
HardEvictionThresholds: hardEvictionThresholds,
},
QOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerPolicyOptions: cpuManagerPolicyOptions,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy,
ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
ExperimentalTopologyManagerScope: s.TopologyManagerScope,
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
if err != nil {
return err
}
}
// TODO(vmarmol): Do this through container config.
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
}
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps, s.RemoteRuntimeEndpoint, s.RemoteImageEndpoint)
if err != nil {
return err
}
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
if s.HealthzPort > 0 {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
if err != nil {
klog.ErrorS(err, "Failed to start healthz server")
}
}, 5*time.Second, wait.NeverStop)
}
if s.RunOnce {
return nil
}
// If systemd is used, notify it that we have started
go daemon.SdNotify(false, "READY=1")
select {
case <-done:
break
case <-ctx.Done():
break
}
return nil
}
对象实例化
RunKubelet 主要完成:createAndInitKubelet func 创建和初始化Kubelet对象,以及使用startKubelete func启动kubelet 主函数
-
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
return err
}
// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
return err
}
hostnameOverridden := len(kubeServer.HostnameOverride) > 0
// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)
var nodeIPs []net.IP
if kubeServer.NodeIP != "" {
for _, ip := range strings.Split(kubeServer.NodeIP, ",") {
parsedNodeIP := netutils.ParseIPSloppy(strings.TrimSpace(ip))
if parsedNodeIP == nil {
klog.InfoS("Could not parse --node-ip ignoring", "IP", ip)
} else {
nodeIPs = append(nodeIPs, parsedNodeIP)
}
}
}
if len(nodeIPs) > 2 || (len(nodeIPs) == 2 && netutils.IsIPv6(nodeIPs[0]) == netutils.IsIPv6(nodeIPs[1])) {
return fmt.Errorf("bad --node-ip %q; must contain either a single IP or a dual-stack pair of IPs", kubeServer.NodeIP)
} else if len(nodeIPs) == 2 && kubeServer.CloudProvider != "" {
return fmt.Errorf("dual-stack --node-ip %q not supported when using a cloud provider", kubeServer.NodeIP)
} else if len(nodeIPs) == 2 && (nodeIPs[0].IsUnspecified() || nodeIPs[1].IsUnspecified()) {
return fmt.Errorf("dual-stack --node-ip %q cannot include '0.0.0.0' or '::'", kubeServer.NodeIP)
}
capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: true,
})
credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
klog.V(2).InfoS("Using root directory", "path", kubeServer.RootDirectory)
if kubeDeps.OSInterface == nil {
kubeDeps.OSInterface = kubecontainer.RealOS{}
}
if kubeServer.KubeletConfiguration.SeccompDefault && !utilfeature.DefaultFeatureGate.Enabled(features.SeccompDefault) {
return fmt.Errorf("the SeccompDefault feature gate must be enabled in order to use the SeccompDefault configuration")
}
k, err := createAndInitKubelet(kubeServer,
kubeDeps,
hostname,
hostnameOverridden,
nodeName,
nodeIPs)
if err != nil {
return fmt.Errorf("failed to create kubelet: %w", err)
}
// NewMainKubelet should have set up a pod source config if one didn't exist
// when the builder was run. This is just a precaution.
if kubeDeps.PodConfig == nil {
return fmt.Errorf("failed to create kubelet, pod source config was nil")
}
podCfg := kubeDeps.PodConfig
if err := rlimit.SetNumFiles(uint64(kubeServer.MaxOpenFiles)); err != nil {
klog.ErrorS(err, "Failed to set rlimit on max file handles")
}
// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %w", err)
}
klog.InfoS("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
}
return nil
}
通过NewMainKubelet创建Kubelet对象,同时包含对内部模式的实例化,如对Informer、各类内部的Manager
-
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
crOptions *config.ContainerRuntimeOptions,
hostname string,
hostnameOverridden bool,
nodeName types.NodeName,
nodeIPs []net.IP,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
imageCredentialProviderConfigFile string,
imageCredentialProviderBinDir string,
registerNode bool,
registerWithTaints []v1.Taint,
allowedUnsafeSysctls []string,
experimentalMounterPath string,
kernelMemcgNotification bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
masterServiceNamespace string,
registerSchedulable bool,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
nodeStatusMaxImages int32,
seccompDefault bool,
) (*Kubelet, error) {
logger := klog.TODO()
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
if kubeCfg.SyncFrequency.Duration <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
}
if kubeCfg.MakeIPTablesUtilChains {
if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
}
if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
}
if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.DisableCloudProviders) && cloudprovider.IsDeprecatedInternal(cloudProvider) {
cloudprovider.DisableWarningForProvider(cloudProvider)
return nil, fmt.Errorf("cloud provider %q was specified, but built-in cloud providers are disabled. Please set --cloud-provider=external and migrate to an external cloud provider", cloudProvider)
}
var nodeHasSynced cache.InformerSynced
var nodeLister corelisters.NodeLister
// If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
// If not nil, we are running as part of a cluster and should sync w/API
if kubeDeps.KubeClient != nil {
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{metav1.ObjectNameField: string(nodeName)}.String()
}))
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
nodeHasSynced = func() bool {
return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
}
kubeInformers.Start(wait.NeverStop)
klog.InfoS("Attempting to sync node with API server")
} else {
// we don't have a client to sync!
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeLister = corelisters.NewNodeLister(nodeIndexer)
nodeHasSynced = func() bool { return true }
klog.InfoS("Kubelet is running in standalone mode, will skip API server sync")
}
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)
if err != nil {
return nil, err
}
}
containerGCPolicy := kubecontainer.GCPolicy{
MinAge: minimumGCAge.Duration,
MaxPerPodContainer: int(maxPerPodContainerCount),
MaxContainers: int(maxContainerCount),
}
daemonEndpoints := &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
}
imageGCPolicy := images.ImageGCPolicy{
MinAge: kubeCfg.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
}
enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
if experimentalNodeAllocatableIgnoreEvictionThreshold {
// Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
enforceNodeAllocatable = []string{}
}
thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: kernelMemcgNotification,
PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
}
var serviceLister corelisters.ServiceLister
var serviceHasSynced cache.InformerSynced
if kubeDeps.KubeClient != nil {
kubeInformers := informers.NewSharedInformerFactory(kubeDeps.KubeClient, 0)
serviceLister = kubeInformers.Core().V1().Services().Lister()
serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced
kubeInformers.Start(wait.NeverStop)
} else {
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceLister = corelisters.NewServiceLister(serviceIndexer)
serviceHasSynced = func() bool { return true }
}
// construct a node reference used for events
nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: string(nodeName),
UID: types.UID(nodeName),
Namespace: "",
}
oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder)
if err != nil {
if libcontaineruserns.RunningInUserNS() {
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletInUserNamespace) {
// oomwatcher.NewWatcher returns "open /dev/kmsg: operation not permitted" error,
// when running in a user namespace with sysctl value `kernel.dmesg_restrict=1`.
klog.V(2).InfoS("Failed to create an oomWatcher (running in UserNS, ignoring)", "err", err)
oomWatcher = nil
} else {
klog.ErrorS(err, "Failed to create an oomWatcher (running in UserNS, Hint: enable KubeletInUserNamespace feature flag to ignore the error)")
return nil, err
}
} else {
return nil, err
}
}
clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
for _, ipEntry := range kubeCfg.ClusterDNS {
ip := netutils.ParseIPSloppy(ipEntry)
if ip == nil {
klog.InfoS("Invalid clusterDNS IP", "IP", ipEntry)
} else {
clusterDNS = append(clusterDNS, ip)
}
}
httpClient := &http.Client{}
klet := &Kubelet{
hostname: hostname,
hostnameOverridden: hostnameOverridden,
nodeName: nodeName,
kubeClient: kubeDeps.KubeClient,
heartbeatClient: kubeDeps.HeartbeatClient,
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
rootDirectory: rootDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
registerNode: registerNode,
registerWithTaints: registerWithTaints,
registerSchedulable: registerSchedulable,
dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, nodeIPs, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
serviceLister: serviceLister,
serviceHasSynced: serviceHasSynced,
nodeLister: nodeLister,
nodeHasSynced: nodeHasSynced,
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
recorder: kubeDeps.Recorder,
cadvisor: kubeDeps.CAdvisorInterface,
cloud: kubeDeps.Cloud,
externalCloudProvider: cloudprovider.IsExternal(cloudProvider),
providerID: providerID,
nodeRef: nodeRef,
nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
hostutil: kubeDeps.HostUtil,
subpather: kubeDeps.Subpather,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
syncLoopMonitor: atomic.Value{},
daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager,
nodeIPs: nodeIPs,
nodeIPValidator: validateNodeIP,
clock: clock.RealClock{},
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
iptablesDropBit: int(kubeCfg.IPTablesDropBit),
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
nodeStatusMaxImages: nodeStatusMaxImages,
lastContainerStartedTime: newTimeCache(),
}
if klet.cloud != nil {
klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
}
var secretManager secret.Manager
var configMapManager configmap.Manager
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy:
secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval)
configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
secretManager = secret.NewCachingSecretManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
configMapManager = configmap.NewCachingConfigMapManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
case kubeletconfiginternal.GetChangeDetectionStrategy:
secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
default:
return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
}
klet.secretManager = secretManager
klet.configMapManager = configMapManager
if klet.experimentalHostUserNamespaceDefaulting {
klog.InfoS("Experimental host user namespace defaulting is enabled")
}
machineInfo, err := klet.cadvisor.MachineInfo()
if err != nil {
return nil, err
}
// Avoid collector collects it as a timestamped metric
// See PR #95210 and #97006 for more details.
machineInfo.Timestamp = time.Time{}
klet.setCachedMachineInfo(machineInfo)
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.livenessManager = proberesults.NewManager()
klet.readinessManager = proberesults.NewManager()
klet.startupManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager)
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
klet.runtimeService = kubeDeps.RemoteRuntimeService
if kubeDeps.KubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
}
// setup containerLogManager for CRI container runtime
containerLogManager, err := logs.NewContainerLogManager(
klet.runtimeService,
kubeDeps.OSInterface,
kubeCfg.ContainerLogMaxSize,
int(kubeCfg.ContainerLogMaxFiles),
)
if err != nil {
return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
}
klet.containerLogManager = containerLogManager
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
klet.podWorkers = newPodWorkers(
klet.syncPod,
klet.syncTerminatingPod,
klet.syncTerminatedPod,
kubeDeps.Recorder,
klet.workQueue,
klet.resyncInterval,
backOffPeriod,
klet.podCache,
)
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
klet.readinessManager,
klet.startupManager,
rootDirectory,
machineInfo,
klet.podWorkers,
kubeDeps.OSInterface,
klet,
httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
imageCredentialProviderConfigFile,
imageCredentialProviderBinDir,
kubeCfg.CPUCFSQuota,
kubeCfg.CPUCFSQuotaPeriod,
kubeDeps.RemoteRuntimeService,
kubeDeps.RemoteImageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
klet.containerLogManager,
klet.runtimeClassManager,
seccompDefault,
kubeCfg.MemorySwap.SwapBehavior,
kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
*kubeCfg.MemoryThrottlingFactor,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
}
klet.runtimeCache = runtimeCache
// common provider to get host file system usage associated with a pod managed by kubelet
hostStatsProvider := stats.NewHostStatsProvider(kubecontainer.RealOS{}, func(podUID types.UID) string {
return getEtcHostsPath(klet.getPodDir(podUID))
})
if kubeDeps.useLegacyCadvisorStats {
klet.StatsProvider = stats.NewCadvisorStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
klet.podManager,
klet.runtimeCache,
klet.containerRuntime,
klet.statusManager,
hostStatsProvider)
} else {
klet.StatsProvider = stats.NewCRIStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
klet.podManager,
klet.runtimeCache,
kubeDeps.RemoteRuntimeService,
kubeDeps.RemoteImageService,
hostStatsProvider,
utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI))
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
klog.ErrorS(err, "Pod CIDR update failed")
}
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
if err != nil {
return nil, err
}
klet.containerGC = containerGC
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
// setup imageManager
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager
if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
if err != nil {
return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
}
kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
cert := klet.serverCertificateManager.Current()
if cert == nil {
return nil, fmt.Errorf("no serving certificate available for the kubelet")
}
return cert, nil
}
}
if kubeDeps.ProbeManager != nil {
klet.probeManager = kubeDeps.ProbeManager
} else {
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.readinessManager,
klet.startupManager,
klet.runner,
kubeDeps.Recorder)
}
tokenManager := token.NewManager(kubeDeps.KubeClient)
// NewInitializedVolumePluginMgr initializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
// which affects node ready status. This function must be called before Kubelet is initialized so that the Node
// ReadyState is accurate with the storage state.
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
if err != nil {
return nil, err
}
klet.pluginManager = pluginmanager.NewPluginManager(
klet.getPluginsRegistrationDir(), /* sockDir */
kubeDeps.Recorder,
)
// If the experimentalMounterPathFlag is set, we do not want to
// check node capabilities since the mount path is not the default
if len(experimentalMounterPath) != 0 {
// Replace the nameserver in containerized-mounter's rootfs/etc/resolv.conf with kubelet.ClusterDNS
// so that service name could be resolved
klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
}
// setup volumeManager
klet.volumeManager = volumemanager.NewVolumeManager(
kubeCfg.EnableControllerAttachDetach,
nodeName,
klet.podManager,
klet.podWorkers,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
kubeDeps.Mounter,
kubeDeps.HostUtil,
klet.getPodsDir(),
kubeDeps.Recorder,
keepTerminatedPodVolumes,
volumepathhandler.NewBlockVolumePathHandler())
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig,
killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation)
klet.evictionManager = evictionManager
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
// Safe, allowed sysctls can always be used as unsafe sysctls in the spec.
// Hence, we concatenate those two lists.
safeAndUnsafeSysctls := append(sysctl.SafeSysctlAllowlist(), allowedUnsafeSysctls...)
sysctlsAllowlist, err := sysctl.NewAllowlist(safeAndUnsafeSysctls)
if err != nil {
return nil, err
}
klet.admitHandlers.AddPodAdmitHandler(sysctlsAllowlist)
// enable active deadline handler
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
if err != nil {
return nil, err
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
// apply functional Option's
for _, opt := range kubeDeps.Options {
opt(klet)
}
if sysruntime.GOOS == "linux" {
// AppArmor is a Linux kernel security module and it does not support other operating systems.
klet.appArmorValidator = apparmor.NewValidator()
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
}
leaseDuration := time.Duration(kubeCfg.NodeLeaseDurationSeconds) * time.Second
renewInterval := time.Duration(float64(leaseDuration) * nodeLeaseRenewIntervalFraction)
klet.nodeLeaseController = lease.NewController(
klet.clock,
klet.heartbeatClient,
string(klet.nodeName),
kubeCfg.NodeLeaseDurationSeconds,
klet.onRepeatedHeartbeatFailure,
renewInterval,
v1.NamespaceNodeLease,
util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName)))
// setup node shutdown manager
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
Logger: logger,
ProbeManager: klet.probeManager,
Recorder: kubeDeps.Recorder,
NodeRef: nodeRef,
GetPodsFunc: klet.GetActivePods,
KillPodFunc: killPodNow(klet.podWorkers, kubeDeps.Recorder),
SyncNodeStatusFunc: klet.syncNodeStatus,
ShutdownGracePeriodRequested: kubeCfg.ShutdownGracePeriod.Duration,
ShutdownGracePeriodCriticalPods: kubeCfg.ShutdownGracePeriodCriticalPods.Duration,
ShutdownGracePeriodByPodPriority: kubeCfg.ShutdownGracePeriodByPodPriority,
StateDirectory: rootDirectory,
})
klet.shutdownManager = shutdownManager
klet.usernsManager, err = MakeUserNsManager(klet)
if err != nil {
return nil, err
}
klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
// Finally, put the most recent version of the config on the Kubelet, so
// people can see how it was configured.
klet.kubeletConfiguration = *kubeCfg
// Generating the status funcs should be the last thing we do,
// since this relies on the rest of the Kubelet having been constructed.
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
return klet, nil
}
最后通过一个Event,标志启动
-
// BirthCry sends an event that the kubelet has started up.
func (kl *Kubelet) BirthCry() {
// Make an event that kubelet restarted.
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
}
注意在standone模式下,EventBroadcaster实际上不会被启动,因此在该模式下,Event不会被发送
最后通过StartGrageCollection 启动垃圾回收服务,包含ConainerGC 协程和Imgae GC协程
启动kubelet主服务
在创建kubelet对象之后,通过statKubelet启动
-
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())
// start the kubelet server
if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
statKubelet func使用非阻塞,启动,k.Run 负责启动kubelet的内部依赖模块
kubelet在启动的时候,可以通过kl.initNetworkUitl func为当前节点配置iptables规则,可以通过kl.initNewworkUtil func当前节点配置iptables规则,主要是初始化与MASQUERADE和DROP相关的iptables处理链,仅适用于Linux操作系统,该功能主要是通过kubelet的 –make-iptables-utils-chains启动参数控制,默认开启该功能。
kl.initNewworkUtil内部启用一个协程,监听mange\nat和filter三个Table下的KUBE-KUBELET-CANRY Chain的变化事件,如果发送变化,则自动尝试重新同步规则,以保证操作系统的iptables始终存在符合kubelet要求的规则,
-
func (kl *Kubelet) initNetworkUtil() {
exec := utilexec.New()
iptClients := []utiliptables.Interface{
utiliptables.New(exec, utiliptables.ProtocolIPv4),
utiliptables.New(exec, utiliptables.ProtocolIPv6),
}
for i := range iptClients {
iptClient := iptClients[i]
if kl.syncIPTablesRules(iptClient) {
klog.InfoS("Initialized iptables rules.", "protocol", iptClient.Protocol())
go iptClient.Monitor(
utiliptables.Chain("KUBE-KUBELET-CANARY"),
[]utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
func() { kl.syncIPTablesRules(iptClient) },
1*time.Minute, wait.NeverStop,
)
} else {
klog.InfoS("Failed to initialize iptables rules; some functionality may be missing.", "protocol", iptClient.Protocol())
}
}
}
kubelet向操作系统写入iptables规则和相关说明(IPv4)
- mangle table
iptables模式有俩种版本,iptables-legacy和iptables-nft |
nat table
在kubelet监听iptables变化使用
当Chain发生变化的时候,往往意味着iptables规则被flush清空,立即触发执行iptables规则同步
-N KUBE-KUBELET_CANARY
KUBE-MARK-MASQ为数据包添加MARK标记0x4000/0x4000(由--iptables-masquerade-bit 控制)
在nat table的POSTOUTING阶段,携带0x4000/0x4000 标记的数据包自动执行MASQUERADE(SNAT)
-N KUBE-MARK-MASQ
-N KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
KUBE-MARK-DROP为数据包添加0x8000/0x8000 (由--iptables--drop-bit参数控制)
在filter table的INPUT/OUTPUT 0x8000/0x8000的数据包被丢弃
-N KUBE-MARK-DROP
-N KUBE-MARK-DROP -j MARK --set-xmark 0x8000/0x8000
创建KUBE-POSTOUTING Chain,并且添加到POSTOUTING执行链
KUBE-POSTOUTING通过检查数据包是否携带0x4000/0x4000标记,决定是否执行MASQUERADE
不包含0x4000/0x4000标记的数据包直接-j RETURN 返回
...Filter table
⚠️随着Dockershim移除,kubelet本身不依赖这些iptables,社区正在考虑kubelet管理的iptables交由kube-proxy管理
提案:KEP-3178
Kl.syncLoop func启动kubelet的主协程程序,不断从Channel中读取Event
-
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
// Responsible for checking limits in resolv.conf
// The limits do not have anything to do with individual pods
// Since this is called in syncLoop, we don't need to call it anywhere else
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
Channel不断读取Event,交给对应Handle处理
-
// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1. configCh: a channel to read config events from
// 2. handler: the SyncHandler to dispatch pods to
// 3. syncCh: a channel to read periodic sync events from
// 4. housekeepingCh: a channel to read housekeeping events from
// 5. plegCh: a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// Here is an appropriate place to note that despite the syntactical
// similarity to the switch statement, the case statements in a select are
// evaluated in a pseudorandom order if there are multiple channels ready to
// read from when the select is evaluated. In other words, case statements
// are evaluated in random order, and you can not assume that the case
// statements evaluate in order if multiple channels have events.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// - configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// - plegCh: update the runtime cache; sync pod
// - syncCh: sync all pods waiting for sync
// - housekeepingCh: trigger cleanup of pods
// - health manager: sync pods that have failed or in which one or more
// containers have failed health checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
if e.Type == pleg.ContainerStarted {
// record the most recent time we observed a container start for this pod.
// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
// to make sure we don't miss handling graceful termination for containers we reported as having started.
kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
status := ""
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
start := time.Now()
klog.V(4).InfoS("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.ErrorS(err, "Failed cleaning pods")
}
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
}
klog.V(4).InfoS("SyncLoop (housekeeping) end")
}
}
return true
}
启动HTTP Server服务和gPRC Server服务
目前只要支持以下三种HTTP Server
- readonly server:默认监听10255,仅提供只读类型的接口
- kubelet core server:默认监听10250,提供健康检查、健康采集等接口
- healthz server:默认10248,仅健康检查