Prometheus-Scrape指标抓取

基于v3.5

工作原理

graph TD
    A[ScrapePool] --> B[Target Discovery]
    A --> C[Scrape Loop Management]
    A --> D[HTTP Client]
B --> B1[Sync Targets]
B1 --> B2[Target Group Processing]
B2 --> B3[Target Creation/Deduplication]
B3 --> B4[Active Targets Update]

C --> C1[Loop Creation]
C1 --> C2[Scrape Interval Setup]
C2 --> C3[Scraper Initialization]

D --> D1[Target Scraper]
D1 --> D2[HTTP Request]
D2 --> D3[Metrics Endpoint]

C3 --> E[Scrape Execution]
E --> E1[Context Timeout Setup]
E1 --> E2[HTTP GET Request]
E2 --> E3[Response Reading]

E3 --> F[Response Processing]
F --> F1[Content Type Detection]
F1 --> F2[Parser Selection]
F2 --> F3[Metric Parsing]

F3 --> G[Cache Management]
G --> G1[Series Caching]
G1 --> G2[Label Processing]
G2 --> G3[Staleness Tracking]

F3 --> H[Storage Append]
H --> H1[Sample Validation]
H1 --> H2[Label Mutation]
H2 --> H3[Appender Interface]
H3 --> H4[TSDB Storage]

H --> I[Exemplar Handling]
I --> I1[Exemplar Parsing]
I1 --> I2[Exemplar Storage]

H --> J[Metadata Handling]
J --> J1[Type/Help/Unit]
J1 --> J2[Metadata Cache]

E --> K[Report Metrics]
K --> K1[Scrape Duration]
K1 --> K2[Scrape Health]
K2 --> K3[Sample Counters]

F --> L[Error Handling]
L --> L1[Staleness Markers]
L1 --> L2[Sample Limit Check]
L2 --> L3[Retry Logic]

style A fill:#e1f5fe
style E fill:#f3e5f5
style F fill:#e8f5e8
style H fill:#fff3e0
style K fill:#fce4ec

classDef process fill:#bbdefb,stroke:#333;
classDef data fill:#c8e6c9,stroke:#333;
classDef storage fill:#ffccbc,stroke:#333;
classDef reporting fill:#f8bbd0,stroke:#333;

class C1,C2,C3,E1,E2,E3, process;
class F1,F2,F3,G1,G2,G3,H1,H2,I1,J1, data;
class H3,H4,I2,J2, storage;
class K1,K2,K3, reporting;
  1. 目标发现与同步 (Target Discovery):

    • 从服务发现机制获取目标组
    • 处理和去重目标
    • 更新活动目标列表
  2. 抓取循环管理 (Scrape Loop Management):

    • 为每个目标创建抓取循环
    • 设置抓取间隔和超时
    • 初始化抓取器
  3. HTTP 客户端与请求 (HTTP Client):

    • 使用配置的 HTTP 客户端
    • 向目标端点发送 GET 请求
    • 读取响应数据
  4. 抓取执行 (Scrape Execution):

    • 设置上下文超时
    • 发送 HTTP 请求
    • 读取并处理响应
  5. 响应处理 (Response Processing):

    • 检测内容类型
    • 选择合适的解析器
    • 解析指标数据
  6. 缓存管理 (Cache Management):

    • 缓存系列信息
    • 处理标签变化
    • 跟踪陈旧性
  7. 存储追加 (Storage Append):

    • 验证样本数据
    • 进行标签转换
    • 通过 Appender 接口写入 TSDB

详细流程

Scape初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
type Manager struct {
opts *Options // 抓取管理器配置选项
logger *slog.Logger // 日志记录器
append storage.Appendable // 存储接口,用于追加抓取的数据
graceShut chan struct{} // 用于优雅关闭的通道

offsetSeed uint64 // 全局偏移种子,用于在HA设置中分散抓取负载
mtxScrape sync.Mutex // 保护以下字段的互斥锁
scrapeConfigs map[string]*config.ScrapeConfig // 抓取配置映射,键为job名称
scrapePools map[string]*scrapePool // 抓取池映射,键为job名称
newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error) // 创建抓取失败日志记录器的函数
scrapeFailureLoggers map[string]FailureLogger // 抓取失败日志记录器映射,键为日志文件路径
targetSets map[string][]*targetgroup.Group // 目标组映射,键为job名称
buffers *pool.Pool // 字节缓冲池

