K8s-kubelet(HTTP服务接口)

基于1.25

kubelet通过HTTP Server对外暴露API,为了确保接口安全,kubelet按照安全等级从低到高顺序支持3种HTTP Server,分别是healthz server、readonly server和kubelet core server

一级类目 二级类目 Path路径 描述
Default Handlerers healthz /healthz 检查kubelet是否健康,重点检查syncLoop是否持续在规定时间内完成。检查syncLoop四因为其他组件故障会间接导致syncLoop不能执行成功
Default Handlerers pods /pods 读取当前节点运行的Pod列表(通过PodManager获取)
Default Handlerers stats /stats/summary 读取资源使用状态
Default Handlerers metrics /metrics 读取kubelet监控指标数据
Default Handlerers metrics /metrics/cadvisor 读取cadvisor监控指标数据
Default Handlerers metrics /metrics/probes 读取probes监控指标数据
Default Handlerers metrics /metrics/resource 读取resources监控指标数据
Default Handlerers checkopint /checkpoint/{podNamespace}/{podID}/{containerName} 为容器构建快照,依赖ContainerCheckpoint featuregate及容器运行时的支持
Debugging Handlers run /run/{podNamespace}/{podID}/{containerName}||/run/{podNamespace}/{podID}/{uid}/{containerName} 在容器内执行命令。这里的podID=PodName,uid==Pod UID,uid仅无法通过Namespace/Name 确定唯一的Pod时,查处目标Pod
Debugging Handlers exec /exec/{podNamespace}/{podID}/{containerName}||/exec/{podNamespace}/{podID}/{uid}/{containerName} 在容器内交互执行命令。这里的podID=PodName,uid==Pod UID,uid仅无法通过Namespace/Name 确定唯一的Pod时,查处目标Pod
Debugging Handlers attach /attach/{podNamespace}/{podID}/{containerName}||/attach/{podNamespace}/{podID}/{uid}/{containerName} 连接到容器执行进程,CRI调用会返回一个重定向地址,处理stream
Debugging Handlers portForward /portForward/{podNamespace}/{podID}|/portForward/{podNamespace}/{podID}/{uid} 执行端口转发,CRI调用返回一个重定向地址,处理stream流
Debugging Handlers containerLogs /containerLogs/{podNamespace}/{podID}/{containerName} 读取容器日志,kubelet首先通过CRI读取容器日志在宿主机上的路径,然后从本地文件系统加载日志
Debugging Handlers configz /configz 读取kubelet使用的配置
Debugging Handlers runningpods /runningpods 读取当前节点运行的Pod列表(通过RuntimeCache获取),与/pods返回期望的Pod列表不同,/runningpods返回的是容器运行时的实际Pod列表
SystemLogHandler logs /logs|| /logs/{logpath.*} 读取节点/var/log下的日志日志
ProfilingHandler debug /debug/pprof/{subpath.*} 读取pprof监控信息
DebugFlagsHandler debug /debug/flags/v 动态调整kubelet的日志等级

日志查询接口

kubectl logs 常常读取日志运行日志,实际上的流程是通过kubectl 发送请求给kube-apiserver,通过认证之后,kube-apiserver主动向kubelet的/containerLogs发情请求,读取容器日志

kubectl向kube-apiserver发起日志查询请求

kubectl logs命令,底层调用了client-go的GetLogs func获取日志

kube-apiserver向kubelet发起日志查询请求

对于kube-apiserver,Log也是一种资源对象,只不过是由LogREST负责处理

  • API注册阶段,Pod已经注册了其资源对象和RESTSTorage的对应关系

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/registry/core/rest/storage_core.go#L274

    storage := map[string]rest.Storage{}
    if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
    storage[resource] = podStorage.Pod
    storage[resource+"/attach"] = podStorage.Attach
    storage[resource+"/status"] = podStorage.Status
    storage[resource+"/log"] = podStorage.Log
    storage[resource+"/exec"] = podStorage.Exec
    storage[resource+"/portforward"] = podStorage.PortForward
    storage[resource+"/proxy"] = podStorage.Proxy
    storage[resource+"/binding"] = podStorage.Binding
    if podStorage.Eviction != nil {
    storage[resource+"/eviction"] = podStorage.Eviction
    }
    storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers

