Prometheus-Rules告警评估

基于v3.5

工作原理

graph TD
    A[Rule Files] --> B[Manager.LoadGroups]
    B --> C[Parse Rule Groups]
    C --> D[Create Rule Objects]
    D --> E[Analyse Rule Dependencies]
    E --> F[Build Dependency Map]
    F --> G[Set Rule Dependencies]
    
    H[Manager.Run] --> I[Group Evaluation Loop]
    I --> J[Split Rules into Batches]
    J --> K{Concurrent Evaluation Enabled?}
    K -->|Yes| L[Concurrent Rule Execution]
    K -->|No| M[Sequential Rule Execution]
    
    L --> N[Acquire Concurrency Slot]
    N --> O[Evaluate Rule]
    O --> P[Release Concurrency Slot]
    P --> Q[Gather Results]
    
    M --> R[Evaluate Rules Sequentially]
    R --> Q
    
    Q --> S[Store Results]
    S --> T{Alert Rule?}
    T -->|Yes| U[Check Alert Conditions]
    U --> V[Send Alerts if Needed]
    T -->|No| W[Recording Rule]
    W --> X[Store Time Series]
    
    Y[Restore State] --> Z[Restore Alert For State]
    
    subgraph "Loading Phase"
        B
        C
        D
        E
        F
        G
    end
    
    subgraph "Execution Phase"
        H
        I
        J
        K
        L
        M
        N
        O
        P
        Q
        R
        S
    end
    
    subgraph "Output Phase"
        T
        U
        V
        W
        X
    end
    
    subgraph "Recovery Phase"
        Y
        Z
    end
    
    style Loading Phase fill:#e1f5fe
    style Execution Phase fill:#f3e5f5
    style Output Phase fill:#e8f5e8
    style Recovery Phase fill:#fff3e0

这个图表展示了Prometheus规则系统的工作流程:

  1. 加载阶段(Loading Phase):
    从规则文件加载规则组
    解析规则并创建规则对象
    分析规则间的依赖关系并构建依赖映射
  2. 执行阶段(Execution Phase):
    管理器运行并启动规则组评估循环
    根据依赖关系将规则分割成可并发执行的批次
    根据配置决定是并发执行还是顺序执行规则
  3. 输出阶段(Output Phase):
    存储规则执行结果
    对于告警规则,检查触发条件并发送告警
    对于记录规则,存储时间序列数据
  4. 恢复阶段(Recovery Phase):
    在Prometheus重启时恢复告警状态
    规则依赖分析允许系统优化执行顺序,使无依赖关系的规则可以并发执行,提高评估效率。同时,系统通过并发控制机制确保不会因为过多并发查询而压垮系统。

启动顺序

sequenceDiagram
    participant User as 用户
    participant Manager as Manager
    participant Group as Group
    participant Rule as Rule
    participant Storage as 存储

    User->>Manager: 创建Manager实例
    activate Manager
    Note right of Manager: NewManager(options)

    User->>Manager: 调用Update()加载规则文件
    activate Manager
    Manager->>Manager: LoadGroups()
    Note right of Manager: 解析规则文件
    
    loop 遍历每个规则文件
        Manager->>Manager: 解析Group
        loop 遍历Group中的规则
            Manager->>Rule: 创建AlertingRule/RecordingRule
            activate Rule
            Rule-->>Manager: 返回Rule对象
            deactivate Rule
        end
        
        Manager->>Manager: AnalyseRules(rules)
        Note right of Manager: 分析规则依赖关系
        
        Manager->>Group: 创建Group实例
        activate Group
        Group-->>Manager: 返回Group
    end
    
    Manager-->>User: 返回groups和errors
    
    User->>Manager: 调用Run()启动管理器
    Note right of Manager: 启动后台goroutine
    
    Manager->>Manager: start()
    activate Manager
    
    loop 评估循环
        Manager->>Group: Eval(评估时间)
        activate Group
        
        Group->>Group: SplitGroupIntoBatches()
        Note right of Group: 将规则分组成可并发批次
        
        loop 按批次执行规则
            Group->>Rule: Evaluate()
            activate Rule
            Rule->>Storage: 查询数据
            activate Storage
            Storage-->>Rule: 返回查询结果
            deactivate Storage
            
            Rule->>Rule: 执行规则逻辑
            Rule->>Storage: 存储结果
            activate Storage
            Storage-->>Rule: 确认存储
            deactivate Storage
            
            Rule-->>Group: 返回评估结果
            deactivate Rule
        end
        
        Group->>Manager: 完成一次评估
        deactivate Group
        
        Manager->>Manager: 等待下次评估间隔
    end
    
    User->>Manager: 调用Stop()停止
    Manager->>Manager: 停止所有goroutine
    deactivate Manager

