EndpointSlice 控制器流程

本文档描述 Kubernetes EndpointSlice 控制器如何让 Service 感知后端 Pod 变化的完整流程。

概述

EndpointSlice 控制器负责:

  • 监听 Service 和 Pod 变化
  • 生成和管理 EndpointSlice 资源
  • 记录 Service 后端 Pod 的 IP 地址和端口
flowchart TD
    subgraph APIServer["API Server"]
        A["Service 资源"]
        B["Pod 资源"]
        C["EndpointSlice 资源"]
        D["Node 资源"]
    end

    subgraph EndpointSliceController["EndpointSlice Controller"]
        E["Watch Service"]
        F["Watch Pod"]
        G["Watch Node"]
        H["Reconciler"]
        I["serviceQueue"]
    end

    subgraph Consumers["消费者"]
        J["kube-proxy"]
        K["CoreDNS"]
        L["Ingress Controller"]
    end

    E -->|触发| I
    F -->|触发| I
    G -->|触发| I
    I --> H
    H -->|Create/Update/Delete| C

    C -->|Watch| J
    C -->|Watch| K
    C -->|Watch| L

    style H fill:#c8e6c9
    style C fill:#fff3e0

EndpointSlice vs Endpoints

特性 Endpoints (旧) EndpointSlice (新)
单对象端点限制 ~1000 100 (可配置)
可扩展性 有限
拓扑感知 支持 Zone/Topology
双栈 IP 不支持 支持 IPv4/IPv6
性能 大规模集群有瓶颈 优化

流程详解

1. EndpointSlice Controller 架构

代码路径: pkg/controller/endpointslice/endpointslice_controller.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Controller 管理 selector-based service endpoint slices
type Controller struct {
client clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder

// Lister 从 shared informer 获取资源
serviceLister corelisters.ServiceLister
servicesSynced cache.InformerSynced
podLister corelisters.PodLister
podsSynced cache.InformerSynced
endpointSliceLister discoverylisters.EndpointSliceLister
endpointSlicesSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodesSynced cache.InformerSynced

// EndpointSlice 跟踪器
endpointSliceTracker *endpointsliceutil.EndpointSliceTracker

// Reconciler 用于协调 EndpointSlice 变更
reconciler *endpointslicerec.Reconciler

// 工作队列
serviceQueue workqueue.RateLimitingInterface
topologyQueue workqueue.RateLimitingInterface

// 每个 EndpointSlice 最大端点数
maxEndpointsPerSlice int32

// 拓扑缓存(用于 TopologyAwareHints)
topologyCache *topologycache.TopologyCache
}

2. 事件处理流程

sequenceDiagram
    participant API as API Server
    participant ESC as EndpointSlice Controller
    participant Queue as serviceQueue
    participant Reconciler as Reconciler
    participant ES as EndpointSlice

    API->>ESC: Pod Add/Update/Delete 事件
    ESC->>ESC: addPod / updatePod / deletePod
    ESC->>Queue: Enqueue Service Key

    API->>ESC: Service Add/Update/Delete 事件
    ESC->>ESC: onServiceUpdate / onServiceDelete
    ESC->>Queue: Enqueue Service Key

    Queue->>ESC: processNextServiceWorkItem
    ESC->>ESC: syncService(key)

    ESC->>Reconciler: Reconcile(Service, Pods, existingSlices)
    Reconciler->>ES: Create/Update/Delete EndpointSlices
    ES->>API: 写入 API Server

3. syncService 核心逻辑

