Kubernetes 控制器深度剖析

概述

Kubernetes 的控制器是实现声明式 API 的核心机制。控制器通过”reconcile”(调和)模式,持续将期望状态与实际状态进行调和,确保集群始终维持在期望状态。Kubernetes 提供了众多内置控制器,每个控制器负责特定资源的管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
┌─────────────────────────────────────────────────────────────────────────┐
│ Kubernetes 控制器架构图 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ │
│ │ kube-controller- │ │
│ │ manager │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌───────────────────────────┼───────────────────────────┐ │
│ │ │ │ │
│ ┌──▼────────┐ ┌──────────────▼────────────┐ ┌────────────▼────────┐ │
│ │ Node │ │ Deployment │ │ ReplicaSet │ │
│ │ Controller│ │ Controller │ │ Controller │ │
│ └───────────┘ └───────────────────────────┘ └────────────────────┘ │
│ │ │ │
│ ┌──▼────────┐ ┌──────────────┐ ┌────────────┐ ┌──────────────▼───┐ │
│ │ Route │ │ Service │ │ Endpoint │ │ Job │ │
│ │ Controller│ │ Controller │ │ Controller│ │ Controller │ │
│ └───────────┘ └──────────────┘ └────────────┘ └──────────────────┘ │
│ │ │ │
│ ┌──▼────────┐ ┌──────────────┐ ┌────────────┐ ┌──────────────▼───┐ │
│ │ PV/ PVC │ │ Namespace │ │ Garbage │ │ CronJob │ │
│ │ Controller│ │ Controller │ │ Collector │ │ Controller │ │
│ └───────────┘ └──────────────┘ └────────────┘ └──────────────────┘ │
│ │ │ │
│ ┌──▼────────┐ ┌──────────────┐ ┌────────────┐ ┌──────────────▼───┐ │
│ │ Token │ │ PVC │ │ DaemonSet │ │ StatefulSet │ │
│ │ Controller│ │ Protection │ │ Controller│ │ Controller │ │
│ └───────────┘ └──────────────┘ └────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

1. 控制器通用模式

1.1 Reconcile 模式

所有 Kubernetes 控制器都遵循相同的 reconcile 模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
┌─────────────────────────────────────────────────────────────────────────┐
│ Reconcile 模式流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 期望状态 (Spec) 实际状态 (Status) │
│ │ │ │
│ │ │ │
│ └──────────────┬─────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Controller │ │
│ │ Reconcile │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 观察 │ │ 分析 │ │ 执行 │ │
│ │ 当前 │ │ 差异 │ │ 操作 │ │
│ │ 状态 │ │ │ │ │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

1.2 控制器实现模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// pkg/controller/controller_utils.go
type Controller interface {
Run(ctx context.Context, workers int)
}

type ControllerImpl struct {
client clientset.Interface
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
reconciler func(ctx context.Context, key string) error
}

func (c *ControllerImpl) Run(ctx context.Context, workers int) {
// 启动 informers
go c.informer.Run(ctx.Done())

// 等待缓存同步
if !cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced) {
return
}

// 启动 worker 协程
for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, ctx.Done())
}
}

func (c *ControllerImpl) worker() {
for c.processNextItem() {
// 持续处理队列中的任务
}
}

func (c *ControllerImpl) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

// 执行 reconcile
if err := c.reconciler(context.TODO(), key.(string)); err != nil {
// 处理错误,可能重新入队
c.queue.AddRateLimited(key)
}
return true
}

2. Deployment 控制器

