Alertmanager-告警发送

基于v0.28

启动顺序

sequenceDiagram
participant Alert as 新告警
participant Dispatcher as Dispatcher
participant Route as Route
participant AggrGroup as aggrGroup
participant NotifyStage as notify.Stage
participant Notifier as Notifier(邮件/微信等)

Alert->>Dispatcher: 新告警流入
Dispatcher->>Route: 路由树匹配
Route-->>Dispatcher: 匹配到的 Route
Dispatcher->>AggrGroup: 插入/创建分组
Note over AggrGroup: run 协程定时触发通知
AggrGroup->>NotifyStage: flush(alerts...) 触发通知管道
loop 通知管道链路
NotifyStage->>Notifier: 调用具体 Notifier 发送
Notifier-->>NotifyStage: 返回发送结果
end
NotifyStage-->>AggrGroup: 返回整体通知结果
AggrGroup->>AggrGroup: 删除已解决告警
AggrGroup-->>Dispatcher: 分组生命周期管理

工作原理

flowchart TD
A["收到告警分组"] --> B["GossipSettle<br/>等待集群稳定"]
B --> C["InhibitStage<br/>抑制规则过滤"]
C --> D["TimeActiveStage<br/>激活时间判断"]
D --> E["TimeMuteStage<br/>静默时间判断"]
E --> F["SilenceStage<br/>静默规则过滤"]
F --> G["WaitStage<br/>等待/延迟"]
G --> H["DedupStage<br/>去重"]
H --> I["RetryStage<br/>失败重试"]
I --> J["SetNotifiesStage<br/>记录通知日志"]
J --> K["FanoutStage<br/>并发分发到各通知渠道"]
K --> L["各通知集成(如邮件、Webhook、Slack等)"]

详细流程

万恶之源启动

// 构建通知管道
pipeline := pipelineBuilder.New(
receivers,
waitFunc,
inhibitor,
silencer,
intervener,
marker,
notificationLog,
pipelinePeer,
)

// New 构建每个receiver的通知流水线,流水线由多个Stage串联组成。
// 包括:集群同步、抑制、时间激活/静默、静默规则、实际通知分发等阶段。
func (pb *PipelineBuilder) New(
receivers map[string][]Integration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
intervener *timeinterval.Intervener,
marker types.GroupMarker,
notificationLog NotificationLog,
peer Peer,
) RoutingStage {
rs := make(RoutingStage, len(receivers))

ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor, pb.metrics)
tas := NewTimeActiveStage(intervener, marker, pb.metrics)
tms := NewTimeMuteStage(intervener, marker, pb.metrics)
ss := NewMuteStage(silencer, pb.metrics)

for name := range receivers {
st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
rs[name] = MultiStage{ms, is, tas, tms, ss, st}
}

pb.metrics.InitializeFor(receivers)

return rs
}


// createReceiverStage 为每个receiver创建分发阶段(FanoutStage),每个集成有独立的流水线:
// 1. WaitStage(等待) 2. DedupStage(去重) 3. RetryStage(重试) 4. SetNotifiesStage(记录日志)
func createReceiverStage(
name string,
integrations []Integration,
wait func() time.Duration,
notificationLog NotificationLog,
metrics *Metrics,
) Stage {
var fs FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
GroupName: name,
Integration: integrations[i].Name(),
Idx: uint32(integrations[i].Index()),
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
s = append(s, NewRetryStage(integrations[i], name, metrics))
s = append(s, NewSetNotifiesStage(notificationLog, recv))

fs = append(fs, s)
}
return fs
}

MuteStage

// MuteStage 通过Muter(如抑制器、静默器)过滤不应发送的告警。
type MuteStage struct {
muter types.Muter // 抑制器或静默器
metrics *Metrics // 指标收集器
}

// NewMuteStage return a new MuteStage.
func NewMuteStage(m types.Muter, metrics *Metrics) *MuteStage {
return &MuteStage{muter: m, metrics: metrics}
}

// Exec implements the Stage interface.
func (n *MuteStage) Exec(ctx context.Context, logger *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
filtered []*types.Alert
muted []*types.Alert
)
for _, a := range alerts {
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if muted.
if n.muter.Mutes(a.Labels) {
muted = append(muted, a)
} else {
filtered = append(filtered, a)
}
// TODO(fabxc): increment muted alerts counter if muted.
}
if len(muted) > 0 {

var reason string
switch n.muter.(type) {
case *silence.Silencer:
reason = SuppressedReasonSilence
case *inhibit.Inhibitor:
reason = SuppressedReasonInhibition
default:
}
n.metrics.numNotificationSuppressedTotal.WithLabelValues(reason).Add(float64(len(muted)))
logger.Debug("Notifications will not be sent for muted alerts", "alerts", fmt.Sprintf("%v", muted), "reason", reason)
}

