Alertmanager-告警发送

基于v0.28

启动顺序

sequenceDiagram
    participant Alert as 新告警
    participant Dispatcher as Dispatcher
    participant Route as Route
    participant AggrGroup as aggrGroup
    participant NotifyStage as notify.Stage
    participant Notifier as Notifier(邮件/微信等)

    Alert->>Dispatcher: 新告警流入
    Dispatcher->>Route: 路由树匹配
    Route-->>Dispatcher: 匹配到的 Route
    Dispatcher->>AggrGroup: 插入/创建分组
    Note over AggrGroup: run 协程定时触发通知
    AggrGroup->>NotifyStage: flush(alerts...) 触发通知管道
    loop 通知管道链路
        NotifyStage->>Notifier: 调用具体 Notifier 发送
        Notifier-->>NotifyStage: 返回发送结果
    end
    NotifyStage-->>AggrGroup: 返回整体通知结果
    AggrGroup->>AggrGroup: 删除已解决告警
    AggrGroup-->>Dispatcher: 分组生命周期管理

工作原理

flowchart TD
    A["收到告警分组"] --> B["GossipSettle<br/>等待集群稳定"]
    B --> C["InhibitStage<br/>抑制规则过滤"]
    C --> D["TimeActiveStage<br/>激活时间判断"]
    D --> E["TimeMuteStage<br/>静默时间判断"]
    E --> F["SilenceStage<br/>静默规则过滤"]
    F --> G["WaitStage<br/>等待/延迟"]
    G --> H["DedupStage<br/>去重"]
    H --> I["RetryStage<br/>失败重试"]
    I --> J["SetNotifiesStage<br/>记录通知日志"]
    J --> K["FanoutStage<br/>并发分发到各通知渠道"]
    K --> L["各通知集成(如邮件、Webhook、Slack等)"]

详细流程

万恶之源启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 构建通知管道
pipeline := pipelineBuilder.New(
receivers,
waitFunc,
inhibitor,
silencer,
intervener,
marker,
notificationLog,
pipelinePeer,
)

// New 构建每个receiver的通知流水线,流水线由多个Stage串联组成。
// 包括:集群同步、抑制、时间激活/静默、静默规则、实际通知分发等阶段。
func (pb *PipelineBuilder) New(
receivers map[string][]Integration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
intervener *timeinterval.Intervener,
marker types.GroupMarker,
notificationLog NotificationLog,
peer Peer,
) RoutingStage {
rs := make(RoutingStage, len(receivers))

ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor, pb.metrics)
tas := NewTimeActiveStage(intervener, marker, pb.metrics)
tms := NewTimeMuteStage(intervener, marker, pb.metrics)
ss := NewMuteStage(silencer, pb.metrics)

for name := range receivers {
st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
rs[name] = MultiStage{ms, is, tas, tms, ss, st}
}

pb.metrics.InitializeFor(receivers)

return rs
}


// createReceiverStage 为每个receiver创建分发阶段(FanoutStage),每个集成有独立的流水线:
// 1. WaitStage(等待) 2. DedupStage(去重) 3. RetryStage(重试) 4. SetNotifiesStage(记录日志)
func createReceiverStage(
name string,
integrations []Integration,
wait func() time.Duration,
notificationLog NotificationLog,
metrics *Metrics,
) Stage {
var fs FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
GroupName: name,
Integration: integrations[i].Name(),
Idx: uint32(integrations[i].Index()),
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
s = append(s, NewRetryStage(integrations[i], name, metrics))
s = append(s, NewSetNotifiesStage(notificationLog, recv))

fs = append(fs, s)
}
return fs
}

MuteStage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// MuteStage 通过Muter(如抑制器、静默器)过滤不应发送的告警。
type MuteStage struct {
muter types.Muter // 抑制器或静默器
metrics *Metrics // 指标收集器
}

// NewMuteStage return a new MuteStage.
func NewMuteStage(m types.Muter, metrics *Metrics) *MuteStage {
return &MuteStage{muter: m, metrics: metrics}
}

// Exec implements the Stage interface.
func (n *MuteStage) Exec(ctx context.Context, logger *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
filtered []*types.Alert
muted []*types.Alert
)
for _, a := range alerts {
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if muted.
if n.muter.Mutes(a.Labels) {
muted = append(muted, a)
} else {
filtered = append(filtered, a)
}
// TODO(fabxc): increment muted alerts counter if muted.
}
if len(muted) > 0 {

var reason string
switch n.muter.(type) {
case *silence.Silencer:
reason = SuppressedReasonSilence
case *inhibit.Inhibitor:
reason = SuppressedReasonInhibition
default:
}
n.metrics.numNotificationSuppressedTotal.WithLabelValues(reason).Add(float64(len(muted)))
logger.Debug("Notifications will not be sent for muted alerts", "alerts", fmt.Sprintf("%v", muted), "reason", reason)
}

