K8s-client-go

什么是Client-go

Client-go是K8s提供的调用K8s对西那个的客户端,实现了访问kube-apiserver,对K8s集群中的资源对象管理

客户端

client-go支持Client Set、DynamicClient、Discovery Client、RESTClient四种

  • RESTClient是最基础的客户端,对HTTP请求进行了封装
  • ClientSet对在RESTClient基础上,封装了Resource和Version的管理方法
    • ClientSet只能处理K8s内置资源
    • 通过client-gen代码生成器生成
  • DynamicClient:DynamicClient可以处理所有的资源,包括CRD
  • DiscoveryClient:用于发现kube-apisever所支持的Group、Versions、Resources

kuebconfig配置管理

kubeconfig就是管理访问kube-apiserver的配置信息,支持管理访问多个kube-apiserver的配置信息

  • K8s其他组件默认加载kubeconfig
  • 默认放在$HOME/.kube/config
  • 也可以使用KUBECONFIG环境变量指定多个路径
[root@k8s-master ~]# cat .kube/config                                                                                                                                                                 
apiVersion: v1 # K8s集群信息,如kube-apiserver的地址以及集群证书
clusters:
- cluster:
certificate-authority-data: xxx
server: https://masterip:6443
name: kubernetes
contexts:
- context:
cluster: kubernetes
user: kubernetes-admin
name: kubernetes-admin@kubernetes
current-context: kubernetes-admin@kubernetes
kind: Config
preferences: {} # 定义K8s 集群用户验证凭证
users:
- name: kubernetes-admin
user:
client-certificate-data: xxx
client-key-data: xxx
// 生成K8  config代码
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
if err := rules.Migrate(); err != nil {
return nil, err
}

errlist := []error{}
missingList := []string{}

kubeConfigFiles := []string{}

// Make sure a file we were explicitly told to use exists
if len(rules.ExplicitPath) > 0 {
if _, err := os.Stat(rules.ExplicitPath); os.IsNotExist(err) {
return nil, err
}
kubeConfigFiles = append(kubeConfigFiles, rules.ExplicitPath)

} else {
kubeConfigFiles = append(kubeConfigFiles, rules.Precedence...)
}

kubeconfigs := []*clientcmdapi.Config{}
// read and cache the config files so that we only look at them once
for _, filename := range kubeConfigFiles {
if len(filename) == 0 {
// no work to do
continue
}

config, err := LoadFromFile(filename)

if os.IsNotExist(err) {
// skip missing files
// Add to the missing list to produce a warning
missingList = append(missingList, filename)
continue
}

if err != nil {
errlist = append(errlist, fmt.Errorf("error loading config file \"%s\": %v", filename, err))
continue
}

kubeconfigs = append(kubeconfigs, config)
}

if rules.WarnIfAllMissing && len(missingList) > 0 && len(kubeconfigs) == 0 {
klog.Warningf("Config not found: %s", strings.Join(missingList, ", "))
}

// first merge all of our maps
mapConfig := clientcmdapi.NewConfig()

for _, kubeconfig := range kubeconfigs {
mergo.Merge(mapConfig, kubeconfig, mergo.WithOverride)
}

// merge all of the struct values in the reverse order so that priority is given correctly
// errors are not added to the list the second time
nonMapConfig := clientcmdapi.NewConfig()
for i := len(kubeconfigs) - 1; i >= 0; i-- {
kubeconfig := kubeconfigs[i]
mergo.Merge(nonMapConfig, kubeconfig, mergo.WithOverride)
}

// since values are overwritten, but maps values are not, we can merge the non-map config on top of the map config and
// get the values we expect.
config := clientcmdapi.NewConfig()
mergo.Merge(config, mapConfig, mergo.WithOverride)
mergo.Merge(config, nonMapConfig, mergo.WithOverride)

if rules.ResolvePaths() {
if err := ResolveLocalPaths(config); err != nil {
errlist = append(errlist, err)
}
}
return config, utilerrors.NewAggregate(errlist)
}

合并多个配置信息

config := clientcmdapi.NewConfig()
mergo.Merge(config, mapConfig, mergo.WithOverride)
mergo.Merge(config, nonMapConfig, mergo.WithOverride)

RESTClient

RESTClient是最基础Client,封装了HTTP接口

使用示例

package main

import (
"context"
"flag"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
)

func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}

// path: /api/v1/namespaces/{namespace}/pods
config.APIPath = "api"
// pod的Group是空字符串
config.GroupVersion = &corev1.SchemeGroupVersion
// 指定序列化工具
config.NegotiatedSerializer = scheme.Codecs

