K8s-kube-proxy(初始化过程)
K8s-kube-proxy(初始化过程)
基于1.25
kube-proxy也使用Cobra解析用户输入参数,初始化Options对象、验证参数有效性,调用Run启动
-
// NewProxyCommand creates a *cobra.Command object with default parameters
func NewProxyCommand() *cobra.Command {
opts := NewOptions()
cmd := &cobra.Command{
Use: "kube-proxy",
Long: `The Kubernetes network proxy runs on each node. This
reflects services as defined in the Kubernetes API on each node and can do simple
TCP, UDP, and SCTP stream forwarding or round robin TCP, UDP, and SCTP forwarding across a set of backends.
Service cluster IPs and ports are currently found through Docker-links-compatible
environment variables specifying ports opened by the service proxy. There is an optional
addon that provides cluster DNS for these cluster IPs. The user must create a service
with the apiserver API to configure the proxy.`,
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())
if err := initForOS(opts.WindowsService); err != nil {
return fmt.Errorf("failed os init: %w", err)
}
if err := opts.Complete(); err != nil {
return fmt.Errorf("failed complete: %w", err)
}
if err := opts.Validate(); err != nil {
return fmt.Errorf("failed validate: %w", err)
}
if err := opts.Run(); err != nil {
klog.ErrorS(err, "Error running ProxyServer")
return err
}
return nil
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
var err error
opts.config, err = opts.ApplyDefaults(opts.config)
if err != nil {
klog.ErrorS(err, "Unable to create flag defaults")
// ACTION REQUIRED: Exit code changed from 255 to 1
os.Exit(1)
}
fs := cmd.Flags()
opts.AddFlags(fs)
fs.AddGoFlagSet(goflag.CommandLine) // for --boot-id-file and --machine-id-file
_ = cmd.MarkFlagFilename("config", "yaml", "yml", "json")
return cmd
}
生成iptables、ipvs、Kernel、IP Set接口
kube-proxy需要调用iptables、ipset等命令,但是这些都会封装为了接口
-
var iptInterface utiliptables.Interface
var ipvsInterface utilipvs.Interface
var kernelHandler ipvs.KernelHandler
var ipsetInterface utilipset.Interface
untiliptables.Interface
封装了iptables等命令,包含创建、删除iptables链、iptables规则
-
// Interface is an injectable interface for running iptables commands. Implementations must be goroutine-safe.
type Interface interface {
// EnsureChain checks if the specified chain exists and, if not, creates it. If the chain existed, return true.
EnsureChain(table Table, chain Chain) (bool, error)
// FlushChain clears the specified chain. If the chain did not exist, return error.
FlushChain(table Table, chain Chain) error
// DeleteChain deletes the specified chain. If the chain did not exist, return error.
DeleteChain(table Table, chain Chain) error
// ChainExists tests whether the specified chain exists, returning an error if it
// does not, or if it is unable to check.
ChainExists(table Table, chain Chain) (bool, error)
// EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true.
EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
// DeleteRule checks if the specified rule is present and, if so, deletes it.
DeleteRule(table Table, chain Chain, args ...string) error
// IsIPv6 returns true if this is managing ipv6 tables.
IsIPv6() bool
// Protocol returns the IP family this instance is managing,
Protocol() Protocol
// SaveInto calls `iptables-save` for table and stores result in a given buffer.
SaveInto(table Table, buffer *bytes.Buffer) error
// Restore runs `iptables-restore` passing data through []byte.
// table is the Table to restore
// data should be formatted like the output of SaveInto()
// flush sets the presence of the "--noflush" flag. see: FlushFlag
// counters sets the "--counters" flag. see: RestoreCountersFlag
Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
// RestoreAll is the same as Restore except that no table is specified.
RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
// Monitor detects when the given iptables tables have been flushed by an external
// tool (e.g. a firewall reload) by creating canary chains and polling to see if
// they have been deleted. (Specifically, it polls tables[0] every interval until
// the canary has been deleted from there, then waits a short additional time for
// the canaries to be deleted from the remaining tables as well. You can optimize
// the polling by listing a relatively empty table in tables[0]). When a flush is
// detected, this calls the reloadFunc so the caller can reload their own iptables
// rules. If it is unable to create the canary chains (either initially or after
// a reload) it will log an error and stop monitoring.
// (This function should be called from a goroutine.)
Monitor(canary Chain, tables []Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{})
// HasRandomFully reveals whether `-j MASQUERADE` takes the
// `--random-fully` option. This is helpful to work around a
// Linux kernel bug that sometimes causes multiple flows to get
// mapped to the same IP:PORT and consequently some suffer packet
// drops.
HasRandomFully() bool
// Present checks if the kernel supports the iptable interface
Present() bool
}
接口的实现:
-
// EnsureChain is part of Interface.
func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error) {
fullArgs := makeFullArgs(table, chain)
runner.mu.Lock()
defer runner.mu.Unlock()
out, err := runner.run(opCreateChain, fullArgs)
if err != nil {
if ee, ok := err.(utilexec.ExitError); ok {
if ee.Exited() && ee.ExitStatus() == 1 {
return true, nil
}
}
return false, fmt.Errorf("error creating chain %q: %v: %s", chain, err, out)
}
return false, nil
}
// FlushChain is part of Interface.
func (runner *runner) FlushChain(table Table, chain Chain) error {
fullArgs := makeFullArgs(table, chain)
runner.mu.Lock()
defer runner.mu.Unlock()
out, err := runner.run(opFlushChain, fullArgs)
if err != nil {
return fmt.Errorf("error flushing chain %q: %v: %s", chain, err, out)
}
return nil
}
// DeleteChain is part of Interface.
func (runner *runner) DeleteChain(table Table, chain Chain) error {
fullArgs := makeFullArgs(table, chain)
runner.mu.Lock()
defer runner.mu.Unlock()
out, err := runner.run(opDeleteChain, fullArgs)
if err != nil {
return fmt.Errorf("error deleting chain %q: %v: %s", chain, err, out)
}
return nil
}
// EnsureRule is part of Interface.
func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) {
fullArgs := makeFullArgs(table, chain, args...)
runner.mu.Lock()
defer runner.mu.Unlock()
exists, err := runner.checkRule(table, chain, args...)
if err != nil {
return false, err
}
if exists {
return true, nil
}
out, err := runner.run(operation(position), fullArgs)
if err != nil {
return false, fmt.Errorf("error appending rule: %v: %s", err, out)
}
return false, nil
}
// DeleteRule is part of Interface.
func (runner *runner) DeleteRule(table Table, chain Chain, args ...string) error {
fullArgs := makeFullArgs(table, chain, args...)
runner.mu.Lock()
defer runner.mu.Unlock()
exists, err := runner.checkRule(table, chain, args...)
if err != nil {
return err
}
if !exists {
return nil
}
out, err := runner.run(opDeleteRule, fullArgs)
if err != nil {
return fmt.Errorf("error deleting rule: %v: %s", err, out)
}
return nil
}
func (runner *runner) IsIPv6() bool {
return runner.protocol == ProtocolIPv6
}
func (runner *runner) Protocol() Protocol {
return runner.protocol
}
// SaveInto is part of Interface.
func (runner *runner) SaveInto(table Table, buffer *bytes.Buffer) error {
runner.mu.Lock()
defer runner.mu.Unlock()
trace := utiltrace.New("iptables save")
defer trace.LogIfLong(2 * time.Second)
// run and return
iptablesSaveCmd := iptablesSaveCommand(runner.protocol)
args := []string{"-t", string(table)}
klog.V(4).Infof("running %s %v", iptablesSaveCmd, args)
cmd := runner.exec.Command(iptablesSaveCmd, args...)
cmd.SetStdout(buffer)
stderrBuffer := bytes.NewBuffer(nil)
cmd.SetStderr(stderrBuffer)
err := cmd.Run()
if err != nil {
stderrBuffer.WriteTo(buffer) // ignore error, since we need to return the original error
}
return err
}
// Restore is part of Interface.
func (runner *runner) Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
// setup args
args := []string{"-T", string(table)}
return runner.restoreInternal(args, data, flush, counters)
}
// RestoreAll is part of Interface.
func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
// setup args
args := make([]string, 0)
return runner.restoreInternal(args, data, flush, counters)
}
type iptablesLocker interface {
Close() error
}
utilipvs.Interface
实现ipvs的借口命令,包含创建、更新和删除ipvs VirtualServer等
-
// Interface is an injectable interface for running ipvs commands. Implementations must be goroutine-safe.
type Interface interface {
// Flush clears all virtual servers in system. return occurred error immediately.
Flush() error
// AddVirtualServer creates the specified virtual server.
AddVirtualServer(*VirtualServer) error
// UpdateVirtualServer updates an already existing virtual server. If the virtual server does not exist, return error.
UpdateVirtualServer(*VirtualServer) error
// DeleteVirtualServer deletes the specified virtual server. If the virtual server does not exist, return error.
DeleteVirtualServer(*VirtualServer) error
// Given a partial virtual server, GetVirtualServer will return the specified virtual server information in the system.
GetVirtualServer(*VirtualServer) (*VirtualServer, error)
// GetVirtualServers lists all virtual servers in the system.
GetVirtualServers() ([]*VirtualServer, error)
// AddRealServer creates the specified real server for the specified virtual server.
AddRealServer(*VirtualServer, *RealServer) error
// GetRealServers returns all real servers for the specified virtual server.
GetRealServers(*VirtualServer) ([]*RealServer, error)
// DeleteRealServer deletes the specified real server from the specified virtual server.
DeleteRealServer(*VirtualServer, *RealServer) error
// UpdateRealServer updates the specified real server from the specified virtual server.
UpdateRealServer(*VirtualServer, *RealServer) error
// ConfigureTimeouts is the equivalent to running "ipvsadm --set" to configure tcp, tcpfin and udp timeouts
ConfigureTimeouts(time.Duration, time.Duration, time.Duration) error
}
ipvs实现:
-
// AddVirtualServer is part of ipvs.Interface.
func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
svc, err := toIPVSService(vs)
if err != nil {
return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.NewService(svc)
}
// UpdateVirtualServer is part of ipvs.Interface.
func (runner *runner) UpdateVirtualServer(vs *VirtualServer) error {
svc, err := toIPVSService(vs)
if err != nil {
return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.UpdateService(svc)
}
// DeleteVirtualServer is part of ipvs.Interface.
func (runner *runner) DeleteVirtualServer(vs *VirtualServer) error {
svc, err := toIPVSService(vs)
if err != nil {
return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.DelService(svc)
}
// GetVirtualServer is part of ipvs.Interface.
func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error) {
svc, err := toIPVSService(vs)
if err != nil {
return nil, fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
}
runner.mu.Lock()
ipvsSvc, err := runner.ipvsHandle.GetService(svc)
runner.mu.Unlock()
if err != nil {
return nil, fmt.Errorf("could not get IPVS service: %w", err)
}
vServ, err := toVirtualServer(ipvsSvc)
if err != nil {
return nil, fmt.Errorf("could not convert IPVS service to local virtual server: %w", err)
}
return vServ, nil
}
// GetVirtualServers is part of ipvs.Interface.
func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
runner.mu.Lock()
ipvsSvcs, err := runner.ipvsHandle.GetServices()
runner.mu.Unlock()
if err != nil {
return nil, fmt.Errorf("could not get IPVS services: %w", err)
}
vss := make([]*VirtualServer, 0)
for _, ipvsSvc := range ipvsSvcs {
vs, err := toVirtualServer(ipvsSvc)
if err != nil {
return nil, fmt.Errorf("could not convert IPVS service to local virtual server: %w", err)
}
vss = append(vss, vs)
}
return vss, nil
}
// Flush is part of ipvs.Interface. Currently we delete IPVS services one by one
func (runner *runner) Flush() error {
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.Flush()
}
// AddRealServer is part of ipvs.Interface.
func (runner *runner) AddRealServer(vs *VirtualServer, rs *RealServer) error {
svc, err := toIPVSService(vs)
if err != nil {
return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
}
dst, err := toIPVSDestination(rs)
if err != nil {
return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.NewDestination(svc, dst)
}
// DeleteRealServer is part of ipvs.Interface.
func (runner *runner) DeleteRealServer(vs *VirtualServer, rs *RealServer) error {
svc, err := toIPVSService(vs)
if err != nil {
return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
}
dst, err := toIPVSDestination(rs)
if err != nil {
return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.DelDestination(svc, dst)
}
func (runner *runner) UpdateRealServer(vs *VirtualServer, rs *RealServer) error {
svc, err := toIPVSService(vs)
if err != nil {
return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
}
dst, err := toIPVSDestination(rs)
if err != nil {
return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.UpdateDestination(svc, dst)
}
// GetRealServers is part of ipvs.Interface.
func (runner *runner) GetRealServers(vs *VirtualServer) ([]*RealServer, error) {
svc, err := toIPVSService(vs)
if err != nil {
return nil, fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
}
runner.mu.Lock()
dsts, err := runner.ipvsHandle.GetDestinations(svc)
runner.mu.Unlock()
if err != nil {
return nil, fmt.Errorf("could not get IPVS destination for service: %w", err)
}
rss := make([]*RealServer, 0)
for _, dst := range dsts {
dst, err := toRealServer(dst)
// TODO: aggregate errors?
if err != nil {
return nil, fmt.Errorf("could not convert IPVS destination to local real server: %w", err)
}
rss = append(rss, dst)
}
return rss, nil
}
// ConfigureTimeouts is the equivalent to running "ipvsadm --set" to configure tcp, tcpfin and udp timeouts
func (runner *runner) ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout time.Duration) error {
ipvsConfig := &libipvs.Config{
TimeoutTCP: tcpTimeout,
TimeoutTCPFin: tcpFinTimeout,
TimeoutUDP: udpTimeout,
}
return runner.ipvsHandle.SetConfig(ipvsConfig)
}
// toVirtualServer converts an IPVS Service to the equivalent VirtualServer structure.
func toVirtualServer(svc *libipvs.Service) (*VirtualServer, error) {
if svc == nil {
return nil, errors.New("ipvs svc should not be empty")
}
vs := &VirtualServer{
Address: svc.Address,
Port: svc.Port,
Scheduler: svc.SchedName,
Protocol: protocolToString(Protocol(svc.Protocol)),
Timeout: svc.Timeout,
}
// Test Flags >= 0x2, valid Flags ranges [0x2, 0x3]
if svc.Flags&FlagHashed == 0 {
return nil, fmt.Errorf("Flags of successfully created IPVS service should be >= %d since every service is hashed into the service table", FlagHashed)
}
// Sub Flags to 0x2
// 011 -> 001, 010 -> 000
vs.Flags = ServiceFlags(svc.Flags &^ uint32(FlagHashed))
if vs.Address == nil {
if svc.AddressFamily == unix.AF_INET {
vs.Address = net.IPv4zero
} else {
vs.Address = net.IPv6zero
}
}
return vs, nil
}
...
判断是否支持ipvs代理模式
kube-proxy判断当前宿主节点是否支持ipvs代理模式。需要依赖上一步的kernelHandler
-
canUseIPVS, err := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface, config.IPVS.Scheduler)
if string(config.Mode) == proxyModeIPVS && err != nil {
klog.ErrorS(err, "Can't use the IPVS proxier")
}
CanUseIPVSProxier 负责判断是否支持ipvs代理模式,主要检查宿主节点的内核加载情况和IPSet版本:
- 检查宿主节点的内核加载情况:需要的内核模块`ip_vs\ip_vs_rr\ip_vs_wrr\ip_vs_sh\nf_conntrack_ipvs4(内核版本低于4.19)或nf_conntrack(内核版本不低于4.19)
- 检查IPSet版本:kube-proxy 通过
ipset --vesion
获取版本信息,如果版本低于6.0无法满足ipvs代理模式要求
获取宿主节点的Hostname
kube-proxy通过GetHostname获取宿主节点的hostname
如果设置
--hostname-override
参数,则直接使用这个参数-
hostname, err := utilnode.GetHostname(config.HostnameOverride)
if err != nil {
return nil, err
}
生成KubeClient和EventClient
kube-proxy需要和kube-apiserver通信获取和更新一些资源对象以及输出一些Event,所以需要KubeClient和EventClient
-
// createClients creates a kube client and an event client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed.
func createClients(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, v1core.EventsGetter, error) {
var kubeConfig *rest.Config
var err error
if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 {
klog.InfoS("Neither kubeconfig file nor master URL was specified, falling back to in-cluster config")
// 获取集群内config
kubeConfig, err = rest.InClusterConfig()
} else {
// This creates a client, first loading any specified kubeconfig
// file, and then overriding the Master flag, if non-empty.
kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
}
if err != nil {
return nil, nil, err
}
kubeConfig.AcceptContentTypes = config.AcceptContentTypes
kubeConfig.ContentType = config.ContentType
kubeConfig.QPS = config.QPS
kubeConfig.Burst = int(config.Burst)
// 生成kubeClient
client, err := clientset.NewForConfig(kubeConfig)
if err != nil {
return nil, nil, err
}
// 生成eventClient
eventClient, err := clientset.NewForConfig(kubeConfig)
if err != nil {
return nil, nil, err
}
return client, eventClient.CoreV1(), nil
}
获取宿主节点的IP地址
kube-proxy通过deleteNodeIP
获取当前宿主节点的IP地址
- 如果设置了
--bind-address
启动参数,则直接使用该参数的值 - 如果没有设置
--bind-address
启动参数,则获取K8s集群中当前宿主节点的Node资源对象Status记录的IP地址 - 如果上述俩种都没有设置成功, 则使用
127.0.0.1
作为宿主节点的IP地址
确定代理模式
尽管kube-proxy启动参数中设置了代理模式,但是宿主节点未必支持该模式
在无法支持指定模式下,需要回退到支持的代理模式中
回退的顺序:ipvs->iptables->userspace
如果启动参数指定的是userspace模式,则直接使用
如果启动参数指定的是iptables模式,则调用
tryIPtablesProxy
func判定宿主节点能否支持iptables模式,如果不能,回退到userspace模式- 在tryIPTableProxy模式中,检查
/proc/sys/net/ipv4/conf/all/route_localnet
的值,如果存在则认为支持iptables模式
- 在tryIPTableProxy模式中,检查
如果启动参数指定ipvs,则调用
tryIPVSProxy
判断是否支持,不然回退到iiptables-
func getProxyMode(proxyMode string, canUseIPVS bool, kcompat iptables.KernelCompatTester) string {
switch proxyMode {
case proxyModeUserspace:
return proxyModeUserspace
case proxyModeIPTables:
return tryIPTablesProxy(kcompat)
case proxyModeIPVS:
return tryIPVSProxy(canUseIPVS, kcompat)
}
klog.InfoS("Unknown proxy mode, assuming iptables proxy", "proxyMode", proxyMode)
return tryIPTablesProxy(kcompat)
}
确定本地数据包判定方法
LocalMode标识kube-proxy以何种方法判定的当前节点的流量来自于当前节点,分别由四种
ClusterCIDR(默认模式):当前流量来自于Pod网段的时候,认为流量来自于宿主节点,
NodeCIDR:当流量来自于当前节点的被分配的Pod网段时,认为流量来自于宿主节点
BridgeInterface:当流量来自于某个网桥,认为流量来自于宿主节点
InterfaceNamePrefix:当流量来自于某些具有前缀的网络接口的时候,认为流量来自于宿主节点
-
...
if detectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", hostname)
nodeInfo, err = waitForPodCIDR(client, hostname)
if err != nil {
return nil, err
}
podCIDRs = nodeInfo.Spec.PodCIDRs
klog.InfoS("NodeInfo", "PodCIDR", nodeInfo.Spec.PodCIDR, "PodCIDRs", nodeInfo.Spec.PodCIDRs)
}
...
确定IP协议栈
kube-proxy判断IPv4/IPv6 双栈协议还是单栈来运行
如果是userspace模式,直接单栈运行,网络协议栈是节点属于单IP类型
如果是ipvs或者iptables,调用
Present()
,分别检查iptables和ip6tables下是否存在nat表和POSTROUTING链,确定是双栈还是单栈-
...
if proxyMode != proxyModeUserspace {
// Create iptables handlers for both families, one is already created
// Always ordered as IPv4, IPv6
if primaryProtocol == utiliptables.ProtocolIPv4 {
ipt[0] = iptInterface
ipt[1] = utiliptables.New(execer, utiliptables.ProtocolIPv6)
} else {
ipt[0] = utiliptables.New(execer, utiliptables.ProtocolIPv4)
ipt[1] = iptInterface
}
for _, perFamilyIpt := range ipt {
if !perFamilyIpt.Present() {
klog.V(0).InfoS("kube-proxy running in single-stack mode, this ipFamily is not supported", "ipFamily", perFamilyIpt.Protocol())
dualStack = false
}
}
}
... -
// Present tests if iptable is supported on current kernel by checking the existence
// of default table and chain
func (runner *runner) Present() bool {
if _, err := runner.ChainExists(TableNAT, ChainPostrouting); err != nil {
return false
}
return true
}
生成Proxier结构体
确定IP协议栈之后和代理模式,需要生产kube-proxy对应的结构体Proxier
-
if proxyMode == proxyModeIPTables {
klog.V(0).InfoS("Using iptables Proxier")
if config.IPTables.MasqueradeBit == nil {
// MasqueradeBit must be specified or defaulted.
return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
}
if dualStack {
klog.V(0).InfoS("kube-proxy running in dual-stack mode", "ipFamily", iptInterface.Protocol())
klog.V(0).InfoS("Creating dualStackProxier for iptables")
// Always ordered to match []ipt
var localDetectors [2]proxyutiliptables.LocalTrafficDetector
localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt, nodeInfo)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = iptables.NewDualStackProxier(
ipt,
utilsysctl.New(),
execer,
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
localDetectors,
hostname,
nodeIPTuple(config.BindAddress),
recorder,
healthzServer,
config.NodePortAddresses,
)
} else {
// Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support).
var localDetector proxyutiliptables.LocalTrafficDetector
localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = iptables.NewProxier(
iptInterface,
utilsysctl.New(),
execer,
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
localDetector,
hostname,
nodeIP,
recorder,
healthzServer,
config.NodePortAddresses,
)
}
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
proxymetrics.RegisterMetrics()
} else if proxyMode == proxyModeIPVS {
klog.V(0).InfoS("Using ipvs Proxier")
if dualStack {
klog.V(0).InfoS("Creating dualStackProxier for ipvs")
nodeIPs := nodeIPTuple(config.BindAddress)
// Always ordered to match []ipt
var localDetectors [2]proxyutiliptables.LocalTrafficDetector
localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt, nodeInfo)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
proxier, err = ipvs.NewDualStackProxier(
ipt,
ipvsInterface,
ipsetInterface,
utilsysctl.New(),
execer,
config.IPVS.SyncPeriod.Duration,
config.IPVS.MinSyncPeriod.Duration,
config.IPVS.ExcludeCIDRs,
config.IPVS.StrictARP,
config.IPVS.TCPTimeout.Duration,
config.IPVS.TCPFinTimeout.Duration,
config.IPVS.UDPTimeout.Duration,
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
localDetectors,
hostname,
nodeIPs,
recorder,
healthzServer,
config.IPVS.Scheduler,
config.NodePortAddresses,
kernelHandler,
)
} else {
var localDetector proxyutiliptables.LocalTrafficDetector
localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
proxier, err = ipvs.NewProxier(
iptInterface,
ipvsInterface,
ipsetInterface,
utilsysctl.New(),
execer,
config.IPVS.SyncPeriod.Duration,
config.IPVS.MinSyncPeriod.Duration,
config.IPVS.ExcludeCIDRs,
config.IPVS.StrictARP,
config.IPVS.TCPTimeout.Duration,
config.IPVS.TCPFinTimeout.Duration,
config.IPVS.UDPTimeout.Duration,
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
localDetector,
hostname,
nodeIP,
recorder,
healthzServer,
config.IPVS.Scheduler,
config.NodePortAddresses,
kernelHandler,
)
}
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
proxymetrics.RegisterMetrics()
...