return ctx, filtered, nil
}

TimeActiveStage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

// TimeActiveStage 判断当前时间是否处于激活区间,若否则丢弃所有告警。
type TimeActiveStage timeStage

func NewTimeActiveStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeActiveStage {
return &TimeActiveStage{muter, marker, metrics}
}

// Exec implements the stage interface for TimeActiveStage.
// Exec 实现 Stage 接口,用于 TimeActiveStage 阶段处理
// TimeActiveStage 判断当前时间是否处于激活区间,若否则丢弃所有告警
func (tas TimeActiveStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// 从上下文中获取路由ID
routeID, ok := RouteID(ctx)
if !ok {
return ctx, nil, errors.New("route ID missing")
}

// 从上下文中获取分组键
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}

// 从上下文中获取激活时间区间名称列表
activeTimeIntervalNames, ok := ActiveTimeIntervalNames(ctx)
if !ok {
return ctx, alerts, nil
}

// 如果没有定义任何激活时间区间,则默认总是激活状态
if len(activeTimeIntervalNames) == 0 {
return ctx, alerts, nil
}

// 从上下文中获取当前时间
now, ok := Now(ctx)
if !ok {
return ctx, alerts, errors.New("missing now timestamp")
}

// 检查当前时间是否在任何一个激活时间区间内
active, _, err := tas.muter.Mutes(activeTimeIntervalNames, now)
if err != nil {
return ctx, alerts, err
}

var mutedBy []string
if !active {
// 如果当前不在激活时间区间内,则记录被所有激活时间区间静默
mutedBy = activeTimeIntervalNames
}
// 更新分组标记状态
tas.marker.SetMuted(routeID, gkey, mutedBy)

// 如果当前不在激活时间区间内,丢弃所有告警并记录指标
if !active {
tas.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonActiveTimeInterval).Add(float64(len(alerts)))
l.Debug("Notifications not sent, route is not within active time", "alerts", len(alerts))
// 当前在激活时间区间内,继续处理告警
return ctx, nil, nil
}

return ctx, alerts, nil
}

WaitStage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// WaitStage 等待一段时间后再继续,防止通知风暴。
type WaitStage struct {
wait func() time.Duration // 等待时间的生成函数
}

// NewWaitStage returns a new WaitStage.
func NewWaitStage(wait func() time.Duration) *WaitStage {
return &WaitStage{
wait: wait,
}
}

// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
return ctx, nil, ctx.Err()
}
return ctx, alerts, nil
}

SetNotifiesStage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// SetNotifiesStage 记录已发送通知到通知日志,便于后续去重。
type SetNotifiesStage struct {
nflog NotificationLog // 通知日志
recv *nflogpb.Receiver // 接收者信息
}

// NewSetNotifiesStage returns a new instance of a SetNotifiesStage.
func NewSetNotifiesStage(l NotificationLog, recv *nflogpb.Receiver) *SetNotifiesStage {
return &SetNotifiesStage{
nflog: l,
recv: recv,
}
}

// Exec implements the Stage interface.
func (n SetNotifiesStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}

firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, errors.New("firing alerts missing")
}

resolved, ok := ResolvedAlerts(ctx)
if !ok {
return ctx, nil, errors.New("resolved alerts missing")
}

repeat, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}
expiry := 2 * repeat

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry)
}

NewDedupStage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// DedupStage 通过通知日志去重,避免重复发送同一组告警。
// 仅在需要时才允许通过,减少无效通知。
type DedupStage struct {
rs ResolvedSender // 是否发送已恢复告警的配置
nflog NotificationLog // 通知日志接口,用于查询历史通知记录
recv *nflogpb.Receiver // 接收者信息,包含接收者名称和集成类型

now func() time.Time // 获取当前时间的函数,便于测试
hash func(*types.Alert) uint64 // 计算告警hash的函数,用于标识唯一告警
}

// NewDedupStage 创建一个新的去重阶段实例
func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
return &DedupStage{
rs: rs,
nflog: l,
recv: recv,
now: utcNow, // 默认使用UTC时间
hash: hashAlert, // 默认使用xxhash算法
}
}

// utcNow 返回当前UTC时间
func utcNow() time.Time {
return time.Now().UTC()
}

// hashBuffer 用于缓存hash计算时的临时缓冲区
type hashBuffer struct {
buf []byte
}