// 创建RESTClient实例
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
// 存放podList
res := &corev1.PodList{}
namespace := "kube-system"
err = restClient.Get().Namespace(namespace).
// 请求资源
Resource("pods").
// 指定大小限制和序列化工具
VersionedParams(&metav1.ListOptions{Limit: 100}, scheme.ParameterCodec).
Do(context.TODO()).
// 结果存放
Into(res)
if err != nil {
panic(err)
}
for _, item := range res.Items {
fmt.Printf("%v\t %v\t %v\n",
item.Namespace,
item.Status.Phase,
item.Name)
}
}

执行流程

rest.RESTClientFor func通过kubeconfig中的配置信息实例化RESTClient对象,RESTClient对象使用流式调用HTTP请求的一系列参数

  • Get函数支持HTTP设置为Get,还支持Post、Put、Delete、Patch等请求方法

  • Namespace func设置请求的命名空间

  • Resource func设置请求的资源名称

  • VersionedParams func设置一些查询选项(Limit、LabelSelector、TimeoutSeconds等)

    // request connects to the server and invokes the provided function when a server response is
    // received. It handles retry behavior and up front validation of requests. It will invoke
    // fn at most once. It will return an error if a problem occurred prior to connecting to the
    // server - the provided function is responsible for handling server errors.
    func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
    // Metrics for total request latency
    start := time.Now()
    defer func() {
    metrics.RequestLatency.Observe(ctx, r.verb, r.finalURLTemplate(), time.Since(start))
    }()

    if r.err != nil {
    klog.V(4).Infof("Error in request: %v", r.err)
    return r.err
    }

    if err := r.requestPreflightCheck(); err != nil {
    return err
    }

    client := r.c.Client
    if client == nil {
    client = http.DefaultClient
    }

    // Throttle the first try before setting up the timeout configured on the
    // client. We don't want a throttled client to return timeouts to callers
    // before it makes a single request.
    if err := r.tryThrottle(ctx); err != nil {
    return err
    }

    if r.timeout > 0 {
    var cancel context.CancelFunc
    ctx, cancel = context.WithTimeout(ctx, r.timeout)
    defer cancel()
    }

    isErrRetryableFunc := func(req *http.Request, err error) bool {
    // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
    // Thus in case of "GET" operations, we simply retry it.
    // We are not automatically retrying "write" operations, as they are not idempotent.
    if req.Method != "GET" {
    return false
    }
    // For connection errors and apiserver shutdown errors retry.
    if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
    return true
    }
    return false
    }

    // Right now we make about ten retry attempts if we get a Retry-After response.
    retry := r.retryFn(r.maxRetries)
    for {
    if err := retry.Before(ctx, r); err != nil {
    return retry.WrapPreviousError(err)
    }
    req, err := r.newHTTPRequest(ctx)
    if err != nil {
    return err
    }
    resp, err := client.Do(req)
    updateURLMetrics(ctx, r, resp, err)
    // The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
    // https://pkg.go.dev/net/http#Request
    if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
    metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
    }
    retry.After(ctx, r, resp, err)

    done := func() bool {
    defer readAndCloseResponseBody(resp)

    // if the server returns an error in err, the response will be nil.
    f := func(req *http.Request, resp *http.Response) {
    if resp == nil {
    return
    }
    fn(req, resp)
    }

    if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
    return false
    }

    f(req, resp)
    return true
    }()
    if done {
    return retry.WrapPreviousError(err)
    }
    }
    }

    // Do formats and executes the request. Returns a Result object for easy response
    // processing.
    //
    // Error type:
    // - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
    // - http.Client.Do errors are returned directly.
    func (r *Request) Do(ctx context.Context) Result {
    var result Result
    err := r.request(ctx, func(req *http.Request, resp *http.Response) {
    result = r.transformResponse(resp, req)
    })
    if err != nil {
    return Result{err: err}
    }
    if result.err == nil || len(result.body) > 0 {
    metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
    }
    return result
    }

重试机制

为了减少由于临时或者突发的错误请求导致的请求失败,RESTClient设置了WithRetry接口实现了重试


