K8s-kube-proxy(初始化过程)

基于1.25

kube-proxy也使用Cobra解析用户输入参数,初始化Options对象、验证参数有效性,调用Run启动

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-proxy/app/server.go#L483

    // NewProxyCommand creates a *cobra.Command object with default parameters
    func NewProxyCommand() *cobra.Command {
    opts := NewOptions()

    cmd := &cobra.Command{
    Use: "kube-proxy",
    Long: `The Kubernetes network proxy runs on each node. This
    reflects services as defined in the Kubernetes API on each node and can do simple
    TCP, UDP, and SCTP stream forwarding or round robin TCP, UDP, and SCTP forwarding across a set of backends.
    Service cluster IPs and ports are currently found through Docker-links-compatible
    environment variables specifying ports opened by the service proxy. There is an optional
    addon that provides cluster DNS for these cluster IPs. The user must create a service
    with the apiserver API to configure the proxy.`,
    RunE: func(cmd *cobra.Command, args []string) error {
    verflag.PrintAndExitIfRequested()
    cliflag.PrintFlags(cmd.Flags())

    if err := initForOS(opts.WindowsService); err != nil {
    return fmt.Errorf("failed os init: %w", err)
    }

    if err := opts.Complete(); err != nil {
    return fmt.Errorf("failed complete: %w", err)
    }

    if err := opts.Validate(); err != nil {
    return fmt.Errorf("failed validate: %w", err)
    }

    if err := opts.Run(); err != nil {
    klog.ErrorS(err, "Error running ProxyServer")
    return err
    }

    return nil
    },
    Args: func(cmd *cobra.Command, args []string) error {
    for _, arg := range args {
    if len(arg) > 0 {
    return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
    }
    }
    return nil
    },
    }

    var err error
    opts.config, err = opts.ApplyDefaults(opts.config)
    if err != nil {
    klog.ErrorS(err, "Unable to create flag defaults")
    // ACTION REQUIRED: Exit code changed from 255 to 1
    os.Exit(1)
    }

    fs := cmd.Flags()
    opts.AddFlags(fs)
    fs.AddGoFlagSet(goflag.CommandLine) // for --boot-id-file and --machine-id-file

    _ = cmd.MarkFlagFilename("config", "yaml", "yml", "json")

    return cmd
    }

生成iptables、ipvs、Kernel、IP Set接口

kube-proxy需要调用iptables、ipset等命令,但是这些都会封装为了接口

untiliptables.Interface

封装了iptables等命令,包含创建、删除iptables链、iptables规则

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/util/iptables/iptables.go#L49

    // Interface is an injectable interface for running iptables commands.  Implementations must be goroutine-safe.
    type Interface interface {
    // EnsureChain checks if the specified chain exists and, if not, creates it. If the chain existed, return true.
    EnsureChain(table Table, chain Chain) (bool, error)
    // FlushChain clears the specified chain. If the chain did not exist, return error.
    FlushChain(table Table, chain Chain) error
    // DeleteChain deletes the specified chain. If the chain did not exist, return error.
    DeleteChain(table Table, chain Chain) error
    // ChainExists tests whether the specified chain exists, returning an error if it
    // does not, or if it is unable to check.
    ChainExists(table Table, chain Chain) (bool, error)
    // EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true.
    EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
    // DeleteRule checks if the specified rule is present and, if so, deletes it.
    DeleteRule(table Table, chain Chain, args ...string) error
    // IsIPv6 returns true if this is managing ipv6 tables.
    IsIPv6() bool
    // Protocol returns the IP family this instance is managing,
    Protocol() Protocol
    // SaveInto calls `iptables-save` for table and stores result in a given buffer.
    SaveInto(table Table, buffer *bytes.Buffer) error
    // Restore runs `iptables-restore` passing data through []byte.
    // table is the Table to restore
    // data should be formatted like the output of SaveInto()
    // flush sets the presence of the "--noflush" flag. see: FlushFlag
    // counters sets the "--counters" flag. see: RestoreCountersFlag
    Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
    // RestoreAll is the same as Restore except that no table is specified.
    RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
    // Monitor detects when the given iptables tables have been flushed by an external
    // tool (e.g. a firewall reload) by creating canary chains and polling to see if
    // they have been deleted. (Specifically, it polls tables[0] every interval until
    // the canary has been deleted from there, then waits a short additional time for
    // the canaries to be deleted from the remaining tables as well. You can optimize
    // the polling by listing a relatively empty table in tables[0]). When a flush is
    // detected, this calls the reloadFunc so the caller can reload their own iptables
    // rules. If it is unable to create the canary chains (either initially or after
    // a reload) it will log an error and stop monitoring.
    // (This function should be called from a goroutine.)
    Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{})
    // HasRandomFully reveals whether `-j MASQUERADE` takes the
    // `--random-fully` option. This is helpful to work around a
    // Linux kernel bug that sometimes causes multiple flows to get
    // mapped to the same IP:PORT and consequently some suffer packet
    // drops.
    HasRandomFully() bool

    // Present checks if the kernel supports the iptable interface
    Present() bool
    }

