K8s-client-go:WorkQueue工作队列

WorkQueue队列与普通的FIFO队列,添加了标记和去重功能。有如下特性:

  1. 有序:按照顺序处理元素
  2. 去重:相同元素同一时间不会被重复处理
  3. 并发性:多生产者和消费者
  4. 标记机制:ShutDown方法通过信号量通知队列不再接收新的元素,并通知metric goroutine退出
  5. 延迟:支持延迟队列,延迟一段时间将元素存入队列
  6. 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素出现排队(REenqueued)的次数
  7. Metric:支持metric监控指标,可用Prometheus监控

WorkQueue支持三种队列,并且提供三种接口:

  • Interface:FIFO队列,支持去重
  • Delaying Interface:延迟队列接口,基于Interface接口限制
  • RateLimtingInterface:限速接口队列

FIFO基本队列

位置:k8s.io/client-go/util/workqueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Interface interface {
//给队列添加元素
Add(item interface{})
// 获取队列长度
Len()
// 获取队列头部元素
Get() (item interface{}, shutdown bool)
// 标记队列该元素已经被处理
Done(item interface{})
// 关闭队列
ShutDown()
// 查询队列释放正在被关闭
ShuttingDown() bool
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
// 定义了处理项的顺序
queue []t

// dirty defines all of the items that need to be processed.
// 当前处理的集合
dirty set

// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set

cond *sync.Cond

shuttingDown bool

metrics queueMetrics

unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}

延迟队列

位置:k8s.io/client-go/util/workqueue/delaying_queue.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
// 指定持续时间之后向工作队列添加元素
AddAfter(item interface{}, duration time.Duration)
}


// delayingType wraps an Interface and provides delayed re-enquing
// 包装了一个接口提供延迟的查询
type delayingType struct {
Interface

// clock tracks time for delayed firing
// 时间跟踪延迟时间
clock clock.Clock

// stopCh lets us signal a shutdown to the waiting loop
// stopCh 等待循环的关机信号
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
// stopOnce保证只发出一次关机信号
stopOnce sync.Once

// heartbeat ensures we wait no more than maxWait before firing
// 确保我们触发不会超过max Wait
heartbeat clock.Ticker

// waitingForAddCh is a buffered channel that feeds waitingForAdd
// 提供waitingForAdd的缓冲通道
waitingForAddCh chan *waitFor

// metrics counts the number of retries
// 度量重试次数
metrics retryMetrics
}

// waitFor 保存要添加的数据和应该添加的时间
type waitFor struct {
data t // 要添加的元素数据
readyAt time.Time // 应该被添加的时间点
index int // 优先队列(heap)中的索引
}

限速队列

位置:k8s.io/client-go/util/workqueue/default_rate_limiters.go

1
2
3
4
5
6
// client-go/util/workqueue/default_rate_limiters.go
type RateLimiter interface {
When(item interface{}) time.Duration // 获取item元素应该等待多长时间
Forget(item interface{}) // 表示元素已经完成了重试,不管是成功还是失败都会停止跟踪,也就是抛弃该元素
NumRequeues(item interface{}) int // 返回元素失败的次数(也就是放入队列的次数)
}

主要提供了四种限速策略:

  1. 令牌桶算法(BucketRateLimiter)
  2. 排队指数算法(ItemExponential FailureRateLimter)
  3. 计数器算法(ItemFastSlowRateLimter)
  4. 混合模式(MaxOfRateLimter)多种限速算法混合使用

令牌桶算法

主要通过go的golang.org/x/time/rate实现。

往令牌桶中固定速率添加token,多余的token会溢出。每次从token从桶中取出。

1
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},

排队指数算法

指数排队算法,把相同的元素的排队数作为指数,排队数增大,速度限制呈指数级增长。但是最大不会超过maxDelay。元素排队的统计是有限制周期的,一个限速周期是指数从执行AddRateLimter方法到执行完Forget方法之间的速度。如果该元素被Forget方法处理完,则清空排队数。

位置:k8s.io/client-go/util/workqueue/default_rate_limiters.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1

// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}

calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}

return calculated

计数器算法

计数器算法就是限制一段时间内允许通过的元素数量。但是WorkQueue添加了fastslow速率

1
2
3
4
5
6
7
8
9
10
11
12
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

r.failures[item] = r.failures[item] + 1

if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}

return r.slowDelay
}