Prometheus源码:服务发现

服务发现的定义

流程:

scrape发现服务进行统一管理,Prometheus对所支持的发 现服务都抽象出Discoverer接口,各scrape发现服务都必须实现该接 口并用于服务发现

Discover接口定义了Run()接口:

// FILE Path:prometheus/discovery/discovery
type Discoverer interface {
// Run hands a channel to the discovery provider (Consul, DNS, etc.) through which
// it can send updated target groups. It must return when the context is canceled.
// It should not close the update channel on returning.
Run(ctx context.Context, up chan<- []*targetgroup.Group)
}

所有的服务都会转换为

// File Path: prometheus/discovery/targetgroup/targetgroup.go
type Group struct {
// Targets is a list of targets identified by a label set. Each target is
// uniquely identifiable in the group by its address label.
// 目标服务地址,一般是URL
Targets []model.LabelSet
// Labels is a set of labels that is common across all targets in the group.
// 标签信息
Labels model.LabelSet
// Source is an identifier that describes a group of targets.
// 路径信息
Source string
}

启动manager

服务管理都在manager.go中启动

// File Path:prometheus/discovery/manager.go
func (m *Manager) startProvider(ctx context.Context, p *Provider) {
level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
ctx, cancel := context.WithCancel(ctx)
// 存储刷新新的服务
updates := make(chan []*targetgroup.Group)
// 添加取消discover方法
p.cancel = cancel
// 启动具体的发现服务
go p.d.Run(ctx, updates)
// 同步更新所发现的服务
go m.updater(ctx, p, updates)
}

manager的定义:

// File Path:prometheus/discovery/manager.go
type Manager struct {
// 系统日志记录
logger log.Logger
name string
httpOpts []config.HTTPClientOption
// 同步读写锁
mtx sync.RWMutex
// 协同控制
ctx context.Context

// Some Discoverers(e.g. k8s) send only the updates for a given target group,
// so we use map[tg.Source]*targetgroup.Group to know which group to update.
// 发现的目标服务
targets map[poolKey]map[string]*targetgroup.Group
targetsMtx sync.Mutex

// providers keeps track of SD providers.
providers []*Provider
// The sync channel sends the updates as a map where the key is the job value from the scrape config.
// 将所发现的目标服务以chan的方式通知对方
syncCh chan map[string][]*targetgroup.Group

// How long to wait before sending updates to the channel. The variable
// should only be modified in unit tests.
updatert time.Duration

// The triggerSend channel signals to the Manager that new updates have been received from providers.
triggerSend chan struct{}

// lastProvider counts providers registered during Manager's lifetime.
lastProvider uint

// A registerer for all service discovery metrics.
registerer prometheus.Registerer

metrics *Metrics
sdMetrics map[string]DiscovererMetrics
}

构建启动服务发现

Prometheus 在初始化过程中会构建 discoveryManagerScrape, 并通过调用ApplyConfig方法完成对Discoverer的构建:

// File Path:prometheus/discovery/manager.go
func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
m.mtx.Lock()
defer m.mtx.Unlock()

var failedCount int
for name, scfg := range cfg {
// 注册实现逻辑
failedCount += m.registerProviders(scfg, name)
}
m.metrics.FailedConfigs.Set(float64(failedCount))

var (
wg sync.WaitGroup
// keep shows if we keep any providers after reload.
keep bool
newProviders []*Provider
)
for _, prov := range m.providers {
// Cancel obsolete providers.
if len(prov.newSubs) == 0 {
wg.Add(1)
prov.done = func() {
wg.Done()
}
prov.cancel()
continue
}
newProviders = append(newProviders, prov)
// refTargets keeps reference targets used to populate new subs' targets
var refTargets map[string]*targetgroup.Group
prov.mu.Lock()

m.targetsMtx.Lock()
for s := range prov.subs {
keep = true
refTargets = m.targets[poolKey{s, prov.name}]
// Remove obsolete subs' targets.
if _, ok := prov.newSubs[s]; !ok {
delete(m.targets, poolKey{s, prov.name})
m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, s)
}
}
// Set metrics and targets for new subs.
for s := range prov.newSubs {
if _, ok := prov.subs[s]; !ok {
m.metrics.DiscoveredTargets.WithLabelValues(s).Set(0)
}
if l := len(refTargets); l > 0 {
m.targets[poolKey{s, prov.name}] = make(map[string]*targetgroup.Group, l)
for k, v := range refTargets {
m.targets[poolKey{s, prov.name}][k] = v
}
}
}
m.targetsMtx.Unlock()

prov.subs = prov.newSubs
prov.newSubs = map[string]struct{}{}
prov.mu.Unlock()
if !prov.IsStarted() {
m.startProvider(m.ctx, prov)
}
}
// Currently downstream managers expect full target state upon config reload, so we must oblige.
// While startProvider does pull the trigger, it may take some time to do so, therefore
// we pull the trigger as soon as possible so that downstream managers can populate their state.
// See https://github.com/prometheus/prometheus/pull/8639 for details.
if keep {
select {
case m.triggerSend <- struct{}{}:
default:
}
}
m.providers = newProviders
wg.Wait()

return nil
}

推送消息给discover服务者

// consul 实现
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
if d.finalizer != nil {
defer d.finalizer()
}
//
d.initialize(ctx)

if len(d.watchedServices) == 0 || len(d.watchedTags) != 0 {
// We need to watch the catalog.
ticker := time.NewTicker(d.refreshInterval)

// Watched services and their cancellation functions.
services := make(map[string]func())
var lastIndex uint64

for {
select {
case <-ctx.Done():
ticker.Stop()
return
default:
d.watchServices(ctx, ch, &lastIndex, services)
<-ticker.C
}
}
} else {
// We only have fully defined services.
for _, name := range d.watchedServices {
d.watchService(ctx, ch, name)
}
<-ctx.Done()
}
}