Alertmanager-dispatch告警分组

基于v0.28

总体流程

flowchart TD
A[告警流入 Alertmanager] --> B[dispatch 路由树匹配]
B --> C{匹配到哪些 Route?}
C -- 无匹配 --> Z[丢弃或默认处理]
C -- 有匹配 --> D[分组聚合(按 group_by 标签)]
D --> E[分组等待 group_wait]
E --> F[分组内聚合告警]
F --> G{到达 group_interval 或有新告警?}
G -- 否 --> F
G -- 是 --> H[触发通知管道]
H --> I[通知发送(邮件/微信/钉钉等)]
I --> J[分组生命周期管理]
J --> K{分组内还有活跃告警?}
K -- 有 --> F
K -- 无 --> L[销毁分组,释放资源]

subgraph 说明
direction LR
X1[Route: 路由规则,决定分组、接收器、静默等]
X2[group_by: 分组标签]
X3[group_wait: 首次通知等待]
X4[group_interval: 分组内通知间隔]
X5[repeat_interval: 重复通知间隔]
X1 --> X2
X1 --> X3
X1 --> X4
X1 --> X5
end

style H fill:#f9f,stroke:#333,stroke-width:2px
style I fill:#bbf,stroke:#333,stroke-width:2px
style D fill:#ffd,stroke:#333,stroke-width:1px
style F fill:#ffd,stroke:#333,stroke-width:1px
style G fill:#ffd,stroke:#333,stroke-width:1px
style J fill:#ffd,stroke:#333,stroke-width:1px
style K fill:#ffd,stroke:#333,stroke-width:1px
style L fill:#eee,stroke:#333,stroke-width:1px
style Z fill:#eee,stroke:#333,stroke-width:1px

三大参数

sequenceDiagram
participant Alert as 新告警到达
participant AM as Alertmanager
participant User as 通知接收者

Alert->>AM: 新分组第1条告警
Note right of AM: 进入 group_wait 等待期
AM-->>AM: 收敛更多同分组告警
AM->>User: group_wait 到期后首次通知

Alert->>AM: 分组内又有新告警
Note right of AM: 若距离上次通知 < group_interval,等待
AM->>User: group_interval 到期后再次通知

loop 分组内有未恢复告警
AM->>User: repeat_interval 到期后重复通知
end
graph LR
A[group_wait] -->|首次收敛窗口| B[首次通知]
B -->|新告警| C{距离上次通知}
C -- < group_interval --> D[等待]
C -- ≥ group_interval --> E[追加通知]
E -->|未恢复| F{距离上次重复}
F -- < repeat_interval --> G[等待]
F -- ≥ repeat_interval --> H[重复通知]

运行原理

启动顺序

sequenceDiagram
participant Alert as 新告警
participant Dispatcher as Dispatcher
participant Route as Route
participant GroupMap as aggrGroupsPerRoute
participant AggrGroup as aggrGroup
participant Notify as 通知管道

Alert->>Dispatcher: 新告警流入
Dispatcher->>Route: 路由树匹配
Route-->>Dispatcher: 匹配到的 Route 列表
loop 对每个匹配 Route
Dispatcher->>GroupMap: 查找/创建分组 (groupLabels)
alt 分组已存在
Dispatcher->>AggrGroup: insert(alert)
else 新分组
Dispatcher->>GroupMap: 新建 aggrGroup
Dispatcher->>AggrGroup: insert(alert)
AggrGroup->>AggrGroup: 启动 run 协程,定时通知
end
end

Note over AggrGroup: run 协程定时触发通知
AggrGroup->>Notify: flush(alerts...) 触发通知
Notify-->>AggrGroup: 通知结果
AggrGroup->>AggrGroup: 删除已解决告警

Note over Dispatcher,AggrGroup: Dispatcher 定期 doMaintenance 清理空分组
Dispatcher->>GroupMap: 检查分组是否为空
alt 分组为空
Dispatcher->>AggrGroup: stop()
Dispatcher->>GroupMap: 删除分组
end

分组告警器

// Dispatcher 负责将告警分组、聚合、路由、分发到通知管道。
type Dispatcher struct {
route *Route // 路由树根节点
alerts provider.Alerts // 告警数据源
stage notify.Stage // 通知管道(Stage)
marker types.GroupMarker // 分组状态标记器
metrics *DispatcherMetrics // 监控指标
limits Limits // 分组数量限制

timeout func(time.Duration) time.Duration // 通知超时控制

mtx sync.RWMutex
aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup // 路由-分组映射
aggrGroupsNum int // 当前分组总数

done chan struct{} // 停止信号
ctx context.Context // 主 context
cancel func() // 停止函数

logger *slog.Logger // 日志
}

万恶之源启动

	var disp *dispatch.Dispatcher
