// FILE Path:prometheus/discovery/discovery type Discoverer interface { // Run hands a channel to the discovery provider (Consul, DNS, etc.) through which // it can send updated target groups. It must return when the context is canceled. // It should not close the update channel on returning. Run(ctx context.Context, up chan<- []*targetgroup.Group) }
所有的服务都会转换为
// File Path: prometheus/discovery/targetgroup/targetgroup.go type Group struct { // Targets is a list of targets identified by a label set. Each target is // uniquely identifiable in the group by its address label. // 目标服务地址,一般是URL Targets []model.LabelSet // Labels is a set of labels that is common across all targets in the group. // 标签信息 Labels model.LabelSet // Source is an identifier that describes a group of targets. // 路径信息 Source string }
// Some Discoverers(e.g. k8s) send only the updates for a given target group, // so we use map[tg.Source]*targetgroup.Group to know which group to update. // 发现的目标服务 targets map[poolKey]map[string]*targetgroup.Group targetsMtx sync.Mutex
// providers keeps track of SD providers. providers []*Provider // The sync channel sends the updates as a map where the key is the job value from the scrape config. // 将所发现的目标服务以chan的方式通知对方 syncCh chanmap[string][]*targetgroup.Group
// How long to wait before sending updates to the channel. The variable // should only be modified in unit tests. updatert time.Duration
// The triggerSend channel signals to the Manager that new updates have been received from providers. triggerSend chanstruct{}
// lastProvider counts providers registered during Manager's lifetime. lastProvider uint
// A registerer for all service discovery metrics. registerer prometheus.Registerer
var failedCount int for name, scfg := range cfg { // 注册实现逻辑 failedCount += m.registerProviders(scfg, name) } m.metrics.FailedConfigs.Set(float64(failedCount))
var ( wg sync.WaitGroup // keep shows if we keep any providers after reload. keep bool newProviders []*Provider ) for _, prov := range m.providers { // Cancel obsolete providers. iflen(prov.newSubs) == 0 { wg.Add(1) prov.done = func() { wg.Done() } prov.cancel() continue } newProviders = append(newProviders, prov) // refTargets keeps reference targets used to populate new subs' targets var refTargets map[string]*targetgroup.Group prov.mu.Lock()
m.targetsMtx.Lock() for s := range prov.subs { keep = true refTargets = m.targets[poolKey{s, prov.name}] // Remove obsolete subs' targets. if _, ok := prov.newSubs[s]; !ok { delete(m.targets, poolKey{s, prov.name}) m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, s) } } // Set metrics and targets for new subs. for s := range prov.newSubs { if _, ok := prov.subs[s]; !ok { m.metrics.DiscoveredTargets.WithLabelValues(s).Set(0) } if l := len(refTargets); l > 0 { m.targets[poolKey{s, prov.name}] = make(map[string]*targetgroup.Group, l) for k, v := range refTargets { m.targets[poolKey{s, prov.name}][k] = v } } } m.targetsMtx.Unlock()
prov.subs = prov.newSubs prov.newSubs = map[string]struct{}{} prov.mu.Unlock() if !prov.IsStarted() { m.startProvider(m.ctx, prov) } } // Currently downstream managers expect full target state upon config reload, so we must oblige. // While startProvider does pull the trigger, it may take some time to do so, therefore // we pull the trigger as soon as possible so that downstream managers can populate their state. // See https://github.com/prometheus/prometheus/pull/8639 for details. if keep { select { case m.triggerSend <- struct{}{}: default: } } m.providers = newProviders wg.Wait()
iflen(d.watchedServices) == 0 || len(d.watchedTags) != 0 { // We need to watch the catalog. ticker := time.NewTicker(d.refreshInterval)
// Watched services and their cancellation functions. services := make(map[string]func()) var lastIndex uint64
for { select { case <-ctx.Done(): ticker.Stop() return default: d.watchServices(ctx, ch, &lastIndex, services) <-ticker.C } } } else { // We only have fully defined services. for _, name := range d.watchedServices { d.watchService(ctx, ch, name) } <-ctx.Done() } }