Kube-controller-manager(Endpoint)
Kube-controller-manager(Endpoint)
基于1.25
Endpoint Controller主要控制Endpoint资源
- Endpoints资源对象主要包含Subsets属性,用于存储同命名空间内同名称的Service资源对象关联的Pod的IP地址和端口号
- Endpoints Controller为每一个Service资源对象创建一个与命名空间相同的Endpoints对象,并且在Service和Pod变化的时候,重新统计对应Service关联的Pod的IP地址和端口号,更新到Endpoints的Subsets属性中
- 这些地址和端口被各个节点的kube-proxy用于更新本机的iptables或ipvs
控制器初始化
- Service
- Endpoints
- Pod
主要执行逻辑
e.serviceLister.Sevice
获取Service
e.podLister.Pods
获取Service资源对象关联的POd
For _,pod:=range pods
遍历Pod统计所有的IP地址和端口号
Endpoint Controller遍历Service关联的所有Pod资源对象,获取Pod的IP,调用FindPort func找到各个Service的Pod目的端口号,并且调用addEndpoitSuset func生成<Service 端口号:Pod IP地址>。一个Service 资源对象有多个Pod,将会生成多个<Service端口号:Pod IP地址>,放在subsets数组
-
for _, pod := range pods {
if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
continue
}
ep, err := podToEndpointAddressForService(service, pod)
if err != nil {
// this will happen, if the cluster runs with some nodes configured as dual stack and some as not
// such as the case of an upgrade..
klog.V(2).Infof("Failed to find endpoint for service:%s with ClusterIP:%s on pod:%s with error:%v", service.Name, service.Spec.ClusterIP, klog.KObj(pod), err)
continue
}
epa := *ep
if endpointutil.ShouldSetHostname(pod, service) {
epa.Hostname = pod.Spec.Hostname
}
// Allow headless service not to have ports.
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
// No need to repack subsets for headless service without ports.
}
} else {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
epp := endpointPortFromServicePort(servicePort, portNum)
var readyEps, notReadyEps int
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
}
}
subsets = endpoints.RepackSubsets(subsets)
-
endpoints.RepackSubsets
计算Endpoints资源对象的Subsets
每个EndpointSubset都记录了<Service端口号:Pod IP地址集合>的一个 N:M
-
// EndpointSubset is a group of addresses with a common set of ports. The
// expanded set of endpoints is the Cartesian product of Addresses x Ports.
// For example, given:
//
// {
// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}],
// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}]
// }
//
// The resulting set of endpoints can be viewed as:
//
// a: [ 10.10.1.1:8675, 10.10.2.2:8675 ],
// b: [ 10.10.1.1:309, 10.10.2.2:309 ]
type EndpointSubset struct {
Addresses []EndpointAddress
NotReadyAddresses []EndpointAddress
Ports []EndpointPort
}
e.endpointsLister.Endpoints
获取Endpoints资源对象
truncateEndpoints
截断subsets数组中超出数量限制的IP地址。Endpoints资源对象的subsets数组最多存1000个IP地址,超出数量的IP地址将被删除
e.client.CoreV1().Endpoints
创建或更新Endpints资源对象
Subsets属性的计算
subsets []v1.EndpointSubset
获取<端口号:IP地址>集合。再调用RepackSubsets的前一步,Endpoint Controller已经计算出subsets数组,数字的每一个元素一个<Service端口号:Pod IP地址>
portToAddrReadyMap
计算<端口号:IP地址集合>的map
addrReadyMapKeyToPorts
计算<IP地址集合的Hash:端口集合>的Map
final:=[]v1.EndpointSubset{}
计算最终的subset数组