当Log发起GET请求,是由存储对象podStorage.Log的Get func处理

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/registry/core/pod/rest/log.go#L78

    // Get retrieves a runtime.Object that will stream the contents of the pod log
    func (r *LogREST) Get(ctx context.Context, name string, opts runtime.Object) (runtime.Object, error) {
    // register the metrics if the context is used. This assumes sync.Once is fast. If it's not, it could be an init block.
    registerMetrics()

    logOpts, ok := opts.(*api.PodLogOptions)
    if !ok {
    return nil, fmt.Errorf("invalid options object: %#v", opts)
    }

    countSkipTLSMetric(logOpts.InsecureSkipTLSVerifyBackend)

    if errs := validation.ValidatePodLogOptions(logOpts); len(errs) > 0 {
    return nil, errors.NewInvalid(api.Kind("PodLogOptions"), name, errs)
    }
    location, transport, err := pod.LogLocation(ctx, r.Store, r.KubeletConn, name, logOpts)
    if err != nil {
    return nil, err
    }
    return &genericrest.LocationStreamer{
    Location: location,
    Transport: transport,
    ContentType: "text/plain",
    Flush: logOpts.Follow,
    ResponseChecker: genericrest.NewGenericHttpResponseChecker(api.Resource("pods/log"), name),
    RedirectChecker: genericrest.PreventRedirects,
    TLSVerificationErrorCounter: podLogsTLSFailure,
    }, nil
    }

LogREST通过ValidatePodLogOptions校验之后,即返回了LocationStreamer,构造了https://:10250/containerLogs/{namespace}/{name}/{container}

kubelet处理并响应日志查询请求

kubelet的HTTP Server最终收到来自kube-apiserver的日志请求,经过认证、鉴权之后,最终被getContainerLogs处理

getContainerLogs 解析日志查询请求,狗仔PodLogOptions参数对象,通过调用GetKubeletContainerLogs读取容器日志

  • Ref:https://github.com/kubernetes/kubernetes/blob/3ec9c7f4d20be445af83d0bd5c4e77a9f22ab6cc/pkg/kubelet/kubelet_pods.go#L1520

    // GetKubeletContainerLogs returns logs from the container
    // TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
    // or all of them.
    func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
    // Pod workers periodically write status to statusManager. If status is not
    // cached there, something is wrong (or kubelet just restarted and hasn't
    // caught up yet). Just assume the pod is not ready yet.
    name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
    if err != nil {
    return fmt.Errorf("unable to parse pod full name %q: %v", podFullName, err)
    }

    pod, ok := kl.GetPodByName(namespace, name)
    if !ok {
    return fmt.Errorf("pod %q cannot be found - no logs available", name)
    }

    // TODO: this should be using the podWorker's pod store as authoritative, since
    // the mirrorPod might still exist, the pod may have been force deleted but
    // is still terminating (users should be able to view logs of force deleted static pods
    // based on full name).
    var podUID types.UID
    pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
    if wasMirror {
    if pod == nil {
    return fmt.Errorf("mirror pod %q does not have a corresponding pod", name)
    }
    podUID = mirrorPod.UID
    } else {
    podUID = pod.UID
    }

    podStatus, found := kl.statusManager.GetPodStatus(podUID)
    if !found {
    // If there is no cached status, use the status from the
    // config source (apiserver). This is useful if kubelet
    // has recently been restarted.
    podStatus = pod.Status
    }

    // TODO: Consolidate the logic here with kuberuntime.GetContainerLogs, here we convert container name to containerID,
    // but inside kuberuntime we convert container id back to container name and restart count.
    // TODO: After separate container log lifecycle management, we should get log based on the existing log files
    // instead of container status.
    containerID, err := kl.validateContainerLogStatus(pod.Name, &podStatus, containerName, logOptions.Previous)
    if err != nil {
    return err
    }

    // Since v1.32, stdout may be nil if the stream is not requested.
    if stdout != nil {
    // Do a zero-byte write to stdout before handing off to the container runtime.
    // This ensures at least one Write call is made to the writer when copying starts,
    // even if we then block waiting for log output from the container.
    if _, err := stdout.Write([]byte{}); err != nil {
    return err
    }
    }

    return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr)
    }

    kubelet首先通过runtimeService.ContainerName调用CRI,从容器运行时获取容器日志,然后通过status.GetLogPath获取容器的日志路径,最后使用ReadLogs读取日志文件的内容

    kubelet实际上是从宿主机之间拉取日志内容,只是通过CRI调用获取容器日志的宿主机路径

    通过以下命令可以获取容器的日志路径,通过访问宿主机对应的相应路径可以查看对应的日志文件

