Prometheus-PromQL

基于v3.5

工作原理

graph TD
    A[Query Input] --> B[Parser]
    B --> C[Engine]
    C --> D[Query Execution]
    D --> E[Result Output]

    %% Parser Module
    subgraph Parser["Parser (parser/)"]
        B1[Lexical Analysis] --> B2[Syntax Parsing]
        B2 --> B3[AST Generation]
        B3 --> B4[Validation]
    end

    %% Engine Module
    subgraph Engine["Engine (engine.go)"]
        C1[Query Initialization] --> C2[Query Planning]
        C2 --> C3[Query Evaluation]
        C3 --> C4[Result Aggregation]
    end

    %% Query Execution Module
    subgraph QueryExecution["Query Execution"]
        D1[Querier Interaction] --> D2[Series Selection]
        D2 --> D3[Expression Evaluation]
        D3 --> D4[Data Aggregation]
    end

    %% Core Functions Mapping
    B2 -- "generated_parser.y.go" --> F1[Parse Expression]
    B3 -- "ast.go" --> F2[Generate AST]
    C1 -- "engine.go" --> F3[NewInstantQuery/NewRangeQuery]
    C2 -- "engine.go" --> F4[execEvalStmt]
    C3 -- "engine.go" --> F5[evaluator.Eval]
    D1 -- "engine.go" --> F6[populateSeries]
    D3 -- "engine.go" --> F7[calculateDuration]
    D4 -- "engine.go" --> F8[handleAggregationError]

启动顺序

graph TD
    A[Engine初始化] --> B[配置EngineOpts]
    B --> C[创建Query对象]
    C --> D[执行Query.Exec]
    D --> E[调用Engine.exec]
    E --> F[检查上下文是否完成]
    F --> G[插入Query到ActiveQueryTracker]
    G --> H[执行查询准备阶段]
    H --> I[调用execEvalStmt执行查询]
    I --> J[创建Evaluator]
    J --> K[调用Evaluator.Eval]
    K --> L[处理查询结果]
    L --> M[记录查询日志]
    M --> N[返回查询结果]
  1. Engine初始化 (NewEngine)
    初始化 Engine 对象时,传入 EngineOpts 配置(如超时时间、最大样本数等)。
    关键字段包括 timeout、maxSamplesPerQuery、activeQueryTracker 等。
  2. 创建Query对象
    通过 Engine.NewInstantQuery 或 Engine.NewRangeQuery 创建查询对象。
    查询对象包含解析后的 PromQL 表达式和相关的查询选项。
  3. 执行Query.Exec
    调用 Query.Exec(ctx) 方法开始执行查询。
    此方法会调用 Engine.exec 来处理查询的生命周期。
  4. 检查上下文是否完成
    在 Engine.exec 中,首先检查上下文是否被取消或超时。
    如果上下文已完成,则直接返回错误。
  5. 插入Query到ActiveQueryTracker
    将查询插入到 ActiveQueryTracker 中以跟踪活跃查询。
    如果达到最大并发查询限制,此步骤会阻塞直到有空闲槽位。
  6. 执行查询准备阶段
    调用 Engine.execEvalStmt 执行查询的准备阶段。
    包括解析表达式、构建查询器(Querier)、设置时间范围等。
  7. 调用Evaluator.Eval
    创建 Evaluator 对象并调用其 Eval 方法。
    根据表达式的类型(如 Matrix、Vector、Scalar),执行相应的计算逻辑。
    处理查询结果
  8. 根据表达式类型,将结果转换为 Matrix、Vector 或 Scalar。
    如果结果是 Matrix,可能需要进一步排序或过滤。
  9. 记录查询日志
    将查询的统计信息和结果记录到日志中。
    日志内容包括查询字符串、执行时间、警告信息等。
  10. 返回查询结果
    最终返回查询结果,包含数据值、警告信息和可能的错误

详细流程

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
queryEngine = promql.NewEngine(opts)