代码路径: pkg/controller/endpointslice/endpointslice_controller.go:345

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *Controller) syncService(logger klog.Logger, key string) error {
startTime := time.Now()
defer func() {
logger.V(4).Info("Finished syncing service endpoint slices", "key", key, "elapsedTime", time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)

// 获取 Service
service, err := c.serviceLister.Services(namespace).Get(name)
if apierrors.IsNotFound(err) {
// Service 已删除,清理所有 EndpointSlice
return c.deleteEndpointSlicesForService(namespace, name)
}

// 获取 Service 关联的 EndpointSlice
esList, err := c.endpointSliceTracker.EndpointSlicesForService(namespace, name)

// 获取匹配的 Pod
pods, err := c.podLister.Pods(namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())

// 调用 Reconciler 协调 EndpointSlice
return c.reconciler.Reconcile(logger, service, pods, esList)
}

4. Pod 选择与过滤

代码路径: staging/src/k8s.io/endpointslice/util/controller_utils.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// ShouldPodBeInEndpoints 判断 Pod 是否应包含在 Endpoints 中
func ShouldPodBeInEndpoints(pod *v1.Pod, includeTerminating bool) bool {
// 终止状态的 Pod 不包含
if isPodTerminal(pod) {
return false
}

// 没有 IP 的 Pod 不包含
if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
return false
}

// 正在删除的 Pod(可选包含)
if !includeTerminating && pod.DeletionTimestamp != nil {
return false
}

return true
}

// GetPodServiceMemberships 获取 Pod 匹配的所有 Service
func GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
set := sets.String{}
services, err := serviceLister.Services(pod.Namespace).List(labels.Everything())

for _, service := range services {
if service.Spec.Selector == nil {
continue // nil selector 不匹配任何 Pod
}
if labels.ValidatedSetSelector(service.Spec.Selector).Matches(labels.Set(pod.Labels)) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(service)
set.Insert(key)
}
}
return set, nil
}

5. EndpointSlice 数据结构

API 定义: api/discovery/v1/types.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
type EndpointSlice struct {
metav1.TypeMeta
metav1.ObjectMeta

// 地址类型: IPv4, IPv6, FQDN
AddressType AddressType

// 端点列表
Endpoints []Endpoint

// 端口列表
Ports []EndpointPort
}

type Endpoint struct {
// IP 地址列表
Addresses []string

// 端点条件
Conditions EndpointConditions

// 主机名
Hostname *string

// 目标引用
TargetRef *v1.ObjectReference

// 节点名
NodeName *string

// Zone 信息
Zone *string

// 拓扑提示
Hints *EndpointHints
}

type EndpointConditions struct {
Ready *bool
Serving *bool
Terminating *bool
}

6. Reconciler 协调逻辑

代码路径: staging/src/k8s.io/endpointslice/reconciler.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (r *Reconciler) Reconcile(logger klog.Logger, service *v1.Service, pods []*v1.Pod, existingSlices []*discovery.EndpointSlice) error {
// 1. 计算期望的端点
desiredEndpoints := r.computeDesiredEndpoints(pods, service)

// 2. 比较 existingSlices 和 desiredEndpoints
slicesToUpdate, slicesToCreate, slicesToDelete := r.computeChanges(existingSlices, desiredEndpoints)

// 3. 执行变更
for _, slice := range slicesToCreate {
_, err := r.client.DiscoveryV1().EndpointSlice(slice.Namespace).Create(ctx, slice, metav1.CreateOptions{})
}

for _, slice := range slicesToUpdate {
_, err := r.client.DiscoveryV1().EndpointSlice(slice.Namespace).Update(ctx, slice, metav1.UpdateOptions{})
}

for _, slice := range slicesToDelete {
err := r.client.DiscoveryV1().EndpointSlice(slice.Namespace).Delete(ctx, slice.Name, metav1.DeleteOptions{})
}

return nil
}

7. 拓扑感知 (TopologyAwareHints)

flowchart TD
    subgraph Nodes["节点分布"]
        N1["Node-Zone-A"]
        N2["Node-Zone-A"]
        N3["Node-Zone-B"]
        N4["Node-Zone-B"]
    end

    subgraph EndpointSlices["EndpointSlice"]
        E1["Endpoint (Zone-A)"]
        E2["Endpoint (Zone-A)"]
        E3["Endpoint (Zone-B)"]
        E4["Endpoint (Zone-B)"]
    end

    subgraph Consumers["消费者"]
        C1["Client in Zone-A"]
        C2["Client in Zone-B"]
    end

    N1 --> E1
    N2 --> E2
    N3 --> E3
    N4 --> E4

    C1 -->|优先访问| E1
    C1 -->|优先访问| E2
    C2 -->|优先访问| E3
    C2 -->|优先访问| E4