接口的实现:

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/util/iptables/iptables.go#L251

    // EnsureChain is part of Interface.
    func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error) {
    fullArgs := makeFullArgs(table, chain)

    runner.mu.Lock()
    defer runner.mu.Unlock()

    out, err := runner.run(opCreateChain, fullArgs)
    if err != nil {
    if ee, ok := err.(utilexec.ExitError); ok {
    if ee.Exited() && ee.ExitStatus() == 1 {
    return true, nil
    }
    }
    return false, fmt.Errorf("error creating chain %q: %v: %s", chain, err, out)
    }
    return false, nil
    }

    // FlushChain is part of Interface.
    func (runner *runner) FlushChain(table Table, chain Chain) error {
    fullArgs := makeFullArgs(table, chain)

    runner.mu.Lock()
    defer runner.mu.Unlock()

    out, err := runner.run(opFlushChain, fullArgs)
    if err != nil {
    return fmt.Errorf("error flushing chain %q: %v: %s", chain, err, out)
    }
    return nil
    }

    // DeleteChain is part of Interface.
    func (runner *runner) DeleteChain(table Table, chain Chain) error {
    fullArgs := makeFullArgs(table, chain)

    runner.mu.Lock()
    defer runner.mu.Unlock()

    out, err := runner.run(opDeleteChain, fullArgs)
    if err != nil {
    return fmt.Errorf("error deleting chain %q: %v: %s", chain, err, out)
    }
    return nil
    }

    // EnsureRule is part of Interface.
    func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) {
    fullArgs := makeFullArgs(table, chain, args...)

    runner.mu.Lock()
    defer runner.mu.Unlock()

    exists, err := runner.checkRule(table, chain, args...)
    if err != nil {
    return false, err
    }
    if exists {
    return true, nil
    }
    out, err := runner.run(operation(position), fullArgs)
    if err != nil {
    return false, fmt.Errorf("error appending rule: %v: %s", err, out)
    }
    return false, nil
    }

    // DeleteRule is part of Interface.
    func (runner *runner) DeleteRule(table Table, chain Chain, args ...string) error {
    fullArgs := makeFullArgs(table, chain, args...)

    runner.mu.Lock()
    defer runner.mu.Unlock()

    exists, err := runner.checkRule(table, chain, args...)
    if err != nil {
    return err
    }
    if !exists {
    return nil
    }
    out, err := runner.run(opDeleteRule, fullArgs)
    if err != nil {
    return fmt.Errorf("error deleting rule: %v: %s", err, out)
    }
    return nil
    }

    func (runner *runner) IsIPv6() bool {
    return runner.protocol == ProtocolIPv6
    }

    func (runner *runner) Protocol() Protocol {
    return runner.protocol
    }

    // SaveInto is part of Interface.
    func (runner *runner) SaveInto(table Table, buffer *bytes.Buffer) error {
    runner.mu.Lock()
    defer runner.mu.Unlock()

    trace := utiltrace.New("iptables save")
    defer trace.LogIfLong(2 * time.Second)

    // run and return
    iptablesSaveCmd := iptablesSaveCommand(runner.protocol)
    args := []string{"-t", string(table)}
    klog.V(4).Infof("running %s %v", iptablesSaveCmd, args)
    cmd := runner.exec.Command(iptablesSaveCmd, args...)
    cmd.SetStdout(buffer)
    stderrBuffer := bytes.NewBuffer(nil)
    cmd.SetStderr(stderrBuffer)

    err := cmd.Run()
    if err != nil {
    stderrBuffer.WriteTo(buffer) // ignore error, since we need to return the original error
    }
    return err
    }

    // Restore is part of Interface.
    func (runner *runner) Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
    // setup args
    args := []string{"-T", string(table)}
    return runner.restoreInternal(args, data, flush, counters)
    }

    // RestoreAll is part of Interface.
    func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
    // setup args
    args := make([]string, 0)
    return runner.restoreInternal(args, data, flush, counters)
    }

    type iptablesLocker interface {
    Close() error
    }

