Prometheus-Discoverer服务发现

基于v3.5

启动流程

sequenceDiagram
    participant main.go
    participant Manager
    participant Provider
    participant Discoverer
    participant TargetGroup
main.go->>Manager: NewManager() 
main.go->>Manager: ApplyConfig()

Manager->>Provider: 创建Provider实例
Manager->>Provider: startProvider()

Provider->>Discoverer: 初始化具体发现器(如K8s/Consul等)
Provider->>Discoverer: Run(ctx, updatesCh)

loop 发现循环
    Discoverer->>TargetGroup: 获取目标组
    Discoverer->>Provider: 发送更新(updatesCh<-tgs)
end

Provider->>Manager: 触发更新(triggerSend<-{})

Manager->>Manager: updateGroup()
Manager->>Manager: allGroups()
Manager->>main.go: 同步更新(syncCh<-groups)

main.go->>scrape: 使用更新后的目标组

工作原理

graph TD
    A[主程序] -->|1. 创建| B[Discovery Manager]
    A -->|2. 配置| C[Configs]
subgraph Discovery Manager
    B -->|启动| D[Provider 1]
    B -->|启动| E[Provider 2]
    B -->|...| F[Provider N]
    
    D -->|注册| G[Discoverer K8s]
    E -->|注册| H[Discoverer Consul]
    F -->|注册| I[Discoverer Static]
    
    B -->|管理| J[Target Groups缓存]
    B -->|同步| K[Sync Channel]
end

subgraph 服务发现实现
    G -->|监听| L[Kubernetes API]
    H -->|查询| M[Consul 服务目录]
    I -->|加载| N[静态配置文件]
end

L -->|推送更新| D
M -->|推送更新| E
N -->|推送更新| F

D -->|3. 更新| J
E -->|3. 更新| J
F -->|3. 更新| J

J -->|4. 聚合| B
B -->|5. 分发| K
K -->|6. 消费| O[Scrape Manager]

详细流程

核心结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Manager maintains a set of discovery providers and sends each update to a map channel.
// Targets are grouped by the target set name.
// Manager 管理多个服务发现提供者,并将目标更新发送到同步通道
// 目标按目标集合名称分组
type Manager struct {
logger *slog.Logger // 日志记录器,用于输出运行日志
name string // 管理器名称,用于标识和区分不同管理器实例
httpOpts []config.HTTPClientOption // HTTP客户端配置选项,传递给服务发现提供者

mtx sync.RWMutex // 读写锁,保护管理器状态的并发访问
ctx context.Context // 上下文,用于控制管理器的生命周期

// 目标组缓存,按poolKey(集合名+提供者名)分组存储
// 某些发现器(如k8s)只发送特定目标组的更新,所以需要通过tg.Source来识别要更新的组
targets map[poolKey]map[string]*targetgroup.Group
targetsMtx sync.Mutex // 保护targets的互斥锁

providers []*Provider // 当前管理的服务发现提供者列表

// 同步通道,用于发送目标更新
// 键是scrape配置中的job名称,值是目标组列表
syncCh chan map[string][]*targetgroup.Group

updatert time.Duration // 更新间隔时间,用于节流控制
// 注意:此变量应仅在单元测试中修改

triggerSend chan struct{} // 触发发送信号通道,当有新的更新到达时通知发送器

lastProvider uint // 提供者计数器,记录管理器生命周期内注册的提供者数量

registerer prometheus.Registerer // Prometheus指标注册器,用于注册和暴露指标

metrics *Metrics // 管理器自身的指标
sdMetrics map[string]DiscovererMetrics // 各服务发现机制的指标
}

