Prometheus-notifier告警推送

基于v3.5

工作原理

sequenceDiagram
    participant Q as Queue
    participant M as Manager
    participant AM as Alertmanager
    participant AL as Alertmanager API
Q->>M: nextBatch()
M->>M: 分批处理告警
M->>AM: sendAll()
AM->>AM: 应用特定于 AM 的 relabel
AM->>AL: HTTP POST /api/v2/alerts
AL-->>AM: Response
AM->>M: 返回发送结果

关键特性

  1. 告警队列管理
    有界队列,防止内存无限增长
    支持队列满时的丢弃策略
    可配置的批处理大小
  2. Relabeling(重新标记)
    发送前对告警应用 relabel 配置
    支持每个 Alertmanager 集合的特定 relabel 规则
  3. 高可用性
    支持多个 Alertmanager 实例
    自动去重避免重复通知
    故障转移机制
  4. 服务发现集成
    与 Prometheus 服务发现机制集成
    动态更新 Alertmanager 实例列表
    这个 notifier 模块实现了 Prometheus 告警通知的核心功能,确保告警能够可靠地发送到 Alertmanager 集群

详细流程

告警控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Manager 是负责将警报通知分发到警报管理服务的结构体。
type Manager struct {
queue []*Alert // 存储待发送的警报队列
opts *Options // 管理器的配置选项

metrics *alertMetrics // 指标监控相关数据

more chan struct{} // 用于通知有更多警报需要处理的通道
mtx sync.RWMutex // 读写锁,保护队列和警报管理器集合的并发访问

stopOnce *sync.Once // 确保 Stop 方法只执行一次
stopRequested chan struct{} // 接收停止信号的通道

alertmanagers map[string]*alertmanagerSet // 存储不同配置对应的警报管理器集合
logger *slog.Logger // 日志记录器
}

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
		notifierManager = notifier.NewManager(&cfg.notifier, logger.With("component", "notifier"))

// NewManager 是管理器的构造函数。
func NewManager(o *Options, logger *slog.Logger) *Manager {
// 如果未设置请求发送函数,则使用默认的 do 函数
if o.Do == nil {
o.Do = do
}
// 如果未提供 MaxBatchSize 或其值小于等于 0,则设置为默认值 DefaultMaxBatchSize
if o.MaxBatchSize <= 0 {
o.MaxBatchSize = DefaultMaxBatchSize
}
// 如果未提供日志记录器,则使用空日志记录器(不记录任何日志)
if logger == nil {
logger = promslog.NewNopLogger()
}

// 初始化 Manager 实例
n := &Manager{
queue: make([]*Alert, 0, o.QueueCapacity), // 初始化警报队列,容量为配置的队列容量
more: make(chan struct{}, 1), // 创建一个缓冲大小为 1 的通道,用于通知有更多警报需要处理
stopRequested: make(chan struct{}), // 创建一个无缓冲通道,用于接收停止信号
stopOnce: &sync.Once{}, // 初始化 stopOnce,确保 Stop 方法只执行一次
opts: o, // 设置配置选项
logger: logger, // 设置日志记录器
}

// 定义获取当前队列长度的函数
queueLenFunc := func() float64 { return float64(n.queueLen()) }
// 定义获取已发现的 Alertmanager 数量的函数
alertmanagersDiscoveredFunc := func() float64 { return float64(len(n.Alertmanagers())) }

// 初始化指标监控相关数据
n.metrics = newAlertMetrics(
o.Registerer, // 注册指标的注册器
o.QueueCapacity, // 队列容量
queueLenFunc, // 获取队列长度的函数
alertmanagersDiscoveredFunc, // 获取已发现 Alertmanager 数量的函数
)

// 返回初始化完成的 Manager 实例
return n
}

Run启动

  • targetUpdateLoop 工作流程
sequenceDiagram
    participant TUL as targetUpdateLoop
    participant CH as tsets 通道
    participant R as reload()
    participant AM as Alertmanager 集合
loop 持续运行
    TUL->>CH: 监听目标组更新
    CH-->>TUL: 接收目标组数据
    TUL->>R: 调用 reload 方法
    R->>AM: 同步 Alertmanager 集合
end
  • sendLoop 工作流程
sequenceDiagram
    participant SL as sendLoop
    participant MC as more 通道
    participant SOB as sendOneBatch()
    participant SA as sendAll()
    participant DQ as drainQueue()
    participant SR as stopRequested 通道
loop 持续运行
    SL->>SR: 检查停止信号
    SR-->>SL: 未停止
    SL->>MC: 监听 more 通道
    MC-->>SL: 有数据
    SL->>SOB: 调用 sendOneBatch
    SOB->>SA: 调用 sendAll 发送警报
    SA-->>SOB: 发送完成
    SOB->>SL: 检查队列是否还有数据
end

SR-->>SL: 收到停止信号
SL->>DQ: 调用 drainQueue 排空队列
  • sendAll() 并发发送机制
graph LR
    SA[sendAll] --> A[遍历 Alertmanager 集合]
    A --> B{集合是否为空}
    B -- 不为空 --> C[应用 Alert Relabel]
    C --> D[转换为 OpenAPI 格式]
    D --> E[JSON 序列化]
    E --> F[遍历集合中的实例]
    F --> G[启动并发 Goroutine]
    G --> H[发送 HTTP 请求]
    H --> I[更新指标]
    I --> J[等待所有 Goroutine 完成]
    J --> K[返回发送结果]
B -- 为空 --> A
  • 队列管理机制
graph TD
    S[Send 方法] --> RL[应用 Relabel]
    RL --> CC{检查容量}
    CC -- 超过 --> DO[丢弃旧警报]
    CC -- 未超过 --> AQ[添加到队列]
    DO --> AQ
    AQ --> SM[发送 more 信号]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// Run dispatches notifications continuously, returning once Stop has been called and all
// pending notifications have been drained from the queue (if draining is enabled).
//
// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other.
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
// Run dispatches notifications continuously, returning once Stop has been called and all
// pending notifications have been drained from the queue (if draining is enabled).
//
// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other.
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
// 创建一个WaitGroup用于等待两个goroutine完成
wg := sync.WaitGroup{}
// 添加两个任务到WaitGroup中
wg.Add(2)

// 启动第一个goroutine处理目标更新
go func() {
// goroutine结束时调用Done减少WaitGroup计数
defer wg.Done()
// 处理目标更新循环
n.targetUpdateLoop(tsets)
}()

// 启动第二个goroutine处理通知发送
go func() {
// goroutine结束时调用Done减少WaitGroup计数
defer wg.Done()
// 处理通知发送循环
n.sendLoop()
// 发送循环结束后排空队列
n.drainQueue()
}()

// 等待两个goroutine都完成
wg.Wait()
// 记录通知管理器已停止的日志
n.logger.Info("Notification manager stopped")
}


// sendLoop continuously consumes the notifications queue and sends alerts to
// the configured Alertmanagers.
// sendLoop 持续消费通知队列并将警报发送到配置的 Alertmanager。
func (n *Manager) sendLoop() {
for {
// If we've been asked to stop, that takes priority over sending any further notifications.
// 如果收到停止请求,则优先处理停止,不再发送任何通知。
select {
case <-n.stopRequested:
return
default:
// 嵌套select确保在处理消息前再次检查是否需要停止
select {
case <-n.stopRequested:
return

case <-n.more:
// 发送一批警报通知
n.sendOneBatch()

// If the queue still has items left, kick off the next iteration.
// 如果队列中还有剩余项,则触发下一次迭代。
if n.queueLen() > 0 {
n.setMore()
}
}
}
}
}[]