// hashBuffers 缓冲池,用于复用hashBuffer,减少内存分配
var hashBuffers = sync.Pool{
New: func() interface{} { return &hashBuffer{buf: make([]byte, 0, 1024)} },
}

// hashAlert 计算告警的hash值,用于唯一标识告警
func hashAlert(a *types.Alert) uint64 {
const sep = '\xff' // 分隔符

hb := hashBuffers.Get().(*hashBuffer)
defer hashBuffers.Put(hb)
b := hb.buf[:0] // 复用缓冲区

// 收集所有标签名并排序,确保相同标签集总是生成相同的hash
names := make(model.LabelNames, 0, len(a.Labels))
for ln := range a.Labels {
names = append(names, ln)
}
sort.Sort(names)

// 将排序后的标签名和值拼接为字节序列
for _, ln := range names {
b = append(b, string(ln)...)
b = append(b, sep)
b = append(b, string(a.Labels[ln])...)
b = append(b, sep)
}

// 使用xxhash算法计算hash值
return xxhash.Sum64(b)
}

// needsUpdate 判断是否需要发送通知更新
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
// 如果没有历史记录,且当前有firing告警,则需要通知
if entry == nil {
return len(firing) > 0
}

// 如果当前firing告警不是历史firing告警的子集,则需要通知
if !entry.IsFiringSubset(firing) {
return true
}

// 如果没有firing告警,但有历史firing告警,则需要发送解决通知
if len(firing) == 0 {
return len(entry.FiringAlerts) > 0
}

// 如果配置了发送解决通知,且当前解决告警不是历史解决告警的子集,则需要通知
if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
return true
}

// 如果内容没有变化,但超过了重复通知间隔,也需要通知
return entry.Timestamp.Before(n.now().Add(-repeat))
}

// Exec 实现Stage接口,执行去重逻辑
func (n *DedupStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// 从上下文中获取分组键
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}

// 从上下文中获取重复通知间隔
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}

// 计算当前告警的hash,分为firing和resolved两类
firingSet := map[uint64]struct{}{}
resolvedSet := map[uint64]struct{}{}
firing := []uint64{}
resolved := []uint64{}

for _, a := range alerts {
hash := n.hash(a)
if a.Resolved() {
resolved = append(resolved, hash)
resolvedSet[hash] = struct{}{}
} else {
firing = append(firing, hash)
firingSet[hash] = struct{}{}
}
}

// 将firing和resolved告警hash存入上下文
ctx = WithFiringAlerts(ctx, firing)
ctx = WithResolvedAlerts(ctx, resolved)

// 查询通知日志中该分组的历史记录
entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
if err != nil && !errors.Is(err, nflog.ErrNotFound) {
return ctx, nil, err
}

// 处理查询结果
var entry *nflogpb.Entry
switch len(entries) {
case 0: // 没有历史记录
case 1: // 有单条历史记录
entry = entries[0]
default: // 有多条历史记录,返回错误
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}

// 判断是否需要发送通知
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
return ctx, alerts, nil
}
// 不需要发送通知,返回空切片
return ctx, nil, nil
}

RetryStage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// RetryStage 对通知失败的情况进行重试,采用指数退避策略。
// 仅在可重试错误时继续尝试,直到成功或超时。
type RetryStage struct {
integration Integration // 通知集成实例,包含具体的通知实现
groupName string // 告警分组名称,用于标识和日志记录
metrics *Metrics // 指标收集器,用于记录通知相关指标
labelValues []string // 指标标签值,用于区分不同的通知集成
}

// NewRetryStage 创建并返回一个新的RetryStage实例
func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage {
// 初始化基础标签值(集成名称)
labelValues := []string{i.Name()}

// 如果启用了接收者名称指标,则添加接收者名称作为额外标签
if metrics.ff.EnableReceiverNamesInMetrics() {
labelValues = append(labelValues, i.receiverName)
}

return &RetryStage{
integration: i,
groupName: groupName,
metrics: metrics,
labelValues: labelValues,
}
}

// Exec 实现Stage接口,执行重试逻辑
func (r RetryStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// 记录通知尝试指标
r.metrics.numNotifications.WithLabelValues(r.labelValues...).Inc()

// 执行实际的重试逻辑
ctx, alerts, err := r.exec(ctx, l, alerts...)

// 处理错误情况
failureReason := DefaultReason.String()
if err != nil {
// 尝试解析错误原因
var e *ErrorWithReason
if errors.As(err, &e) {
failureReason = e.Reason.String()
}
// 记录失败通知指标(包含原因)
r.metrics.numTotalFailedNotifications.WithLabelValues(append(r.labelValues, failureReason)...).Inc()
}

return ctx, alerts, err
}