utilipvs.Interface

实现ipvs的借口命令,包含创建、更新和删除ipvs VirtualServer等

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/util/ipvs/ipvs.go#L29

    // Interface is an injectable interface for running ipvs commands.  Implementations must be goroutine-safe.
    type Interface interface {
    // Flush clears all virtual servers in system. return occurred error immediately.
    Flush() error
    // AddVirtualServer creates the specified virtual server.
    AddVirtualServer(*VirtualServer) error
    // UpdateVirtualServer updates an already existing virtual server. If the virtual server does not exist, return error.
    UpdateVirtualServer(*VirtualServer) error
    // DeleteVirtualServer deletes the specified virtual server. If the virtual server does not exist, return error.
    DeleteVirtualServer(*VirtualServer) error
    // Given a partial virtual server, GetVirtualServer will return the specified virtual server information in the system.
    GetVirtualServer(*VirtualServer) (*VirtualServer, error)
    // GetVirtualServers lists all virtual servers in the system.
    GetVirtualServers() ([]*VirtualServer, error)
    // AddRealServer creates the specified real server for the specified virtual server.
    AddRealServer(*VirtualServer, *RealServer) error
    // GetRealServers returns all real servers for the specified virtual server.
    GetRealServers(*VirtualServer) ([]*RealServer, error)
    // DeleteRealServer deletes the specified real server from the specified virtual server.
    DeleteRealServer(*VirtualServer, *RealServer) error
    // UpdateRealServer updates the specified real server from the specified virtual server.
    UpdateRealServer(*VirtualServer, *RealServer) error
    // ConfigureTimeouts is the equivalent to running "ipvsadm --set" to configure tcp, tcpfin and udp timeouts
    ConfigureTimeouts(time.Duration, time.Duration, time.Duration) error
    }