triggerReload chan struct{} // 触发重新加载的通道

metrics *scrapeMetrics // 抓取指标收集器
}



scrapeManager, err := scrape.NewManager(
&cfg.scrape,
logger.With("component", "scrape manager"),
logging.NewJSONFileLogger,
fanoutStorage,
prometheus.DefaultRegisterer,
)

func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
// 处理空配置的情况
if o == nil {
o = &Options{}
}
// 处理空日志记录器的情况
if logger == nil {
logger = promslog.NewNopLogger()
}

// 创建抓取指标收集器
sm, err := newScrapeMetrics(registerer)
if err != nil {
return nil, fmt.Errorf("failed to create scrape manager due to error: %w", err)
}

// 初始化Manager结构体
m := &Manager{
append: app, // 数据存储接口
opts: o, // 配置选项
logger: logger, // 日志记录器
newScrapeFailureLogger: newScrapeFailureLogger, // 失败日志创建函数
scrapeConfigs: make(map[string]*config.ScrapeConfig), // 抓取配置缓存
scrapePools: make(map[string]*scrapePool), // 抓取池缓存
graceShut: make(chan struct{}), // 优雅关闭通道
triggerReload: make(chan struct{}, 1), // 触发重载通道(带缓冲)
metrics: sm, // 指标收集器
buffers: pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }), // 字节缓冲池
}

// 设置元数据缓存收集器
m.metrics.setTargetMetadataCacheGatherer(m)

return m, nil
}

Run启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
case ts := <-tsets:
// 更新目标集
m.updateTsets(ts)

// 触发重新加载,使用非阻塞方式发送信号
select {
case m.triggerReload <- struct{}{}:
default:
}

case <-m.graceShut:
// 优雅关闭时返回nil错误
return nil
}
}
}

func (m *Manager) reloader() {
reloadIntervalDuration := m.opts.DiscoveryReloadInterval
if reloadIntervalDuration == model.Duration(0) {
reloadIntervalDuration = model.Duration(5 * time.Second)
}

ticker := time.NewTicker(time.Duration(reloadIntervalDuration))

defer ticker.Stop()

for {
select {
case <-m.graceShut:
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}


// reload 方法用于重新加载抓取配置并同步目标组
func (m *Manager) reload() {
// 加锁保护抓取池的并发访问
m.mtxScrape.Lock()

// 使用 WaitGroup 等待所有同步操作完成
var wg sync.WaitGroup

// 遍历所有目标集合
for setName, groups := range m.targetSets {
// 检查是否已存在对应的抓取池
if _, ok := m.scrapePools[setName]; !ok {
// 获取抓取配置
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
// 如果配置不存在,记录错误并继续处理下一个
m.logger.Error("error reloading target set", "err", "invalid config id:"+setName)
continue
}

// 检查是否存在冲突的配置选项
if scrapeConfig.ConvertClassicHistogramsToNHCBEnabled() && m.opts.EnableCreatedTimestampZeroIngestion {
// TODO(krajorama): fix https://github.com/prometheus/prometheus/issues/15137
// 如果同时启用了直方图转换和创建时间戳零样本摄入,记录错误并跳过
m.logger.Error("error reloading target set", "err", "cannot convert classic histograms to native histograms with custom buckets and ingest created timestamp zero samples at the same time due to https://github.com/prometheus/prometheus/issues/15137")
continue
}

// 增加抓取池计数器
m.metrics.targetScrapePools.Inc()

// 创建新的抓取池
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
if err != nil {
// 如果创建失败,增加失败计数器并记录错误
m.metrics.targetScrapePoolsFailed.Inc()
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName)
continue
}

// 将新创建的抓取池添加到管理器中
m.scrapePools[setName] = sp

// 设置抓取失败日志记录器
if l, ok := m.scrapeFailureLoggers[scrapeConfig.ScrapeFailureLogFile]; ok {
sp.SetScrapeFailureLogger(l)
} else {
// 如果未找到日志记录器,记录错误(这是一个应该上报的bug)
sp.logger.Error("No logger found. This is a bug in Prometheus that should be reported upstream.", "scrape_pool", setName)
}
}

// 增加等待组计数
wg.Add(1)

// Run the sync in parallel as these take a while and at high load can't catch up.
// 并行运行同步操作,因为在高负载下这些操作可能无法跟上节奏
go func(sp *scrapePool, groups []*targetgroup.Group) {
// 执行目标组同步
sp.Sync(groups)
// 完成后减少等待组计数
wg.Done()
}(m.scrapePools[setName], groups)
}

// 解锁
m.mtxScrape.Unlock()

// 等待所有同步操作完成
wg.Wait()
}

