K8s-client-go
K8s-client-go
什么是Client-go
Client-go是K8s提供的调用K8s对西那个的客户端,实现了访问kube-apiserver,对K8s集群中的资源对象管理
客户端
client-go支持Client Set、DynamicClient、Discovery Client、RESTClient四种
- RESTClient是最基础的客户端,对HTTP请求进行了封装
- ClientSet对在RESTClient基础上,封装了Resource和Version的管理方法
- ClientSet只能处理K8s内置资源
- 通过client-gen代码生成器生成
- DynamicClient:DynamicClient可以处理所有的资源,包括CRD
- DiscoveryClient:用于发现kube-apisever所支持的Group、Versions、Resources
kuebconfig配置管理
kubeconfig就是管理访问kube-apiserver的配置信息,支持管理访问多个kube-apiserver的配置信息
- K8s其他组件默认加载kubeconfig
- 默认放在$HOME/.kube/config
- 也可以使用KUBECONFIG环境变量指定多个路径
[root@k8s-master ~]# cat .kube/config |
// 生成K8 config代码 |
func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) { |
合并多个配置信息
config := clientcmdapi.NewConfig() |
RESTClient
RESTClient是最基础Client,封装了HTTP接口
使用示例
package main |
执行流程
rest.RESTClientFor func通过kubeconfig中的配置信息实例化RESTClient对象,RESTClient对象使用流式调用HTTP请求的一系列参数
Get函数支持HTTP设置为Get,还支持Post、Put、Delete、Patch等请求方法
Namespace func设置请求的命名空间
Resource func设置请求的资源名称
VersionedParams func设置一些查询选项(Limit、LabelSelector、TimeoutSeconds等)
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
// server - the provided function is responsible for handling server errors.
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
// Metrics for total request latency
start := time.Now()
defer func() {
metrics.RequestLatency.Observe(ctx, r.verb, r.finalURLTemplate(), time.Since(start))
}()
if r.err != nil {
klog.V(4).Infof("Error in request: %v", r.err)
return r.err
}
if err := r.requestPreflightCheck(); err != nil {
return err
}
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
// Throttle the first try before setting up the timeout configured on the
// client. We don't want a throttled client to return timeouts to callers
// before it makes a single request.
if err := r.tryThrottle(ctx); err != nil {
return err
}
if r.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.timeout)
defer cancel()
}
isErrRetryableFunc := func(req *http.Request, err error) bool {
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as they are not idempotent.
if req.Method != "GET" {
return false
}
// For connection errors and apiserver shutdown errors retry.
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
return true
}
return false
}
// Right now we make about ten retry attempts if we get a Retry-After response.
retry := r.retryFn(r.maxRetries)
for {
if err := retry.Before(ctx, r); err != nil {
return retry.WrapPreviousError(err)
}
req, err := r.newHTTPRequest(ctx)
if err != nil {
return err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
// https://pkg.go.dev/net/http#Request
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
}
retry.After(ctx, r, resp, err)
done := func() bool {
defer readAndCloseResponseBody(resp)
// if the server returns an error in err, the response will be nil.
f := func(req *http.Request, resp *http.Response) {
if resp == nil {
return
}
fn(req, resp)
}
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
return false
}
f(req, resp)
return true
}()
if done {
return retry.WrapPreviousError(err)
}
}
}
// Do formats and executes the request. Returns a Result object for easy response
// processing.
//
// Error type:
// - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// - http.Client.Do errors are returned directly.
func (r *Request) Do(ctx context.Context) Result {
var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
return Result{err: err}
}
if result.err == nil || len(result.body) > 0 {
metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
}
return result
}
重试机制
为了减少由于临时或者突发的错误请求导致的请求失败,RESTClient设置了WithRetry接口实现了重试
|
ClientSet
基于RESTClient的Client,实现了需要指定Resource和Version
使用示例
package main |
实现原理
当使用kubernetes.NewForConfig func实现ClientSet客户端,会级联构造出专属对象
CoreV1Client实现了CoreV1Interface接口,该接口封装了Core资源组、V1资源版本
type CoreV1Interface interface { |
初始化Client Func:
// NewForConfigAndClient creates a new CoreV1Client for the given config and http client. |
DynamicClient
基于RESTClient的动态客户端
可以对任意K8s资源操作,包含CRD
不是类型安全的,可能访问CRD出现程序崩溃
使用示例
package main |
实现原理
- 在dynamic.NewForConfig func初始化DynamicClient 过程中,先使用kubeconfig对象
- 使用kubeconfig构建RESTClient(NewForConfigAndClient)
// NewForConfigAndClient creates a new dynamic client for the given config and http client. |
和ClientSet不同,初始化的时候不会传入Group、Version、Resource,只有在实际操作的时候传入
-
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
// 使用了断言
if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
return list, nil
}
list, err := uncastObj.(*unstructured.Unstructured).ToList()
if err != nil {
return nil, err
}
return list, nil
}
DiscoveryClient
DiscoveryClient是基于RESTClient,主要用于发现kube-apiserver支持的资源组、资源版本、资源信息
- kubectl 的
api-version
和api-resource
输出也是基于DiscoveryClient实现 - 还可以实现这些信息存储到本地,进行本地cache,减少催kube-apiserver的访问压力
- kubectl 的缓存实现使用了ChachedDiscoveryClient,在缓存周期获取
- 默认缓存位置:
~/.kube/cache/discovery
和~/.kube/cache/http
目录 - 默认存储周期6h
- 默认缓存位置:
使用示例
package main |
实现原理
kube-apiserver暴露了/api
和/apis
接口
// ServerGroups returns the supported groups, with information like supported versions and the |