K8s-kube-apiserver(信号处理机制)
K8s-kube-apiserver(信号处理机制)
基于1.25
K8s基于UNIX信号实现常驻进程以及进程的优雅退出
- 例如,当kube-apiserver进程接收到SIGTERM或SIGINT信号时,先通知kube-apiserver内部的Gouroutine协程优先退出,再退出主线程退出
- 例如,Prometheus基于监听SIGHUP信号,实现热加载配置文件
常驻进程实现
Kube-apiserver的实现如下,其他类似:
-
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
s := options.NewServerRunOptions()
cmd := &cobra.Command{
Use: "kube-apiserver",
Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
// stop printing usage when the command errors
SilenceUsage: true,
PersistentPreRunE: func(*cobra.Command, []string) error {
// silence client-go warnings.
// kube-apiserver loopback clients should not log self-issued warnings.
rest.SetDefaultWarningHandler(rest.NoWarnings{})
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()
// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
return err
}
cliflag.PrintFlags(fs)
// set default options
completedOptions, err := Complete(s)
if err != nil {
return err
}
// validate options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
// 常驻
return Run(completedOptions, genericapiserver.SetupSignalHandler())
}, -
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalHandler() <-chan struct{} {
return SetupSignalContext().Done()
}
// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalContext() context.Context {
close(onlyOneSignalHandler) // panics when called twice
shutdownHandler = make(chan os.Signal, 2)
ctx, cancel := context.WithCancel(context.Background())
// 监听ShutdownSignals信号(非Windows为os.Interrupt和syscall.SIGTERM信号,在windwos监听os.Interrupt)
// 如果触发的时候stop chan阻塞
// 使用Ctrl 或者kill-15 stop退出
signal.Notify(shutdownHandler, shutdownSignals...)
go func() {
<-shutdownHandler
cancel()
<-shutdownHandler
os.Exit(1) // second signal. Exit directly.
}()
return ctx
}
进程的优雅关闭
当进程关闭的时候,kube-apiserver可能还有很多连接正在处理,直接退出关闭,可能影响体验
- 当收到stopCh关闭信号后,立即发出ShutdowInititaed信号,迅速吧当前kube-apiserver实例的就绪检查点/readyz设置为false,防止新的请求继续进入实例,并且启动ShutdownDelay Duration延迟关闭等待计时器
- 同时,触发Pre ShutdownHooks钩子func,完成关闭的预处理工作(如帮当前实例从此K8s的Service中移除)
- 在计时器结束之前发出PreShutdownDelayDuration信号,类似还会发出Pre ShutdownHookStopped信号,上述俩个信号发出后,NotAccptingNewRequest信号
- 假如开启ShutdownSendRetry特性(默认关闭),kube-apiserver会真正关闭HTTP Server服务之前,请求Drain操作。等待所有的请求被Drain处理完成,发出InFlight RequestDrained信号
- 不启用,直接关闭
- HTTP Server启动时,仅依赖自身优雅关闭。如果传入shutdownTimeout参数,如果超过等待时间依旧没有处理完,强制关闭
-
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
// Clean up resources on shutdown.
defer s.Destroy()
// spawn a new goroutine for closing the MuxAndDiscoveryComplete signal
// registration happens during construction of the generic api server
// the last server in the chain aggregates signals from the previous instances
go func() {
for _, muxAndDiscoveryCompletedSignal := range s.GenericAPIServer.MuxAndDiscoveryCompleteSignals() {
select {
case <-muxAndDiscoveryCompletedSignal:
continue
case <-stopCh:
klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
return
}
}
s.lifecycleSignals.MuxAndDiscoveryComplete.Signal()
klog.V(1).Infof("%s has all endpoints registered and discovery information is complete", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
}()
go func() {
defer delayedStopCh.Signal()
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
<-stopCh
// As soon as shutdown is initiated, /readyz should start returning failure.
// This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red
// and stop sending traffic to this server.
shutdownInitiatedCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name())
time.Sleep(s.ShutdownDelayDuration)
}()
// close socket after delayed stopCh
shutdownTimeout := s.ShutdownTimeout
if s.ShutdownSendRetryAfter {
// when this mode is enabled, we do the following:
// - the server will continue to listen until all existing requests in flight
// (not including active long running requests) have been drained.
// - once drained, http Server Shutdown is invoked with a timeout of 2s,
// net/http waits for 1s for the peer to respond to a GO_AWAY frame, so
// we should wait for a minimum of 2s
shutdownTimeout = 2 * time.Second
klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout)
}
notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
stopHttpServerCh := make(chan struct{})
go func() {
defer close(stopHttpServerCh)
timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled()
if s.ShutdownSendRetryAfter {
timeToStopHttpServerCh = drainedCh.Signaled()
}
<-timeToStopHttpServerCh
}()
// Start the audit backend before any request comes in. This means we must call Backend.Run
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
// AuditBackend.Run will stop as soon as all in-flight requests are drained.
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(drainedCh.Signaled()); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
if err != nil {
return err
}
httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
go func() {
<-listenerStoppedCh
httpServerStoppedListeningCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
}()
// we don't accept new request as soon as both ShutdownDelayDuration has
// elapsed and preshutdown hooks have completed.
preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped
go func() {
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
defer notAcceptingNewRequestCh.Signal()
// wait for the delayed stopCh before closing the handler chain
<-delayedStopCh.Signaled()
// Additionally wait for preshutdown hooks to also be finished, as some of them need
// to send API calls to clean up after themselves (e.g. lease reconcilers removing
// itself from the active servers).
<-preShutdownHooksHasStoppedCh.Signaled()
}()
go func() {
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name())
defer drainedCh.Signal()
// wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called).
<-notAcceptingNewRequestCh.Signaled()
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
// once HandlerChainWaitGroup.Wait is invoked, the apiserver is
// expected to reject any incoming request with a {503, Retry-After}
// response via the WithWaitGroup filter. On the contrary, we observe
// that incoming request(s) get a 'connection refused' error, this is
// because, at this point, we have called 'Server.Shutdown' and
// net/http server has stopped listening. This causes incoming
// request to get a 'connection refused' error.
// On the other hand, if 'ShutdownSendRetryAfter' is enabled incoming
// requests will be rejected with a {429, Retry-After} since
// 'Server.Shutdown' will be invoked only after in-flight requests
// have been drained.
// TODO: can we consolidate these two modes of graceful termination?
s.HandlerChainWaitGroup.Wait()
}()
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
<-stopCh
// run shutdown hooks directly. This includes deregistering from
// the kubernetes endpoint in case of kube-apiserver.
func() {
defer func() {
preShutdownHooksHasStoppedCh.Signal()
klog.V(1).InfoS("[graceful-termination] pre-shutdown hooks completed", "name", preShutdownHooksHasStoppedCh.Name())
}()
err = s.RunPreShutdownHooks()
}()
if err != nil {
return err
}
// Wait for all requests in flight to drain, bounded by the RequestTimeout variable.
<-drainedCh.Signaled()
if s.AuditBackend != nil {
s.AuditBackend.Shutdown()
klog.V(1).InfoS("[graceful-termination] audit backend shutdown completed")
}
// wait for stoppedCh that is closed when the graceful termination (server.Shutdown) is finished.
<-listenerStoppedCh
<-stoppedCh
klog.V(1).Info("[graceful-termination] apiserver is exiting")
return nil
}
向systemd报告进程状态
早期Linux使用initd进程管理Linux进程,后面使用systemd取缔了initd,成为了PID=1的进程
-
// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
// The returned channel is closed when the (asynchronous) termination is finished.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
// Use an internal stop channel to allow cleanup of the listeners on error.
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
var listenerStoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
return nil, nil, err
}
}
// Now that listener have bound successfully, it is the
// responsibility of the caller to close the provided channel to
// ensure cleanup.
go func() {
<-stopCh
close(internalStopCh)
}()
s.RunPostStartHooks(stopCh)
// 报告状态
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
return stoppedCh, listenerStoppedCh, nil
}
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Joohwan!
评论