2.1 Deployment 控制器架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌─────────────────────────────────────────────────────────────────────────┐
│ Deployment 控制器流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Deployment ReplicaSet Pod │
│ │ │ │ │
│ │ 1. 创建/更新 │ │ │
│ │ ───────────────────────────► │ │ │
│ │ │ 2. 创建 Pod │ │
│ │ │ ───────────────────────────► │ │
│ │ │ │ │
│ │ 3. 滚动更新策略 │ │ │
│ │ ───────────────────────────► │ │ │
│ │ │ 4. 创建新 ReplicaSet │ │
│ │ │ ───────────────────────────► │ │
│ │ │ │ │
│ │ │ 5. 逐步替换 Pod │ │
│ │ │ ───────────────────────────► │ │
│ │ │ │ │
│ │ 6. 清理旧 ReplicaSet │ │ │
│ │ ───────────────────────────► │ │ │
│ │ │ │ │
└─────────────────────────────────────────────────────────────────────────┘

2.2 滚动更新策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// pkg/controller/deployment/deployment_controller.go
// 滚动更新策略计算
type RollingUpdateDeployment struct {
// 最大不可用 Pod 数
MaxUnavailable *int32
// 最大多出的 Pod 数
MaxSurge *int32
}

// 计算可用副本数
func (d *Deployment) calculateAvailableReplicas(
availableReplicas int32,
maxUnavailable int32,
) int32 {
// 可用副本 = 总副本数 - 最大不可用数
return availableReplicas - maxUnavailable
}

// 计算需要创建的新 Pod 数
func (d *Deployment) calculateNewReplicas(
allRSs []*apps.ReplicaSet,
newRSAvailableReplicas int32,
) (int32, error) {
maxSurge := maxUnavailable := int32(0)

// 计算 maxSurge 和 maxUnavailable
if d.Spec.Strategy.RollingUpdate != nil {
maxSurge = *d.Spec.Strategy.RollingUpdate.MaxSurge
maxUnavailable = *d.Spec.Strategy.RollingUpdate.MaxUnavailable
}

// 总期望副本数
desiredReplicas := d.Spec.Replicas

// 新 ReplicaSet 需要创建的 Pod 数
newReplicas := desiredReplicas - newRSAvailableReplicas

// 考虑 surge 的情况
totalAvailable := newRSAvailableReplicas + availableRS.Replicas
if totalAvailable < desiredReplicas-maxUnavailable {
// 需要扩容
newReplicas = min(newReplicas+maxSurge, desiredReplicas)
}

return newReplicas, nil
}

2.3 Deployment 状态机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Deployment 状态转换
// 1. normal -> progressing (新 ReplicaSet 创建)
// 2. progressing -> complete (所有 Pod 都可用)
// 3. progressing -> paused (用户暂停)
// 4. paused -> progressing (用户恢复)
// 5. any -> scaling (副本数变化)

func (dc *DeploymentController) checkRollingUpdateConditions(d *apps.Deployment) error {
// 检查是否满足完成条件
if d.Spec.ProgressDeadlineSeconds != nil {
// 检查是否在截止时间内完成
if d.Status.ObservedGeneration < d.Generation {
// 仍在处理中
}
}
}

3. ReplicaSet 控制器

3.1 ReplicaSet 控制器逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) reconcile(
ctx context.Context,
rs *apps.ReplicaSet,
) error {
// 1. 获取关联的 Pod 列表
pods, err := rsc.getPodMapForRS(rs)
if err != nil {
return err
}

// 2. 分类 Pod
manageReplicasDiff := len(pods) - int(*rs.Spec.Replicas)
filteredPods := filterActivePods(pods)

// 3. 删除不需要的 Pod
if manageReplicasDiff > 0 {
// 需要删除多余的 Pod
deletionCount := manageReplicasDiff
rsc.deletePods(filteredPods, deletionCount)
}

// 4. 创建缺失的 Pod
if manageReplicasDiff < 0 {
// 需要创建缺失的 Pod
wantReplicas := -manageReplicasDiff
rsc.createPods(rs, filteredPods, wantReplicas)
}

// 5. 更新状态
return rsc.updateReplicaSetStatus(rs, filteredPods)
}

