Skip to content

Commit 9b57efb

Browse files
committed
feat(Metrics&Alert): 补充写入service_states缓存
1 parent 166ce3e commit 9b57efb

File tree

3 files changed

+42
-0
lines changed

3 files changed

+42
-0
lines changed

internal/alerting/service/receiver/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,12 @@ func (h *Handler) AlertmanagerWebhook(c *gin.Context) {
170170
// 仅记录错误,不阻断主流程
171171
}
172172
// 6) 写通到 Redis(不阻塞主流程,失败仅记录日志)
173+
// alert_issues
173174
if err := h.cache.WriteIssue(c, row, a); err != nil {
174175
// 仅记录错误,避免影响 Alertmanager 重试逻辑
175176
}
177+
// service_states
178+
_ = h.cache.WriteServiceState(c, a.Labels["service"], a.Labels["service_version"], row.AlertSince, "Error")
176179
MarkSeen(key) // 记忆幂等键
177180
created++
178181
}
@@ -383,6 +386,10 @@ key 设计与 TTL:
383386
- alert:idemp:{fingerprint}|{startsAtRFC3339Nano} → "1",TTL 10m(用于分布式幂等 SETNX)
384387
- alert:index:open → Set(issues...),无 TTL(恢复时再移除)
385388
- alert:index:svc:{service}:open → Set(issues...),无 TTL
389+
// service_states 缓存
390+
- service_state:{service}:{version} → JSON(service/version/report_at/health_state),TTL 3d
391+
- service_state:index:service:{service} → Set(keys)
392+
- service_state:index:health:{health_state} → Set(keys)
386393

387394
cache.go(示例):
388395

@@ -446,6 +453,9 @@ redis-cli --raw keys 'alert:*'
446453
redis-cli --raw get alert:issue:<id>
447454
redis-cli --raw smembers alert:index:open | head -n 10
448455
redis-cli ttl alert:issue:<id>
456+
redis-cli --raw keys 'service_state:*'
457+
redis-cli --raw get service_state:serviceA:v1.3.7
458+
redis-cli --raw smembers service_state:index:health:Error
449459
```
450460

451461

internal/alerting/service/receiver/cache.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,17 @@ import (
1515
type AlertIssueCache interface {
1616
WriteIssue(ctx context.Context, r *AlertIssueRow, a AMAlert) error
1717
TryMarkIdempotent(ctx context.Context, a AMAlert) (bool, error)
18+
WriteServiceState(ctx context.Context, service, version string, reportAt time.Time, healthState string) error
1819
}
1920

2021
// NoopCache is a no-op implementation of AlertIssueCache.
2122
type NoopCache struct{}
2223

2324
func (NoopCache) WriteIssue(ctx context.Context, r *AlertIssueRow, a AMAlert) error { return nil }
2425
func (NoopCache) TryMarkIdempotent(ctx context.Context, a AMAlert) (bool, error) { return true, nil }
26+
func (NoopCache) WriteServiceState(ctx context.Context, service, version string, reportAt time.Time, healthState string) error {
27+
return nil
28+
}
2529

2630
// Cache implements AlertIssueCache using Redis.
2731
type Cache struct{ R *redis.Client }
@@ -79,3 +83,30 @@ func (c *Cache) TryMarkIdempotent(ctx context.Context, a AMAlert) (bool, error)
7983
ok, err := c.R.SetNX(ctx, k, "1", 10*time.Minute).Result()
8084
return ok, err
8185
}
86+
87+
// WriteServiceState writes the service state snapshot into Redis and maintains simple indices.
88+
func (c *Cache) WriteServiceState(ctx context.Context, service, version string, reportAt time.Time, healthState string) error {
89+
if c == nil || c.R == nil {
90+
return nil
91+
}
92+
s := strings.TrimSpace(service)
93+
v := strings.TrimSpace(version)
94+
key := "service_state:" + s + ":" + v
95+
payload := map[string]any{
96+
"service": s,
97+
"version": v,
98+
"report_at": reportAt,
99+
"health_state": healthState,
100+
}
101+
b, _ := json.Marshal(payload)
102+
pipe := c.R.Pipeline()
103+
pipe.Set(ctx, key, b, 72*time.Hour)
104+
if s != "" {
105+
pipe.SAdd(ctx, "service_state:index:service:"+s, key)
106+
}
107+
if healthState != "" {
108+
pipe.SAdd(ctx, "service_state:index:health:"+healthState, key)
109+
}
110+
_, err := pipe.Exec(ctx)
111+
return err
112+
}

internal/alerting/service/receiver/handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func (h *Handler) AlertmanagerWebhook(c *fox.Context) {
6666
version := strings.TrimSpace(a.Labels["service_version"]) // optional
6767
if service != "" {
6868
_ = w.UpsertServiceState(c.Request.Context(), service, version, row.AlertSince, "Error")
69+
_ = h.cache.WriteServiceState(c.Request.Context(), service, version, row.AlertSince, "Error")
6970
}
7071
}
7172
// Write-through to cache. Errors are ignored to avoid impacting webhook ack.

0 commit comments

Comments
 (0)