EndpointSlice 的 Hints 字段支持拓扑感知路由:

1
2
3
4
5
6
7
endpoints:
- addresses:
- "10.244.1.5"
zone: "zone-a"
hints:
forZones:
- name: "zone-a"

8. kube-proxy 如何使用 EndpointSlice

代码路径: pkg/proxy/endpointslicecache/endpointslice_cache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
type EndpointSliceCache struct {
lock sync.RWMutex

// 按 Service 分组的 tracker
trackerByServiceMap map[types.NamespacedName]*endpointSliceTracker

hostname string
ipFamily v1.IPFamily

makeEndpointInfo func(*BaseEndpointInfo, *ServicePortName) Endpoint
recorder record.EventRecorder
}

// updatePending 更新 pending 的 EndpointSlice
func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool {
serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)

esInfo := newEndpointSliceInfo(endpointSlice, remove)

cache.lock.Lock()
defer cache.lock.Unlock()

if _, ok := cache.trackerByServiceMap[serviceKey]; !ok {
cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker()
}

changed := cache.esInfoChanged(serviceKey, sliceKey, esInfo)

if changed {
cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esInfo
}

return changed
}

// checkoutChanges 返回所有 pending 的变更
func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*endpointsChange {
changes := make(map[types.NamespacedName]*endpointsChange)

cache.lock.Lock()
defer cache.lock.Unlock()

for serviceNN, esTracker := range cache.trackerByServiceMap {
if len(esTracker.pending) == 0 {
continue
}

change := &endpointsChange{}
change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied)

for name, sliceInfo := range esTracker.pending {
if sliceInfo.Remove {
delete(esTracker.applied, name)
} else {
esTracker.applied[name] = sliceInfo
}
delete(esTracker.pending, name)
}

change.current = cache.getEndpointsMap(serviceNN, esTracker.applied)
changes[serviceNN] = change
}

return changes
}

关键代码锚点

功能 文件路径
EndpointSlice Controller pkg/controller/endpointslice/endpointslice_controller.go:187
syncService 入口 pkg/controller/endpointslice/endpointslice_controller.go:345
Reconciler staging/src/k8s.io/endpointslice/reconciler.go
Pod 过滤逻辑 staging/src/k8s.io/endpointslice/util/controller_utils.go:94
kube-proxy EndpointSliceCache pkg/proxy/endpointslicecache/endpointslice_cache.go

EndpointSlice 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: my-service-abc123
namespace: default
labels:
kubernetes.io/service-name: my-service
endpointslice.kubernetes.io/managed-by: endpointslice-controller.k8s.io
addressType: IPv4
endpoints:
- addresses:
- "10.244.1.5"
conditions:
ready: true
serving: true
terminating: false
targetRef:
kind: Pod
name: my-pod-1
namespace: default
nodeName: node-1
zone: zone-a
- addresses:
- "10.244.2.3"
conditions:
ready: true
targetRef:
kind: Pod
name: my-pod-2
namespace: default
nodeName: node-2
zone: zone-b
ports:
- name: http
port: 8080
protocol: TCP

触发条件总结

事件 触发条件
Pod 创建 Pod labels 匹配 Service selector
Pod 更新 Pod IP、Ready 状态、Labels 变化
Pod 删除 Pod 被删除
Service 创建 Service 有 selector
Service 更新 Service selector 或 ports 变化
Service 删除 清理关联的 EndpointSlice

高频面试题

Q1: EndpointSlice 和传统的 Endpoints 有什么区别?

参考答案:

特性 Endpoints (v1) EndpointSlice (discovery/v1)
可扩展性 单对象 ~1000 端点限制 可分片,每片最多 1000 端点
地址类型 混合 IPv4/IPv6 按 AddressType 分离
拓扑感知 支持 Zone/NodeName
状态条件 只有 Ready Ready/Serving/Terminating
管理者标识 LabelManagedBy 标签
性能 大规模集群瓶颈 优化,增量更新

核心优势:

  • 解决大规模集群(10万+ Pod)的性能问题
  • 支持拓扑感知路由(TopologyAwareHints)
  • 更好的双栈 IPv4/IPv6 支持