详细流程

告警规则控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
// The Manager manages recording and alerting rules.
// Manager 负责管理记录规则和告警规则。
type Manager struct {
opts *ManagerOptions // 管理器选项配置
groups map[string]*Group // 存储规则组,键为文件名和组名的组合
mtx sync.RWMutex // 读写锁,用于保护对groups的并发访问
block chan struct{} // 阻塞通道,用于控制规则评估的启动时机
done chan struct{} // 完成通道,用于通知规则管理器停止运行
restored bool // 标记是否已恢复规则状态(如for状态)
restoreNewRuleGroups bool // 标记是否需要恢复新增规则组的状态

logger *slog.Logger // 日志记录器
}

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: logger.With("component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
MaxConcurrentEvals: cfg.maxConcurrentEvals,
ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval,
DefaultRuleQueryOffset: func() time.Duration {
return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
},
})

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager(o *ManagerOptions) *Manager {
if o.Metrics == nil {
o.Metrics = NewGroupMetrics(o.Registerer)
}

if o.GroupLoader == nil {
o.GroupLoader = FileLoader{}
}

if o.RuleConcurrencyController == nil {
if o.ConcurrentEvalsEnabled {
o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals)
} else {
o.RuleConcurrencyController = sequentialRuleEvalController{}
}
}

if o.RuleDependencyController == nil {
o.RuleDependencyController = ruleDependencyController{}
}

if o.Logger == nil {
o.Logger = promslog.NewNopLogger()
}

m := &Manager{
groups: map[string]*Group{},
opts: o,
block: make(chan struct{}),
done: make(chan struct{}),
logger: o.Logger,
restoreNewRuleGroups: o.RestoreNewRuleGroups,
}

return m
}

启动Run

1
2
3
4
5
6
7
8
9
10
11
12

// Run starts processing of the rule manager. It is blocking.
func (m *Manager) Run() {
m.logger.Info("Starting rule manager...")
m.start()
<-m.done
}

func (m *Manager) start() {
close(m.block)
}

更新Update-加载配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
 {
name: "rules",
reloader: func(cfg *config.Config) error {
if agentMode {
// No-op in Agent mode
return nil
}

// Get all rule files matching the configuration paths.
var files []string
for _, pat := range cfg.RuleFiles {
fs, err := filepath.Glob(pat)
if err != nil {
// The only error can be a bad pattern.
return fmt.Errorf("error retrieving rule files for %s: %w", pat, err)
}
files = append(files, fs...)
}
return ruleManager.Update(
time.Duration(cfg.GlobalConfig.EvaluationInterval),
files,
cfg.GlobalConfig.ExternalLabels,
externalURL,
nil,
)
},
}

// Update 根据配置要求更新规则管理器的状态。
// 如果加载新规则失败,则恢复旧规则集。
// 如果管理器已经停止,此方法将不执行任何操作。
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc) error {
m.mtx.Lock()
defer m.mtx.Unlock()

// 我们不能更新一个已停止的管理器
select {
case <-m.done:
return nil
default:
}

// 加载新的规则组
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, false, files...)