// Engine handles the lifetime of queries from beginning to end.
// It is connected to a querier.
type Engine struct {
logger *slog.Logger // 日志记录器,用于记录引擎运行过程中的日志信息
metrics *engineMetrics // 引擎指标收集器,用于收集和暴露引擎的运行指标
timeout time.Duration // 查询超时时间,超过该时间查询将被取消
maxSamplesPerQuery int // 单个查询允许的最大样本数,用于限制查询的资源消耗
activeQueryTracker QueryTracker // 活跃查询跟踪器,用于跟踪和管理正在执行的查询
queryLogger QueryLogger // 查询日志记录器,用于记录查询的执行日志
queryLoggerLock sync.RWMutex // 查询日志记录器的读写锁,保证并发安全
lookbackDelta time.Duration // 查询回溯时间差,用于确定查询的时间范围
noStepSubqueryIntervalFn func(rangeMillis int64) int64 // 无步长子查询的时间间隔函数,用于计算子查询的时间间隔
enableAtModifier bool // 是否启用 @ 修改器,允许用户指定查询的时间点
enableNegativeOffset bool // 是否启用负偏移量,允许用户指定查询的时间偏移
enablePerStepStats bool // 是否启用每步统计信息,用于收集查询每步的执行统计
enableDelayedNameRemoval bool // 是否启用延迟删除名称标签,用于在查询结束时才删除名称标签
enableTypeAndUnitLabels bool // 是否启用类型和单位标签,用于支持基于类型和单位的查询决策
}

// NewEngine returns a new engine.
func NewEngine(opts EngineOpts) *Engine {
// 如果没有提供日志记录器,则使用空的日志记录器
if opts.Logger == nil {
opts.Logger = promslog.NewNopLogger()
}

// 创建一个用于记录查询持续时间的 SummaryVec 指标,按 slice 标签分组
queryResultSummary := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace, // 指标命名空间
Subsystem: subsystem, // 指标子系统
Name: "query_duration_seconds", // 指标名称
Help: "Query timings", // 指标帮助信息
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, // 分位数目标
},
[]string{"slice"}, // 标签键列表
)

// 初始化引擎的监控指标
metrics := &engineMetrics{
// 当前正在执行或等待的查询数量
currentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queries",
Help: "The current number of queries being executed or waiting.",
}),
// 查询日志是否启用的状态
queryLogEnabled: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "query_log_enabled",
Help: "State of the query log.",
}),
// 查询日志失败的总次数
queryLogFailures: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "query_log_failures_total",
Help: "The number of query log failures.",
}),
// 最大并发查询数
maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queries_concurrent_max",
Help: "The max number of concurrent queries.",
}),
// 所有查询加载的样本总数
querySamples: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "query_samples_total",
Help: "The total number of samples loaded by all queries.",
}),
// 查询在队列中等待的时间
queryQueueTime: queryResultSummary.WithLabelValues("queue_time"),
// 查询准备时间
queryPrepareTime: queryResultSummary.WithLabelValues("prepare_time"),
// 查询内部评估时间
queryInnerEval: queryResultSummary.WithLabelValues("inner_eval"),
// 查询结果排序时间
queryResultSort: queryResultSummary.WithLabelValues("result_sort"),
}

// 如果提供了活动查询跟踪器,设置最大并发查询数;否则设为 -1 表示无限制
if t := opts.ActiveQueryTracker; t != nil {
metrics.maxConcurrentQueries.Set(float64(t.GetMaxConcurrent()))
} else {
metrics.maxConcurrentQueries.Set(-1)
}

// 如果 LookbackDelta 为 0,则设置为默认值,并记录调试日志
if opts.LookbackDelta == 0 {
opts.LookbackDelta = defaultLookbackDelta
if l := opts.Logger; l != nil {
l.Debug("Lookback delta is zero, setting to default value", "value", defaultLookbackDelta)
}
}

// 如果提供了注册表,则注册所有指标
if opts.Reg != nil {
opts.Reg.MustRegister(
metrics.currentQueries,
metrics.maxConcurrentQueries,
metrics.queryLogEnabled,
metrics.queryLogFailures,
metrics.querySamples,
queryResultSummary,
)
}

// 返回初始化完成的 Engine 实例
return &Engine{
timeout: opts.Timeout, // 查询超时时间
logger: opts.Logger, // 日志记录器
metrics: metrics, // 监控指标
maxSamplesPerQuery: opts.MaxSamples, // 每个查询的最大样本数
activeQueryTracker: opts.ActiveQueryTracker, // 活动查询跟踪器
lookbackDelta: opts.LookbackDelta, // 查找回溯时间差
noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn, // 子查询间隔函数
enableAtModifier: opts.EnableAtModifier, // 是否启用 @ 修饰符
enableNegativeOffset: opts.EnableNegativeOffset, // 是否启用负偏移
enablePerStepStats: opts.EnablePerStepStats, // 是否启用每步统计
enableDelayedNameRemoval: opts.EnableDelayedNameRemoval, // 是否启用延迟名称移除
enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels, // 是否启用类型和单位标签
}
}