3.2 Pod 选择与匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ReplicaSet 使用 selector 匹配 Pod
func (rsc *ReplicaSetController) getPodMapForRS(rs *apps.ReplicaSet) ([]*v1.Pod, error) {
// 使用 informer 的索引获取匹配的 Pod
pods, err := rsc.podLister.Pods(rs.Namespace).List(rs.Spec.Selector)
if err != nil {
return nil, err
}

// 双重检查 selector 是否匹配
// 防止索引错误导致的误匹配
filtered := make([]*v1.Pod, 0)
for _, pod := range pods {
if rs.Spec.Selector.Matches(labels.Set(pod.Labels)) {
filtered = append(filtered, pod)
}
}
return filtered, nil
}

4. StatefulSet 控制器

4.1 StatefulSet 控制器核心逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// pkg/controller/statefulset/statefulset_control.go
type StatefulSetControl interface {
UpdateStatefulSet(
ctx context.Context,
set *apps.StatefulSet,
pods []*v1.Pod,
) (*apps.StatefulSetStatus, error)
}

func (ssc *defaultStatefulSetControl) UpdateStatefulSet(
ctx context.Context,
set *apps.StatefulSet,
pods []*v1.Pod,
) (*apps.StatefulSetStatus, error) {
// 1. 获取/创建 PVC
if err := ssc.pvcListerSynced(); err != nil {
return nil, err
}

// 2. 排序 Pod(确保有序性)
// Ordinal: 索引越小越早启动
// DeleteOrdinal: 索引越大越早删除
pods = sortPods(pods, set)

// 3. 执行调和
for i := range pods {
if isTerminating(pods[i]) {
// 等待 Pod 终止
continue
}
if isPending(pods[i]) {
// 创建/更新 Pod
}
}

// 4. 创建缺失的 Pod
if len(pods) < int(*set.Spec.Replicas) {
return ssc.createPod(set, ordinal)
}

// 5. 更新状态
return ssc.calculateStatus(set, pods), nil
}

4.2 有序部署与终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
┌─────────────────────────────────────────────────────────────────────────┐
│ StatefulSet 有序部署流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 部署顺序 (Pod-0 -> Pod-1 -> Pod-2 -> ...): │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Pod-0 ┌─────┐ │ │
│ │ ─────────►│Running│ ─────────────────────────► Ready │ │
│ │ └─────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Pod-1 启动 (等待 Pod-0 Ready) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Pod-2 启动 (等待 Pod-1 Ready) │ │
│ │ ... │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ 终止顺序 (Pod-N -> Pod-(N-1) -> ... -> Pod-0): │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Pod-N 终止 ──► Pod-(N-1) 终止 ──► ... ──► Pod-0 终止 │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ 先删除 再删除 最后删除 │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

5. DaemonSet 控制器

5.1 DaemonSet 控制器逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// pkg/controller/daemonset/daemonset_controller.go
func (dsc *DaemonSetController) Run(ctx context.Context, workers int) {
// 1. 启动所有 Informer
// 2. 等待缓存同步
// 3. 启动 worker 协程
}

func (dsc *DaemonSetController) worker() {
for dsc.processNextWorkItem() {
// 持续处理
}
}

func (dsc *DaemonSetController) syncDaemonSet(
ctx context.Context,
key string,
) error {
// 1. 获取 DaemonSet
namespace, name, err := cache.SplitMetaNamespaceKey(key)
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)

// 2. 获取需要运行 Pod 的节点
// 考虑:
// - NodeSelector
// - Node Affinity
// - Taints/Tolerations
nodes, err := dsc.getNodesForDaemonSet(ds)

// 3. 获取现有的 Pod
pods, err := dsc.getPodDaemonSetMap(ds)

// 4. 调和差异
return dsc.manageDaemonSet(ctx, ds, nodes, pods)
}

