type DedupStage struct { rs ResolvedSender nflog NotificationLog recv *nflogpb.Receiver
now func() time.Time hash func(*types.Alert) uint64 }
func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage { return &DedupStage{ rs: rs, nflog: l, recv: recv, now: utcNow, hash: hashAlert, } }
func utcNow() time.Time { return time.Now().UTC() }
type hashBuffer struct { buf []byte }
var hashBuffers = sync.Pool{ New: func() interface{} { return &hashBuffer{buf: make([]byte, 0, 1024)} }, }
func hashAlert(a *types.Alert) uint64 { const sep = '\xff'
hb := hashBuffers.Get().(*hashBuffer) defer hashBuffers.Put(hb) b := hb.buf[:0]
names := make(model.LabelNames, 0, len(a.Labels)) for ln := range a.Labels { names = append(names, ln) } sort.Sort(names)
for _, ln := range names { b = append(b, string(ln)...) b = append(b, sep) b = append(b, string(a.Labels[ln])...) b = append(b, sep) }
return xxhash.Sum64(b) }
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool { if entry == nil { return len(firing) > 0 }
if !entry.IsFiringSubset(firing) { return true }
if len(firing) == 0 { return len(entry.FiringAlerts) > 0 }
if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) { return true }
return entry.Timestamp.Before(n.now().Add(-repeat)) }
func (n *DedupStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { gkey, ok := GroupKey(ctx) if !ok { return ctx, nil, errors.New("group key missing") }
repeatInterval, ok := RepeatInterval(ctx) if !ok { return ctx, nil, errors.New("repeat interval missing") }
firingSet := map[uint64]struct{}{} resolvedSet := map[uint64]struct{}{} firing := []uint64{} resolved := []uint64{}
for _, a := range alerts { hash := n.hash(a) if a.Resolved() { resolved = append(resolved, hash) resolvedSet[hash] = struct{}{} } else { firing = append(firing, hash) firingSet[hash] = struct{}{} } }
ctx = WithFiringAlerts(ctx, firing) ctx = WithResolvedAlerts(ctx, resolved)
entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv)) if err != nil && !errors.Is(err, nflog.ErrNotFound) { return ctx, nil, err }
var entry *nflogpb.Entry switch len(entries) { case 0: case 1: entry = entries[0] default: return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries)) }
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) { return ctx, alerts, nil } return ctx, nil, nil }
|