// 如果加载过程中出现错误,记录错误并返回
if errs != nil {
for _, e := range errs {
m.logger.Error("loading groups failed", "err", e)
}
return errors.New("error loading rules, previous rule set restored")
}
m.restored = true

// 使用 WaitGroup 等待所有 goroutine 完成
var wg sync.WaitGroup
for _, newg := range groups {
// 如果存在具有相同标识符的旧组,
// 检查新组是否与旧组相等,如果相等则跳过。
// 如果不相等,则停止旧组并等待其完成当前迭代。
// 然后将状态复制到新组。
gn := GroupKey(newg.file, newg.name)
oldg, ok := m.groups[gn]
delete(m.groups, gn)

if ok && oldg.Equals(newg) {
groups[gn] = oldg
continue
}

wg.Add(1)
go func(newg *Group) {
if ok {
oldg.stop()
newg.CopyState(oldg)
}
wg.Done()
// 等待规则管理器被指示运行后再开始评估。
// 这是为了避免在存储尚未完全启动时运行查询。
<-m.block
newg.run(m.opts.Context)
}(newg)
}

// 停止剩余的旧组。
wg.Add(len(m.groups))
for n, oldg := range m.groups {
go func(n string, g *Group) {
g.markStale = true
g.stop()
if m := g.metrics; m != nil {
m.IterationsMissed.DeleteLabelValues(n)
m.IterationsScheduled.DeleteLabelValues(n)
m.EvalTotal.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n)
m.GroupInterval.DeleteLabelValues(n)
m.GroupLastEvalTime.DeleteLabelValues(n)
m.GroupLastDuration.DeleteLabelValues(n)
m.GroupRules.DeleteLabelValues(n)
m.GroupSamples.DeleteLabelValues((n))
}
wg.Done()
}(n, oldg)
}

// 等待所有 goroutine 完成
wg.Wait()
m.groups = groups

return nil
}

初始化告警配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// LoadGroups 从文件列表中读取规则组。
// 该函数接收以下参数:
// - interval: 默认的规则组评估间隔
// - externalLabels: 外部标签,用于补充规则的标签
// - externalURL: 外部 URL,用于告警规则中的链接
// - groupEvalIterationFunc: 规则组评估迭代函数
// - ignoreUnknownFields: 是否忽略规则文件中的未知字段
// - filenames: 要加载的规则文件列表
// 返回值:
// - map[string]*Group: 加载成功的规则组映射,键为文件名和组名的组合
// - []error: 加载过程中遇到的错误列表
func (m *Manager) LoadGroups(
interval time.Duration, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc, ignoreUnknownFields bool, filenames ...string,
) (map[string]*Group, []error) {
// 创建一个用于存储规则组的映射
groups := make(map[string]*Group)

// 判断是否应该恢复规则状态
shouldRestore := !m.restored || m.restoreNewRuleGroups

// 遍历所有规则文件
for _, fn := range filenames {
// 使用 GroupLoader 加载规则文件
rgs, errs := m.opts.GroupLoader.Load(fn, ignoreUnknownFields)
if errs != nil {
// 如果加载过程中出现错误,则返回错误
return nil, errs
}

// 遍历文件中的所有规则组
for _, rg := range rgs.Groups {
// 设置规则组的评估间隔,如果规则组中未指定则使用默认间隔
itv := interval
if rg.Interval != 0 {
itv = time.Duration(rg.Interval)
}

// 创建规则列表
rules := make([]Rule, 0, len(rg.Rules))
// 遍历规则组中的所有规则
for _, r := range rg.Rules {
// 解析规则表达式
expr, err := m.opts.GroupLoader.Parse(r.Expr)
if err != nil {
// 如果解析失败,则返回错误
return nil, []error{fmt.Errorf("%s: %w", fn, err)}
}

// 合并规则组标签和规则标签
mLabels := FromMaps(rg.Labels, r.Labels)

// 判断是否为告警规则
if r.Alert != "" {
// 创建告警规则并添加到规则列表中
rules = append(rules, NewAlertingRule(
r.Alert,
expr,
time.Duration(r.For),
time.Duration(r.KeepFiringFor),
mLabels,
labels.FromMap(r.Annotations),
externalLabels,
externalURL,
!shouldRestore,
m.logger.With("alert", r.Alert),
))
continue
}
// 如果不是告警规则,则创建记录规则并添加到规则列表中
rules = append(rules, NewRecordingRule(
r.Record,
expr,
mLabels,
))
}

// 分析规则之间的依赖关系并将结果存储在规则本身
m.opts.RuleDependencyController.AnalyseRules(rules)

// 创建新的规则组并存储到映射中
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
Interval: itv,
Limit: rg.Limit,
Rules: rules,
ShouldRestore: shouldRestore,
Opts: m.opts,
QueryOffset: (*time.Duration)(rg.QueryOffset),
done: m.done,
EvalIterationFunc: groupEvalIterationFunc,
})
}
}