crictl inspect -o go-template --templdate '{{.status.logPath}}' <containerid> 
/var/logs/pods/kube-system_coredns-xxx/coredns/2.log

ReadLogs从宿主机不断读取日志文件,通过Response返回给客户端

  • ReadLogs使用os.Open打开日志文件,通过Reader不断读取日志文件,并且将其解析为MSG格式,写入Response Stream。

    • 特别的,当开启fllow模式,ReadLogs会使用fsnotify监听日志文件变化,当日志文件发生rotate,自动等待和发现新的日志文件
  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/kuberuntime/logs/logs.go#L283

    // ReadLogs read the container log and redirect into stdout and stderr.
    // Note that containerID is only needed when following the log, or else
    // just pass in empty string "".
    func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error {
    // fsnotify has different behavior for symlinks in different platform,
    // for example it follows symlink on Linux, but not on Windows,
    // so we explicitly resolve symlinks before reading the logs.
    // There shouldn't be security issue because the container log
    // path is owned by kubelet and the container runtime.
    evaluated, err := filepath.EvalSymlinks(path)
    if err != nil {
    return fmt.Errorf("failed to try resolving symlinks in path %q: %v", path, err)
    }
    path = evaluated
    f, err := os.Open(path)
    if err != nil {
    return fmt.Errorf("failed to open log file %q: %v", path, err)
    }
    defer f.Close()

    // Search start point based on tail line.
    start, err := tail.FindTailLineStartIndex(f, opts.tail)
    if err != nil {
    return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err)
    }
    if _, err := f.Seek(start, io.SeekStart); err != nil {
    return fmt.Errorf("failed to seek %d in log file %q: %v", start, path, err)
    }

    limitedMode := (opts.tail >= 0) && (!opts.follow)
    limitedNum := opts.tail
    // Start parsing the logs.
    r := bufio.NewReader(f)
    // Do not create watcher here because it is not needed if `Follow` is false.
    var watcher *fsnotify.Watcher
    var parse parseFunc
    var stop bool
    isNewLine := true
    found := true
    writer := newLogWriter(stdout, stderr, opts)
    msg := &logMessage{}
    for {
    if stop || (limitedMode && limitedNum == 0) {
    klog.V(2).InfoS("Finished parsing log file", "path", path)
    return nil
    }
    l, err := r.ReadBytes(eol[0])
    if err != nil {
    if err != io.EOF { // This is an real error
    return fmt.Errorf("failed to read log file %q: %v", path, err)
    }
    if opts.follow {
    // The container is not running, we got to the end of the log.
    if !found {
    return nil
    }
    // Reset seek so that if this is an incomplete line,
    // it will be read again.
    if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil {
    return fmt.Errorf("failed to reset seek in log file %q: %v", path, err)
    }
    if watcher == nil {
    // Initialize the watcher if it has not been initialized yet.
    if watcher, err = fsnotify.NewWatcher(); err != nil {
    return fmt.Errorf("failed to create fsnotify watcher: %v", err)
    }
    defer watcher.Close()
    if err := watcher.Add(f.Name()); err != nil {
    return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
    }
    // If we just created the watcher, try again to read as we might have missed
    // the event.
    continue
    }
    var recreated bool
    // Wait until the next log change.
    found, recreated, err = waitLogs(ctx, containerID, watcher, runtimeService)
    if err != nil {
    return err
    }
    if recreated {
    newF, err := os.Open(path)
    if err != nil {
    if os.IsNotExist(err) {
    continue
    }
    return fmt.Errorf("failed to open log file %q: %v", path, err)
    }
    defer newF.Close()
    f.Close()
    if err := watcher.Remove(f.Name()); err != nil && !os.IsNotExist(err) {
    klog.ErrorS(err, "Failed to remove file watch", "path", f.Name())
    }
    f = newF
    if err := watcher.Add(f.Name()); err != nil {
    return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
    }
    r = bufio.NewReader(f)
    }
    // If the container exited consume data until the next EOF
    continue
    }
    // Should stop after writing the remaining content.
    stop = true
    if len(l) == 0 {
    continue
    }
    klog.InfoS("Incomplete line in log file", "path", path, "line", l)
    }
    if parse == nil {
    // Initialize the log parsing function.
    parse, err = getParseFunc(l)
    if err != nil {
    return fmt.Errorf("failed to get parse function: %v", err)
    }
    }
    // Parse the log line.
    msg.reset()
    if err := parse(l, msg); err != nil {
    klog.ErrorS(err, "Failed when parsing line in log file", "path", path, "line", l)
    continue
    }
    // Write the log line into the stream.
    if err := writer.write(msg, isNewLine); err != nil {
    if err == errMaximumWrite {
    klog.V(2).InfoS("Finished parsing log file, hit bytes limit", "path", path, "limit", opts.bytes)
    return nil
    }
    klog.ErrorS(err, "Failed when writing line to log file", "path", path, "line", msg)
    return err
    }
    if limitedMode {
    limitedNum--
    }
    if len(msg.log) > 0 {
    isNewLine = msg.log[len(msg.log)-1] == eol[0]
    } else {
    isNewLine = true
    }
    }
    }

