K8s-kubelet(HTTP服务接口)
K8s-kubelet(HTTP服务接口)
基于1.25
kubelet通过HTTP Server对外暴露API,为了确保接口安全,kubelet按照安全等级从低到高顺序支持3种HTTP Server,分别是healthz server、readonly server和kubelet core server
一级类目 | 二级类目 | Path路径 | 描述 |
---|---|---|---|
Default Handlerers | healthz | /healthz | 检查kubelet是否健康,重点检查syncLoop是否持续在规定时间内完成。检查syncLoop四因为其他组件故障会间接导致syncLoop不能执行成功 |
Default Handlerers | pods | /pods | 读取当前节点运行的Pod列表(通过PodManager获取) |
Default Handlerers | stats | /stats/summary | 读取资源使用状态 |
Default Handlerers | metrics | /metrics | 读取kubelet监控指标数据 |
Default Handlerers | metrics | /metrics/cadvisor | 读取cadvisor监控指标数据 |
Default Handlerers | metrics | /metrics/probes | 读取probes监控指标数据 |
Default Handlerers | metrics | /metrics/resource | 读取resources监控指标数据 |
Default Handlerers | checkopint | /checkpoint/{podNamespace}/{podID}/{containerName} | 为容器构建快照,依赖ContainerCheckpoint featuregate及容器运行时的支持 |
Debugging Handlers | run | /run/{podNamespace}/{podID}/{containerName}||/run/{podNamespace}/{podID}/{uid}/{containerName} | 在容器内执行命令。这里的podID=PodName,uid==Pod UID,uid仅无法通过Namespace/Name 确定唯一的Pod时,查处目标Pod |
Debugging Handlers | exec | /exec/{podNamespace}/{podID}/{containerName}||/exec/{podNamespace}/{podID}/{uid}/{containerName} | 在容器内交互执行命令。这里的podID=PodName,uid==Pod UID,uid仅无法通过Namespace/Name 确定唯一的Pod时,查处目标Pod |
Debugging Handlers | attach | /attach/{podNamespace}/{podID}/{containerName}||/attach/{podNamespace}/{podID}/{uid}/{containerName} | 连接到容器执行进程,CRI调用会返回一个重定向地址,处理stream |
Debugging Handlers | portForward | /portForward/{podNamespace}/{podID}|/portForward/{podNamespace}/{podID}/{uid} | 执行端口转发,CRI调用返回一个重定向地址,处理stream流 |
Debugging Handlers | containerLogs | /containerLogs/{podNamespace}/{podID}/{containerName} | 读取容器日志,kubelet首先通过CRI读取容器日志在宿主机上的路径,然后从本地文件系统加载日志 |
Debugging Handlers | configz | /configz | 读取kubelet使用的配置 |
Debugging Handlers | runningpods | /runningpods | 读取当前节点运行的Pod列表(通过RuntimeCache获取),与/pods返回期望的Pod列表不同,/runningpods返回的是容器运行时的实际Pod列表 |
SystemLogHandler | logs | /logs|| /logs/{logpath.*} | 读取节点/var/log下的日志日志 |
ProfilingHandler | debug | /debug/pprof/{subpath.*} | 读取pprof监控信息 |
DebugFlagsHandler | debug | /debug/flags/v | 动态调整kubelet的日志等级 |
日志查询接口
kubectl logs 常常读取日志运行日志,实际上的流程是通过kubectl 发送请求给kube-apiserver,通过认证之后,kube-apiserver主动向kubelet的/containerLogs发情请求,读取容器日志
kubectl向kube-apiserver发起日志查询请求
kubectl logs命令,底层调用了client-go的GetLogs func获取日志
默认采用按行打印,持续从Stream数据流中读取内容,输出到标准输出,直到遇到错误或者EOF停止
-
1
2
3
4ret := make(map[corev1.ObjectReference]rest.ResponseWrapper, 1)
ret[*ref] = clientset.Pods(t.Namespace).GetLogs(t.Name, currOpts)
return ret, nil
}
kube-apiserver向kubelet发起日志查询请求
对于kube-apiserver,Log也是一种资源对象,只不过是由LogREST负责处理
API注册阶段,Pod已经注册了其资源对象和RESTSTorage的对应关系
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15storage := map[string]rest.Storage{}
if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.Pod
storage[resource+"/attach"] = podStorage.Attach
storage[resource+"/status"] = podStorage.Status
storage[resource+"/log"] = podStorage.Log
storage[resource+"/exec"] = podStorage.Exec
storage[resource+"/portforward"] = podStorage.PortForward
storage[resource+"/proxy"] = podStorage.Proxy
storage[resource+"/binding"] = podStorage.Binding
if podStorage.Eviction != nil {
storage[resource+"/eviction"] = podStorage.Eviction
}
storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
当Log发起GET请求,是由存储对象podStorage.Log的Get func处理
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// Get retrieves a runtime.Object that will stream the contents of the pod log
func (r *LogREST) Get(ctx context.Context, name string, opts runtime.Object) (runtime.Object, error) {
// register the metrics if the context is used. This assumes sync.Once is fast. If it's not, it could be an init block.
registerMetrics()
logOpts, ok := opts.(*api.PodLogOptions)
if !ok {
return nil, fmt.Errorf("invalid options object: %#v", opts)
}
countSkipTLSMetric(logOpts.InsecureSkipTLSVerifyBackend)
if errs := validation.ValidatePodLogOptions(logOpts); len(errs) > 0 {
return nil, errors.NewInvalid(api.Kind("PodLogOptions"), name, errs)
}
location, transport, err := pod.LogLocation(ctx, r.Store, r.KubeletConn, name, logOpts)
if err != nil {
return nil, err
}
return &genericrest.LocationStreamer{
Location: location,
Transport: transport,
ContentType: "text/plain",
Flush: logOpts.Follow,
ResponseChecker: genericrest.NewGenericHttpResponseChecker(api.Resource("pods/log"), name),
RedirectChecker: genericrest.PreventRedirects,
TLSVerificationErrorCounter: podLogsTLSFailure,
}, nil
}
LogREST通过ValidatePodLogOptions校验之后,即返回了LocationStreamer,构造了https://
kubelet处理并响应日志查询请求
kubelet的HTTP Server最终收到来自kube-apiserver的日志请求,经过认证、鉴权之后,最终被getContainerLogs处理
-
1
2
3
4ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getContainerLogs).
Operation("getContainerLogs"))
s.restfulCont.Add(ws)
getContainerLogs 解析日志查询请求,狗仔PodLogOptions参数对象,通过调用GetKubeletContainerLogs读取容器日志
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61// GetKubeletContainerLogs returns logs from the container
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
// Pod workers periodically write status to statusManager. If status is not
// cached there, something is wrong (or kubelet just restarted and hasn't
// caught up yet). Just assume the pod is not ready yet.
name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil {
return fmt.Errorf("unable to parse pod full name %q: %v", podFullName, err)
}
pod, ok := kl.GetPodByName(namespace, name)
if !ok {
return fmt.Errorf("pod %q cannot be found - no logs available", name)
}
// TODO: this should be using the podWorker's pod store as authoritative, since
// the mirrorPod might still exist, the pod may have been force deleted but
// is still terminating (users should be able to view logs of force deleted static pods
// based on full name).
var podUID types.UID
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
return fmt.Errorf("mirror pod %q does not have a corresponding pod", name)
}
podUID = mirrorPod.UID
} else {
podUID = pod.UID
}
podStatus, found := kl.statusManager.GetPodStatus(podUID)
if !found {
// If there is no cached status, use the status from the
// config source (apiserver). This is useful if kubelet
// has recently been restarted.
podStatus = pod.Status
}
// TODO: Consolidate the logic here with kuberuntime.GetContainerLogs, here we convert container name to containerID,
// but inside kuberuntime we convert container id back to container name and restart count.
// TODO: After separate container log lifecycle management, we should get log based on the existing log files
// instead of container status.
containerID, err := kl.validateContainerLogStatus(pod.Name, &podStatus, containerName, logOptions.Previous)
if err != nil {
return err
}
// Since v1.32, stdout may be nil if the stream is not requested.
if stdout != nil {
// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
}
}
return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr)
}kubelet首先通过runtimeService.ContainerName调用CRI,从容器运行时获取容器日志,然后通过status.GetLogPath获取容器的日志路径,最后使用ReadLogs读取日志文件的内容
kubelet实际上是从宿主机之间拉取日志内容,只是通过CRI调用获取容器日志的宿主机路径
通过以下命令可以获取容器的日志路径,通过访问宿主机对应的相应路径可以查看对应的日志文件
1 | crictl inspect -o go-template --templdate '{{.status.logPath}}' <containerid> |
ReadLogs从宿主机不断读取日志文件,通过Response返回给客户端
ReadLogs使用os.Open打开日志文件,通过Reader不断读取日志文件,并且将其解析为MSG格式,写入Response Stream。
- 特别的,当开启fllow模式,ReadLogs会使用fsnotify监听日志文件变化,当日志文件发生rotate,自动等待和发现新的日志文件
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141// ReadLogs read the container log and redirect into stdout and stderr.
// Note that containerID is only needed when following the log, or else
// just pass in empty string "".
func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error {
// fsnotify has different behavior for symlinks in different platform,
// for example it follows symlink on Linux, but not on Windows,
// so we explicitly resolve symlinks before reading the logs.
// There shouldn't be security issue because the container log
// path is owned by kubelet and the container runtime.
evaluated, err := filepath.EvalSymlinks(path)
if err != nil {
return fmt.Errorf("failed to try resolving symlinks in path %q: %v", path, err)
}
path = evaluated
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", path, err)
}
defer f.Close()
// Search start point based on tail line.
start, err := tail.FindTailLineStartIndex(f, opts.tail)
if err != nil {
return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err)
}
if _, err := f.Seek(start, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek %d in log file %q: %v", start, path, err)
}
limitedMode := (opts.tail >= 0) && (!opts.follow)
limitedNum := opts.tail
// Start parsing the logs.
r := bufio.NewReader(f)
// Do not create watcher here because it is not needed if `Follow` is false.
var watcher *fsnotify.Watcher
var parse parseFunc
var stop bool
isNewLine := true
found := true
writer := newLogWriter(stdout, stderr, opts)
msg := &logMessage{}
for {
if stop || (limitedMode && limitedNum == 0) {
klog.V(2).InfoS("Finished parsing log file", "path", path)
return nil
}
l, err := r.ReadBytes(eol[0])
if err != nil {
if err != io.EOF { // This is an real error
return fmt.Errorf("failed to read log file %q: %v", path, err)
}
if opts.follow {
// The container is not running, we got to the end of the log.
if !found {
return nil
}
// Reset seek so that if this is an incomplete line,
// it will be read again.
if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil {
return fmt.Errorf("failed to reset seek in log file %q: %v", path, err)
}
if watcher == nil {
// Initialize the watcher if it has not been initialized yet.
if watcher, err = fsnotify.NewWatcher(); err != nil {
return fmt.Errorf("failed to create fsnotify watcher: %v", err)
}
defer watcher.Close()
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
}
// If we just created the watcher, try again to read as we might have missed
// the event.
continue
}
var recreated bool
// Wait until the next log change.
found, recreated, err = waitLogs(ctx, containerID, watcher, runtimeService)
if err != nil {
return err
}
if recreated {
newF, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
continue
}
return fmt.Errorf("failed to open log file %q: %v", path, err)
}
defer newF.Close()
f.Close()
if err := watcher.Remove(f.Name()); err != nil && !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to remove file watch", "path", f.Name())
}
f = newF
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
}
r = bufio.NewReader(f)
}
// If the container exited consume data until the next EOF
continue
}
// Should stop after writing the remaining content.
stop = true
if len(l) == 0 {
continue
}
klog.InfoS("Incomplete line in log file", "path", path, "line", l)
}
if parse == nil {
// Initialize the log parsing function.
parse, err = getParseFunc(l)
if err != nil {
return fmt.Errorf("failed to get parse function: %v", err)
}
}
// Parse the log line.
msg.reset()
if err := parse(l, msg); err != nil {
klog.ErrorS(err, "Failed when parsing line in log file", "path", path, "line", l)
continue
}
// Write the log line into the stream.
if err := writer.write(msg, isNewLine); err != nil {
if err == errMaximumWrite {
klog.V(2).InfoS("Finished parsing log file, hit bytes limit", "path", path, "limit", opts.bytes)
return nil
}
klog.ErrorS(err, "Failed when writing line to log file", "path", path, "line", msg)
return err
}
if limitedMode {
limitedNum--
}
if len(msg.log) > 0 {
isNewLine = msg.log[len(msg.log)-1] == eol[0]
} else {
isNewLine = true
}
}
}
命令执行接口
kubectl exec 是常见的调试工具,能创建一个交互式的终端环节
kubectl向kube-apiserver发起命令交互请求
在终端执行kubectl exec ,在底层链路上,实际上是通过client-go构造对Pod子资源exec的POST请求完成
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14// TODO: consider abstracting into a client invocation or client helper
req := restClient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: p.Command,
Stdin: p.Stdin,
Stdout: p.Out != nil,
Stderr: p.ErrOut != nil,
TTY: t.Raw,
}, scheme.ParameterCodec)
除了支持POST方法,还支持GET方法
- Post适用于kubectl等客户端调用的场景,基于SPDY协议完成,支持双向通信
- GET适用于WEB调用,可以基于WebSocket完成协议升级,以支持双向通信
SPDY协议是谷歌研发的开放网络传输协议,目的是缩小网页加载时间。通过优先级和多路复用,SPDY只需要创建一个TCP连接就可以传输网页内容和图片资源
-
1
2
3
4
5
6
7
8
9
10
11
12
13func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
if err != nil {
return err
}
return exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
TerminalSizeQueue: terminalSizeQueue,
})
}
kube-apiserver向kubelet发起命令执行请求
对于kube-apiserver来说,exec也是一种资源
API注册阶段,Pod已经注册了其资源对象和RESTSTorage的对应关系
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15storage := map[string]rest.Storage{}
if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.Pod
storage[resource+"/attach"] = podStorage.Attach
storage[resource+"/status"] = podStorage.Status
storage[resource+"/log"] = podStorage.Log
storage[resource+"/exec"] = podStorage.Exec
storage[resource+"/portforward"] = podStorage.PortForward
storage[resource+"/proxy"] = podStorage.Proxy
storage[resource+"/binding"] = podStorage.Binding
if podStorage.Eviction != nil {
storage[resource+"/eviction"] = podStorage.Eviction
}
storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
kubelet处理并响应命令执行请求
kubelet的HTTP Server最终收到来自kube-apiserver的日志请求,经过认证、鉴权之后,最终被getExec处理
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17s.addMetricsBucketMatcher("exec")
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
s.restfulCont.Add(ws)
getExec func解析命令执行请求,通过CRI调用runtimeService.Exec从容器运行时获取一个执行Exec的连接地址,然后通过调用proxyStream代理kube-apiserver和容器运行时之间的流式处理
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
params := getExecRequestParams(request)
streamOpts, err := remotecommandserver.NewOptions(request.Request)
if err != nil {
utilruntime.HandleError(err)
response.WriteError(http.StatusBadRequest, err)
return
}
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
podFullName := kubecontainer.GetPodFullName(pod)
url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
if err != nil {
streaming.WriteError(err, response.ResponseWriter)
return
}
proxyStream(response.ResponseWriter, request.Request, url)
}
端口转发接口
kubectl port-forward能够本地监听服务端口,使其于远端Pod端口相映射,实现端口转发。
kubectl向kube-apiserver发起端口转发请求
终端执行kubectl port-forward,实际上是调用Pod的子资源portforward的POST完成
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
if err != nil {
return err
}
if pod.Status.Phase != corev1.PodRunning {
return fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
defer signal.Stop(signals)
go func() {
<-signals
if o.StopChannel != nil {
close(o.StopChannel)
}
}()
req := o.RESTClient.Post().
Resource("pods").
Namespace(o.Namespace).
Name(pod.Name).
SubResource("portforward")
return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
}
Kuebctl 通过PortForward封装对kube-apiserver的POST请求,实现目标服务器的连接,并且给予连接升级为基于SPDY的多路双向流
-
1
2
3
4
5
6
7
8
9
10
11
12func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
if err != nil {
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
if err != nil {
return err
}
return fw.ForwardPorts()
}
kube-apiserver向kubelet发起端口转发请求
对于kube-apiserver来说,portForward也是一种资源
API注册阶段,Pod已经注册了其资源对象和RESTSTorage的对应关系
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15storage := map[string]rest.Storage{}
if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.Pod
storage[resource+"/attach"] = podStorage.Attach
storage[resource+"/status"] = podStorage.Status
storage[resource+"/log"] = podStorage.Log
storage[resource+"/exec"] = podStorage.Exec
storage[resource+"/portforward"] = podStorage.PortForward
storage[resource+"/proxy"] = podStorage.Proxy
storage[resource+"/binding"] = podStorage.Binding
if podStorage.Eviction != nil {
storage[resource+"/eviction"] = podStorage.Eviction
}
storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
kubelet处理并响应命令执行请求
kubelet的HTTP Server最终收到来自kube-apiserver的日志请求,经过认证、鉴权之后,最终被getExec处理
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17s.addMetricsBucketMatcher("portForward")
ws = new(restful.WebService)
ws.
Path("/portForward")
ws.Route(ws.GET("/{podNamespace}/{podID}").
To(s.getPortForward).
Operation("getPortForward"))
ws.Route(ws.POST("/{podNamespace}/{podID}").
To(s.getPortForward).
Operation("getPortForward"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}").
To(s.getPortForward).
Operation("getPortForward"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
To(s.getPortForward).
Operation("getPortForward"))
s.restfulCont.Add(ws)
kubectl attach都类似,丝线了SPDY协议和双层代理实现