Kube-controller-manager(Endpoint)

基于1.25

Endpoint Controller主要控制Endpoint资源

  • Endpoints资源对象主要包含Subsets属性,用于存储同命名空间内同名称的Service资源对象关联的Pod的IP地址和端口号
  • Endpoints Controller为每一个Service资源对象创建一个与命名空间相同的Endpoints对象,并且在Service和Pod变化的时候,重新统计对应Service关联的Pod的IP地址和端口号,更新到Endpoints的Subsets属性中
  • 这些地址和端口被各个节点的kube-proxy用于更新本机的iptables或ipvs

控制器初始化

  • Service
  • Endpoints
  • Pod

主要执行逻辑

  1. e.serviceLister.Sevice

    获取Service

  2. e.podLister.Pods

    获取Service资源对象关联的POd

  3. For _,pod:=range pods

    遍历Pod统计所有的IP地址和端口号

    Endpoint Controller遍历Service关联的所有Pod资源对象,获取Pod的IP,调用FindPort func找到各个Service的Pod目的端口号,并且调用addEndpoitSuset func生成<Service 端口号:Pod IP地址>。一个Service 资源对象有多个Pod,将会生成多个<Service端口号:Pod IP地址>,放在subsets数组

    • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/controller/endpoint/endpoints_controller.go#L419

      for _, pod := range pods {
      if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
      klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
      continue
      }

      ep, err := podToEndpointAddressForService(service, pod)
      if err != nil {
      // this will happen, if the cluster runs with some nodes configured as dual stack and some as not
      // such as the case of an upgrade..
      klog.V(2).Infof("Failed to find endpoint for service:%s with ClusterIP:%s on pod:%s with error:%v", service.Name, service.Spec.ClusterIP, klog.KObj(pod), err)
      continue
      }

      epa := *ep
      if endpointutil.ShouldSetHostname(pod, service) {
      epa.Hostname = pod.Spec.Hostname
      }

      // Allow headless service not to have ports.
      if len(service.Spec.Ports) == 0 {
      if service.Spec.ClusterIP == api.ClusterIPNone {
      subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
      // No need to repack subsets for headless service without ports.
      }
      } else {
      for i := range service.Spec.Ports {
      servicePort := &service.Spec.Ports[i]
      portNum, err := podutil.FindPort(pod, servicePort)
      if err != nil {
      klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
      continue
      }
      epp := endpointPortFromServicePort(servicePort, portNum)

      var readyEps, notReadyEps int
      subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
      totalReadyEps = totalReadyEps + readyEps
      totalNotReadyEps = totalNotReadyEps + notReadyEps
      }
      }
      }
      subsets = endpoints.RepackSubsets(subsets)
  4. endpoints.RepackSubsets

计算Endpoints资源对象的Subsets

  • 每个EndpointSubset都记录了<Service端口号:Pod IP地址集合>的一个 N:M

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/apis/core/types.go#L4087

    // EndpointSubset is a group of addresses with a common set of ports.  The
    // expanded set of endpoints is the Cartesian product of Addresses x Ports.
    // For example, given:
    //
    // {
    // Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}],
    // Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}]
    // }
    //
    // The resulting set of endpoints can be viewed as:
    //
    // a: [ 10.10.1.1:8675, 10.10.2.2:8675 ],
    // b: [ 10.10.1.1:309, 10.10.2.2:309 ]
    type EndpointSubset struct {
    Addresses []EndpointAddress
    NotReadyAddresses []EndpointAddress
    Ports []EndpointPort
    }
  1. e.endpointsLister.Endpoints

    获取Endpoints资源对象

  2. truncateEndpoints

    截断subsets数组中超出数量限制的IP地址。Endpoints资源对象的subsets数组最多存1000个IP地址,超出数量的IP地址将被删除

  3. e.client.CoreV1().Endpoints

    创建或更新Endpints资源对象

Subsets属性的计算

  1. subsets []v1.EndpointSubset

    获取<端口号:IP地址>集合。再调用RepackSubsets的前一步,Endpoint Controller已经计算出subsets数组,数字的每一个元素一个<Service端口号:Pod IP地址>

  2. portToAddrReadyMap

    计算<端口号:IP地址集合>的map

  3. addrReadyMapKeyToPorts

    计算<IP地址集合的Hash:端口集合>的Map

  4. final:=[]v1.EndpointSubset{}

    计算最终的subset数组