5.2 节点选择算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (dsc *DaemonSetController) getNodesForDaemonSet(
ds *apps.DaemonSet,
) ([]*v1.Node, error) {
// 1. 获取所有节点
allNodes, err := dsc.nodeLister.List(labels.Everything())
if err != nil {
return nil, err
}

// 2. 过滤未 Ready 的节点
readyNodes := filterReadyNodes(allNodes)

// 3. 应用 NodeSelector
if ds.Spec.Template.Spec.NodeSelector != nil {
selectedNodes := make([]*v1.Node, 0)
for _, node := range readyNodes {
if labels.Selected(node.Labels, ds.Spec.Template.Spec.NodeSelector) {
selectedNodes = append(selectedNodes, node)
}
}
return selectedNodes, nil
}

// 4. 应用污点容忍
return readyNodes, nil
}

6. Job 控制器

6.1 Job 控制器逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// pkg/controller/job/job_controller.go
func (jm *JobController) syncJob(ctx context.Context, key string) error {
// 1. 获取 Job
ns, name, _ := cache.SplitMetaNamespaceKey(key)
job, _ := jm.jobLister.Jobs(ns).Get(name)

// 2. 获取关联的 Pod
pods, _ := jm.getPodsForJob(job)

// 3. 计算活跃 Pod 数
active := countActivePods(pods)
failed := countFailedPods(pods)
succeeded := countSucceededPods(pods)

// 4. 管理 Pod 数量
if active > *job.Spec.Parallelism {
// 杀死多余的活跃 Pod
jm.deleteExtraPods(pods, active-*job.Spec.Parallelism)
}

// 5. 创建缺失的 Pod
if active < *job.Spec.Parallelism {
if !isJobSuspended(job) {
jm.createNewPod(job, pods)
}
}

// 6. 更新 Job 状态
return jm.updateJobStatus(job, active, failed, succeeded)
}

6.2 Job 完成判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Job 完成条件判断
func isJobFinished(job *batch.Job) (bool, *batch.JobCondition) {
// 条件 1: 成功完成
if job.Spec.Completions != nil {
if succeeded >= *job.Spec.Completions {
return true, &batch.JobCondition{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
}
}
} else {
// 未设置 completions,只要有一个成功就算完成
if succeeded > 0 && active == 0 {
return true, &batch.JobCondition{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
}
}
}

// 条件 2: 失败
if job.Spec.BackoffLimit != nil {
if failed >= *job.Spec.BackoffLimit {
return true, &batch.JobCondition{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
}
}
}

return false, nil
}

7. CronJob 控制器

7.1 CronJob 控制器架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
┌─────────────────────────────────────────────────────────────────────────┐
│ CronJob 控制器流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ CronJob │ │ CronJob │ │
│ │ Informer │ │ Controller │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ │ Watch │ │
│ └─────────────────────────────────│──────────────────────────► │
│ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ 检查调度时间 │ │
│ │ (每 10 秒检查一次) │ │
│ └───────────┬────────────┘ │
│ │ │
│ ┌──────────────┴──────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ 应该运行? 已经运行? │
│ (是) (否) │
│ │ │ │
│ ▼ │ │
│ 创建 Job 跳过 │
│ │ │ │
│ ▼ │ │
│ 创建 Pod(s) ◄────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

7.2 CronJob 调度逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// pkg/controller/cronjob/cronjob_controller.go
func (jm *CronJobController) syncAll() {
// 每 10 秒同步一次
cronJobs, _ := jm.cjLister.List(labels.Everything())

for _, cj := range cronJobs {
jm.syncOne(cj)
}
}

func (jm *CronJobController) syncOne(cj *batch.CronJob) error {
// 1. 检查是否暂停
if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
return nil
}

// 2. 获取最近应该运行的时间
times, err := getRecentScheduledTimes(cj)
if err != nil {
return err
}

// 3. 获取最近一次运行的 Job
js, _ := jm.jobLister.Jobs(cj.Namespace).List(labels.Everything())
recentJob := getRecentJob(cj, js)

// 4. 检查是否应该创建新 Job
for _, t := range times {
if t.After(recentJob.Created) && t.Before(now) {
// 创建新 Job
job, err := jm.createJob(cj, t)
if err != nil {
return err
}
// 记录创建的 Job
jm.recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %s", job.Name)
}
}

