func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { groupLabels := getGroupLabels(alert, route)
fp := groupLabels.Fingerprint()
d.mtx.Lock() defer d.mtx.Unlock()
routeGroups, ok := d.aggrGroupsPerRoute[route] if !ok { routeGroups = map[model.Fingerprint]*aggrGroup{} d.aggrGroupsPerRoute[route] = routeGroups }
ag, ok := routeGroups[fp] if ok { ag.insert(alert) return }
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit { d.metrics.aggrGroupLimitReached.Inc() d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) return }
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) routeGroups[fp] = ag d.aggrGroupsNum++ d.metrics.aggrGroups.Inc()
ag.insert(alert)
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { _, _, err := d.stage.Exec(ctx, d.logger, alerts...) if err != nil { logger := d.logger.With("num_alerts", len(alerts), "err", err) if errors.Is(ctx.Err(), context.Canceled) { logger.Debug("Notify for alerts failed") } else { logger.Error("Notify for alerts failed") } } return err == nil }) }
func (ag *aggrGroup) insert(alert *types.Alert) { if err := ag.alerts.Set(alert); err != nil { ag.logger.Error("error on set alert", "err", err) }
ag.mtx.Lock() defer ag.mtx.Unlock() if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) { ag.next.Reset(0) } }
|