Alertmanager-主入口main.go

基于v0.28

关键模块说明

  1. API

    • 入口:api/v2/
    • 功能:接收Prometheus警报、提供管理接口
  2. AlertRouter

    • 路径:/dispatch

    • 处理流程:

      func (r *Route) Match(lset model.LabelSet) bool
  3. Inhibitor

    • 路径:/inhibit

    • 核心逻辑:

      func (ih *Inhibitor) Mutes(lset model.LabelSet) bool
  4. DispatchTree

    • 路径:/dispatch
    • 特性:多级路由树结构
  5. Notification Pipeline

    • 路径:/notify
    • 阶段:
      • GroupWait
      • GroupInterval
      • RepeatInterval
  6. Notifier

    • 集成:/notify/<type>

    • 实现示例:

      type EmailNotifier struct {
      conf *config.EmailConfig
      tmpl *template.Template
      }
  7. 持久化层

    • Silences:/silence
    • Notification Log:/nflog
    • 存储后端:LevelDB/BoltDB
  8. 集群通信

    • 协议:Gossip (memberlist)
    • 同步内容:静默状态、通知日志

核心步骤拆分

flowchart TD
A["启动 main()"] --> B["调用 run()"]
B --> C["解析命令行参数"]
C --> D["初始化日志 logger"]
D --> E["解析实验性功能开关"]
E --> F["自动设置 GOMEMLIMIT/GOMAXPROCS"]
F --> G["创建数据目录"]
G --> H["初始化集群 Peer (可选)"]
H --> I["初始化通知日志 nflog"]
I --> J["初始化静默 silences"]
J --> K["启动静默和通知日志维护协程"]
K --> L["加入集群 (可选)"]
L --> M["初始化内存告警存储 alerts"]
M --> N["初始化 API 服务"]
N --> O["计算外部访问 URL"]
O --> P["配置文件热加载与订阅"]
P --> Q["注册 UI、API 路由"]
Q --> R["启动 HTTP 服务"]
R --> S["信号监听,支持热加载和优雅退出"]

初始化flag

var (
configFile = kingpin.Flag("config.file", "Alertmanager configuration file name.").Default("alertmanager.yml").String()
dataDir = kingpin.Flag("storage.path", "Base path for data storage.").Default("data/").String()
retention = kingpin.Flag("data.retention", "How long to keep data for.").Default("120h").Duration()
maintenanceInterval = kingpin.Flag("data.maintenance-interval", "Interval between garbage collection and snapshotting to disk of the silences and the notification logs.").Default("15m").Duration()
maxSilences = kingpin.Flag("silences.max-silences", "Maximum number of silences, including expired silences. If negative or zero, no limit is set.").Default("0").Int()
maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int()
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()

webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093")
externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
routePrefix = kingpin.Flag("web.route-prefix", "Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url.").String()
getConcurrency = kingpin.Flag("web.get-concurrency", "Maximum number of GET requests processed concurrently. If negative or zero, the limit is GOMAXPROC or 8, whichever is larger.").Default("0").Int()
httpTimeout = kingpin.Flag("web.timeout", "Timeout for HTTP requests. If negative or zero, no timeout is set.").Default("0").Duration()

memlimitRatio = kingpin.Flag("auto-gomemlimit.ratio", "The ratio of reserved GOMEMLIMIT memory to the detected maximum container or system memory. The value must be greater than 0 and less than or equal to 1.").
Default("0.9").Float64()

clusterBindAddr = kingpin.Flag("cluster.listen-address", "Listen address for cluster. Set to empty string to disable HA mode.").
Default(defaultClusterAddr).String()
clusterAdvertiseAddr = kingpin.Flag("cluster.advertise-address", "Explicit address to advertise in cluster.").String()
peers = kingpin.Flag("cluster.peer", "Initial peers (may be repeated).").Strings()
peerTimeout = kingpin.Flag("cluster.peer-timeout", "Time to wait between peers to send notifications.").Default("15s").Duration()
gossipInterval = kingpin.Flag("cluster.gossip-interval", "Interval between sending gossip messages. By lowering this value (more frequent) gossip messages are propagated across the cluster more quickly at the expense of increased bandwidth.").Default(cluster.DefaultGossipInterval.String()).Duration()
pushPullInterval = kingpin.Flag("cluster.pushpull-interval", "Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage.").Default(cluster.DefaultPushPullInterval.String()).Duration()
tcpTimeout = kingpin.Flag("cluster.tcp-timeout", "Timeout for establishing a stream connection with a remote node for a full state sync, and for stream read and write operations.").Default(cluster.DefaultTCPTimeout.String()).Duration()
probeTimeout = kingpin.Flag("cluster.probe-timeout", "Timeout to wait for an ack from a probed node before assuming it is unhealthy. This should be set to 99-percentile of RTT (round-trip time) on your network.").Default(cluster.DefaultProbeTimeout.String()).Duration()
probeInterval = kingpin.Flag("cluster.probe-interval", "Interval between random node probes. Setting this lower (more frequent) will cause the cluster to detect failed nodes more quickly at the expense of increased bandwidth usage.").Default(cluster.DefaultProbeInterval.String()).Duration()
settleTimeout = kingpin.Flag("cluster.settle-timeout", "Maximum time to wait for cluster connections to settle before evaluating notifications.").Default(cluster.DefaultPushPullInterval.String()).Duration()
reconnectInterval = kingpin.Flag("cluster.reconnect-interval", "Interval between attempting to reconnect to lost peers.").Default(cluster.DefaultReconnectInterval.String()).Duration()
peerReconnectTimeout = kingpin.Flag("cluster.reconnect-timeout", "Length of time to attempt to reconnect to a lost peer.").Default(cluster.DefaultReconnectTimeout.String()).Duration()
tlsConfigFile = kingpin.Flag("cluster.tls-config", "[EXPERIMENTAL] Path to config yaml file that can enable mutual TLS within the gossip protocol.").Default("").String()
allowInsecureAdvertise = kingpin.Flag("cluster.allow-insecure-public-advertise-address-discovery", "[EXPERIMENTAL] Allow alertmanager to discover and listen on a public IP address.").Bool()
label = kingpin.Flag("cluster.label", "The cluster label is an optional string to include on each packet and stream. It uniquely identifies the cluster and prevents cross-communication issues when sending gossip messages.").Default("").String()
featureFlags = kingpin.Flag("enable-feature", fmt.Sprintf("Comma-separated experimental features to enable. Valid options: %s", strings.Join(featurecontrol.AllowedFlags, ", "))).Default("").String()
)