// 5. 清理旧的 Job
if cj.Spec.SuccessfulJobsHistoryLimit != nil {
cleanOldJobs(cj, js)
}

return nil
}

8. Service 控制器

8.1 Service 控制器逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// pkg/controller/service/controller.go
func (sc *ServiceController) syncService(ctx context.Context, key string) error {
// 1. 获取 Service
namespace, name, _ := cache.SplitMetaNamespaceKey(key)
svc, err := sc.serviceLister.Services(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
// Service 已删除,从缓存获取完整对象来清理 EndpointSlice
cachedSvc, exists := sc.cache.Get(key)
if !exists {
return nil // 已清理,无需处理
}
return sc.deleteEndpointSlices(cachedSvc.(*v1.Service))
}
return err
}

// 2. 检查是否需要创建 EndpointSlice
if !isServiceActive(svc) {
return nil
}

// 3. 获取匹配的 Pod
pods, err := sc.getPodsForService(svc)

// 4. 创建/更新 EndpointSlice
return sc.syncEndpoints(svc, pods)
}

func (sc *ServiceController) syncEndpoints(
svc *v1.Service,
pods []*v1.Pod,
) error {
// 1. 计算 Endpoint 配置
endpoints := buildEndpoints(svc, pods)

// 2. 获取/创建 EndpointSlice
slices, _ := sc.endpointSliceLister.Slices(svc.Namespace, "")

// 3. 更新 EndpointSlice
return sc.updateEndpointSlices(svc, endpoints, slices)
}

9. Node 控制器

9.1 Node 控制器职责

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// pkg/controller/node/node_controller.go
type NodeController struct {
// 节点租用相关
nodeLister cache.GenericLister
nodeInformer cache.SharedIndexInformer

// 污点管理
taintManager *controller.NoExecuteTaintManager

// 节点状态估计
nodeHealthData map[string]*nodeHealthData
}

// NodeController 主要功能:
// 1. 节点注册 - 新节点自动注册
// 2. 节点状态同步 - 定期同步节点状态
// 3. 节点租赁管理 - 处理节点心跳
// 4. 污点管理 - 执行 NoExecute 污点
// 5. 驱逐管理 - 删除不可达节点的 Pod

9.2 节点状态判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 节点状态判断逻辑
func (nc *NodeController) DoEviction(ctx context.Context, nodeName string) error {
// 1. 节点状态变为 NotReady
// 2. 等待 podEvictionTimeout (默认 5 分钟)
// 3. 开始驱逐 Pod

pods, _ := nc.podLister.Pods(v1.NamespaceAll).List(labels.Everything())
for _, pod := range pods {
if pod.Spec.NodeName == nodeName {
if !isPodEvictable(pod) {
continue
}
// 驱逐 Pod
nc.evictPod(pod)
}
}
}

// 污点控制器
func (nc *taintManager) Run(ctx context.Context) {
// 监听 Pod 和 Node 变化
// 根据污点状态驱逐 Pod
}

10. 控制器通用工具

10.1 WorkQueue 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// pkg/controller/controller_utils.go
// RateLimitingInterface 用于控制重试频率

// 1. 直接限速
queue.AddRateLimited(key)

// 2. 指数退避
queue.AddAfter(key, delay)

// 3. 懒重试
queue.Add(key) // 如果已在队列中不会重复添加

// 示例:指数退避实现
func enqueueWithBackoff(queue workqueue.RateLimitingInterface, key string, err error) {
baseDelay := time.Second
maxDelay := 5 * time.Minute

// 计算延迟时间
delay := baseDelay * time.Duration(math.Pow(2, retryCount))
if delay > maxDelay {
delay = maxDelay
}

queue.AddAfter(key, delay)
}

10.2 Informer 机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// pkg/controller/controller_utils.go
// 创建 Informer 的通用方法