discovery启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 创建用于抓取目标的服务发现管理器
// ctxScrape: 抓取组件的上下文
// logger: 带"scrape"组件标识的日志记录器
// prometheus.DefaultRegisterer: 默认的指标注册器
// sdMetrics: 服务发现相关指标
// discovery.Name("scrape"): 设置管理器名称为"scrape"
discoveryManagerScrape = discovery.NewManager(ctxScrape, logger.With("component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
if discoveryManagerScrape == nil {
logger.Error("failed to create a discovery manager scrape")
os.Exit(1) // 创建失败时退出程序
}

// 创建用于通知Alertmanager的服务发现管理器
// ctxNotify: 通知组件的上下文
// logger: 带"notify"组件标识的日志记录器
// prometheus.DefaultRegisterer: 默认的指标注册器
// sdMetrics: 服务发现相关指标
// discovery.Name("notify"): 设置管理器名称为"notify"
discoveryManagerNotify = discovery.NewManager(ctxNotify, logger.With("component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("notify"))
if discoveryManagerNotify == nil {
logger.Error("failed to create a discovery manager notify")
os.Exit(1) // 创建失败时退出程序
}


{
name: "scrape_sd",
reloader: func(cfg *config.Config) error {
c := make(map[string]discovery.Configs)
scfgs, err := cfg.GetScrapeConfigs()
if err != nil {
return err
}
for _, v := range scfgs {
c[v.JobName] = v.ServiceDiscoveryConfigs
}
return discoveryManagerScrape.ApplyConfig(c)
},
}

ApplyConfig应用服务发现配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// ApplyConfig 应用新的服务发现配置
// 检查已运行的提供者是否匹配新配置,保留匹配的提供者
// 停止不需要的提供者,启动新配置要求的提供者
// 参数cfg: 按集合名称分组的服务发现配置
func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
// 加锁保证配置更新的原子性
m.mtx.Lock()
defer m.mtx.Unlock()

// 统计配置失败的次数
var failedCount int
for name, scfg := range cfg {
failedCount += m.registerProviders(scfg, name)
}
// 更新失败配置的指标
m.metrics.FailedConfigs.Set(float64(failedCount))

var (
wg sync.WaitGroup // 用于等待旧提供者完全停止
newProviders []*Provider // 保存更新后的提供者列表
)

// 遍历当前所有提供者
for _, prov := range m.providers {
// 取消不再需要的提供者
if len(prov.newSubs) == 0 {
wg.Add(1)
prov.done = func() {
wg.Done() // 提供者清理完成后通知
}
prov.cancel() // 取消提供者的运行
continue
}

// 保留仍需要的提供者
newProviders = append(newProviders, prov)

// 保存参考目标组,用于初始化新订阅的目标
var refTargets map[string]*targetgroup.Group
prov.mu.Lock()

// 更新目标组缓存
m.targetsMtx.Lock()
for s := range prov.subs {
refTargets = m.targets[poolKey{s, prov.name}]
// 移除不再订阅的目标组
if _, ok := prov.newSubs[s]; !ok {
delete(m.targets, poolKey{s, prov.name})
m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, s)
}
}
// 为新订阅初始化目标和指标
for s := range prov.newSubs {
if _, ok := prov.subs[s]; !ok {
m.metrics.DiscoveredTargets.WithLabelValues(s).Set(0)
}
if l := len(refTargets); l > 0 {
m.targets[poolKey{s, prov.name}] = make(map[string]*targetgroup.Group, l)
for k, v := range refTargets {
m.targets[poolKey{s, prov.name}][k] = v
}
}
}
m.targetsMtx.Unlock()

// 更新订阅关系
prov.subs = prov.newSubs
prov.newSubs = map[string]struct{}{}
prov.mu.Unlock()

// 启动尚未运行的提供者
if !prov.IsStarted() {
m.startProvider(m.ctx, prov)
}
}

// 触发更新通知,确保下游管理器能及时获取最新状态
if len(m.providers) > 0 {
select {
case m.triggerSend <- struct{}{}:
default:
}
}

// 更新提供者列表
m.providers = newProviders
// 等待所有旧提供者完成清理
wg.Wait()

return nil
}

常见几种Discoverer实现原理

File模式

graph TD
    A[File SD Config] --> B[Discovery实例]
    B --> C[文件监听和刷新]
subgraph "初始化阶段"
    A
    A1[SDConfig配置] --> A
    A2[路径匹配规则] --> A
    A3[刷新间隔设置] --> A
end

subgraph "核心组件"
    B
    B1[fsnotify.Watcher] --> B
    B2[时间戳管理] --> B
    B3[指标收集器] --> B
end

subgraph "文件监控流程"
    C
    C1[文件列表获取] --> C
    C2[文件变化监听] --> C
    C3[定期刷新机制] --> C
    C4[目标组解析] --> C
    
    C1 --> D[filepath.Glob匹配]
    C2 --> E[fsnotify事件处理]
    C3 --> F[定时器触发]
    C4 --> G[JSON/YAML解析]
end

subgraph "事件处理"
    E
    E1[文件修改事件] --> E
    E2[文件删除事件] --> E
    E3[文件创建事件] --> E
    E --> H[触发refresh]
end

subgraph "输出结果"
    I[targetgroup.Group流] --> J[Prometheus服务发现]
    G --> I
    H --> I
end

style A fill:#e1f5fe
style B fill:#f3e5f5
style C fill:#e8f5e8
style I fill:#fff3e0