K8s-client-go(常用工具类:事件管理机制+Leader选举机制)
K8s-client-go(常用工具类:事件管理机制+Leader选举机制)
基于1.25
事件管理机制
Event是一种对象资源,展示集群中发生的事件,包含调度器的操作、Pod为什么被驱逐
存储在kube-apiserver的Pod中,有强制保留策略,在最后一次事件发生之后,删除一个小时之前的事件
-
type Event struct {
// metav1.TypeMeta 嵌入类型,包含 APIVersion 和 Kind 字段,用于标识对象的 API 版本和类型
metav1.TypeMeta `json:",inline"`
// 标准对象元数据,包含对象的名称、命名空间、标签等信息
metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
// 该事件涉及的对象的引用
InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
// 事件发生的原因,应该是一个简短的、机器可读的字符串,说明对象状态转换的原因
// TODO: 提供格式的确切规范
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
// 对该操作状态的人类可读描述
// TODO: 确定最大长度
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
// 报告此事件的组件。应该是一个简短的机器可读字符串。
// +optional
Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
// 事件首次记录的时间。(服务器接收时间在 TypeMeta 中。)
// +optional
FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
// 此事件最近一次发生的时间记录。
// +optional
LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
// 此事件发生的次数。
// +optional
Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
// 此事件的类型(正常,警告),未来可能会添加新的类型
// +optional
Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
// 此事件首次观察到的时间。
// +optional
// 有关此事件表示的事件系列的数据,或者如果它是单例事件,则为 nil。
// +optional
Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
// 针对相关对象采取/失败的操作。
// +optional
Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
// 用于更复杂操作的可选辅助对象。
// +optional
Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
// 发出此事件的控制器的名称,例如 `kubernetes.io/kubelet`。
// +optional
ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
// 控制器实例的 ID,例如 `kubelet-xyzf`。
// +optional
ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}
使用
在client-go的tools/record实现了统一的事件管理器
DeploymentController为例子:
初始化事件广播器(EventBroadcaster)和事件记录器(EventRecorder)
EventRecorder用于关键事件节点记录事件
EventBroadcaster用于完整事件的上报
-
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
...
dc.rsControl = controller.RealRSControl{
KubeClient: client,
Recorder: dc.eventRecorder,
}
...
启动EventBroadcaster的日志记录和事件上报,在独立协程中开启事件和日志上报
-
// Start events processing pipeline.
dc.eventBroadcaster.StartStructuredLogging(3)
dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
defer dc.eventBroadcaster.Shutdown()
-
使用EventRecorder在关键阶段上报事件,如扩容
if !alreadyExists && newReplicasCount > 0 {
dc.eventRecorder.Eventf(d, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled up replica set %s from 0 to %d", createdRS.Name, newReplicasCount)
}
运行原理
EventRecorder:事件生产者,记录关键事件
EventBroadcaster:事件消费者,关键广播器
- 分为阻塞和非阻塞,默认使用非阻塞
broadcasterWatcher:由EventBroadcaster统一管理,作为观测者定义事件的处理方式
- 使用一个broadcasterWatcher记录日志没使用另外一个上报事件
EventCorrelator:预处理上报事件的事件相关器,使用聚合、过滤等机制对上报事件进行分析出库,防止过多事件上报
EventSilk:事件沉淀器,使用ClientSet中的corev1.EventInterface把事件上报到kube-apiserver和etcd中
EventRecorder
EventRecorder拥有三种事件记录办法
-
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'eventtype' of this event, and can be one of Normal, Warning. New types could be added in future
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
// 基于输入信息构造事件并且记录到发送队列中
// 最终创建的事件与关联对象位于同一个命名空间
Event(object runtime.Object, eventtype, reason, message string)
// Eventf is just like Event, but with Sprintf for the message field.
// 格式化描述
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
// AnnotatedEventf is just like eventf, but with annotations attached
// 跟Eventf一样,但是多了注解
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
- ActrionDrop:非阻塞的模式事件写入大小为1000的通道中,如果事件数量超过通道容量,就会被丢弃
EventBroadcaster
EventBroadcaster消费EventRecorder记录的事件分发给目前所有已经连接的broadcasterWatcher
- DropIfChannelFull标识位于selec多路复用,使用default关键字进行非阻塞分发,进入broadcasterWatcher的result通道中的事件数量超过1000时,直接丢弃事件
- WaitIfChannelFull也位于selec多路复用,没有default,会进入阻塞
在K8s中EVent可以被丢失,集群规模增大,etcd压力变大,Event的丢失不会影响正常工作
broadcasterWatcher
broadcasterWatcher是每个K8s组件自定义处理事件的方式,当需要记录本地记录或者事件上报到kube-apiserver中,EventBroadcaster会调用StartEventWatcher来创建对应的broadcasterWatcher
EventCorrelator
EventCorrelator是预处理所有的事件,聚合并发产生所有的相似事件
-
// EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system. It can filter all
// incoming events to see if the event should be filtered from further processing. It can aggregate similar events that occur
// frequently to protect the system from spamming events that are difficult for users to distinguish. It performs de-duplication
// to ensure events that are observed multiple times are compacted into a single event with increasing counts.
type EventCorrelator struct {
// the function to filter the event
// 过滤器,主要用于限速
// 实现类EventSourceObjecSpamFilter采用令牌桶,构造出一个初始容量为25,每5min生成一个令牌的限速器
filterFunc EventFilterFunc
// the object that performs event aggregation
// 聚合器,用于聚合一段时间内的事件
aggregator *EventAggregator
// the object that observes events as they come through
// 观察器,用于处理待处理的事件和当前缓存中的事件
// 对于已经存在的和使用双路合并机制生成新事件
logger *eventLogger
}
EventSink
EventSink作为事件沉淀器,把预处理的事件上报给kube-apiserver。
上报的事件有Create、Update、Patch
Leader选举机制
K8s组件为了实现高可用,使用了主备模式,采用Leader选举实现故障转移。
client-go使用tools/leaderelection
实现
示例
package main |
实现原理
资源锁
资源锁提供了支持Leader选举的锁对象,并且在锁对象中存储了当前持有锁的对象leader信息
-
// Interface offers a common interface for locking on arbitrary
// resources used in leader election. The Interface is used
// to hide the details on specific implementations in order to allow
// them to change over time. This interface is strictly for use
// by the leaderelection code.
type Interface interface {
// Get returns the LeaderElectionRecord
Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
// Create attempts to create a LeaderElectionRecord
Create(ctx context.Context, ler LeaderElectionRecord) error
// Update will update and existing LeaderElectionRecord
Update(ctx context.Context, ler LeaderElectionRecord) error
// RecordEvent is used to record events
// 持有锁的领导者发生变化的记录对应的时间,并且使用时间上报机制上报
RecordEvent(string)
// Identity will return the locks Identity
// 返回当前领导者的主键
Identity() string
// Describe is used to convert details on current resource lock
// into a string
// 返回当前锁对象的描述信息,用于调试日志输出
Describe() string
}
Lease对象是专用租用资源对象:
-
// Lease defines a lease concept.
type Lease struct {
metav1.TypeMeta `json:",inline"`
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Specification of the Lease.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
// +optional
Spec LeaseSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
}
// LeaseSpec is a specification of a Lease.
type LeaseSpec struct {
// holderIdentity contains the identity of the holder of a current lease.
// +optional
// 当前租约持有人的身份标识
HolderIdentity *string `json:"holderIdentity,omitempty" protobuf:"bytes,1,opt,name=holderIdentity"`
// leaseDurationSeconds is a duration that candidates for a lease need
// to wait to force acquire it. This is measure against time of last
// observed RenewTime.
// +optional
// 租约候选人需要等待强制获取租约的持续时间
LeaseDurationSeconds *int32 `json:"leaseDurationSeconds,omitempty" protobuf:"varint,2,opt,name=leaseDurationSeconds"`
// acquireTime is a time when the current lease was acquired.
// +optional
// 当前租约被获取时间
AcquireTime *metav1.MicroTime `json:"acquireTime,omitempty" protobuf:"bytes,3,opt,name=acquireTime"`
// renewTime is a time when the current holder of a lease has last
// updated the lease.
// +optional
// 当前租约持有人上次更新租约时间
RenewTime *metav1.MicroTime `json:"renewTime,omitempty" protobuf:"bytes,4,opt,name=renewTime"`
// leaseTransitions is the number of transitions of a lease between
// holders.
// +optional
// 租约持有人之间的转换次数
LeaseTransitions *int32 `json:"leaseTransitions,omitempty" protobuf:"varint,5,opt,name=leaseTransitions"`
}
第四种锁-多锁
多锁包含主锁和从锁俩个对象:
-
// MultiLock is used for lock's migration
type MultiLock struct {
Primary Interface
Secondary Interface
}
启动流程
调用leaderelectrion.RunOrDie func触发Leader候选人进入启动
-
// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() {
le.config.Callbacks.OnStoppedLeading()
}()
// 尝试获取锁,成功就启动OnStartedLeading
// 不成功就renew 续期
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(ctx)
}
竞争锁
Leader选举机制使用acquire func实现竞争锁
- acquire func以LeaderElectionConfig中提供的RetryPeriod 为周期(默认2s),定期轮训执行tryAquireOrRenew获取锁资源
tryAquireOrRenew实现流程如下:
- 构造锁记录LeaderElectionRecord,用于后续创建、对比、更新锁资源对象
- 使用资源锁查询K8s是否存在对应的锁资源对象
- 如果锁资源对象不存在,尝试创建锁资源对象,并且设置leader为自己
- 如果存在,判断锁资源对象是否有leader
- 判断是否有
- 判断租期是否过期
- 如果锁资源对象存在有效leader,自己设置为follwer,退出等待下一个周期
- 如果锁资源对象不存在有效的领导者,则尝试自己设置为领导者,如果失败,回退为跟随者
续锁期
候选人成为领导人之后,需要定期对锁资源对象进行续期,只有续期成功才能继续承担leader,不然成为follower
-
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.Until(func() {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(timeoutCtx), nil
}, timeoutCtx.Done())
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
klog.V(5).Infof("successfully renewed lease %v", desc)
return
}
le.config.Lock.RecordEvent("stopped leading")
le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
// if we hold the lease, give it up
if le.config.ReleaseOnCancel {
le.release()
}
}
释放锁
为了在执行程序退出的时候释放占用锁,Leader选举采用release func实现了锁资源释放
当LeaderElectionConfig把ReleaseOnCanel 配置项为true(默认true) ,执行退出之前会调用realse
只用领导者才能发起锁资源对象释放,释放会构造出一个没指定领导的锁资源和锁记录
-
// release attempts to release the leader lease if we have acquired it.
func (le *LeaderElector) release() bool {
if !le.IsLeader() {
return true
}
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaseDurationSeconds: 1,
RenewTime: now,
AcquireTime: now,
}
if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}