func NewFilteredSharedIndexInformer(
client clientset.Interface,
resourceVersion string,
) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Namespaces().List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Namespaces().Watch(context.TODO(), options)
},
},
&v1.Pod{},
resyncPeriod,
indexers,
)
}

11. 控制器协作关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
┌─────────────────────────────────────────────────────────────────────────┐
│ 控制器协作关系图 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Deployment │ │
│ │ Controller │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ ReplicaSet │ │
│ │ Controller │ │
│ └──────┬──────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Pod-1 │ │ Pod-2 │ │ Pod-N │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Endpoint │ │
│ │ Controller │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Endpoint │ │
│ │ Slice │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

12. 关键源码路径

控制器 源码路径
Deployment pkg/controller/deployment/
ReplicaSet pkg/controller/replicaset/
StatefulSet pkg/controller/statefulset/
DaemonSet pkg/controller/daemonset/
Job pkg/controller/job/
CronJob pkg/controller/cronjob/
Service pkg/controller/service/
Node pkg/controller/node/
PV/PVC pkg/controller/volume/
GC pkg/controller/garbagecollector/
Controller Manager cmd/kube-controller-manager/

面试题

基础题

1. Kubernetes 控制器的核心思想是什么?

Kubernetes 控制器的核心思想是”声明式 API + 状态调和(Reconcile)”。用户声明期望状态(如需要 3 个 Pod 副本),控制器持续监控实际状态与期望状态的差异,并采取行动使两者一致。这种模式的关键点:

  • 声明式:用户描述”做什么”而非”怎么做”
  • 持续调和:控制器持续运行,快速响应状态变化
  • 最终一致:允许短暂的不一致,控制器会逐步调和

2. Deployment 和 ReplicaSet 的区别是什么?

  • Deployment:更高级别的资源,管理 ReplicaSet 的创建、更新和删除,支持滚动更新、回滚等高级功能
  • ReplicaSet:低级别资源,确保指定数量的 Pod 副本运行,通常不直接使用,而是由 Deployment 管理

Deployment 通过管理 ReplicaSet 来间接管理 Pod,而 ReplicaSet 直接管理 Pod。

3. StatefulSet 与 Deployment 的主要区别是什么?

  1. 稳定的网络标识:StatefulSet Pod 有稳定的唯一主机名(pod-name.statefulset-name.namespace.svc.cluster.local)
  2. 有序部署和终止:Pod 按序号顺序部署/终止,保证数据一致性
  3. 稳定的持久存储:使用 PVC,删除 Pod 后数据保留
  4. 独立的 PVC:每个 Pod 有自己独立的 PVC,不共享

4. DaemonSet 的典型使用场景有哪些?

  • 日志收集器:如 Fluentd、Filebeat
  • 监控代理:如 Prometheus Node Exporter、Datadog Agent
  • 网络插件:如 kube-proxy、Calico node
  • 存储插件:如 Ceph、GlusterFS 客户端
  • 日志轮转:如 logrotate

5. Job 和 CronJob 的区别是什么?

  • Job:一次性任务,执行完成后 Pod 保持成功状态
  • CronJob:定时任务,按 cron 表达式定期执行,基于 Job 创建

CronJob 在每个调度时间点创建一个 Job,Job 再创建 Pod 执行实际任务。

中级题

6. Deployment 的滚动更新策略是如何工作的?

Deployment 滚动更新通过 maxUnavailablemaxSurge 控制:

1
2
3
4
5
6
spec:
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 25% # 最大不可用 Pod 数
maxSurge: 25% # 最大多出的 Pod 数

工作流程:

  1. 创建新的 ReplicaSet(rs-new), replicas = 期望数
  2. 逐步从旧 ReplicaSet 转移 Pod 到新 ReplicaSet
  3. 同时保持总数不超过 期望数 + maxSurge
  4. 同时保证可用数不低于 期望数 - maxUnavailable
  5. 完成后删除旧 ReplicaSet

7. Kubernetes 如何保证 Job 至少执行一次?

