K8s-kubelet(HTTP服务接口)
K8s-kubelet(HTTP服务接口)
基于1.25
kubelet通过HTTP Server对外暴露API,为了确保接口安全,kubelet按照安全等级从低到高顺序支持3种HTTP Server,分别是healthz server、readonly server和kubelet core server
一级类目 | 二级类目 | Path路径 | 描述 |
---|---|---|---|
Default Handlerers | healthz | /healthz | 检查kubelet是否健康,重点检查syncLoop是否持续在规定时间内完成。检查syncLoop四因为其他组件故障会间接导致syncLoop不能执行成功 |
Default Handlerers | pods | /pods | 读取当前节点运行的Pod列表(通过PodManager获取) |
Default Handlerers | stats | /stats/summary | 读取资源使用状态 |
Default Handlerers | metrics | /metrics | 读取kubelet监控指标数据 |
Default Handlerers | metrics | /metrics/cadvisor | 读取cadvisor监控指标数据 |
Default Handlerers | metrics | /metrics/probes | 读取probes监控指标数据 |
Default Handlerers | metrics | /metrics/resource | 读取resources监控指标数据 |
Default Handlerers | checkopint | /checkpoint/{podNamespace}/{podID}/{containerName} | 为容器构建快照,依赖ContainerCheckpoint featuregate及容器运行时的支持 |
Debugging Handlers | run | /run/{podNamespace}/{podID}/{containerName}||/run/{podNamespace}/{podID}/{uid}/{containerName} | 在容器内执行命令。这里的podID=PodName,uid==Pod UID,uid仅无法通过Namespace/Name 确定唯一的Pod时,查处目标Pod |
Debugging Handlers | exec | /exec/{podNamespace}/{podID}/{containerName}||/exec/{podNamespace}/{podID}/{uid}/{containerName} | 在容器内交互执行命令。这里的podID=PodName,uid==Pod UID,uid仅无法通过Namespace/Name 确定唯一的Pod时,查处目标Pod |
Debugging Handlers | attach | /attach/{podNamespace}/{podID}/{containerName}||/attach/{podNamespace}/{podID}/{uid}/{containerName} | 连接到容器执行进程,CRI调用会返回一个重定向地址,处理stream |
Debugging Handlers | portForward | /portForward/{podNamespace}/{podID}|/portForward/{podNamespace}/{podID}/{uid} | 执行端口转发,CRI调用返回一个重定向地址,处理stream流 |
Debugging Handlers | containerLogs | /containerLogs/{podNamespace}/{podID}/{containerName} | 读取容器日志,kubelet首先通过CRI读取容器日志在宿主机上的路径,然后从本地文件系统加载日志 |
Debugging Handlers | configz | /configz | 读取kubelet使用的配置 |
Debugging Handlers | runningpods | /runningpods | 读取当前节点运行的Pod列表(通过RuntimeCache获取),与/pods返回期望的Pod列表不同,/runningpods返回的是容器运行时的实际Pod列表 |
SystemLogHandler | logs | /logs|| /logs/{logpath.*} | 读取节点/var/log下的日志日志 |
ProfilingHandler | debug | /debug/pprof/{subpath.*} | 读取pprof监控信息 |
DebugFlagsHandler | debug | /debug/flags/v | 动态调整kubelet的日志等级 |
日志查询接口
kubectl logs 常常读取日志运行日志,实际上的流程是通过kubectl 发送请求给kube-apiserver,通过认证之后,kube-apiserver主动向kubelet的/containerLogs发情请求,读取容器日志
kubectl向kube-apiserver发起日志查询请求
kubectl logs命令,底层调用了client-go的GetLogs func获取日志
默认采用按行打印,持续从Stream数据流中读取内容,输出到标准输出,直到遇到错误或者EOF停止
-
ret := make(map[corev1.ObjectReference]rest.ResponseWrapper, 1)
ret[*ref] = clientset.Pods(t.Namespace).GetLogs(t.Name, currOpts)
return ret, nil
}
kube-apiserver向kubelet发起日志查询请求
对于kube-apiserver,Log也是一种资源对象,只不过是由LogREST负责处理
API注册阶段,Pod已经注册了其资源对象和RESTSTorage的对应关系
-
storage := map[string]rest.Storage{}
if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.Pod
storage[resource+"/attach"] = podStorage.Attach
storage[resource+"/status"] = podStorage.Status
storage[resource+"/log"] = podStorage.Log
storage[resource+"/exec"] = podStorage.Exec
storage[resource+"/portforward"] = podStorage.PortForward
storage[resource+"/proxy"] = podStorage.Proxy
storage[resource+"/binding"] = podStorage.Binding
if podStorage.Eviction != nil {
storage[resource+"/eviction"] = podStorage.Eviction
}
storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
当Log发起GET请求,是由存储对象podStorage.Log的Get func处理
-
// Get retrieves a runtime.Object that will stream the contents of the pod log
func (r *LogREST) Get(ctx context.Context, name string, opts runtime.Object) (runtime.Object, error) {
// register the metrics if the context is used. This assumes sync.Once is fast. If it's not, it could be an init block.
registerMetrics()
logOpts, ok := opts.(*api.PodLogOptions)
if !ok {
return nil, fmt.Errorf("invalid options object: %#v", opts)
}
countSkipTLSMetric(logOpts.InsecureSkipTLSVerifyBackend)
if errs := validation.ValidatePodLogOptions(logOpts); len(errs) > 0 {
return nil, errors.NewInvalid(api.Kind("PodLogOptions"), name, errs)
}
location, transport, err := pod.LogLocation(ctx, r.Store, r.KubeletConn, name, logOpts)
if err != nil {
return nil, err
}
return &genericrest.LocationStreamer{
Location: location,
Transport: transport,
ContentType: "text/plain",
Flush: logOpts.Follow,
ResponseChecker: genericrest.NewGenericHttpResponseChecker(api.Resource("pods/log"), name),
RedirectChecker: genericrest.PreventRedirects,
TLSVerificationErrorCounter: podLogsTLSFailure,
}, nil
}
LogREST通过ValidatePodLogOptions校验之后,即返回了LocationStreamer,构造了https://
kubelet处理并响应日志查询请求
kubelet的HTTP Server最终收到来自kube-apiserver的日志请求,经过认证、鉴权之后,最终被getContainerLogs处理
-
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getContainerLogs).
Operation("getContainerLogs"))
s.restfulCont.Add(ws)
getContainerLogs 解析日志查询请求,狗仔PodLogOptions参数对象,通过调用GetKubeletContainerLogs读取容器日志
-
// GetKubeletContainerLogs returns logs from the container
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
// Pod workers periodically write status to statusManager. If status is not
// cached there, something is wrong (or kubelet just restarted and hasn't
// caught up yet). Just assume the pod is not ready yet.
name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil {
return fmt.Errorf("unable to parse pod full name %q: %v", podFullName, err)
}
pod, ok := kl.GetPodByName(namespace, name)
if !ok {
return fmt.Errorf("pod %q cannot be found - no logs available", name)
}
// TODO: this should be using the podWorker's pod store as authoritative, since
// the mirrorPod might still exist, the pod may have been force deleted but
// is still terminating (users should be able to view logs of force deleted static pods
// based on full name).
var podUID types.UID
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
return fmt.Errorf("mirror pod %q does not have a corresponding pod", name)
}
podUID = mirrorPod.UID
} else {
podUID = pod.UID
}
podStatus, found := kl.statusManager.GetPodStatus(podUID)
if !found {
// If there is no cached status, use the status from the
// config source (apiserver). This is useful if kubelet
// has recently been restarted.
podStatus = pod.Status
}
// TODO: Consolidate the logic here with kuberuntime.GetContainerLogs, here we convert container name to containerID,
// but inside kuberuntime we convert container id back to container name and restart count.
// TODO: After separate container log lifecycle management, we should get log based on the existing log files
// instead of container status.
containerID, err := kl.validateContainerLogStatus(pod.Name, &podStatus, containerName, logOptions.Previous)
if err != nil {
return err
}
// Since v1.32, stdout may be nil if the stream is not requested.
if stdout != nil {
// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
}
}
return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr)
}kubelet首先通过runtimeService.ContainerName调用CRI,从容器运行时获取容器日志,然后通过status.GetLogPath获取容器的日志路径,最后使用ReadLogs读取日志文件的内容
kubelet实际上是从宿主机之间拉取日志内容,只是通过CRI调用获取容器日志的宿主机路径
通过以下命令可以获取容器的日志路径,通过访问宿主机对应的相应路径可以查看对应的日志文件
crictl inspect -o go-template --templdate '{{.status.logPath}}' <containerid> |
ReadLogs从宿主机不断读取日志文件,通过Response返回给客户端
ReadLogs使用os.Open打开日志文件,通过Reader不断读取日志文件,并且将其解析为MSG格式,写入Response Stream。
- 特别的,当开启fllow模式,ReadLogs会使用fsnotify监听日志文件变化,当日志文件发生rotate,自动等待和发现新的日志文件
-
// ReadLogs read the container log and redirect into stdout and stderr.
// Note that containerID is only needed when following the log, or else
// just pass in empty string "".
func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error {
// fsnotify has different behavior for symlinks in different platform,
// for example it follows symlink on Linux, but not on Windows,
// so we explicitly resolve symlinks before reading the logs.
// There shouldn't be security issue because the container log
// path is owned by kubelet and the container runtime.
evaluated, err := filepath.EvalSymlinks(path)
if err != nil {
return fmt.Errorf("failed to try resolving symlinks in path %q: %v", path, err)
}
path = evaluated
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", path, err)
}
defer f.Close()
// Search start point based on tail line.
start, err := tail.FindTailLineStartIndex(f, opts.tail)
if err != nil {
return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err)
}
if _, err := f.Seek(start, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek %d in log file %q: %v", start, path, err)
}
limitedMode := (opts.tail >= 0) && (!opts.follow)
limitedNum := opts.tail
// Start parsing the logs.
r := bufio.NewReader(f)
// Do not create watcher here because it is not needed if `Follow` is false.
var watcher *fsnotify.Watcher
var parse parseFunc
var stop bool
isNewLine := true
found := true
writer := newLogWriter(stdout, stderr, opts)
msg := &logMessage{}
for {
if stop || (limitedMode && limitedNum == 0) {
klog.V(2).InfoS("Finished parsing log file", "path", path)
return nil
}
l, err := r.ReadBytes(eol[0])
if err != nil {
if err != io.EOF { // This is an real error
return fmt.Errorf("failed to read log file %q: %v", path, err)
}
if opts.follow {
// The container is not running, we got to the end of the log.
if !found {
return nil
}
// Reset seek so that if this is an incomplete line,
// it will be read again.
if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil {
return fmt.Errorf("failed to reset seek in log file %q: %v", path, err)
}
if watcher == nil {
// Initialize the watcher if it has not been initialized yet.
if watcher, err = fsnotify.NewWatcher(); err != nil {
return fmt.Errorf("failed to create fsnotify watcher: %v", err)
}
defer watcher.Close()
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
}
// If we just created the watcher, try again to read as we might have missed
// the event.
continue
}
var recreated bool
// Wait until the next log change.
found, recreated, err = waitLogs(ctx, containerID, watcher, runtimeService)
if err != nil {
return err
}
if recreated {
newF, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
continue
}
return fmt.Errorf("failed to open log file %q: %v", path, err)
}
defer newF.Close()
f.Close()
if err := watcher.Remove(f.Name()); err != nil && !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to remove file watch", "path", f.Name())
}
f = newF
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
}
r = bufio.NewReader(f)
}
// If the container exited consume data until the next EOF
continue
}
// Should stop after writing the remaining content.
stop = true
if len(l) == 0 {
continue
}
klog.InfoS("Incomplete line in log file", "path", path, "line", l)
}
if parse == nil {
// Initialize the log parsing function.
parse, err = getParseFunc(l)
if err != nil {
return fmt.Errorf("failed to get parse function: %v", err)
}
}
// Parse the log line.
msg.reset()
if err := parse(l, msg); err != nil {
klog.ErrorS(err, "Failed when parsing line in log file", "path", path, "line", l)
continue
}
// Write the log line into the stream.
if err := writer.write(msg, isNewLine); err != nil {
if err == errMaximumWrite {
klog.V(2).InfoS("Finished parsing log file, hit bytes limit", "path", path, "limit", opts.bytes)
return nil
}
klog.ErrorS(err, "Failed when writing line to log file", "path", path, "line", msg)
return err
}
if limitedMode {
limitedNum--
}
if len(msg.log) > 0 {
isNewLine = msg.log[len(msg.log)-1] == eol[0]
} else {
isNewLine = true
}
}
}
命令执行接口
kubectl exec 是常见的调试工具,能创建一个交互式的终端环节
kubectl向kube-apiserver发起命令交互请求
在终端执行kubectl exec ,在底层链路上,实际上是通过client-go构造对Pod子资源exec的POST请求完成
-
// TODO: consider abstracting into a client invocation or client helper
req := restClient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: p.Command,
Stdin: p.Stdin,
Stdout: p.Out != nil,
Stderr: p.ErrOut != nil,
TTY: t.Raw,
}, scheme.ParameterCodec)
除了支持POST方法,还支持GET方法
- Post适用于kubectl等客户端调用的场景,基于SPDY协议完成,支持双向通信
- GET适用于WEB调用,可以基于WebSocket完成协议升级,以支持双向通信
SPDY协议是谷歌研发的开放网络传输协议,目的是缩小网页加载时间。通过优先级和多路复用,SPDY只需要创建一个TCP连接就可以传输网页内容和图片资源
-
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
if err != nil {
return err
}
return exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
TerminalSizeQueue: terminalSizeQueue,
})
}
kube-apiserver向kubelet发起命令执行请求
对于kube-apiserver来说,exec也是一种资源
API注册阶段,Pod已经注册了其资源对象和RESTSTorage的对应关系
-
storage := map[string]rest.Storage{}
if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.Pod
storage[resource+"/attach"] = podStorage.Attach
storage[resource+"/status"] = podStorage.Status
storage[resource+"/log"] = podStorage.Log
storage[resource+"/exec"] = podStorage.Exec
storage[resource+"/portforward"] = podStorage.PortForward
storage[resource+"/proxy"] = podStorage.Proxy
storage[resource+"/binding"] = podStorage.Binding
if podStorage.Eviction != nil {
storage[resource+"/eviction"] = podStorage.Eviction
}
storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
kubelet处理并响应命令执行请求
kubelet的HTTP Server最终收到来自kube-apiserver的日志请求,经过认证、鉴权之后,最终被getExec处理
-
s.addMetricsBucketMatcher("exec")
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
s.restfulCont.Add(ws)
getExec func解析命令执行请求,通过CRI调用runtimeService.Exec从容器运行时获取一个执行Exec的连接地址,然后通过调用proxyStream代理kube-apiserver和容器运行时之间的流式处理
-
// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
params := getExecRequestParams(request)
streamOpts, err := remotecommandserver.NewOptions(request.Request)
if err != nil {
utilruntime.HandleError(err)
response.WriteError(http.StatusBadRequest, err)
return
}
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
podFullName := kubecontainer.GetPodFullName(pod)
url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
if err != nil {
streaming.WriteError(err, response.ResponseWriter)
return
}
proxyStream(response.ResponseWriter, request.Request, url)
}
端口转发接口
kubectl port-forward能够本地监听服务端口,使其于远端Pod端口相映射,实现端口转发。
kubectl向kube-apiserver发起端口转发请求
终端执行kubectl port-forward,实际上是调用Pod的子资源portforward的POST完成
-
// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
if err != nil {
return err
}
if pod.Status.Phase != corev1.PodRunning {
return fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
defer signal.Stop(signals)
go func() {
<-signals
if o.StopChannel != nil {
close(o.StopChannel)
}
}()
req := o.RESTClient.Post().
Resource("pods").
Namespace(o.Namespace).
Name(pod.Name).
SubResource("portforward")
return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
}
Kuebctl 通过PortForward封装对kube-apiserver的POST请求,实现目标服务器的连接,并且给予连接升级为基于SPDY的多路双向流
-
func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
if err != nil {
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
if err != nil {
return err
}
return fw.ForwardPorts()
}
kube-apiserver向kubelet发起端口转发请求
对于kube-apiserver来说,portForward也是一种资源
API注册阶段,Pod已经注册了其资源对象和RESTSTorage的对应关系
-
storage := map[string]rest.Storage{}
if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.Pod
storage[resource+"/attach"] = podStorage.Attach
storage[resource+"/status"] = podStorage.Status
storage[resource+"/log"] = podStorage.Log
storage[resource+"/exec"] = podStorage.Exec
storage[resource+"/portforward"] = podStorage.PortForward
storage[resource+"/proxy"] = podStorage.Proxy
storage[resource+"/binding"] = podStorage.Binding
if podStorage.Eviction != nil {
storage[resource+"/eviction"] = podStorage.Eviction
}
storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
kubelet处理并响应命令执行请求
kubelet的HTTP Server最终收到来自kube-apiserver的日志请求,经过认证、鉴权之后,最终被getExec处理
-
s.addMetricsBucketMatcher("portForward")
ws = new(restful.WebService)
ws.
Path("/portForward")
ws.Route(ws.GET("/{podNamespace}/{podID}").
To(s.getPortForward).
Operation("getPortForward"))
ws.Route(ws.POST("/{podNamespace}/{podID}").
To(s.getPortForward).
Operation("getPortForward"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}").
To(s.getPortForward).
Operation("getPortForward"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
To(s.getPortForward).
Operation("getPortForward"))
s.restfulCont.Add(ws)
kubectl attach都类似,丝线了SPDY协议和双层代理实现