return ctx, filtered, nil
}

TimeActiveStage


// TimeActiveStage 判断当前时间是否处于激活区间,若否则丢弃所有告警。
type TimeActiveStage timeStage

func NewTimeActiveStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeActiveStage {
return &TimeActiveStage{muter, marker, metrics}
}

// Exec implements the stage interface for TimeActiveStage.
// Exec 实现 Stage 接口,用于 TimeActiveStage 阶段处理
// TimeActiveStage 判断当前时间是否处于激活区间,若否则丢弃所有告警
func (tas TimeActiveStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// 从上下文中获取路由ID
routeID, ok := RouteID(ctx)
if !ok {
return ctx, nil, errors.New("route ID missing")
}

// 从上下文中获取分组键
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}

// 从上下文中获取激活时间区间名称列表
activeTimeIntervalNames, ok := ActiveTimeIntervalNames(ctx)
if !ok {
return ctx, alerts, nil
}

// 如果没有定义任何激活时间区间,则默认总是激活状态
if len(activeTimeIntervalNames) == 0 {
return ctx, alerts, nil
}

// 从上下文中获取当前时间
now, ok := Now(ctx)
if !ok {
return ctx, alerts, errors.New("missing now timestamp")
}

// 检查当前时间是否在任何一个激活时间区间内
active, _, err := tas.muter.Mutes(activeTimeIntervalNames, now)
if err != nil {
return ctx, alerts, err
}

var mutedBy []string
if !active {
// 如果当前不在激活时间区间内,则记录被所有激活时间区间静默
mutedBy = activeTimeIntervalNames
}
// 更新分组标记状态
tas.marker.SetMuted(routeID, gkey, mutedBy)

// 如果当前不在激活时间区间内,丢弃所有告警并记录指标
if !active {
tas.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonActiveTimeInterval).Add(float64(len(alerts)))
l.Debug("Notifications not sent, route is not within active time", "alerts", len(alerts))
// 当前在激活时间区间内,继续处理告警
return ctx, nil, nil
}

return ctx, alerts, nil
}

WaitStage

// WaitStage 等待一段时间后再继续,防止通知风暴。
type WaitStage struct {
wait func() time.Duration // 等待时间的生成函数
}

// NewWaitStage returns a new WaitStage.
func NewWaitStage(wait func() time.Duration) *WaitStage {
return &WaitStage{
wait: wait,
}
}

// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
return ctx, nil, ctx.Err()
}
return ctx, alerts, nil
}

SetNotifiesStage

// SetNotifiesStage 记录已发送通知到通知日志,便于后续去重。
type SetNotifiesStage struct {
nflog NotificationLog // 通知日志
recv *nflogpb.Receiver // 接收者信息
}

// NewSetNotifiesStage returns a new instance of a SetNotifiesStage.
func NewSetNotifiesStage(l NotificationLog, recv *nflogpb.Receiver) *SetNotifiesStage {
return &SetNotifiesStage{
nflog: l,
recv: recv,
}
}

// Exec implements the Stage interface.
func (n SetNotifiesStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}

firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, errors.New("firing alerts missing")
}

resolved, ok := ResolvedAlerts(ctx)
if !ok {
return ctx, nil, errors.New("resolved alerts missing")
}

repeat, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}
expiry := 2 * repeat

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry)
}

NewDedupStage

// DedupStage 通过通知日志去重,避免重复发送同一组告警。
// 仅在需要时才允许通过,减少无效通知。
type DedupStage struct {
rs ResolvedSender // 是否发送已恢复告警的配置
nflog NotificationLog // 通知日志接口,用于查询历史通知记录
recv *nflogpb.Receiver // 接收者信息,包含接收者名称和集成类型

now func() time.Time // 获取当前时间的函数,便于测试
hash func(*types.Alert) uint64 // 计算告警hash的函数,用于标识唯一告警
}

// NewDedupStage 创建一个新的去重阶段实例
func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
return &DedupStage{
rs: rs,
nflog: l,
recv: recv,
now: utcNow, // 默认使用UTC时间
hash: hashAlert, // 默认使用xxhash算法
}
}

// utcNow 返回当前UTC时间
func utcNow() time.Time {
return time.Now().UTC()
}

// hashBuffer 用于缓存hash计算时的临时缓冲区
type hashBuffer struct {
buf []byte
}