// 返回加载成功的规则组映射和空的错误列表
return groups, nil
}

Eval告警评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
// Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
var (
samplesTotal atomic.Float64 // 用于统计本次评估周期中所有规则产生的样本总数
ruleQueryOffset = g.QueryOffset() // 获取规则查询的时间偏移量
)
// 定义一个内部函数 eval,用于执行单个规则的评估
eval := func(i int, rule Rule, cleanup func()) {
// 如果 cleanup 函数不为空,则在函数退出时执行它
if cleanup != nil {
defer cleanup()
}

// 创建带有规则名称和索引的日志记录器
logger := g.logger.With("name", rule.Name(), "index", i)
// 使用 OpenTelemetry 启动一个新的 span 来追踪规则评估过程
ctx, sp := otel.Tracer("").Start(ctx, "rule")
// 设置 span 的属性为规则名称
sp.SetAttributes(attribute.String("name", rule.Name()))
// 在函数退出时结束 span 并记录评估耗时
defer func(t time.Time) {
sp.End()

since := time.Since(t) // 计算评估耗时
g.metrics.EvalDuration.Observe(since.Seconds()) // 更新评估耗时指标
rule.SetEvaluationDuration(since) // 设置规则的评估耗时
rule.SetEvaluationTimestamp(t) // 设置规则的评估时间戳
}(time.Now())

// 如果 span 被采样且有 trace ID,则将 trace ID 添加到日志记录器中
if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() {
logger = logger.With("trace_id", sp.SpanContext().TraceID())
}

// 增加规则评估总次数的计数器
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

// 执行规则评估
vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
if err != nil {
// 如果评估出错,设置规则健康状态为不良,并记录错误
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error()) // 在 span 中记录错误状态
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() // 增加评估失败计数器

// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
// 检查是否为取消的查询错误,如果不是则记录警告日志
var eqc promql.ErrQueryCanceled
if !errors.As(err, &eqc) {
logger.Warn("Evaluating rule failed", "rule", rule, "err", err)
}
return
}
// 规则评估成功,设置健康状态为良好,并清除最后错误
rule.SetHealth(HealthGood)
rule.SetLastError(nil)
samplesTotal.Add(float64(len(vector))) // 累加样本数量

// 如果是告警规则,则发送告警
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
// 初始化用于统计不同类型错误的计数器
var (
numOutOfOrder = 0 // 乱序样本数
numTooOld = 0 // 过旧样本数
numDuplicates = 0 // 重复样本数
)

// 获取一个 appender 用于将评估结果写入存储
app := g.opts.Appendable.Appender(ctx)
// 创建一个 map 用于记录当前评估返回的系列
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
// 在函数退出时提交 appender 的更改
defer func() {
if err := app.Commit(); err != nil {
// 如果提交失败,设置规则健康状态为不良,并记录错误
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

logger.Warn("Rule sample appending failed", "err", err)
return
}
// 提交成功,更新上一次评估返回的系列
g.seriesInPreviousEval[i] = seriesReturned
}()

