Prometheus-项目启动main.go

基于v3.5

启动顺序

flowchart TD
    A[启动 main 函数] --> B[解析命令行参数/配置文件]
    B --> C[初始化日志系统]
    C --> D[初始化通知系统]
    D --> E[校验参数和配置]
    E --> F[初始化核心组件]
    F --> F1[本地存储 localStorage]
    F --> F2[抓取管理器 scrapeManager]
    F --> F3[远程存储 remoteStorage]
    F --> F4[规则管理器 ruleManager]
    F --> F5[Web 服务 webHandler]
    F --> F6[服务发现 discoveryManager]
    F --> F7[告警通知 notifierManager]
    F --> F8[Tracing 管理 tracingManager]
    F1 & F2 & F3 & F4 & F5 & F6 & F7 & F8 --> G[注册重载器 reloaders]
    G --> H[启动各组件 goroutine]
    H --> H1[Web 服务启动]
    H --> H2[抓取管理器启动]
    H --> H3[规则管理器启动]
    H --> H4[通知管理器启动]
    H --> H5[服务发现启动]
    H --> H6[Tracing 启动]
    H1 & H2 & H3 & H4 & H5 & H6 --> I[监听信号/优雅退出]
    I --> J[关闭各组件/资源释放]
    J --> K[日志输出:See you next time!]

详细流程

解析命令行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 构造 flagConfig 结构体,保存所有命令行参数和配置。
// cfg holds all command-line and config file options.
cfg := flagConfig{
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
},
web: web.Options{
Registerer: prometheus.DefaultRegisterer,
Gatherer: prometheus.DefaultGatherer,
},
promslogConfig: promslog.Config{},
}

// 创建 kingpin 应用对象,用于命令行参数解析。
// Create kingpin application for CLI parsing.
a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server").UsageWriter(os.Stdout)

初始化日志

1
2
3
4
// 初始化日志系统。
// Initialize logger.
logger := promslog.New(&cfg.promslogConfig)
slog.SetDefault(logger)

初始化通知系统

