Alertmanager-inhibitor告警抑制

基于v0.28

总体流程

flowchart TD
A[告警流入 Alertmanager] --> B{是否匹配抑制规则?}
B -- 否 --> E[正常进入分发/通知流程]
B -- 是 --> C[遍历所有 InhibitRule]
C --> D{目标告警是否匹配 TargetMatchers?}
D -- 否 --> C
D -- 是 --> F{是否存在源告警满足 SourceMatchers 且 Equal 标签相等?}
F -- 否 --> C
F -- 是 --> G[该告警被抑制, 不再通知]
C -->|遍历完所有规则| H{是否被抑制?}
H -- 是 --> G
H -- 否 --> E

subgraph 规则说明
direction LR
X1[InhibitRule: 定义 SourceMatchers, TargetMatchers, Equal标签]
X2[SourceMatchers: 源告警匹配条件]
X3[TargetMatchers: 目标告警匹配条件]
X4[Equal: 源/目标需相等的标签名]
X1 --> X2
X1 --> X3
X1 --> X4
end

style G fill:#f9f,stroke:#333,stroke-width:2px
style E fill:#bbf,stroke:#333,stroke-width:2px
style B fill:#ffd,stroke:#333,stroke-width:1px
style D fill:#ffd,stroke:#333,stroke-width:1px
style F fill:#ffd,stroke:#333,stroke-width:1px
style H fill:#ffd,stroke:#333,stroke-width:1px

原理

告警抑制器

// Inhibitor 负责根据当前活跃告警和抑制规则,判断某个标签集合(告警)是否应被抑制(muted)。
// 实现了 Muter 接口。
// alerts: 告警提供者,负责订阅和获取活跃告警
// rules: 抑制规则集合
// marker: 告警标记器,用于标记告警状态(如 inhibited)
// logger: 日志记录器
// mtx: 用于并发控制的读写锁
// cancel: 停止后台协程的取消函数
type Inhibitor struct {
alerts provider.Alerts // 告警数据源
rules []*InhibitRule // 抑制规则集合
marker types.AlertMarker // 告警标记器
logger *slog.Logger // 日志

mtx sync.RWMutex // 并发控制
cancel func() // 停止后台任务的取消函数
}

告警抑制启动inhibitor.Run()

// Run 启动 Inhibitor 的后台处理逻辑,包括规则缓存维护和告警监听。
func (ih *Inhibitor) Run() {
var (
g run.Group
ctx context.Context
)

ih.mtx.Lock()
ctx, ih.cancel = context.WithCancel(context.Background())
ih.mtx.Unlock()
runCtx, runCancel := context.WithCancel(ctx)

// 启动每条规则的缓存维护协程,定期清理过期告警
for _, rule := range ih.rules {
go rule.scache.Run(runCtx, 15*time.Minute)
}

g.Add(func() error {
ih.run(runCtx)
return nil
}, func(err error) {
runCancel()
})

if err := g.Run(); err != nil {
ih.logger.Warn("error running inhibitor", "err", err)
}
}

抑制规则启动

告警抑制规则

// Alerts 结构体提供了基于内存的告警存储,支持并发安全、定期 GC、回调等功能。
// 主要用于 Alertmanager 内部的活跃告警缓存和抑制模块的源告警缓存。
//
// c: 以告警指纹为 key 的告警 map
// cb: GC 后的回调函数,参数为被移除的已解决告警切片
type Alerts struct {
sync.Mutex
c map[model.Fingerprint]*types.Alert // 告警存储,key 为指纹
cb func([]types.Alert) // GC 回调函数
}

运行

// 启动每条规则的缓存维护协程,定期清理过期告警
for _, rule := range ih.rules {
go rule.scache.Run(runCtx, 15*time.Minute)
}


// Run 启动 GC 循环,定期清理已解决的告警。
// ctx: 上下文,用于取消 GC 循环
// interval: GC 间隔,必须大于零,否则 panic
func (a *Alerts) Run(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
a.GC()
}
}
}

// GC 执行垃圾回收,移除所有已解决的告警,并返回这些告警。
// 返回值: 被移除的已解决告警切片
func (a *Alerts) GC() []types.Alert {
a.Lock()
var resolved []types.Alert
for fp, alert := range a.c {
if alert.Resolved() {
delete(a.c, fp)
resolved = append(resolved, types.Alert{
Alert: model.Alert{
Labels: alert.Labels.Clone(),
Annotations: alert.Annotations.Clone(),
StartsAt: alert.StartsAt,
EndsAt: alert.EndsAt,
GeneratorURL: alert.GeneratorURL,
},
UpdatedAt: alert.UpdatedAt,
Timeout: alert.Timeout,
})
}
}
a.Unlock()
a.cb(resolved)
return resolved
}

监听抑制规则

// run 负责监听告警流的变化,并实时更新抑制规则的缓存。
// ctx: 上下文,用于取消监听
func (ih *Inhibitor) run(ctx context.Context) {
it := ih.alerts.Subscribe() // 订阅告警流
defer it.Close()

for {
select {
case <-ctx.Done():
return
case a := <-it.Next():
if err := it.Err(); err != nil {
ih.logger.Error("Error iterating alerts", "err", err)
continue
}
// 更新每条抑制规则的源告警缓存
for _, r := range ih.rules {
if r.SourceMatchers.Matches(a.Labels) {
if err := r.scache.Set(a); err != nil {
ih.logger.Error("error on set alert", "err", err)
}
}
}
}
}
}

// Alerts 提供对一组告警的访问,所有方法均为 goroutine 安全。
// 该结构体是 Alertmanager 内存告警存储的核心实现,支持订阅、GC、并发写入、回调等。
//
// cancel: 用于关闭后台 GC 协程的取消函数
// mtx: 并发互斥锁
// alerts: 实际存储告警的 Alerts(store 层)
// marker: 告警状态标记器
// listeners: 订阅告警变化的监听器集合
// next: 下一个监听器的自增 id
// callback: 存储前/后/删除的回调接口
// logger: 日志对象
type Alerts struct {
cancel context.CancelFunc

mtx sync.Mutex

alerts *store.Alerts
marker types.AlertMarker

listeners map[int]listeningAlerts
next int

callback AlertStoreCallback

logger *slog.Logger
}


// Subscribe 返回一个活跃告警的迭代器,包含所有未解决且未通知的告警。
// 结果不保证有序。
func (a *Alerts) Subscribe() provider.AlertIterator {
a.mtx.Lock()
defer a.mtx.Unlock()
var (
done = make(chan struct{})
alerts = a.alerts.List()
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
)

for _, a := range alerts {
ch <- a
}

a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
a.next++

return provider.NewAlertIterator(ch, done, nil)
}