// 遍历评估结果中的每个样本
for _, s := range vector {
var err error
// 根据样本类型(直方图或普通样本)选择不同的追加方法
if s.H != nil {
_, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H)
} else {
app.SetOptions(g.appOpts) // 设置追加选项
_, err = app.Append(0, s.Metric, s.T, s.F)
}

if err != nil {
// 如果追加失败,设置规则健康状态为不良,并记录错误
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
unwrappedErr := errors.Unwrap(err) // 解包错误以获取底层错误
if unwrappedErr == nil {
unwrappedErr = err
}
// 根据不同的错误类型进行分类统计和日志记录
switch {
case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample):
numOutOfOrder++
logger.Debug("Rule evaluation result discarded", "err", err, "sample", s)
case errors.Is(unwrappedErr, storage.ErrTooOldSample):
numTooOld++
logger.Debug("Rule evaluation result discarded", "err", err, "sample", s)
case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp):
numDuplicates++
logger.Debug("Rule evaluation result discarded", "err", err, "sample", s)
default:
logger.Warn("Rule evaluation result discarded", "err", err, "sample", s)
}
} else {
// 追加成功,将样本的标签集添加到 seriesReturned 中
buf := [1024]byte{}
seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric
}
}
// 记录并警告不同类型的样本错误
if numOutOfOrder > 0 {
logger.Warn("Error on ingesting out-of-order result from rule evaluation", "num_dropped", numOutOfOrder)
}
if numTooOld > 0 {
logger.Warn("Error on ingesting too old result from rule evaluation", "num_dropped", numTooOld)
}
if numDuplicates > 0 {
logger.Warn("Error on ingesting results from rule evaluation with different value but same timestamp", "num_dropped", numDuplicates)
}

// 遍历上一次评估返回的系列,检查是否有系列不再出现
for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
// 如果系列不再出现,则标记为过期(stale)
_, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN))
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
// 根据错误类型决定是否记录日志
switch {
case unwrappedErr == nil:
case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample),
errors.Is(unwrappedErr, storage.ErrTooOldSample),
errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
logger.Warn("Adding stale sample failed", "sample", lset.String(), "err", err)
}
}
}
}

// 初始化等待组,用于并发控制
var wg sync.WaitGroup
// 获取规则并发控制器,如果未设置则使用顺序执行控制器
ctrl := g.opts.RuleConcurrencyController
if ctrl == nil {
ctrl = sequentialRuleEvalController{}
}

// 将规则组拆分为多个批次,以便并发执行
batches := ctrl.SplitGroupIntoBatches(ctx, g)
if len(batches) == 0 {
// Sequential evaluation when batches aren't set.
// This is the behaviour without a defined RuleConcurrencyController
// 如果没有设置批次,则顺序执行所有规则
for i, rule := range g.rules {
// Check if the group has been stopped.
// 检查规则组是否已被停止
select {
case <-g.done:
return
default:
}
eval(i, rule, nil) // 顺序执行每个规则
}
} else {
// Concurrent evaluation.
// 如果设置了批次,则并发执行规则
for _, batch := range batches {
for _, ruleIndex := range batch {
// Check if the group has been stopped.
// 检查规则组是否已被停止
select {
case <-g.done:
wg.Wait() // 等待所有并发任务完成
return
default:
}
rule := g.rules[ruleIndex]
// 如果批次大小大于1且控制器允许并发执行,则启动 goroutine
if len(batch) > 1 && ctrl.Allow(ctx, g, rule) {
wg.Add(1)

go eval(ruleIndex, rule, func() {
wg.Done() // 任务完成时减少等待组计数
ctrl.Done(ctx) // 通知控制器任务完成
})
} else {
eval(ruleIndex, rule, nil) // 否则顺序执行
}
}
// It is important that we finish processing any rules in this current batch - before we move into the next one.
// 等待当前批次的所有规则执行完成后再处理下一个批次
wg.Wait()
}
}

// 更新组样本总数指标
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())
// 清理过期的系列
g.cleanupStaleSeries(ctx, ts)
}