promslogflag.AddFlags(kingpin.CommandLine, &promslogConfig)
kingpin.CommandLine.UsageWriter(os.Stdout)

kingpin.Version(version.Print("alertmanager"))
kingpin.CommandLine.GetFlag("help").Short('h')
kingpin.Parse()
// 初始化Logger
logger := promslog.New(&promslogConfig)

logger.Info("Starting Alertmanager", "version", version.Info())
logger.Info("Build context", "build_context", version.BuildContext())

ff, err := featurecontrol.NewFlags(logger, *featureFlags)
if err != nil {
logger.Error("error parsing the feature flag list", "err", err)
return 1
}
compat.InitFromFlags(logger, ff)

容器内实验特性开启

  • 针对于容器环境下的参数设置,设置Go内存限制和EnableAutoGOMAXPROCS
// 解析实验性功能开关
ff, err := featurecontrol.NewFlags(logger, *featureFlags)
if err != nil {
logger.Error("error parsing the feature flag list", "err", err)
return 1
}
compat.InitFromFlags(logger, ff)

自动设置容器


// 自动设置GOMEMLIMIT
if ff.EnableAutoGOMEMLIMIT() {
if *memlimitRatio <= 0.0 || *memlimitRatio > 1.0 {
logger.Error("--auto-gomemlimit.ratio must be greater than 0 and less than or equal to 1.")
return 1
}

if _, err := memlimit.SetGoMemLimitWithOpts(
memlimit.WithRatio(*memlimitRatio),
memlimit.WithProvider(
memlimit.ApplyFallback(
memlimit.FromCgroup,
memlimit.FromSystem,
),
),
); err != nil {
logger.Warn("automemlimit", "msg", "Failed to set GOMEMLIMIT automatically", "err", err)
}
}

// 自动设置GOMAXPROCS
if ff.EnableAutoGOMAXPROCS() {
l := func(format string, a ...interface{}) {
logger.Info("automaxprocs", "msg", fmt.Sprintf(strings.TrimPrefix(format, "maxprocs: "), a...))
}
if _, err := maxprocs.Set(maxprocs.Logger(l)); err != nil {
logger.Warn("Failed to set GOMAXPROCS automatically", "err", err)
}
}

创建数据卷

// 创建数据目录
err = os.MkdirAll(*dataDir, 0o777)
if err != nil {
logger.Error("Unable to create data directory", "err", err)
return 1
}

HA模式(可选)

if *clusterBindAddr != "" {
// 创建集群Peer对象,启用HA模式
peer, err = cluster.Create(
logger.With("component", "cluster"),
prometheus.DefaultRegisterer,
*clusterBindAddr,
*clusterAdvertiseAddr,
*peers,
true,
*pushPullInterval,
*gossipInterval,
*tcpTimeout,
*probeTimeout,
*probeInterval,
tlsTransportConfig,
*allowInsecureAdvertise,
*label,
)
if err != nil {
logger.Error("unable to initialize gossip mesh", "err", err)
return 1
}
clusterEnabled.Set(1)
}

初始化通知日志(nflog)