Job 控制器通过以下机制保证至少执行一次:

  1. 活跃 Pod 追踪:跟踪正在运行的 Pod 数量
  2. Pod 失败重试:失败的 Pod 会被重新创建(受 backoffLimit 限制)
  3. 指数退避:连续失败时延迟增加重试间隔
  4. 优雅终止:Pod 终止信号给予清理时间
  5. 挂起检测:当 Job 从挂起状态恢复时,会创建缺失的 Pod

8. StatefulSet 的 PVC 生命周期是怎样的?

  1. 创建时:StatefulSet controller 在创建 Pod 前创建 PVC
  2. 运行时:PVC 与 Pod 绑定,Pod 使用 PVC
  3. 删除时:默认保留 PVC(由 PVC Protection 控制器保护)
  4. 清理时:需要手动删除 PVC 或配置 persistentVolumeClaimRetentionPolicy
1
2
3
4
spec:
persistentVolumeClaimRetentionPolicy:
whenDeleted: Retain # Delete 时保留 PVC
whenScaled: Retain # 缩容时保留 PVC

9. 控制器如何使用 Informer 和 WorkQueue?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 典型控制器结构
type Controller struct {
informer cache.SharedIndexInformer
lister cache.GenericLister
queue workqueue.RateLimitingInterface
}

func (c *Controller) Run() {
// 1. 启动 Informer
go c.informer.Run(stopCh)

// 2. 等待缓存同步
cache.WaitForCacheSync(stopCh, c.informer.HasSynced)

// 3. 启动 Worker
for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
}

func (c *Controller) worker() {
for c.processItem() {
// 持续从队列获取任务处理
}
}

Informer 负责监听 API Server 变化并缓存本地副本,WorkQueue 存储待处理的任务,Worker 协程从队列获取任务执行 reconcile。

10. Node 控制器如何处理节点故障?

节点故障处理流程:

  1. 检测失败:节点心跳超时(默认 40 秒标记为 Unknown)
  2. 标记状态:节点状态更新为 ConditionUnknown
  3. 等待驱逐:等待 podEvictionTimeout(默认 5 分钟)
  4. 执行驱逐:在所有匹配节点上删除 Pod
  5. Pod 重建:由 Replicaset/DaemonSet 控制器在新节点重建

关键时间点:40 秒 Unknown → 5 分钟后开始驱逐 → Replicaset 重建 Pod

高级题

11. 分析 Deployment 控制器如何实现回滚机制?

Deployment 使用 ReplicaSet 版本历史实现回滚:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Revision 记录在 annotation 中
const RevisionAnnotation = "deployment.kubernetes.io/revision"

// 回滚到指定版本
func (dc *DeploymentController) rollbackTo(
revision int64,
) error {
// 1. 获取历史 ReplicaSet
oldRS, err := dc.getReplicaSetByRevision(revision)
if err != nil {
return err
}

// 2. 获取当前活跃的 ReplicaSet
activeRS, err := dc.getActiveReplicaSet(d)
if err != nil {
return err
}

// 3. 将 Deployment 的 Pod 模板切换到旧 ReplicaSet 的模板
d.Spec.Template = oldRS.Spec.Template

// 4. 更新 Deployment
_, err = dc.client.AppsV1().Deployments(d.Namespace).Update(ctx, d)

// 5. 标记旧 RS 为活跃,新 RS 缩容
dc.scaleReplicaSet(oldRS, *d.Spec.Replicas)
dc.scaleReplicaSet(activeRS, 0)

return nil
}

每个滚动更新都会创建新的 ReplicaSet,历史 ReplicaSet 会被保留(默认保留 10 个),用户可以通过 kubectl rollout undo 回滚到任意历史版本。

12. StatefulSet 的顺序保证是如何实现的?

StatefulSet 控制器通过以下机制保证有序性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 1. Pod 索引管理
func getOrdinal(pod *v1.Pod) int {
// 从 Pod 名称解析索引: statefulset-name-ordinal
parts := strings.Split(pod.Name, "-")
ordinal, _ := strconv.Atoi(parts[len(parts)-1])
return ordinal
}