AlertRule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, queryOffset time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
// 将当前规则详情添加到上下文中,用于追踪规则来源
ctx = NewOriginContext(ctx, NewRuleDetail(r))

// 执行查询,获取当前时间点的指标数据
res, err := query(ctx, r.vector.String(), ts.Add(-queryOffset))
if err != nil {
return nil, err
}

// resultFPs 用于记录当前查询结果中存在的告警指纹,便于后续清理已解决的告警
resultFPs := map[uint64]struct{}{}

// 初始化标签构建器和临时向量结果
lb := labels.NewBuilder(labels.EmptyLabels())
sb := labels.NewScratchBuilder(0)
var vec promql.Vector

// alerts 存储当前查询产生的新告警或更新的告警实例
alerts := make(map[uint64]*Alert, len(res))

// 遍历查询结果,为每个样本创建或更新告警
for _, smpl := range res {
// 提取样本的标签,用于模板渲染
l := smpl.Metric.Map()

// 构造模板数据,包含标签、外部标签、外部URL和样本值
tmplData := template.AlertTemplateData(l, r.externalLabels, r.externalURL, smpl)

// 定义模板中使用的便利变量,简化用户使用
defs := []string{
"{{$labels := .Labels}}",
"{{$externalLabels := .ExternalLabels}}",
"{{$externalURL := .ExternalURL}}",
"{{$value := .Value}}",
}

// expand 函数用于展开模板字符串
expand := func(text string) string {
tmpl := template.NewTemplateExpander(
ctx,
strings.Join(append(defs, text), ""),
"__alert_"+r.Name(),
tmplData,
model.Time(timestamp.FromTime(ts)),
template.QueryFunc(query),
externalURL,
nil,
)
result, err := tmpl.Expand()
if err != nil {
// 模板展开失败时记录警告日志并返回错误信息
result = fmt.Sprintf("<error expanding template: %s>", err)
r.logger.Warn("Expanding alert template failed", "err", err, "data", tmplData)
}
return result
}

// 重置标签构建器并删除指标名称标签
lb.Reset(smpl.Metric)
lb.Del(labels.MetricName)

// 应用规则定义的标签,对标签值进行模板展开
r.labels.Range(func(l labels.Label) {
lb.Set(l.Name, expand(l.Value))
})
// 设置告警名称标签
lb.Set(labels.AlertName, r.Name())

// 重置并处理注解标签,同样进行模板展开
sb.Reset()
r.annotations.Range(func(a labels.Label) {
sb.Add(a.Name, expand(a.Value))
})
annotations := sb.Labels()

// 获取处理后的标签集合并计算其哈希值作为告警的唯一标识
lbs := lb.Labels()
h := lbs.Hash()
resultFPs[h] = struct{}{}

// 检查是否已存在相同标签集的告警,避免重复
if _, ok := alerts[h]; ok {
return nil, errors.New("vector contains metrics with the same labelset after applying alert labels")
}

// 创建新的告警实例
alerts[h] = &Alert{
Labels: lbs,
Annotations: annotations,
ActiveAt: ts,
State: StatePending,
Value: smpl.F,
}
}

// 锁定活动告警映射,确保并发安全
r.activeMtx.Lock()
defer r.activeMtx.Unlock()

// 更新已存在的告警状态或添加新的告警
for h, a := range alerts {
// 如果已存在相同标签的活跃告警,则更新其值和注解
if alert, ok := r.active[h]; ok && alert.State != StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
continue
}

// 否则添加新的告警到活动告警映射中
r.active[h] = a
}