// ----------------------
// 初始化通知日志(nflog)
// ----------------------
notificationLogOpts := nflog.Options{
SnapshotFile: filepath.Join(*dataDir, "nflog"),
Retention: *retention,
Logger: logger.With("component", "nflog"),
Metrics: prometheus.DefaultRegisterer,
}

初始化静默(silence)管理

// ----------------------
// 初始化静默(silence)管理
// ----------------------
silenceOpts := silence.Options{
SnapshotFile: filepath.Join(*dataDir, "silences"),
Retention: *retention,
Limits: silence.Limits{
MaxSilences: func() int { return *maxSilences },
MaxSilenceSizeBytes: func() int { return *maxSilenceSizeBytes },
},
Logger: logger.With("component", "silences"),
Metrics: prometheus.DefaultRegisterer,
}

silences, err := silence.New(silenceOpts)
if err != nil {
logger.Error("error creating silence", "err", err)
return 1
}
if peer != nil {
c := peer.AddState("sil", silences, prometheus.DefaultRegisterer)
silences.SetBroadcast(c.Broadcast)
}

启动静默和通知日志的定期维护协程

// 启动静默和通知日志的定期维护协程
wg.Add(1)
go func() {
silences.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "silences"), stopc, nil)
wg.Done()
}()

defer func() {
close(stopc)
wg.Wait()
}()

集群Peer状态监听注册完成后,正式加入集群

// ----------------------
// 集群Peer状态监听注册完成后,正式加入集群
// ----------------------
if peer != nil {
err = peer.Join(
*reconnectInterval,
*peerReconnectTimeout,
)
if err != nil {
logger.Warn("unable to join gossip mesh", "err", err)
}
ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout)
defer func() {
cancel()
if err := peer.Leave(10 * time.Second); err != nil {
logger.Warn("unable to leave gossip mesh", "err", err)
}
}()
go peer.Settle(ctx, *gossipInterval*10)
}

初始化内存告警存储

// ----------------------
// 初始化内存告警存储
// ----------------------
alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger, prometheus.DefaultRegisterer)
if err != nil {
logger.Error("error creating memory provider", "err", err)
return 1
}
defer alerts.Close()

groupFn 用于API分组查询

// groupFn 用于API分组查询
groupFn := func(routeFilter func(*dispatch.Route) bool, alertFilter func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) {
return disp.Groups(routeFilter, alertFilter)
}

初始化API服务

// ----------------------
// 初始化API服务
// ----------------------
// clusterPeer用于通知管道的集群同步
var clusterPeer cluster.ClusterPeer
if peer != nil {
clusterPeer = peer
}

api, err := api.New(api.Options{
Alerts: alerts,
Silences: silences,
AlertStatusFunc: marker.Status,
GroupMutedFunc: marker.Muted,
Peer: clusterPeer,
Timeout: *httpTimeout,
Concurrency: *getConcurrency,
Logger: logger.With("component", "api"),
Registry: prometheus.DefaultRegisterer,
GroupFunc: groupFn,
})
if err != nil {
logger.Error("failed to create API", "err", err)
return 1
}

计算外部访问URL

// ----------------------
// 计算外部访问URL
// ----------------------
amURL, err := extURL(logger, os.Hostname, (*webConfig.WebListenAddresses)[0], *externalURL)
if err != nil {
logger.Error("failed to determine external URL", "err", err)
return 1
}
logger.Debug("external url", "externalUrl", amURL.String())

通知管道

// ----------------------
// 通知管道相关函数
// ----------------------
waitFunc := func() time.Duration { return 0 }
if peer != nil {
waitFunc = clusterWait(peer, *peerTimeout)
}
timeoutFunc := func(d time.Duration) time.Duration {
if d < notify.MinTimeout {
d = notify.MinTimeout
}
return d + waitFunc()
}

var (
inhibitor *inhibit.Inhibitor
tmpl *template.Template
)

dispMetrics := dispatch.NewDispatcherMetrics(false, prometheus.DefaultRegisterer)
pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer, ff)
configLogger := logger.With("component", "configuration")

配置文件热加载与订阅,动态重建路由、通知管道、静默、抑制等

