Kube-controller-manager(EndpointSlice)

基于1.25

EndpointSlice Controller可以把巨大的Endpoints资源对象拆分成多个EndpointSlice,为了解决Endpoints资源对象

  • 一个Service资源对象关联的所有POd地址存储在一个或者多个EndpoineSlice中

控制器初始化

  • Service
  • EndpointSlice
  • Pod

主要执行逻辑

  1. c.serviceLister.Services

    获取Service资源对象

  2. c.podLister.Pods

    获取Service的关联Pod

  3. c.endpintSliceLister.EndpointSlice

    获取Service关联的EndpointSlice

  4. dropEndpointSlicesPendingDeletion

    过滤删除中的EndpointSlice

  5. c.reconclier.reconclie

    计算和更新各个EndppintSlice

    reconcile步骤如下:

    1. geAddressTypesForService

      获取Service支持的IP地址类型(IPV4、IPV6或者双栈->通过Service的Spec.IPFamiles

    2. serviceSupportedAddressTypes

      找出IP地址类型不符合Service的EndpointSlice

    3. reconcileByAddressType

      为每个IP地址类型计算和更新EndpointSLice

  6. For _,sliceToDelete:= range slicetoDelete

    删除IP地址不匹配的EndpointSlice

EndpointSlice控制器的计算与填充

EndpointSlice Controller的计算和填充的执行逻辑的reconcileByAddressType func中,主要分为俩个部分:

  1. 计算<Service端口号集:Pod IP地址集合>的N:M对应关系
  2. 填充EndpointSlice

计算<Service端口号集:Pod IP地址集合>的N:M对应关系

  1. EndpointSlice Controller 遍历每个Pod

    对于每个Pod,通过getEndpointPorts方法得到这个Pod使用的目标端口集合

    getEndpointPorts:EndpointSlice Controller通过遍历Service的每个Pod,然后找到该Port上的Pod的目标端口号

  2. EndpointSlice Controller 计算目标端口号集合的Hash值,在desiredMetaByPortMap映射中将Hash记录到实际目标端口号集合的映射

  3. EndpointSlice Controller 调用podToEndpoint func得到记录Pod IP地址的数据结构

  4. EndpointSlice Controller 根据目标端口号的Hash值,把记录的PodIP插入到desireEndpointsByPortMap

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/endpointslice/reconciler.go#L156

    // Build data structures for desired state.
    desiredMetaByPortMap := map[endpointutil.PortMapKey]*endpointMeta{}
    desiredEndpointsByPortMap := map[endpointutil.PortMapKey]endpointsliceutil.EndpointSet{}

    for _, pod := range pods {
    includeTerminating := service.Spec.PublishNotReadyAddresses || utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition)
    if !endpointutil.ShouldPodBeInEndpoints(pod, includeTerminating) {
    continue
    }

    endpointPorts := getEndpointPorts(service, pod)
    epHash := endpointutil.NewPortMapKey(endpointPorts)
    if _, ok := desiredEndpointsByPortMap[epHash]; !ok {
    desiredEndpointsByPortMap[epHash] = endpointsliceutil.EndpointSet{}
    }

    if _, ok := desiredMetaByPortMap[epHash]; !ok {
    desiredMetaByPortMap[epHash] = &endpointMeta{
    AddressType: addressType,
    Ports: endpointPorts,
    }
    }

    node, err := r.nodeLister.Get(pod.Spec.NodeName)
    if err != nil {
    // we are getting the information from the local informer,
    // an error different than IsNotFound should not happen
    if !errors.IsNotFound(err) {
    return err
    }
    // If the Node specified by the Pod doesn't exist we want to requeue the Service so we
    // retry later, but also update the EndpointSlice without the problematic Pod.
    // Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid
    // situations where a reference from a Pod to a missing node can leave the EndpointSlice
    // stuck forever.
    // On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the
    // Pod, since the user is explicitly indicating that the Pod address should be published.
    if !service.Spec.PublishNotReadyAddresses {
    klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)
    errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName))
    continue
    }
    }
    endpoint := podToEndpoint(pod, node, service, addressType)
    if len(endpoint.Addresses) > 0 {
    desiredEndpointsByPortMap[epHash].Insert(&endpoint)
    }
    }

填充EndpointSlice资源对象

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/endpointslice/reconciler.go#L206

    for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
    numEndpoints := len(desiredEndpoints)
    pmSlicesToCreate, pmSlicesToUpdate, pmSlicesToDelete, added, removed := r.reconcileByPortMapping(
    service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])

    totalAdded += added
    totalRemoved += removed

    spMetrics.Set(portMap, metrics.EfficiencyInfo{
    Endpoints: numEndpoints,
    Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSlicesToDelete),
    })

    slicesToCreate = append(slicesToCreate, pmSlicesToCreate...)
    slicesToUpdate = append(slicesToUpdate, pmSlicesToUpdate...)
    slicesToDelete = append(slicesToDelete, pmSlicesToDelete...)
    }

TODO:更多补充…