ipvs实现:

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/util/ipvs/ipvs_linux.go#L46


    // AddVirtualServer is part of ipvs.Interface.
    func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
    svc, err := toIPVSService(vs)
    if err != nil {
    return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    }
    runner.mu.Lock()
    defer runner.mu.Unlock()
    return runner.ipvsHandle.NewService(svc)
    }

    // UpdateVirtualServer is part of ipvs.Interface.
    func (runner *runner) UpdateVirtualServer(vs *VirtualServer) error {
    svc, err := toIPVSService(vs)
    if err != nil {
    return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    }
    runner.mu.Lock()
    defer runner.mu.Unlock()
    return runner.ipvsHandle.UpdateService(svc)
    }

    // DeleteVirtualServer is part of ipvs.Interface.
    func (runner *runner) DeleteVirtualServer(vs *VirtualServer) error {
    svc, err := toIPVSService(vs)
    if err != nil {
    return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    }
    runner.mu.Lock()
    defer runner.mu.Unlock()
    return runner.ipvsHandle.DelService(svc)
    }

    // GetVirtualServer is part of ipvs.Interface.
    func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error) {
    svc, err := toIPVSService(vs)
    if err != nil {
    return nil, fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    }
    runner.mu.Lock()
    ipvsSvc, err := runner.ipvsHandle.GetService(svc)
    runner.mu.Unlock()

    if err != nil {
    return nil, fmt.Errorf("could not get IPVS service: %w", err)
    }
    vServ, err := toVirtualServer(ipvsSvc)
    if err != nil {
    return nil, fmt.Errorf("could not convert IPVS service to local virtual server: %w", err)
    }
    return vServ, nil
    }

    // GetVirtualServers is part of ipvs.Interface.
    func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
    runner.mu.Lock()
    ipvsSvcs, err := runner.ipvsHandle.GetServices()
    runner.mu.Unlock()
    if err != nil {
    return nil, fmt.Errorf("could not get IPVS services: %w", err)
    }
    vss := make([]*VirtualServer, 0)
    for _, ipvsSvc := range ipvsSvcs {
    vs, err := toVirtualServer(ipvsSvc)
    if err != nil {
    return nil, fmt.Errorf("could not convert IPVS service to local virtual server: %w", err)
    }
    vss = append(vss, vs)
    }
    return vss, nil
    }

    // Flush is part of ipvs.Interface. Currently we delete IPVS services one by one
    func (runner *runner) Flush() error {
    runner.mu.Lock()
    defer runner.mu.Unlock()
    return runner.ipvsHandle.Flush()
    }

    // AddRealServer is part of ipvs.Interface.
    func (runner *runner) AddRealServer(vs *VirtualServer, rs *RealServer) error {
    svc, err := toIPVSService(vs)
    if err != nil {
    return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    }
    dst, err := toIPVSDestination(rs)
    if err != nil {
    return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
    }
    runner.mu.Lock()
    defer runner.mu.Unlock()
    return runner.ipvsHandle.NewDestination(svc, dst)
    }

    // DeleteRealServer is part of ipvs.Interface.
    func (runner *runner) DeleteRealServer(vs *VirtualServer, rs *RealServer) error {
    svc, err := toIPVSService(vs)
    if err != nil {
    return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    }
    dst, err := toIPVSDestination(rs)
    if err != nil {
    return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
    }
    runner.mu.Lock()
    defer runner.mu.Unlock()
    return runner.ipvsHandle.DelDestination(svc, dst)
    }

    func (runner *runner) UpdateRealServer(vs *VirtualServer, rs *RealServer) error {
    svc, err := toIPVSService(vs)
    if err != nil {
    return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    }
    dst, err := toIPVSDestination(rs)
    if err != nil {
    return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
    }
    runner.mu.Lock()
    defer runner.mu.Unlock()
    return runner.ipvsHandle.UpdateDestination(svc, dst)
    }

    // GetRealServers is part of ipvs.Interface.
    func (runner *runner) GetRealServers(vs *VirtualServer) ([]*RealServer, error) {
    svc, err := toIPVSService(vs)
    if err != nil {
    return nil, fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    }
    runner.mu.Lock()
    dsts, err := runner.ipvsHandle.GetDestinations(svc)
    runner.mu.Unlock()
    if err != nil {
    return nil, fmt.Errorf("could not get IPVS destination for service: %w", err)
    }
    rss := make([]*RealServer, 0)
    for _, dst := range dsts {
    dst, err := toRealServer(dst)
    // TODO: aggregate errors?
    if err != nil {
    return nil, fmt.Errorf("could not convert IPVS destination to local real server: %w", err)
    }
    rss = append(rss, dst)
    }
    return rss, nil
    }

    // ConfigureTimeouts is the equivalent to running "ipvsadm --set" to configure tcp, tcpfin and udp timeouts
    func (runner *runner) ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout time.Duration) error {
    ipvsConfig := &libipvs.Config{
    TimeoutTCP: tcpTimeout,
    TimeoutTCPFin: tcpFinTimeout,
    TimeoutUDP: udpTimeout,
    }

    return runner.ipvsHandle.SetConfig(ipvsConfig)
    }

    // toVirtualServer converts an IPVS Service to the equivalent VirtualServer structure.
    func toVirtualServer(svc *libipvs.Service) (*VirtualServer, error) {
    if svc == nil {
    return nil, errors.New("ipvs svc should not be empty")
    }
    vs := &VirtualServer{
    Address: svc.Address,
    Port: svc.Port,
    Scheduler: svc.SchedName,
    Protocol: protocolToString(Protocol(svc.Protocol)),
    Timeout: svc.Timeout,
    }

    // Test Flags >= 0x2, valid Flags ranges [0x2, 0x3]
    if svc.Flags&FlagHashed == 0 {
    return nil, fmt.Errorf("Flags of successfully created IPVS service should be >= %d since every service is hashed into the service table", FlagHashed)
    }
    // Sub Flags to 0x2
    // 011 -> 001, 010 -> 000
    vs.Flags = ServiceFlags(svc.Flags &^ uint32(FlagHashed))

    if vs.Address == nil {
    if svc.AddressFamily == unix.AF_INET {
    vs.Address = net.IPv4zero
    } else {
    vs.Address = net.IPv6zero
    }
    }
    return vs, nil
    }
    ...