抓取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Sync 将目标组转换为实际的抓取目标,并同步当前运行的抓取器与结果集,返回所有已抓取和丢弃的目标
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
start := time.Now()

sp.targetMtx.Lock()
var all []*Target
var targets []*Target
lb := labels.NewBuilderWithSymbolTable(sp.symbolTable)
sp.droppedTargets = []*Target{}
sp.droppedTargetsCount = 0
for _, tg := range tgs {
// 从目标组创建目标,并记录失败情况
targets, failures := TargetsFromGroup(tg, sp.config, targets, lb)
for _, err := range failures {
sp.logger.Error("Creating target failed", "err", err)
}
// 记录目标同步失败的指标
sp.metrics.targetSyncFailed.WithLabelValues(sp.config.JobName).Add(float64(len(failures)))
for _, t := range targets {
// 通过循环复制 .Labels().IsEmpty() 来避免产生垃圾
// Replicate .Labels().IsEmpty() with a loop here to avoid generating garbage.
nonEmpty := false
t.LabelsRange(func(_ labels.Label) { nonEmpty = true })
switch {
case nonEmpty:
// 如果标签非空,则添加到所有目标中
all = append(all, t)
default:
// 如果标签为空,则根据配置决定是否保留丢弃的目标
if sp.config.KeepDroppedTargets == 0 || uint(len(sp.droppedTargets)) < sp.config.KeepDroppedTargets {
sp.droppedTargets = append(sp.droppedTargets, t)
}
// 增加丢弃目标计数
sp.droppedTargetsCount++
}
}
}
// 设置符号表项数的指标
sp.metrics.targetScrapePoolSymbolTableItems.WithLabelValues(sp.config.JobName).Set(float64(sp.symbolTable.Len()))
sp.targetMtx.Unlock()
// 同步所有目标
sp.sync(all)
// 检查符号表
sp.checkSymbolTable()

// 记录目标同步间隔长度的指标
sp.metrics.targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
// 增加目标抓取池同步计数器
sp.metrics.targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