defer func() {
disp.Stop()
}()
// 启动dispatcher和inhibitor主循环
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics)
routes.Walk(func(r *dispatch.Route) {
if r.RouteOpts.RepeatInterval > *retention {
configLogger.Warn(
"repeat_interval is greater than the data retention period. It can lead to notifications being repeated more often than expected.",
"repeat_interval",
r.RouteOpts.RepeatInterval,
"retention",
*retention,
"route",
r.Key(),
)
}

go disp.Run()


// Run 启动 Dispatcher,开始处理流入的告警。
func (d *Dispatcher) Run() {
d.done = make(chan struct{})

d.mtx.Lock()
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsNum = 0
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()

d.run(d.alerts.Subscribe())
close(d.done)
}

处理告警流

// run 主循环,处理告警流和定期维护。
func (d *Dispatcher) run(it provider.AlertIterator) {
maintenance := time.NewTicker(30 * time.Second)
defer maintenance.Stop()

defer it.Close()

for {
select {
case alert, ok := <-it.Next():
if !ok {
// 迭代器关闭,可能是数据源关闭或出错
if err := it.Err(); err != nil {
d.logger.Error("Error on alert update", "err", err)
}
return
}

d.logger.Debug("Received alert", "alert", alert)

// 记录错误但继续处理
if err := it.Err(); err != nil {
d.logger.Error("Error on alert update", "err", err)
continue
}

now := time.Now()
for _, r := range d.route.Match(alert.Labels) {
d.processAlert(alert, r)
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())

case <-maintenance.C:
d.doMaintenance()
case <-d.ctx.Done():
return
}
}
}

路由匹配

// Route 表示路由树中的一个节点,定义了如何匹配和处理告警。
// 每个 Route 可以有多个子路由,形成树状结构。
type Route struct {
parent *Route // 父节点指针,便于递归和路径生成

// 当前路由节点的配置参数(如分组、接收器、时间区间等)
RouteOpts RouteOpts

// 匹配器集合,只有满足这些匹配条件的告警才会被此路由处理
Matchers labels.Matchers

// Continue 表示如果为 true,告警匹配到本节点后还会继续匹配同级其他路由
Continue bool

// 子路由列表,按配置顺序排列
Routes []*Route
}

// Match 对路由树进行深度优先(左-右)搜索,返回所有匹配给定标签集的路由节点。
// lset: 告警的标签集合
// 返回: 匹配到的所有路由节点(可能有多个,按树结构顺序)
func (r *Route) Match(lset model.LabelSet) []*Route {
if !r.Matchers.Matches(lset) {
return nil // 当前节点不匹配,直接返回
}

var all []*Route

for _, cr := range r.Routes {
matches := cr.Match(lset)

all = append(all, matches...)

// 如果子节点匹配且 Continue=false,则不再继续同级其他子节点
if matches != nil && !cr.Continue {
break
}
}

// 如果没有任何子节点匹配,则当前节点本身为匹配节点
if len(all) == 0 {
all = append(all, r)
}

return all
}

告警插入

// processAlert 负责将告警插入到对应分组(aggrGroup),如无则新建分组。
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
groupLabels := getGroupLabels(alert, route)

fp := groupLabels.Fingerprint()

d.mtx.Lock()
defer d.mtx.Unlock()

routeGroups, ok := d.aggrGroupsPerRoute[route]
if !ok {
routeGroups = map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsPerRoute[route] = routeGroups
}

ag, ok := routeGroups[fp]
if ok {
ag.insert(alert)
return
}

// 新分组前先检查分组上限
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
d.metrics.aggrGroupLimitReached.Inc()
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
return
}

ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()

// 先插入首条告警,再启动分组的 run 协程,保证 run 时分组已存在告警
ag.insert(alert)

go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
logger := d.logger.With("num_alerts", len(alerts), "err", err)
if errors.Is(ctx.Err(), context.Canceled) {
// reload/shutdown 时 context 被取消,降级为 debug 日志
logger.Debug("Notify for alerts failed")
} else {
logger.Error("Notify for alerts failed")
}
}
return err == nil
})
}

// insert 插入告警到分组。
func (ag *aggrGroup) insert(alert *types.Alert) {
if err := ag.alerts.Set(alert); err != nil {
ag.logger.Error("error on set alert", "err", err)
}

// 如果该告警的等待时间已过,立即触发通知
ag.mtx.Lock()
defer ag.mtx.Unlock()
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
ag.next.Reset(0)
}
}

定时触发

// run 启动分组的通知循环,定时触发通知。
func (ag *aggrGroup) run(nf notifyFunc) {
defer close(ag.done)
defer ag.next.Stop()

for {
select {
case now := <-ag.next.C:
// 到达通知时间,构造 context 并触发通知
ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))

// 以定时器时间为准,保证通知时序一致
ctx = notify.WithNow(ctx, now)

// 填充通知上下文信息
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithRouteID(ctx, ag.routeID)

// 重置定时器,准备下次通知
ag.mtx.Lock()
ag.next.Reset(ag.opts.GroupInterval)
ag.hasFlushed = true
ag.mtx.Unlock()

ag.flush(func(alerts ...*types.Alert) bool {
return nf(ctx, alerts...)
})

cancel()

case <-ag.ctx.Done():
return
}
}
}

触发告警

// flush 触发通知并清理已解决的告警。
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
if ag.empty() {
return
}

var (
alerts = ag.alerts.List()
alertsSlice = make(types.AlertSlice, 0, len(alerts))
resolvedSlice = make(types.AlertSlice, 0, len(alerts))
now = time.Now()
)
for _, alert := range alerts {
a := *alert
// 保证已解决的告警不会再次被通知
if a.ResolvedAt(now) {
resolvedSlice = append(resolvedSlice, &a)
} else {
a.EndsAt = time.Time{}
}
alertsSlice = append(alertsSlice, &a)
}
sort.Stable(alertsSlice)

ag.logger.Debug("flushing", "alerts", fmt.Sprintf("%v", alertsSlice))

if notify(alertsSlice...) {
// 通知成功后,删除所有已解决的告警,避免重复通知
if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil {
ag.logger.Error("error on delete alerts", "err", err)
}
}
}