命令执行接口

kubectl exec 是常见的调试工具,能创建一个交互式的终端环节

kubectl向kube-apiserver发起命令交互请求

在终端执行kubectl exec ,在底层链路上,实际上是通过client-go构造对Pod子资源exec的POST请求完成

除了支持POST方法,还支持GET方法

  • Post适用于kubectl等客户端调用的场景,基于SPDY协议完成,支持双向通信
  • GET适用于WEB调用,可以基于WebSocket完成协议升级,以支持双向通信

SPDY协议是谷歌研发的开放网络传输协议,目的是缩小网页加载时间。通过优先级和多路复用,SPDY只需要创建一个TCP连接就可以传输网页内容和图片资源

  • Ref:https://github.com/kubernetes/kubectl/blob/3810c9e1663ed4b1a2eb64eb8d76f1c54de8a5c3/pkg/cmd/exec/exec.go#L120

    func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
    exec, err := remotecommand.NewSPDYExecutor(config, method, url)
    if err != nil {
    return err
    }
    return exec.Stream(remotecommand.StreamOptions{
    Stdin: stdin,
    Stdout: stdout,
    Stderr: stderr,
    Tty: tty,
    TerminalSizeQueue: terminalSizeQueue,
    })
    }

kube-apiserver向kubelet发起命令执行请求

对于kube-apiserver来说,exec也是一种资源

  • API注册阶段,Pod已经注册了其资源对象和RESTSTorage的对应关系

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/registry/core/rest/storage_core.go#L274

    storage := map[string]rest.Storage{}
    if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
    storage[resource] = podStorage.Pod
    storage[resource+"/attach"] = podStorage.Attach
    storage[resource+"/status"] = podStorage.Status
    storage[resource+"/log"] = podStorage.Log
    storage[resource+"/exec"] = podStorage.Exec
    storage[resource+"/portforward"] = podStorage.PortForward
    storage[resource+"/proxy"] = podStorage.Proxy
    storage[resource+"/binding"] = podStorage.Binding
    if podStorage.Eviction != nil {
    storage[resource+"/eviction"] = podStorage.Eviction
    }
    storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers

kubelet处理并响应命令执行请求