// sync 接收一个可能包含重复目标的列表,对其进行去重处理,为新目标启动抓取循环,
// 并停止已消失目标的抓取循环。在所有已停止的抓取循环终止后返回。
func (sp *scrapePool) sync(targets []*Target) {
var (
uniqueLoops = make(map[uint64]loop) // 用于存储唯一目标的循环
interval = time.Duration(sp.config.ScrapeInterval) // 抓取间隔
timeout = time.Duration(sp.config.ScrapeTimeout) // 抓取超时时间
bodySizeLimit = int64(sp.config.BodySizeLimit) // 响应体大小限制
sampleLimit = int(sp.config.SampleLimit) // 样本数量限制
bucketLimit = int(sp.config.NativeHistogramBucketLimit) // 原生直方图桶数量限制
maxSchema = pickSchema(sp.config.NativeHistogramMinBucketFactor) // 选择直方图schema
labelLimits = &labelLimits{ // 标签限制配置
labelLimit: int(sp.config.LabelLimit), // 标签总数限制
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), // 标签名长度限制
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), // 标签值长度限制
}
honorLabels = sp.config.HonorLabels // 是否优先使用目标标签
honorTimestamps = sp.config.HonorTimestamps // 是否优先使用目标时间戳
enableCompression = sp.config.EnableCompression // 是否启用压缩
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness // 是否跟踪时间戳陈旧性
mrc = sp.config.MetricRelabelConfigs // 指标重标记配置
fallbackScrapeProtocol = sp.config.ScrapeFallbackProtocol.HeaderMediaType() // 备用抓取协议
alwaysScrapeClassicHist = sp.config.AlwaysScrapeClassicHistogramsEnabled() // 是否始终抓取经典直方图
convertClassicHistToNHCB = sp.config.ConvertClassicHistogramsToNHCBEnabled() // 是否转换经典直方图为NHCB
)

sp.targetMtx.Lock()
for _, t := range targets {
hash := t.hash()

if _, ok := sp.activeTargets[hash]; !ok {
// The scrape interval and timeout labels are set to the config's values initially,
// so whether changed via relabeling or not, they'll exist and hold the correct values
// for every target.
var err error
interval, timeout, err = t.intervalAndTimeout(interval, timeout)
s := &targetScraper{
Target: t, // 目标实例
client: sp.client, // HTTP客户端
timeout: timeout, // 抓取超时时间
bodySizeLimit: bodySizeLimit, // 响应体大小限制
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, sp.escapingScheme), // Accept头
acceptEncodingHeader: acceptEncodingHeader(enableCompression), // Accept-Encoding头
metrics: sp.metrics, // 指标收集器
}
l := sp.newLoop(scrapeLoopOptions{
target: t, // 抓取目标
scraper: s, // 抓取器
sampleLimit: sampleLimit, // 样本限制
bucketLimit: bucketLimit, // 桶数量限制
maxSchema: maxSchema, // 最大schema
labelLimits: labelLimits, // 标签限制
honorLabels: honorLabels, // 是否优先使用目标标签
honorTimestamps: honorTimestamps, // 是否优先使用目标时间戳
enableCompression: enableCompression, // 是否启用压缩
trackTimestampsStaleness: trackTimestampsStaleness, // 是否跟踪时间戳陈旧性
mrc: mrc, // 指标重标记配置
interval: interval, // 抓取间隔
timeout: timeout, // 抓取超时
alwaysScrapeClassicHist: alwaysScrapeClassicHist, // 是否始终抓取经典直方图
convertClassicHistToNHCB: convertClassicHistToNHCB, // 是否转换经典直方图为NHCB
fallbackScrapeProtocol: fallbackScrapeProtocol, // 备用抓取协议
})
if err != nil {
l.setForcedError(err)
}
l.setScrapeFailureLogger(sp.scrapeFailureLogger)

sp.activeTargets[hash] = t
sp.loops[hash] = l

uniqueLoops[hash] = l
} else {
// This might be a duplicated target.
if _, ok := uniqueLoops[hash]; !ok {
uniqueLoops[hash] = nil
}
// Need to keep the most updated ScrapeConfig for
// displaying labels in the Service Discovery web page.
sp.activeTargets[hash].SetScrapeConfig(sp.config, t.tLabels, t.tgLabels)
}
}

var wg sync.WaitGroup

// Stop and remove old targets and scraper loops.
for hash := range sp.activeTargets {
if _, ok := uniqueLoops[hash]; !ok {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(sp.loops[hash])

delete(sp.loops, hash)
delete(sp.activeTargets, hash)
}
}

sp.targetMtx.Unlock()

sp.metrics.targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
forcedErr := sp.refreshTargetLimitErr()
for _, l := range sp.loops {
l.setForcedError(forcedErr)
}
for _, l := range uniqueLoops {
if l != nil {
go l.run(nil)
}
}
// Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still
// be inserting a previous sample set.
wg.Wait()
}