// 2. 部署时:必须等前一个 Pod Ready 才启动下一个
func shouldLaunchPod(set *apps.StatefulSet, ordinal int) bool {
if ordinal == 0 {
return true // 第一个 Pod 可以直接启动
}
// 检查前一个 Pod 是否 Ready
prevPod := getPodByOrdinal(set, ordinal-1)
return isPodReady(prevPod)
}

// 3. 终止时:必须等后面的 Pod 先终止才终止当前 Pod
func shouldTerminatePod(set *apps.StatefulSet, ordinal int) bool {
if ordinal == getMaxOrdinal(set) {
return true // 最后一个 Pod 可以直接终止
}
// 检查后面是否有 Pod 仍在运行
for i := ordinal + 1; i <= getMaxOrdinal(set); i++ {
pod := getPodByOrdinal(set, i)
if !isPodTerminated(pod) {
return false
}
}
return true
}

13. CronJob 控制器的并发策略是如何工作的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// CronJob 并发策略
const (
// AllowConcurrent: 允许并发执行
AllowConcurrent = ""

// ForbidConcurrent: 禁止并发,新 Job 必须等旧的完成
ForbidConcurrent = "Forbid"

// ReplaceConcurrent: 替换正在运行的 Job
ReplaceConcurrent = "Replace"
)

func (jm *CronJobController) allowStart(
cj *batch.CronJob,
scheduledTime time.Time,
) bool {
switch cj.Spec.ConcurrencyPolicy {
case batch.AllowConcurrent:
return true

case batch.ForbidConcurrent:
// 检查是否有正在运行的 Job
runningJobs := getRunningJobs(cj, scheduledTime)
return len(runningJobs) == 0

case batch.ReplaceConcurrent:
// 停止正在运行的 Job
stopRunningJobs(cj)
return true
}
return true
}

场景题

14. 如何排查 Deployment 的滚动更新卡住的问题?

排查步骤:

  1. 检查 Deployment 状态:kubectl describe deployment <name>
  2. 查看滚动更新事件:检查是否有错误信息
  3. 检查 ReplicaSet:kubectl get rs
  4. 常见原因:
    • 镜像拉取失败:检查 Pod 事件和镜像配置
    • 资源不足:节点资源耗尽
    • 健康检查失败:Liveness/Readiness Probe 配置错误
    • 挂载卷失败:PVC 未绑定或挂载超时
    • maxUnavailable=0 且 maxSurge=0:配置冲突导致无法更新
  5. 检查历史记录:kubectl rollout history deployment <name>

15. 如果需要暂停和恢复 Deployment 滚动更新?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 暂停
kubectl rollout pause deployment/<name>

# 恢复
kubectl rollout resume deployment/<name>

# 查看状态
kubectl rollout status deployment/<name>

# 回滚到上一版本
kubectl rollout undo deployment/<name>

# 回滚到指定版本
kubectl rollout undo deployment/<name> --to-revision=2

暂停期间对 Deployment 的修改会生效但不执行滚动更新,恢复后会按最新配置继续滚动。

16. 如何设计一个支持金丝雀发布的 Deployment 策略?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp-v1
spec:
replicas: 4
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 0
maxSurge: 1
selector:
matchLabels:
app: myapp
template:
metadata:
labels:
app: myapp
version: v1
spec:
containers:
- name: myapp
image: myapp:v1
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp-v2-canary
spec:
replicas: 1 # 金丝雀副本数
strategy:
type: RollingUpdate
selector:
matchLabels:
app: myapp
template:
metadata:
labels:
app: myapp
version: v2
spec:
containers:
- name: myapp
image: myapp:v2
---
apiVersion: v1
kind: Service
metadata:
name: myapp
spec:
selector:
app: myapp
ports:
- port: 80

通过控制 v2 的副本数,可以控制进入金丝雀版本的流量比例。