判断是否支持ipvs代理模式

kube-proxy判断当前宿主节点是否支持ipvs代理模式。需要依赖上一步的kernelHandler

CanUseIPVSProxier 负责判断是否支持ipvs代理模式,主要检查宿主节点的内核加载情况和IPSet版本:

  • 检查宿主节点的内核加载情况:需要的内核模块`ip_vs\ip_vs_rr\ip_vs_wrr\ip_vs_sh\nf_conntrack_ipvs4(内核版本低于4.19)或nf_conntrack(内核版本不低于4.19)
  • 检查IPSet版本:kube-proxy 通过ipset --vesion获取版本信息,如果版本低于6.0无法满足ipvs代理模式要求

获取宿主节点的Hostname

kube-proxy通过GetHostname获取宿主节点的hostname

生成KubeClient和EventClient

kube-proxy需要和kube-apiserver通信获取和更新一些资源对象以及输出一些Event,所以需要KubeClient和EventClient

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-proxy/app/server.go#L562

    // createClients creates a kube client and an event client from the given config and masterOverride.
    // TODO remove masterOverride when CLI flags are removed.
    func createClients(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, v1core.EventsGetter, error) {
    var kubeConfig *rest.Config
    var err error

    if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 {
    klog.InfoS("Neither kubeconfig file nor master URL was specified, falling back to in-cluster config")
    // 获取集群内config
    kubeConfig, err = rest.InClusterConfig()
    } else {
    // This creates a client, first loading any specified kubeconfig
    // file, and then overriding the Master flag, if non-empty.
    kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
    &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
    &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
    }
    if err != nil {
    return nil, nil, err
    }

    kubeConfig.AcceptContentTypes = config.AcceptContentTypes
    kubeConfig.ContentType = config.ContentType
    kubeConfig.QPS = config.QPS
    kubeConfig.Burst = int(config.Burst)
    // 生成kubeClient
    client, err := clientset.NewForConfig(kubeConfig)
    if err != nil {
    return nil, nil, err
    }
    // 生成eventClient
    eventClient, err := clientset.NewForConfig(kubeConfig)
    if err != nil {
    return nil, nil, err
    }

    return client, eventClient.CoreV1(), nil
    }

获取宿主节点的IP地址

kube-proxy通过deleteNodeIP获取当前宿主节点的IP地址

  • 如果设置了--bind-address启动参数,则直接使用该参数的值
  • 如果没有设置--bind-address启动参数,则获取K8s集群中当前宿主节点的Node资源对象Status记录的IP地址
  • 如果上述俩种都没有设置成功, 则使用127.0.0.1作为宿主节点的IP地址

确定代理模式

尽管kube-proxy启动参数中设置了代理模式,但是宿主节点未必支持该模式

  • 在无法支持指定模式下,需要回退到支持的代理模式中

  • 回退的顺序:ipvs->iptables->userspace

  • 如果启动参数指定的是userspace模式,则直接使用

  • 如果启动参数指定的是iptables模式,则调用tryIPtablesProxyfunc判定宿主节点能否支持iptables模式,如果不能,回退到userspace模式

    • 在tryIPTableProxy模式中,检查/proc/sys/net/ipv4/conf/all/route_localnet的值,如果存在则认为支持iptables模式
  • 如果启动参数指定ipvs,则调用tryIPVSProxy判断是否支持,不然回退到iiptables

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-proxy/app/server_others.go#L583

    func getProxyMode(proxyMode string, canUseIPVS bool, kcompat iptables.KernelCompatTester) string {
    switch proxyMode {
    case proxyModeUserspace:
    return proxyModeUserspace
    case proxyModeIPTables:
    return tryIPTablesProxy(kcompat)
    case proxyModeIPVS:
    return tryIPVSProxy(canUseIPVS, kcompat)
    }
    klog.InfoS("Unknown proxy mode, assuming iptables proxy", "proxyMode", proxyMode)
    return tryIPTablesProxy(kcompat)
    }

确定本地数据包判定方法

LocalMode标识kube-proxy以何种方法判定的当前节点的流量来自于当前节点,分别由四种

  • ClusterCIDR(默认模式):当前流量来自于Pod网段的时候,认为流量来自于宿主节点,

  • NodeCIDR:当流量来自于当前节点的被分配的Pod网段时,认为流量来自于宿主节点

  • BridgeInterface:当流量来自于某个网桥,认为流量来自于宿主节点

  • InterfaceNamePrefix:当流量来自于某些具有前缀的网络接口的时候,认为流量来自于宿主节点

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-proxy/app/server_others.go#L167

    ...	
    if detectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
    klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", hostname)
    nodeInfo, err = waitForPodCIDR(client, hostname)
    if err != nil {
    return nil, err
    }
    podCIDRs = nodeInfo.Spec.PodCIDRs
    klog.InfoS("NodeInfo", "PodCIDR", nodeInfo.Spec.PodCIDR, "PodCIDRs", nodeInfo.Spec.PodCIDRs)
    }
    ...

确定IP协议栈

kube-proxy判断IPv4/IPv6 双栈协议还是单栈来运行

  • 如果是userspace模式,直接单栈运行,网络协议栈是节点属于单IP类型

  • 如果是ipvs或者iptables,调用Present(),分别检查iptables和ip6tables下是否存在nat表和POSTROUTING链,确定是双栈还是单栈

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-proxy/app/server_others.go#L188

    ...
    if proxyMode != proxyModeUserspace {
    // Create iptables handlers for both families, one is already created
    // Always ordered as IPv4, IPv6
    if primaryProtocol == utiliptables.ProtocolIPv4 {
    ipt[0] = iptInterface
    ipt[1] = utiliptables.New(execer, utiliptables.ProtocolIPv6)
    } else {
    ipt[0] = utiliptables.New(execer, utiliptables.ProtocolIPv4)
    ipt[1] = iptInterface
    }

    for _, perFamilyIpt := range ipt {
    if !perFamilyIpt.Present() {
    klog.V(0).InfoS("kube-proxy running in single-stack mode, this ipFamily is not supported", "ipFamily", perFamilyIpt.Protocol())
    dualStack = false
    }
    }
    }
    ...
  • Refhttps://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/pkg/util/iptables/iptables.go#L736

    // Present tests if iptable is supported on current kernel by checking the existence
    // of default table and chain
    func (runner *runner) Present() bool {
    if _, err := runner.ChainExists(TableNAT, ChainPostrouting); err != nil {
    return false
    }

    return true
    }

生成Proxier结构体

确定IP协议栈之后和代理模式,需要生产kube-proxy对应的结构体Proxier

  • Ref:https://github.com/kubernetes/kubernetes/blob/88e994f6bf8fc88114c5b733e09afea339bea66d/cmd/kube-proxy/app/server_others.go#L207

    if proxyMode == proxyModeIPTables {
    klog.V(0).InfoS("Using iptables Proxier")
    if config.IPTables.MasqueradeBit == nil {
    // MasqueradeBit must be specified or defaulted.
    return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
    }

    if dualStack {
    klog.V(0).InfoS("kube-proxy running in dual-stack mode", "ipFamily", iptInterface.Protocol())
    klog.V(0).InfoS("Creating dualStackProxier for iptables")
    // Always ordered to match []ipt
    var localDetectors [2]proxyutiliptables.LocalTrafficDetector
    localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt, nodeInfo)
    if err != nil {
    return nil, fmt.Errorf("unable to create proxier: %v", err)
    }

    // TODO this has side effects that should only happen when Run() is invoked.
    proxier, err = iptables.NewDualStackProxier(
    ipt,
    utilsysctl.New(),
    execer,
    config.IPTables.SyncPeriod.Duration,
    config.IPTables.MinSyncPeriod.Duration,
    config.IPTables.MasqueradeAll,
    int(*config.IPTables.MasqueradeBit),
    localDetectors,
    hostname,
    nodeIPTuple(config.BindAddress),
    recorder,
    healthzServer,
    config.NodePortAddresses,
    )
    } else {
    // Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support).
    var localDetector proxyutiliptables.LocalTrafficDetector
    localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo)
    if err != nil {
    return nil, fmt.Errorf("unable to create proxier: %v", err)
    }

    // TODO this has side effects that should only happen when Run() is invoked.
    proxier, err = iptables.NewProxier(
    iptInterface,
    utilsysctl.New(),
    execer,
    config.IPTables.SyncPeriod.Duration,
    config.IPTables.MinSyncPeriod.Duration,
    config.IPTables.MasqueradeAll,
    int(*config.IPTables.MasqueradeBit),
    localDetector,
    hostname,
    nodeIP,
    recorder,
    healthzServer,
    config.NodePortAddresses,
    )
    }

    if err != nil {
    return nil, fmt.Errorf("unable to create proxier: %v", err)
    }
    proxymetrics.RegisterMetrics()
    } else if proxyMode == proxyModeIPVS {
    klog.V(0).InfoS("Using ipvs Proxier")
    if dualStack {
    klog.V(0).InfoS("Creating dualStackProxier for ipvs")

    nodeIPs := nodeIPTuple(config.BindAddress)

    // Always ordered to match []ipt
    var localDetectors [2]proxyutiliptables.LocalTrafficDetector
    localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt, nodeInfo)
    if err != nil {
    return nil, fmt.Errorf("unable to create proxier: %v", err)
    }

    proxier, err = ipvs.NewDualStackProxier(
    ipt,
    ipvsInterface,
    ipsetInterface,
    utilsysctl.New(),
    execer,
    config.IPVS.SyncPeriod.Duration,
    config.IPVS.MinSyncPeriod.Duration,
    config.IPVS.ExcludeCIDRs,
    config.IPVS.StrictARP,
    config.IPVS.TCPTimeout.Duration,
    config.IPVS.TCPFinTimeout.Duration,
    config.IPVS.UDPTimeout.Duration,
    config.IPTables.MasqueradeAll,
    int(*config.IPTables.MasqueradeBit),
    localDetectors,
    hostname,
    nodeIPs,
    recorder,
    healthzServer,
    config.IPVS.Scheduler,
    config.NodePortAddresses,
    kernelHandler,
    )
    } else {
    var localDetector proxyutiliptables.LocalTrafficDetector
    localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo)
    if err != nil {
    return nil, fmt.Errorf("unable to create proxier: %v", err)
    }

    proxier, err = ipvs.NewProxier(
    iptInterface,
    ipvsInterface,
    ipsetInterface,
    utilsysctl.New(),
    execer,
    config.IPVS.SyncPeriod.Duration,
    config.IPVS.MinSyncPeriod.Duration,
    config.IPVS.ExcludeCIDRs,
    config.IPVS.StrictARP,
    config.IPVS.TCPTimeout.Duration,
    config.IPVS.TCPFinTimeout.Duration,
    config.IPVS.UDPTimeout.Duration,
    config.IPTables.MasqueradeAll,
    int(*config.IPTables.MasqueradeBit),
    localDetector,
    hostname,
    nodeIP,
    recorder,
    healthzServer,
    config.IPVS.Scheduler,
    config.NodePortAddresses,
    kernelHandler,
    )
    }
    if err != nil {
    return nil, fmt.Errorf("unable to create proxier: %v", err)
    }
    proxymetrics.RegisterMetrics()
    ...