// hashBuffers 缓冲池,用于复用hashBuffer,减少内存分配
var hashBuffers = sync.Pool{
New: func() interface{} { return &hashBuffer{buf: make([]byte, 0, 1024)} },
}

// hashAlert 计算告警的hash值,用于唯一标识告警
func hashAlert(a *types.Alert) uint64 {
const sep = '\xff' // 分隔符

hb := hashBuffers.Get().(*hashBuffer)
defer hashBuffers.Put(hb)
b := hb.buf[:0] // 复用缓冲区

// 收集所有标签名并排序,确保相同标签集总是生成相同的hash
names := make(model.LabelNames, 0, len(a.Labels))
for ln := range a.Labels {
names = append(names, ln)
}
sort.Sort(names)

// 将排序后的标签名和值拼接为字节序列
for _, ln := range names {
b = append(b, string(ln)...)
b = append(b, sep)
b = append(b, string(a.Labels[ln])...)
b = append(b, sep)
}

// 使用xxhash算法计算hash值
return xxhash.Sum64(b)
}

// needsUpdate 判断是否需要发送通知更新
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
// 如果没有历史记录,且当前有firing告警,则需要通知
if entry == nil {
return len(firing) > 0
}

// 如果当前firing告警不是历史firing告警的子集,则需要通知
if !entry.IsFiringSubset(firing) {
return true
}

// 如果没有firing告警,但有历史firing告警,则需要发送解决通知
if len(firing) == 0 {
return len(entry.FiringAlerts) > 0
}

// 如果配置了发送解决通知,且当前解决告警不是历史解决告警的子集,则需要通知
if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
return true
}

// 如果内容没有变化,但超过了重复通知间隔,也需要通知
return entry.Timestamp.Before(n.now().Add(-repeat))
}

// Exec 实现Stage接口,执行去重逻辑
func (n *DedupStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// 从上下文中获取分组键
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}

// 从上下文中获取重复通知间隔
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}

// 计算当前告警的hash,分为firing和resolved两类
firingSet := map[uint64]struct{}{}
resolvedSet := map[uint64]struct{}{}
firing := []uint64{}
resolved := []uint64{}

for _, a := range alerts {
hash := n.hash(a)
if a.Resolved() {
resolved = append(resolved, hash)
resolvedSet[hash] = struct{}{}
} else {
firing = append(firing, hash)
firingSet[hash] = struct{}{}
}
}

// 将firing和resolved告警hash存入上下文
ctx = WithFiringAlerts(ctx, firing)
ctx = WithResolvedAlerts(ctx, resolved)

// 查询通知日志中该分组的历史记录
entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
if err != nil && !errors.Is(err, nflog.ErrNotFound) {
return ctx, nil, err
}

// 处理查询结果
var entry *nflogpb.Entry
switch len(entries) {
case 0: // 没有历史记录
case 1: // 有单条历史记录
entry = entries[0]
default: // 有多条历史记录,返回错误
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}

// 判断是否需要发送通知
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
return ctx, alerts, nil
}
// 不需要发送通知,返回空切片
return ctx, nil, nil
}

RetryStage

// RetryStage 对通知失败的情况进行重试,采用指数退避策略。
// 仅在可重试错误时继续尝试,直到成功或超时。
type RetryStage struct {
integration Integration // 通知集成实例,包含具体的通知实现
groupName string // 告警分组名称,用于标识和日志记录
metrics *Metrics // 指标收集器,用于记录通知相关指标
labelValues []string // 指标标签值,用于区分不同的通知集成
}

// NewRetryStage 创建并返回一个新的RetryStage实例
func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage {
// 初始化基础标签值(集成名称)
labelValues := []string{i.Name()}

// 如果启用了接收者名称指标,则添加接收者名称作为额外标签
if metrics.ff.EnableReceiverNamesInMetrics() {
labelValues = append(labelValues, i.receiverName)
}

return &RetryStage{
integration: i,
groupName: groupName,
metrics: metrics,
labelValues: labelValues,
}
}

// Exec 实现Stage接口,执行重试逻辑
func (r RetryStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// 记录通知尝试指标
r.metrics.numNotifications.WithLabelValues(r.labelValues...).Inc()

// 执行实际的重试逻辑
ctx, alerts, err := r.exec(ctx, l, alerts...)

// 处理错误情况
failureReason := DefaultReason.String()
if err != nil {
// 尝试解析错误原因
var e *ErrorWithReason
if errors.As(err, &e) {
failureReason = e.Reason.String()
}
// 记录失败通知指标(包含原因)
r.metrics.numTotalFailedNotifications.WithLabelValues(append(r.labelValues, failureReason)...).Inc()
}

return ctx, alerts, err
}