// ----------------------
// 配置文件热加载与订阅,动态重建路由、通知管道、静默、抑制等
// ----------------------
configCoordinator := config.NewCoordinator(
*configFile,
prometheus.DefaultRegisterer,
configLogger,
)
configCoordinator.Subscribe(func(conf *config.Config) error {
// 解析模板文件
tmpl, err = template.FromGlobs(conf.Templates)
if err != nil {
return fmt.Errorf("failed to parse templates: %w", err)
}
tmpl.ExternalURL = amURL

// 构建路由树,收集所有被引用的receiver
routes := dispatch.NewRoute(conf.Route, nil)
activeReceivers := make(map[string]struct{})
routes.Walk(func(r *dispatch.Route) {
activeReceivers[r.RouteOpts.Receiver] = struct{}{}
})

// 构建receiver到integration的映射
receivers := make(map[string][]notify.Integration, len(activeReceivers))
var integrationsNum int
for _, rcv := range conf.Receivers {
if _, found := activeReceivers[rcv.Name]; !found {
// 未被路由引用的receiver跳过
configLogger.Info("skipping creation of receiver not referenced by any route", "receiver", rcv.Name)
continue
}
integrations, err := receiver.BuildReceiverIntegrations(rcv, tmpl, logger)
if err != nil {
return err
}
// rcv.Name全局唯一
receivers[rcv.Name] = integrations
integrationsNum += len(integrations)
}

// 构建时间区间映射
timeIntervals := make(map[string][]timeinterval.TimeInterval, len(conf.MuteTimeIntervals)+len(conf.TimeIntervals))
for _, ti := range conf.MuteTimeIntervals {
timeIntervals[ti.Name] = ti.TimeIntervals
}

for _, ti := range conf.TimeIntervals {
timeIntervals[ti.Name] = ti.TimeIntervals
}

intervener := timeinterval.NewIntervener(timeIntervals)

// 停止旧的inhibitor和dispatcher
inhibitor.Stop()
disp.Stop()

// 新建抑制器和静默器
inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger)
silencer := silence.NewSilencer(silences, marker, logger)

// pipelinePeer用于集群通知同步
var pipelinePeer notify.Peer
if peer != nil {
pipelinePeer = peer
}

// 构建通知管道
pipeline := pipelineBuilder.New(
receivers,
waitFunc,
inhibitor,
silencer,
intervener,
marker,
notificationLog,
pipelinePeer,
)

configuredReceivers.Set(float64(len(activeReceivers)))
configuredIntegrations.Set(float64(integrationsNum))
configuredInhibitionRules.Set(float64(len(conf.InhibitRules)))

// 更新API配置
api.Update(conf, func(labels model.LabelSet) {
inhibitor.Mutes(labels)
silencer.Mutes(labels)
})

// 启动dispatcher和inhibitor主循环
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics)
routes.Walk(func(r *dispatch.Route) {
if r.RouteOpts.RepeatInterval > *retention {
configLogger.Warn(
"repeat_interval is greater than the data retention period. It can lead to notifications being repeated more often than expected.",
"repeat_interval",
r.RouteOpts.RepeatInterval,
"retention",
*retention,
"route",
r.Key(),
)
}

if r.RouteOpts.RepeatInterval < r.RouteOpts.GroupInterval {
configLogger.Warn(
"repeat_interval is less than group_interval. Notifications will not repeat until the next group_interval.",
"repeat_interval",
r.RouteOpts.RepeatInterval,
"group_interval",
r.RouteOpts.GroupInterval,
"route",
r.Key(),
)
}
})

go disp.Run()
go inhibitor.Run()

return nil
})

if err := configCoordinator.Reload(); err != nil {
return 1
}

路由前缀处理

// ----------------------
// 路由前缀处理
// ----------------------
if *routePrefix == "" {
*routePrefix = amURL.Path
}
*routePrefix = "/" + strings.Trim(*routePrefix, "/")
logger.Debug("route prefix", "routePrefix", *routePrefix)

注册UI、API路由

// ----------------------
// 注册UI、API路由
// ----------------------
router := route.New().WithInstrumentation(instrumentHandler)
if *routePrefix != "/" {
router.Get("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, *routePrefix, http.StatusFound)
})
router = router.WithPrefix(*routePrefix)
}

webReload := make(chan chan error)

ui.Register(router, webReload, logger)
reactapp.Register(router, logger)

mux := api.Register(router, *routePrefix)

srv := &http.Server{Handler: mux}
srvc := make(chan struct{})

启动HTTP服务

// ----------------------
// 启动HTTP服务
// ----------------------
go func() {
if err := web.ListenAndServe(srv, webConfig, logger); !errors.Is(err, http.ErrServerClosed) {
logger.Error("Listen error", "err", err)
close(srvc)
}
defer func() {
if err := srv.Close(); err != nil {
logger.Error("Error on closing the server", "err", err)
}
}()
}()

信号监听,支持热加载和优雅退出

// ----------------------
// 信号监听,支持热加载和优雅退出
// ----------------------
var (
hup = make(chan os.Signal, 1)
term = make(chan os.Signal, 1)
)
signal.Notify(hup, syscall.SIGHUP)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)

for {
select {
case <-hup:
// ignore error, already logged in `reload()`
_ = configCoordinator.Reload()
case errc := <-webReload:
errc <- configCoordinator.Reload()
case <-term:
logger.Info("Received SIGTERM, exiting gracefully...")
return 0
case <-srvc:
return 1
}
}