kubelet的HTTP Server最终收到来自kube-apiserver的日志请求,经过认证、鉴权之后,最终被getExec处理

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/server/server.go#L452

    s.addMetricsBucketMatcher("exec")
    ws = new(restful.WebService)
    ws.
    Path("/exec")
    ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
    To(s.getExec).
    Operation("getExec"))
    ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
    To(s.getExec).
    Operation("getExec"))
    ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
    To(s.getExec).
    Operation("getExec"))
    ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
    To(s.getExec).
    Operation("getExec"))
    s.restfulCont.Add(ws)

getExec func解析命令执行请求,通过CRI调用runtimeService.Exec从容器运行时获取一个执行Exec的连接地址,然后通过调用proxyStream代理kube-apiserver和容器运行时之间的流式处理

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/server/server.go#L824

    // getExec handles requests to run a command inside a container.
    func (s *Server) getExec(request *restful.Request, response *restful.Response) {
    params := getExecRequestParams(request)
    streamOpts, err := remotecommandserver.NewOptions(request.Request)
    if err != nil {
    utilruntime.HandleError(err)
    response.WriteError(http.StatusBadRequest, err)
    return
    }
    pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
    if !ok {
    response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
    return
    }

    podFullName := kubecontainer.GetPodFullName(pod)
    url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
    if err != nil {
    streaming.WriteError(err, response.ResponseWriter)
    return
    }
    proxyStream(response.ResponseWriter, request.Request, url)
    }

端口转发接口

kubectl port-forward能够本地监听服务端口,使其于远端Pod端口相映射,实现端口转发。

kubectl向kube-apiserver发起端口转发请求

终端执行kubectl port-forward,实际上是调用Pod的子资源portforward的POST完成

  • Ref:https://github.com/kubernetes/kubectl/blob/3810c9e1663ed4b1a2eb64eb8d76f1c54de8a5c3/pkg/cmd/portforward/portforward.go#L389

    // RunPortForward implements all the necessary functionality for port-forward cmd.
    func (o PortForwardOptions) RunPortForward() error {
    pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
    if err != nil {
    return err
    }

    if pod.Status.Phase != corev1.PodRunning {
    return fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
    }

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    defer signal.Stop(signals)

    go func() {
    <-signals
    if o.StopChannel != nil {
    close(o.StopChannel)
    }
    }()

    req := o.RESTClient.Post().
    Resource("pods").
    Namespace(o.Namespace).
    Name(pod.Name).
    SubResource("portforward")

    return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
    }

Kuebctl 通过PortForward封装对kube-apiserver的POST请求,实现目标服务器的连接,并且给予连接升级为基于SPDY的多路双向流

kube-apiserver向kubelet发起端口转发请求

对于kube-apiserver来说,portForward也是一种资源

  • API注册阶段,Pod已经注册了其资源对象和RESTSTorage的对应关系

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/registry/core/rest/storage_core.go#L274

    storage := map[string]rest.Storage{}
    if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
    storage[resource] = podStorage.Pod
    storage[resource+"/attach"] = podStorage.Attach
    storage[resource+"/status"] = podStorage.Status
    storage[resource+"/log"] = podStorage.Log
    storage[resource+"/exec"] = podStorage.Exec
    storage[resource+"/portforward"] = podStorage.PortForward
    storage[resource+"/proxy"] = podStorage.Proxy
    storage[resource+"/binding"] = podStorage.Binding
    if podStorage.Eviction != nil {
    storage[resource+"/eviction"] = podStorage.Eviction
    }
    storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers

kubelet处理并响应命令执行请求

kubelet的HTTP Server最终收到来自kube-apiserver的日志请求,经过认证、鉴权之后,最终被getExec处理

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/kubelet/server/server.go#L488

    s.addMetricsBucketMatcher("portForward")
    ws = new(restful.WebService)
    ws.
    Path("/portForward")
    ws.Route(ws.GET("/{podNamespace}/{podID}").
    To(s.getPortForward).
    Operation("getPortForward"))
    ws.Route(ws.POST("/{podNamespace}/{podID}").
    To(s.getPortForward).
    Operation("getPortForward"))
    ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}").
    To(s.getPortForward).
    Operation("getPortForward"))
    ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
    To(s.getPortForward).
    Operation("getPortForward"))
    s.restfulCont.Add(ws)

kubectl attach都类似,丝线了SPDY协议和双层代理实现