K8s-client-go(常用工具类:事件管理机制+Leader选举机制)

基于1.25

事件管理机制

Event是一种对象资源,展示集群中发生的事件,包含调度器的操作、Pod为什么被驱逐

  • 存储在kube-apiserver的Pod中,有强制保留策略,在最后一次事件发生之后,删除一个小时之前的事件

  • Ref:https://github.com/kubernetes/api/blob/f7b7ea4f0fcc6cb8c8dd42eb46a94c7e163d1b9d/core/v1/types.go#L5776

    type Event struct {
    // metav1.TypeMeta 嵌入类型,包含 APIVersion 和 Kind 字段,用于标识对象的 API 版本和类型
    metav1.TypeMeta `json:",inline"`
    // 标准对象元数据,包含对象的名称、命名空间、标签等信息
    metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`

    // 该事件涉及的对象的引用
    InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`

    // 事件发生的原因,应该是一个简短的、机器可读的字符串,说明对象状态转换的原因
    // TODO: 提供格式的确切规范
    // +optional
    Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`

    // 对该操作状态的人类可读描述
    // TODO: 确定最大长度
    // +optional
    Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`

    // 报告此事件的组件。应该是一个简短的机器可读字符串。
    // +optional
    Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`

    // 事件首次记录的时间。(服务器接收时间在 TypeMeta 中。)
    // +optional
    FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`

    // 此事件最近一次发生的时间记录。
    // +optional
    LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`

    // 此事件发生的次数。
    // +optional
    Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`

    // 此事件的类型(正常,警告),未来可能会添加新的类型
    // +optional
    Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`

    // 此事件首次观察到的时间。
    // +optional

    // 有关此事件表示的事件系列的数据,或者如果它是单例事件,则为 nil。
    // +optional
    Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`

    // 针对相关对象采取/失败的操作。
    // +optional
    Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`

    // 用于更复杂操作的可选辅助对象。
    // +optional
    Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`

    // 发出此事件的控制器的名称,例如 `kubernetes.io/kubelet`。
    // +optional
    ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`

    // 控制器实例的 ID,例如 `kubelet-xyzf`。
    // +optional
    ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
    }

使用

在client-go的tools/record实现了统一的事件管理器

DeploymentController为例子:

  1. 初始化事件广播器(EventBroadcaster)和事件记录器(EventRecorder)

  2. 启动EventBroadcaster的日志记录和事件上报,在独立协程中开启事件和日志上报

  3. 使用EventRecorder在关键阶段上报事件,如扩容

    if !alreadyExists && newReplicasCount > 0 {
    dc.eventRecorder.Eventf(d, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled up replica set %s from 0 to %d", createdRS.Name, newReplicasCount)
    }

运行原理

  • EventRecorder:事件生产者,记录关键事件

  • EventBroadcaster:事件消费者,关键广播器

    • 分为阻塞和非阻塞,默认使用非阻塞
  • broadcasterWatcher:由EventBroadcaster统一管理,作为观测者定义事件的处理方式

    • 使用一个broadcasterWatcher记录日志没使用另外一个上报事件
  • EventCorrelator:预处理上报事件的事件相关器,使用聚合、过滤等机制对上报事件进行分析出库,防止过多事件上报

  • EventSilk:事件沉淀器,使用ClientSet中的corev1.EventInterface把事件上报到kube-apiserver和etcd中

EventRecorder

EventRecorder拥有三种事件记录办法

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/record/event.go#L91

    // EventRecorder knows how to record events on behalf of an EventSource.
    type EventRecorder interface {
    // Event constructs an event from the given information and puts it in the queue for sending.
    // 'object' is the object this event is about. Event will make a reference-- or you may also
    // pass a reference to the object directly.
    // 'eventtype' of this event, and can be one of Normal, Warning. New types could be added in future
    // 'reason' is the reason this event is generated. 'reason' should be short and unique; it
    // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
    // to automate handling of events, so imagine people writing switch statements to handle them.
    // You want to make that easy.
    // 'message' is intended to be human readable.
    //
    // The resulting event will be created in the same namespace as the reference object.
    // 基于输入信息构造事件并且记录到发送队列中
    // 最终创建的事件与关联对象位于同一个命名空间
    Event(object runtime.Object, eventtype, reason, message string)

    // Eventf is just like Event, but with Sprintf for the message field.
    // 格式化描述
    Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

    // AnnotatedEventf is just like eventf, but with annotations attached
    // 跟Eventf一样,但是多了注解
    AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
    }

  • ActrionDrop:非阻塞的模式事件写入大小为1000的通道中,如果事件数量超过通道容量,就会被丢弃

EventBroadcaster

EventBroadcaster消费EventRecorder记录的事件分发给目前所有已经连接的broadcasterWatcher

  • DropIfChannelFull标识位于selec多路复用,使用default关键字进行非阻塞分发,进入broadcasterWatcher的result通道中的事件数量超过1000时,直接丢弃事件
  • WaitIfChannelFull也位于selec多路复用,没有default,会进入阻塞

在K8s中EVent可以被丢失,集群规模增大,etcd压力变大,Event的丢失不会影响正常工作

broadcasterWatcher

broadcasterWatcher是每个K8s组件自定义处理事件的方式,当需要记录本地记录或者事件上报到kube-apiserver中,EventBroadcaster会调用StartEventWatcher来创建对应的broadcasterWatcher

EventCorrelator

EventCorrelator是预处理所有的事件,聚合并发产生所有的相似事件

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/record/events_cache.go#L405

    // EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system.  It can filter all
    // incoming events to see if the event should be filtered from further processing. It can aggregate similar events that occur
    // frequently to protect the system from spamming events that are difficult for users to distinguish. It performs de-duplication
    // to ensure events that are observed multiple times are compacted into a single event with increasing counts.
    type EventCorrelator struct {
    // the function to filter the event
    // 过滤器,主要用于限速
    // 实现类EventSourceObjecSpamFilter采用令牌桶,构造出一个初始容量为25,每5min生成一个令牌的限速器
    filterFunc EventFilterFunc
    // the object that performs event aggregation
    // 聚合器,用于聚合一段时间内的事件
    aggregator *EventAggregator
    // the object that observes events as they come through
    // 观察器,用于处理待处理的事件和当前缓存中的事件
    // 对于已经存在的和使用双路合并机制生成新事件
    logger *eventLogger
    }

EventSink

EventSink作为事件沉淀器,把预处理的事件上报给kube-apiserver。

上报的事件有Create、Update、Patch

Leader选举机制

K8s组件为了实现高可用,使用了主备模式,采用Leader选举实现故障转移。

client-go使用tools/leaderelection实现

示例

package main

import (
"context"
"flag"
"fmt"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"os"
"os/signal"
"syscall"
"time"
)

func exitWhenTerminate(cancel context.CancelFunc) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
fmt.Println("Received terminate,start shutting down...")
cancel()
}()
}

func main() {
var id string
flag.StringVar(&id, "id", uuid.New().String(), "id of the leader (required)")
flag.Parse()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
exitWhenTerminate(cancel)

config, err := clientcmd.BuildConfigFromFlags("", "/Users/joohwan/.kube/config")
if err != nil {
panic(err)
}
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}

lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: "example",
Namespace: "default",
},
Client: kubeClient.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
run(ctx)
},
OnStoppedLeading: func() {
fmt.Printf("leader lost %s", id)
os.Exit(1)
},
OnNewLeader: func(identity string) {
if identity == id {
return
}
fmt.Printf("new leader elected: %s", id)
},
},
})

}
func run(ctx context.Context) {
fmt.Println("Controller loop...")
select {}

}

实现原理

资源锁

资源锁提供了支持Leader选举的锁对象,并且在锁对象中存储了当前持有锁的对象leader信息

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/leaderelection/resourcelock/interface.go#L144

    // Interface offers a common interface for locking on arbitrary
    // resources used in leader election. The Interface is used
    // to hide the details on specific implementations in order to allow
    // them to change over time. This interface is strictly for use
    // by the leaderelection code.
    type Interface interface {
    // Get returns the LeaderElectionRecord
    Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)

    // Create attempts to create a LeaderElectionRecord
    Create(ctx context.Context, ler LeaderElectionRecord) error

    // Update will update and existing LeaderElectionRecord
    Update(ctx context.Context, ler LeaderElectionRecord) error

    // RecordEvent is used to record events
    // 持有锁的领导者发生变化的记录对应的时间,并且使用时间上报机制上报
    RecordEvent(string)

    // Identity will return the locks Identity
    // 返回当前领导者的主键
    Identity() string

    // Describe is used to convert details on current resource lock
    // into a string
    // 返回当前锁对象的描述信息,用于调试日志输出
    Describe() string
    }

Lease对象是专用租用资源对象:

  • Ref:https://github.com/kubernetes/api/blob/f7b7ea4f0fcc6cb8c8dd42eb46a94c7e163d1b9d/coordination/v1/types.go#L40

    // Lease defines a lease concept.
    type Lease struct {
    metav1.TypeMeta `json:",inline"`
    // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
    // +optional
    metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

    // Specification of the Lease.
    // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
    // +optional
    Spec LeaseSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
    }

    // LeaseSpec is a specification of a Lease.
    type LeaseSpec struct {
    // holderIdentity contains the identity of the holder of a current lease.
    // +optional
    // 当前租约持有人的身份标识
    HolderIdentity *string `json:"holderIdentity,omitempty" protobuf:"bytes,1,opt,name=holderIdentity"`
    // leaseDurationSeconds is a duration that candidates for a lease need
    // to wait to force acquire it. This is measure against time of last
    // observed RenewTime.
    // +optional
    // 租约候选人需要等待强制获取租约的持续时间
    LeaseDurationSeconds *int32 `json:"leaseDurationSeconds,omitempty" protobuf:"varint,2,opt,name=leaseDurationSeconds"`
    // acquireTime is a time when the current lease was acquired.
    // +optional
    // 当前租约被获取时间
    AcquireTime *metav1.MicroTime `json:"acquireTime,omitempty" protobuf:"bytes,3,opt,name=acquireTime"`
    // renewTime is a time when the current holder of a lease has last
    // updated the lease.
    // +optional
    // 当前租约持有人上次更新租约时间
    RenewTime *metav1.MicroTime `json:"renewTime,omitempty" protobuf:"bytes,4,opt,name=renewTime"`
    // leaseTransitions is the number of transitions of a lease between
    // holders.
    // +optional
    // 租约持有人之间的转换次数
    LeaseTransitions *int32 `json:"leaseTransitions,omitempty" protobuf:"varint,5,opt,name=leaseTransitions"`
    }

第四种锁-多锁

多锁包含主锁和从锁俩个对象:

启动流程

调用leaderelectrion.RunOrDie func触发Leader候选人进入启动

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/leaderelection/leaderelection.go#L200

    // Run starts the leader election loop. Run will not return
    // before leader election loop is stopped by ctx or it has
    // stopped holding the leader lease
    func (le *LeaderElector) Run(ctx context.Context) {
    defer runtime.HandleCrash()
    defer func() {
    le.config.Callbacks.OnStoppedLeading()
    }()
    // 尝试获取锁,成功就启动OnStartedLeading
    // 不成功就renew 续期
    if !le.acquire(ctx) {
    return // ctx signalled done
    }
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    go le.config.Callbacks.OnStartedLeading(ctx)
    le.renew(ctx)
    }

竞争锁

Leader选举机制使用acquire func实现竞争锁

  • acquire func以LeaderElectionConfig中提供的RetryPeriod 为周期(默认2s),定期轮训执行tryAquireOrRenew获取锁资源

tryAquireOrRenew实现流程如下:

  1. 构造锁记录LeaderElectionRecord,用于后续创建、对比、更新锁资源对象
  2. 使用资源锁查询K8s是否存在对应的锁资源对象
  3. 如果锁资源对象不存在,尝试创建锁资源对象,并且设置leader为自己
  4. 如果存在,判断锁资源对象是否有leader
    • 判断是否有
    • 判断租期是否过期
  5. 如果锁资源对象存在有效leader,自己设置为follwer,退出等待下一个周期
  6. 如果锁资源对象不存在有效的领导者,则尝试自己设置为领导者,如果失败,回退为跟随者

续锁期

候选人成为领导人之后,需要定期对锁资源对象进行续期,只有续期成功才能继续承担leader,不然成为follower

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/leaderelection/leaderelection.go#L265

    // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
    func (le *LeaderElector) renew(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    wait.Until(func() {
    timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
    defer timeoutCancel()
    err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
    return le.tryAcquireOrRenew(timeoutCtx), nil
    }, timeoutCtx.Done())

    le.maybeReportTransition()
    desc := le.config.Lock.Describe()
    if err == nil {
    klog.V(5).Infof("successfully renewed lease %v", desc)
    return
    }
    le.config.Lock.RecordEvent("stopped leading")
    le.metrics.leaderOff(le.config.Name)
    klog.Infof("failed to renew lease %v: %v", desc, err)
    cancel()
    }, le.config.RetryPeriod, ctx.Done())

    // if we hold the lease, give it up
    if le.config.ReleaseOnCancel {
    le.release()
    }
    }

释放锁

为了在执行程序退出的时候释放占用锁,Leader选举采用release func实现了锁资源释放

  • 当LeaderElectionConfig把ReleaseOnCanel 配置项为true(默认true) ,执行退出之前会调用realse

  • 只用领导者才能发起锁资源对象释放,释放会构造出一个没指定领导的锁资源和锁记录

  • Ref:https://github.com/kubernetes/client-go/blob/4f1c2e7b9711846cf31255fd709616c66cb3cfea/tools/leaderelection/leaderelection.go#L294

    // release attempts to release the leader lease if we have acquired it.
    func (le *LeaderElector) release() bool {
    if !le.IsLeader() {
    return true
    }
    now := metav1.Now()
    leaderElectionRecord := rl.LeaderElectionRecord{
    LeaderTransitions: le.observedRecord.LeaderTransitions,
    LeaseDurationSeconds: 1,
    RenewTime: now,
    AcquireTime: now,
    }
    if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
    klog.Errorf("Failed to release lock: %v", err)
    return false
    }

    le.setObservedRecord(&leaderElectionRecord)
    return true
    }