var numActivePending int
// 遍历所有活动告警,处理告警状态转换和清理
for fp, a := range r.active {
if _, ok := resultFPs[fp]; !ok {
// 当前指纹不在查询结果中,表示该告警已不再触发

// 根据 keepFiringFor 参数判断是否应继续保持触发状态
var keepFiring bool
if a.State == StateFiring && r.keepFiringFor > 0 {
// 如果是首次需要保持触发,记录开始时间
if a.KeepFiringSince.IsZero() {
a.KeepFiringSince = ts
}
// 判断是否仍在保持触发的时间窗口内
if ts.Sub(a.KeepFiringSince) < r.keepFiringFor {
keepFiring = true
}
}

// 对于已解决的告警,保留一段时间以确保 Alertmanager 能正确接收解决通知
// 这有助于提高系统在网络问题或 Alertmanager 重启时的容错能力
if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
delete(r.active, fp)
}
// 如果告警不是非活动状态且不需要保持触发,则标记为非活动
if a.State != StateInactive && !keepFiring {
a.State = StateInactive
a.ResolvedAt = ts
}
// 如果不需要保持触发,跳过后续处理
if !keepFiring {
continue
}
} else {
// 告警仍在触发,重置保持触发时间
a.KeepFiringSince = time.Time{}
}

// 增加活跃或待定告警计数
numActivePending++

// 如果告警处于待定状态且已超过保持时间,则转为触发状态
if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = StateFiring
a.FiredAt = ts
}

// 如果规则已恢复,生成告警的时间序列样本
if r.restored.Load() {
vec = append(vec, r.sample(a, ts.Add(-queryOffset)))
vec = append(vec, r.forStateSample(a, ts.Add(-queryOffset), float64(a.ActiveAt.Unix())))
}
}

// 如果设置了告警数量限制且当前活跃告警数超过限制,则清空所有告警并返回错误
if limit > 0 && numActivePending > limit {
r.active = map[uint64]*Alert{}
return nil, fmt.Errorf("exceeded limit of %d with %d alerts", limit, numActivePending)
}

return vec, nil
}

RecordingRule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Eval evaluates the rule and then overrides the metric names and labels accordingly.
// Eval 函数负责计算规则,并根据规则定义覆盖指标名称和标签。
func (rule *RecordingRule) Eval(ctx context.Context, queryOffset time.Duration, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) {
// 将当前规则详情添加到上下文中,用于追踪和日志记录
ctx = NewOriginContext(ctx, NewRuleDetail(rule))

// 执行查询函数,获取原始的指标数据向量
// 查询的时间戳会根据 queryOffset 进行调整
vector, err := query(ctx, rule.vector.String(), ts.Add(-queryOffset))
if err != nil {
// 如果查询出错,则更新规则健康状态并返回错误
rule.SetHealth(HealthBad)
rule.SetLastError(err)
return nil, err
}

// 创建一个标签构建器,用于修改或设置新的标签
lb := labels.NewBuilder(labels.EmptyLabels())

// 遍历查询结果中的每一个样本
for i := range vector {
sample := &vector[i]

// 重置标签构建器为当前样本的标签
lb.Reset(sample.Metric)

// 设置新的指标名称为规则中定义的名称
lb.Set(labels.MetricName, rule.name)

// 遍历规则中定义的所有标签,并将其设置到当前样本上
rule.labels.Range(func(l labels.Label) {
lb.Set(l.Name, l.Value)
})

// 更新当前样本的标签集合
sample.Metric = lb.Labels()
}

// 检查应用标签后是否存在重复的标签集(即相同的指标名称和标签组合)
// 如果存在,则返回错误以避免产生冲突的指标
if vector.ContainsSameLabelset() {
err = errors.New("vector contains metrics with the same labelset after applying rule labels")
rule.SetHealth(HealthBad)
rule.SetLastError(err)
return nil, err
}

// 获取结果系列的数量
numSeries := len(vector)

// 如果设置了限制并且结果数量超过限制,则返回错误
if limit > 0 && numSeries > limit {
err = fmt.Errorf("exceeded limit of %d with %d series", limit, numSeries)
rule.SetHealth(HealthBad)
rule.SetLastError(err)
return nil, err
}

// 设置规则健康状态为良好,并清除之前的错误
rule.SetHealth(HealthGood)
rule.SetLastError(nil)
return vector, nil
}