Prometheus源码:指标采集

基于release-3.0

指标采集的流程

  1. 构造ScrapeManager实例
  2. 加载配置
  3. 启动ScrapeManger实例

ScrapeManager负责维护scrapePool,并且管理scrape组件的生命周期。

ScrapeManager调用NewManager方法完成对ScrapeManager实例的创建:

// FILE PATH:scrape/manager.go
type Manager struct {
opts *Options
// 系统日志记录
logger log.Logger
// 指标存储器
append storage.Appendable
graceShut chan struct{}

offsetSeed uint64 // Global offsetSeed seed is used to spread scrape workload across HA setup.
// 同步访问控制锁(读写锁)
mtxScrape sync.Mutex // Guards the fields below.
// [job_name][scrape配置]
scrapeConfigs map[string]*config.ScrapeConfig
// [job_name][指标采集]
scrapePools map[string]*scrapePool
targetSets map[string][]*targetgroup.Group
buffers *pool.Pool

triggerReload chan struct{}

metrics *scrapeMetrics
}

指标的配置加载

根据Prometheus.yaml中的scrape_configs(ScrapeConfig)配置项,对scrape服务进行配置更新,调用ApplyConfg

// file path:config/config.go
type ScrapeConfig struct {
// The job name to which the job label is set by default.
// 作业名称
JobName string `yaml:"job_name"`
// Indicator whether the scraped metrics should remain unmodified.
// 添加label时若已经发现同名Label,是否保留原有标签不被覆盖
HonorLabels bool `yaml:"honor_labels,omitempty"`
// Indicator whether the scraped timestamps should be respected.
HonorTimestamps bool `yaml:"honor_timestamps"`
// Indicator whether to track the staleness of the scraped timestamps.
TrackTimestampsStaleness bool `yaml:"track_timestamps_staleness"`
// A set of query parameters with which the target is scraped.
// 指标采集URL参数
Params url.Values `yaml:"params,omitempty"`
// How frequently to scrape the targets of this scrape config.
// 指标采集周期
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
// The timeout for scraping targets of this config.
// 指标采集超时时间
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"`
// The protocols to negotiate during a scrape. It tells clients what
// protocol are accepted by Prometheus and with what preference (most wanted is first).
// Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1,
// OpenMetricsText1.0.0, PrometheusText0.0.4.
ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"`
// Whether to scrape a classic histogram that is also exposed as a native histogram.
ScrapeClassicHistograms bool `yaml:"scrape_classic_histograms,omitempty"`
// The HTTP resource path on which to fetch metrics from targets.
// 指标采集路径
MetricsPath string `yaml:"metrics_path,omitempty"`
// The URL scheme with which to fetch metrics from targets.
// 指标采集协议
Scheme string `yaml:"scheme,omitempty"`
// Indicator whether to request compressed response from the target.
EnableCompression bool `yaml:"enable_compression"`
// An uncompressed response body larger than this many bytes will cause the
// scrape to fail. 0 means no limit.
BodySizeLimit units.Base2Bytes `yaml:"body_size_limit,omitempty"`
// More than this many samples post metric-relabeling will cause the scrape to
// fail. 0 means no limit.
// 每批次采集的指标个数上线
// 如果指标重置之后label,超出限制,依旧抓取失败
// 0表示不限制
SampleLimit uint `yaml:"sample_limit,omitempty"`
// More than this many targets after the target relabeling will cause the
// scrapes to fail. 0 means no limit.
TargetLimit uint `yaml:"target_limit,omitempty"`
// More than this many labels post metric-relabeling will cause the scrape to
// fail. 0 means no limit.
LabelLimit uint `yaml:"label_limit,omitempty"`
// More than this label name length post metric-relabeling will cause the
// scrape to fail. 0 means no limit.
LabelNameLengthLimit uint `yaml:"label_name_length_limit,omitempty"`
// More than this label value length post metric-relabeling will cause the
// scrape to fail. 0 means no limit.
LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"`
// If there are more than this many buckets in a native histogram,
// buckets will be merged to stay within the limit.
NativeHistogramBucketLimit uint `yaml:"native_histogram_bucket_limit,omitempty"`
// If the growth factor of one bucket to the next is smaller than this,
// buckets will be merged to increase the factor sufficiently.
NativeHistogramMinBucketFactor float64 `yaml:"native_histogram_min_bucket_factor,omitempty"`
// Keep no more than this many dropped targets per job.
// 0 means no limit.
KeepDroppedTargets uint `yaml:"keep_dropped_targets,omitempty"`
// Allow UTF8 Metric and Label Names.
MetricNameValidationScheme string `yaml:"metric_name_validation_scheme,omitempty"`

// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
// 发现服务配置
ServiceDiscoveryConfigs discovery.Configs `yaml:"-"`
// HTTP客户端的参数设置
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`

// List of target relabel configurations.
// 目标重置label规则
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
// List of metric relabel configurations.
// 指标重置label规则
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"`
}

// 加载配置文件
func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()

scfgs, err := cfg.GetScrapeConfigs()
if err != nil {
return err
}

c := make(map[string]*config.ScrapeConfig)
for _, scfg := range scfgs {
c[scfg.JobName] = scfg
}
m.scrapeConfigs = c

if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
}

// Cleanup and reload pool if the configuration has changed.
var failed bool
for name, sp := range m.scrapePools {
switch cfg, ok := m.scrapeConfigs[name]; {
case !ok:
sp.stop()
delete(m.scrapePools, name)
case !reflect.DeepEqual(sp.config, cfg):
err := sp.reload(cfg)
if err != nil {
level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
failed = true
}
}
}

if failed {
return errors.New("failed to apply the new configuration")
}
return nil
}

重新加载配置

// File Path: scrape/scrape.go
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.metrics.targetScrapePoolReloads.Inc()
start := time.Now()

client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, sp.httpOpts...)
if err != nil {
sp.metrics.targetScrapePoolReloadsFailed.Inc()
return fmt.Errorf("error creating HTTP client: %w", err)
}

reuseCache := reusableCache(sp.config, cfg)
sp.config = cfg
oldClient := sp.client
sp.client = client

sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))

sp.restartLoops(reuseCache)
oldClient.CloseIdleConnections()
sp.metrics.targetReloadIntervalLength.WithLabelValues(time.Duration(sp.config.ScrapeInterval).String()).Observe(
time.Since(start).Seconds(),
)
return nil
}