// WithRetry allows the client to retry a request up to a certain number of times
// Note that WithRetry is not safe for concurrent use by multiple
// goroutines without additional locking or coordination.
type WithRetry interface {
// IsNextRetry advances the retry counter appropriately
// and returns true if the request should be retried,
// otherwise it returns false, if:
// - we have already reached the maximum retry threshold.
// - the error does not fall into the retryable category.
// - the server has not sent us a 429, or 5xx status code and the
// 'Retry-After' response header is not set with a value.
// - we need to seek to the beginning of the request body before we
// initiate the next retry, the function should log an error and
// return false if it fails to do so.
//
// restReq: the associated rest.Request
// httpReq: the HTTP Request sent to the server
// resp: the response sent from the server, it is set if err is nil
// err: the server sent this error to us, if err is set then resp is nil.
// f: a IsRetryableErrorFunc function provided by the client that determines
// if the err sent by the server is retryable.
// 用于更新重试的计数器,判断是否继续重试
// 下面的情况不在重试
// 1.达到最大重试次数,默认10
// 2.返回的错误类型为不可重试的错误
// 3.kube-apiserver 返回的状态码不是429(请求在一定时间内超出频率)或500,且响应头中未包含Retry-After字段指示时间
// 4.无法在请求失败的时候重置请求体
IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool

// Before should be invoked prior to each attempt, including
// the first one. If an error is returned, the request should
// be aborted immediately.
//
// Before may also be additionally responsible for preparing
// the request for the next retry, namely in terms of resetting
// the request body in case it has been read.
// 每次请求调用前,如果失败则直接终止请求,也用于在每次重试前充值请求体(body)
Before(ctx context.Context, r *Request) error

// After should be invoked immediately after an attempt is made.
// 在每次请求调用后,通常用于在日志中记录错误的响应状态错误内容等信息,或设置下一次重试时间
After(ctx context.Context, r *Request, resp *http.Response, err error)

// WrapPreviousError wraps the error from any previous attempt into
// the final error specified in 'finalErr', so the user has more
// context why the request failed.
// For example, if a request times out after multiple retries then
// we see a generic context.Canceled or context.DeadlineExceeded
// error which is not very useful in debugging. This function can
// wrap any error from previous attempt(s) to provide more context to
// the user. The error returned in 'err' must satisfy the
// following conditions:
// a: errors.Unwrap(err) = errors.Unwrap(finalErr) if finalErr
// implements Unwrap
// b: errors.Unwrap(err) = finalErr if finalErr does not
// implements Unwrap
// c: errors.Is(err, otherErr) = errors.Is(finalErr, otherErr)
// 用于把之前的错误包装到最终的错误对象中
WrapPreviousError(finalErr error) (err error)
}

ClientSet

基于RESTClient的Client,实现了需要指定Resource和Version

使用示例

package main

import (
"context"
"flag"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
)

func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
pod, err := clientset.CoreV1().Pods("default").Get(context.TODO(), "", metav1.GetOptions{})
if err != nil {
panic(err)
}
fmt.Println(pod)
}

实现原理

当使用kubernetes.NewForConfig func实现ClientSet客户端,会级联构造出专属对象

CoreV1Client实现了CoreV1Interface接口,该接口封装了Core资源组、V1资源版本

type CoreV1Interface interface {
RESTClient() rest.Interface
ComponentStatusesGetter
ConfigMapsGetter
EndpointsGetter
EventsGetter
LimitRangesGetter
NamespacesGetter
NodesGetter
PersistentVolumesGetter
PersistentVolumeClaimsGetter
PodsGetter
PodTemplatesGetter
ReplicationControllersGetter
ResourceQuotasGetter
SecretsGetter
ServicesGetter
ServiceAccountsGetter
}

初始化Client Func:

// NewForConfigAndClient creates a new CoreV1Client for the given config and http client.
// Note the http client provided takes precedence over the configured transport values.
func NewForConfigAndClient(c *rest.Config, h *http.Client) (*CoreV1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
client, err := rest.RESTClientForConfigAndClient(&config, h)
if err != nil {
return nil, err
}
return &CoreV1Client{client}, nil
}

DynamicClient

基于RESTClient的动态客户端

  • 可以对任意K8s资源操作,包含CRD

  • 不是类型安全的,可能访问CRD出现程序崩溃

使用示例

package main

import (
"context"
"errors"
"flag"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
)