Q2: EndpointSlice 控制器是如何工作的?

参考答案:

  1. 监听资源变化:

    • Service Informer:监听 Service 创建/更新/删除
    • Pod Informer:监听 Pod 变化
    • Node Informer:监听节点变化(拓扑感知)
  2. 触发同步:

    • Pod 变化 → GetPodServiceMemberships 找到匹配的 Service
    • Service 变化 → 直接触发该 Service 的同步
    • 将 Service Key 加入 serviceQueue
  3. 同步逻辑 (syncService):

    • 获取 Service 关联的 Pod 列表
    • 获取现有的 EndpointSlice 列表
    • 调用 Reconciler.Reconcile() 计算 diff
    • 执行 Create/Update/Delete 操作
  4. Pod 过滤规则:

    • 终止状态(Terminating)的 Pod 不包含
    • 没有 IP 的 Pod 不包含
    • 未 Ready 的 Pod 根据条件判断

Q3: 什么是拓扑感知路由 (TopologyAwareHints)?

参考答案:
拓扑感知路由让客户端优先访问同一 Zone 的后端 Pod,减少跨 Zone 流量成本。

工作原理:

  1. EndpointSlice Controller 计算每个 Zone 的端点分布
  2. 为每个端点添加 hints.forZones 字段
  3. kube-proxy 根据客户端所在 Zone 过滤端点

启用方式:

1
2
3
4
5
apiVersion: v1
kind: Service
metadata:
annotations:
service.kubernetes.io/topology-mode: Auto

EndpointSlice 示例:

1
2
3
4
5
6
endpoints:
- addresses: ["10.244.1.5"]
zone: "zone-a"
hints:
forZones:
- name: "zone-a" # 只向 zone-a 的客户端推荐此端点

Q4: Pod 的 Ready、Serving、Terminating 条件分别是什么含义?

参考答案:

1
2
3
4
5
type EndpointConditions struct {
Ready *bool // Pod 就绪,可以接收流量
Serving *bool // Pod 正在服务(即使 Terminating 也可接收)
Terminating *bool // Pod 正在终止
}
状态 Ready Serving Terminating 说明
正常运行 true true false 接收所有流量
未就绪 false false false 不接收流量
优雅终止中 false true true 可接收优雅终止流量

应用场景:

  • publishNotReadyAddresses: true 时,即使 Ready=false 也会包含
  • Serving 状态用于支持优雅终止期间的流量

Q5: kube-proxy 如何使用 EndpointSlice?

参考答案:

  1. Watch EndpointSlice:

    • kube-proxy 通过 Informer 监听 EndpointSlice 变化
    • 变化事件触发 OnEndpointSliceAdd/Update/Delete
  2. EndpointSliceCache:

    • 维护 trackerByServiceMap 按 Service 分组
    • 每个 Service 有 applied(已应用)和 pending(待处理)状态
    • checkoutChanges() 返回所有待处理的变更
  3. 更新规则:

    • 计算变更前后 的端点差异
    • 更新 iptables/IPVS 规则
    • 标记 pending 为 applied

代码路径: pkg/proxy/endpointslicecache/endpointslice_cache.go

Q6: 如何排查 Service 无法访问后端 Pod 的问题?

参考答案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 1. 检查 Service 和 EndpointSlice
kubectl get svc <service-name>
kubectl get endpointslice -l kubernetes.io/service-name=<service-name>

# 2. 检查 Pod 标签是否匹配 Service Selector
kubectl get pods -l app=<label-value> --show-labels

# 3. 检查 Pod 是否 Ready
kubectl get pods -l app=<label-value>
kubectl describe pod <pod-name>

# 4. 检查 EndpointSlice 控制器日志
kubectl logs -n kube-system <kube-controller-manager-pod> | grep endpointslice

# 5. 常见原因:
# - Pod 标签不匹配 Service Selector
# - Pod 未 Ready(readinessProbe 失败)
# - Pod 没有 IP(正在启动中)
# - Service 没有定义 Selector(需要手动创建 EndpointSlice)
# - EndpointSlice 控制器异常

延伸阅读