1
2
3
4
5
6
// 初始化通知系统。
// Initialize notifications system.
notifs := notifications.NewNotifications(cfg.maxNotificationsSubscribers, prometheus.DefaultRegisterer)
cfg.web.NotificationsSub = notifs.Sub
cfg.web.NotificationsGetter = notifs.Get
notifs.AddNotification(notifications.StartingUp

特性门控

1
2
3
4
5
6
// 根据 featureList 设置实验特性。
// Set feature flags from featureList.
if err := cfg.setFeatureListOptions(logger); err != nil {
fmt.Fprintf(os.Stderr, "Error parsing feature list: %s\n", err)
os.Exit(1)
}

配置文件校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 校验 agent/server 模式下参数合法性。
// Validate agent/server mode flags.
if agentMode && len(serverOnlyFlags) > 0 {
fmt.Fprintf(os.Stderr, "The following flag(s) can not be used in agent mode: %q", serverOnlyFlags)
os.Exit(3)
}

if !agentMode && len(agentOnlyFlags) > 0 {
fmt.Fprintf(os.Stderr, "The following flag(s) can only be used in agent mode: %q", agentOnlyFlags)
os.Exit(3)
}

if cfg.memlimitRatio <= 0.0 || cfg.memlimitRatio > 1.0 {
fmt.Fprintf(os.Stderr, "--auto-gomemlimit.ratio must be greater than 0 and less than or equal to 1.")
os.Exit(1)
}


// 检查配置文件有效性,提前报错避免后续组件启动。
// Throw error for invalid config before starting other components.
var cfgFile *config.Config
if cfgFile, err = config.LoadFile(cfg.configFile, agentMode, promslog.NewNopLogger()); err != nil {
absPath, pathErr := filepath.Abs(cfg.configFile)
if pathErr != nil {
absPath = cfg.configFile
}
logger.Error(fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "file", absPath, "err", err)
os.Exit(2)
}

非Agent模式参数设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 非 agent 模式下,设置 TSDB 保留策略和相关参数。
if !agentMode {
if cfg.tsdb.RetentionDuration == 0 && cfg.tsdb.MaxBytes == 0 {
cfg.tsdb.RetentionDuration = defaultRetentionDuration
logger.Info("No time or size retention was set so using the default time retention", "duration", defaultRetentionDuration)
}

// 检查保留时间溢出,最大 100 年。
if cfg.tsdb.RetentionDuration < 0 {
y, err := model.ParseDuration("100y")
if err != nil {
panic(err)
}
cfg.tsdb.RetentionDuration = y
logger.Warn("Time retention value is too high. Limiting to: " + y.String())
}

// 设置最大块大小。
if cfg.tsdb.MaxBlockDuration == 0 {
maxBlockDuration, err := model.ParseDuration("31d")
if err != nil {
panic(err)
}
if cfg.tsdb.RetentionDuration != 0 && cfg.tsdb.RetentionDuration/10 < maxBlockDuration {
maxBlockDuration = cfg.tsdb.RetentionDuration / 10
}
cfg.tsdb.MaxBlockDuration = maxBlockDuration
}

// 延迟压缩参数检查。
if cfg.tsdb.EnableDelayedCompaction && (cfg.tsdb.CompactionDelayMaxPercent > 100 || cfg.tsdb.CompactionDelayMaxPercent <= 0) {
logger.Warn("The --storage.tsdb.delayed-compaction.max-percent should have a value between 1 and 100. Using default", "default", tsdb.DefaultCompactionDelayMaxPercent)
cfg.tsdb.CompactionDelayMaxPercent = tsdb.DefaultCompactionDelayMaxPercent
}

cfg.tsdb.WALCompressionType = parseCompressionType(tsdbWALCompression, tsdbWALCompressionType)
} else {
cfg.agent.WALCompressionType = parseCompressionType(agentWALCompression, agentWALCompressionType)
}

初始化主要存储、抓取、远程存储等核心组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// 初始化主要存储、抓取、远程存储等核心组件。
// Initialize main storage, scrape manager, remote storage, etc.
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

var (
ctxWeb, cancelWeb = context.WithCancel(context.Background())
ctxRule = context.Background()

notifierManager = notifier.NewManager(&cfg.notifier, logger.With("component", "notifier"))

ctxScrape, cancelScrape = context.WithCancel(context.Background())
ctxNotify, cancelNotify = context.WithCancel(context.Background())
discoveryManagerScrape *discovery.Manager
discoveryManagerNotify *discovery.Manager
)

// 注册 Kubernetes SD 相关指标。
err = discovery.RegisterK8sClientMetricsWithPrometheus(prometheus.DefaultRegisterer)
if err != nil {
logger.Error("failed to register Kubernetes client metrics", "err", err)
os.Exit(1)
}

sdMetrics, err := discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
if err != nil {
logger.Error("failed to register service discovery metrics", "err", err)
os.Exit(1)
}

discoveryManagerScrape = discovery.NewManager(ctxScrape, logger.With("component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
if discoveryManagerScrape == nil {
logger.Error("failed to create a discovery manager scrape")
os.Exit(1)
}

discoveryManagerNotify = discovery.NewManager(ctxNotify, logger.With("component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("notify"))
if discoveryManagerNotify == nil {
logger.Error("failed to create a discovery manager notify")
os.Exit(1)
}

scrapeManager, err := scrape.NewManager(
&cfg.scrape,
logger.With("component", "scrape manager"),
logging.NewJSONFileLogger,
fanoutStorage,
prometheus.DefaultRegisterer,
)
if err != nil {
logger.Error("failed to create a scrape manager", "err", err)
os.Exit(1)
}

var (
tracingManager = tracing.NewManager(logger)

queryEngine *promql.Engine
ruleManager *rules.Manager
)


// 非 agent 模式下,初始化 PromQL 引擎和规则管理器。
if !agentMode {
opts := promql.EngineOpts{
Logger: logger.With("component", "query engine"),
Reg: prometheus.DefaultRegisterer,
MaxSamples: cfg.queryMaxSamples,
Timeout: time.Duration(cfg.queryTimeout),
ActiveQueryTracker: promql.NewActiveQueryTracker(localStoragePath, cfg.queryConcurrency, logger.With("component", "activeQueryTracker")),
LookbackDelta: time.Duration(cfg.lookbackDelta),
NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,
// EnableAtModifier and EnableNegativeOffset have to be
// always on for regular PromQL as of Prometheus v2.33.
EnableAtModifier: true,
EnableNegativeOffset: true,
EnablePerStepStats: cfg.enablePerStepStats,
EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval,
}

queryEngine = promql.NewEngine(opts)

ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: logger.With("component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
MaxConcurrentEvals: cfg.maxConcurrentEvals,
ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval,
DefaultRuleQueryOffset: func() time.Duration {
return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
},
})
}

webHandler初始化

1
2
// Web handler 依赖 ScrapeManager,需在其后初始化。
webHandler := web.New(logger.With("component", "web"), &cfg.web)