func GetCRD(client *dynamic.DynamicClient) (*unstructured.UnstructuredList, error) {
resource, _ := client.Resource(schema.GroupVersionResource{"node.nodedeploy", "v1", "nodedeploys"}).List(context.TODO(), metav1.ListOptions{})
if resource == nil {
return nil, errors.New("Not find")
}
return resource, nil
}
func CreateCRD(dynamicClient *dynamic.DynamicClient) {
//使用scheme的包带入gvr
deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
deploymentName := "ss"
replicas := 1
image := "nginx"
//定义结构化数据
deploymnet := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": deploymentName,
},
"spec": map[string]interface{}{
"replicas": replicas,
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": "demo",
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": "demo",
},
},
"spec": map[string]interface{}{
"containers": []map[string]interface{}{
{
"name": "web",
"image": image,
"ports": []map[string]interface{}{
{
"name": "http",
"protocol": "TCP",
"containerPort": 80,
},
},
},
},
},
},
},
},
}
create, err := dynamicClient.Resource(deploymentRes).Namespace("default").Create(context.TODO(), deploymnet, metav1.CreateOptions{})
if err != nil {
panic(err)
}
fmt.Println(create)

}
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
//kubeconfig = "C:\\Users\\huan\\.kube\\conf"

flag.Parse()

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
// 实例化对象
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
crd, _ := GetCRD(dynamicClient)
fmt.Println(crd)
}

实现原理

  1. 在dynamic.NewForConfig func初始化DynamicClient 过程中,先使用kubeconfig对象
  2. 使用kubeconfig构建RESTClient(NewForConfigAndClient)
// NewForConfigAndClient creates a new dynamic client for the given config and http client.
// Note the http client provided takes precedence over the configured transport values.
func NewForConfigAndClient(inConfig *rest.Config, h *http.Client) (Interface, error) {
config := ConfigFor(inConfig)
// for serializing the options
config.GroupVersion = &schema.GroupVersion{}
config.APIPath = "/if-you-see-this-search-for-the-break"

restClient, err := rest.RESTClientForConfigAndClient(config, h)
if err != nil {
return nil, err
}
return &dynamicClient{client: restClient}, nil
}
  • 和ClientSet不同,初始化的时候不会传入Group、Version、Resource,只有在实际操作的时候传入

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/dynamic/simple.go#L272

    func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
    result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)
    if err := result.Error(); err != nil {
    return nil, err
    }
    retBytes, err := result.Raw()
    if err != nil {
    return nil, err
    }
    uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
    if err != nil {
    return nil, err
    }
    // 使用了断言
    if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
    return list, nil
    }

    list, err := uncastObj.(*unstructured.Unstructured).ToList()
    if err != nil {
    return nil, err
    }
    return list, nil
    }

DiscoveryClient

DiscoveryClient是基于RESTClient,主要用于发现kube-apiserver支持的资源组、资源版本、资源信息

  • kubectl 的api-versionapi-resource输出也是基于DiscoveryClient实现
  • 还可以实现这些信息存储到本地,进行本地cache,减少催kube-apiserver的访问压力
  • kubectl 的缓存实现使用了ChachedDiscoveryClient,在缓存周期获取
    • 默认缓存位置:~/.kube/cache/discovery~/.kube/cache/http目录
    • 默认存储周期6h

使用示例

package main

import (
"flag"
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
)

func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}

// 新建discoveryClient
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err)
}
// 获取所有的分组和资源数据
apiGroups, APIResourceListSlice, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err)
}
fmt.Printf("APIGroup:\n\n %v\n\n", apiGroups)
for _, resourceList := range APIResourceListSlice {
version := resourceList.GroupVersion
fmt.Printf("%s", version)

//把字符串转换为数据结构
groupVersion, err := schema.ParseGroupVersion(version)
if err != nil {
panic(err)
}
fmt.Printf("%v", groupVersion)
}
}

实现原理

kube-apiserver暴露了/api/apis接口

// ServerGroups returns the supported groups, with information like supported versions and the
// preferred version.
func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) {
// Get the groupVersions exposed at /api
v := &metav1.APIVersions{}
err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v)
apiGroup := metav1.APIGroup{}
if err == nil && len(v.Versions) != 0 {
apiGroup = apiVersionsToAPIGroup(v)
}
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
return nil, err
}

// Get the groupVersions exposed at /apis
apiGroupList = &metav1.APIGroupList{}
err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList)
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
return nil, err
}
// to be compatible with a v1.0 server, if it's a 403 or 404, ignore and return whatever we got from /api
if err != nil && (errors.IsNotFound(err) || errors.IsForbidden(err)) {
apiGroupList = &metav1.APIGroupList{}
}

// prepend the group retrieved from /api to the list if not empty
if len(v.Versions) != 0 {
apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...)
}
return apiGroupList, nil
}