1- 下面是一份可以直接放进 alerting/service/receiver/README.md 的「落地实施计划」。它把 Prometheus → Alertmanager →(POST JSON)→ /receiver 的数据接收、解析校验、以及结构化插入 PostgreSQL 的每一步拆清楚,并按你的目录给出需要新建的文件与代码骨架。
2-
3- ⸻
4-
5- 🧭 端到端验证(Docker Postgres + 本服务)
1+ 🧭 端到端验证(Docker Postgres + Redis + 本服务)
62
73以下步骤演示从 Alertmanager Webhook 到数据库落库的完整链路验证:
84
@@ -14,6 +10,12 @@ docker run --name zeroops-pg \
1410 -p 5432:5432 -d postgres:16
1511```
1612
13+ 1b) 启动 Redis(Docker)
14+
15+ ``` bash
16+ docker run --name zeroops-redis -p 6379:6379 -d redis:7-alpine
17+ ```
18+
17192 ) 初始化告警相关表
1820运行集成测试(需 Postgres 实例与 ` -tags=integration ` )可验证插入成功:
1921``` bash
@@ -25,6 +27,7 @@ go test ./internal/alerting/service/receiver -tags=integration -run TestPgDAO_In
2527``` bash
2628export DB_HOST=localhost DB_PORT=5432 DB_USER=postgres DB_PASSWORD=postgres DB_NAME=zeroops DB_SSLMODE=disable
2729export ALERT_WEBHOOK_BASIC_USER=alert ALERT_WEBHOOK_BASIC_PASS=REDACTED
30+ export REDIS_ADDR=localhost:6379 REDIS_PASSWORD=" " REDIS_DB=0
2831nohup go run ./cmd/zeroops -- 1> /tmp/zeroops.out 2>&1 &
2932```
3033
@@ -50,7 +53,7 @@ curl -u alert:REDACTED -H 'Content-Type: application/json' \
5053}'
5154```
5255
53- 5 ) 在数据库中验证(应看到一行 Open/P1/InProcessing 且标题匹配的记录)
56+ 5 ) 在数据库中验证(应看到一行 Open/P1/Pending 且标题匹配的记录)
5457
5558``` bash
5659docker exec -i zeroops-pg psql -U postgres -d zeroops -c \
@@ -75,7 +78,7 @@ receiver/ — 从 Alertmanager Webhook 到 alert_issues 入库的实施计划
7578
7679目标:当 Alertmanager 向本服务发起 POST JSON 时,第一次创建告警记录并落表 alert_issues,字段规则:
7780 • state 默认 Open
78- • alertState 默认 InProcessing
81+ • alertState 默认 Pending
7982 • 其余字段按 webhook 请求体解析、校验后写入
8083
8184本计划仅覆盖「首次创建」逻辑;resolved(恢复)更新逻辑可在后续补充(例如切换 state=Closed、alertState=Restored)。
@@ -96,6 +99,7 @@ alerting/
9699 ├─ validator.go # 字段校验(必填/枚举/时间格式等)
97100 ├─ mapper.go # 映射:AM payload → alert_issues 行记录
98101 ├─ dao.go # DB 访问(Insert/Query/事务/重试)
102+ ├─ cache.go # Redis 客户端与写通缓存(Write-through)
99103 ├─ idempotency.go # 幂等键生成与“已处理”快速判断(应用层)
100104 └─ errors.go # 统一错误定义(参数错误/DB错误等)
101105
@@ -116,9 +120,10 @@ handler.go
116120
117121type Handler struct {
118122 dao * DAO
123+ cache * Cache // Redis 写通
119124}
120125
121- func NewHandler(dao * DAO) * Handler { return &Handler{dao: dao} }
126+ func NewHandler(dao * DAO, cache * Cache ) * Handler { return &Handler{dao: dao, cache: cache } }
122127
123128func (h * Handler) AlertmanagerWebhook(c * gin.Context) {
124129 var req AMWebhook // dto.go 中定义的 Alertmanager 请求体结构
@@ -154,11 +159,20 @@ func (h *Handler) AlertmanagerWebhook(c *gin.Context) {
154159 continue
155160 }
156161
157- // 4) 插入 DB(第一次创建强制 state=Open, alertState=InProcessing )
162+ // 4) 插入 DB(第一次创建强制 state=Open, alertState=Pending )
158163 if err := h.dao.InsertAlertIssue(c, row); err != nil {
159164 // 若唯一约束冲突/网络抖动等,记录后继续
160165 continue
161166 }
167+ // 5) 同步写入 service_states(health_state=Error;detail/resolved_at/correlation_id 留空)
168+ // service 从 labels.service 取;version 可从 labels.service_version 取(可空)
169+ if err := h.dao.UpsertServiceState(c, a.Labels["service"], a.Labels["service_version"], row.AlertSince, "Error"); err != nil {
170+ // 仅记录错误,不阻断主流程
171+ }
172+ // 6) 写通到 Redis(不阻塞主流程,失败仅记录日志)
173+ if err := h.cache.WriteIssue(c, row, a); err != nil {
174+ // 仅记录错误,避免影响 Alertmanager 重试逻辑
175+ }
162176 MarkSeen(key) // 记忆幂等键
163177 created++
164178 }
@@ -204,7 +218,7 @@ type AlertIssueRow struct {
204218 ID string // uuid
205219 State string // enum: Open/Closed (首次固定 Open)
206220 Level string // varchar(32): P0/P1/P2/Warning
207- AlertState string // enum: InProcessing/Restored/AutoRestored(首次固定 InProcessing )
221+ AlertState string // enum: Pending/ InProcessing/Restored/AutoRestored(首次固定 Pending )
208222 Title string // varchar(255)
209223 LabelJSON json.RawMessage // json: 标准化后的 [ {key,value}]
210224 AlertSince time.Time // timestamp: 用 StartsAt
@@ -291,7 +305,7 @@ func MapToAlertIssueRow(w *AMWebhook, a *AMAlert) (*AlertIssueRow, error) {
291305 return &AlertIssueRow{
292306 ID: uuid.NewString(),
293307 State: "Open",
294- AlertState: "InProcessing ",
308+ AlertState: "Pending ",
295309 Level: level,
296310 Title: title,
297311 LabelJSON: b,
@@ -329,7 +343,7 @@ type DAO struct{ DB *pgxpool.Pool }
329343func (d * DAO) InsertAlertIssue(ctx context.Context, r * AlertIssueRow) error {
330344 const q = `
331345 INSERT INTO alert_issues
332- (id, state, level, alertState , title, label, alertSince )
346+ (id, state, level, alert_state , title, labels, alert_since )
333347 VALUES
334348 ($1, $2, $3, $4, $5, $6, $7)
335349 `
@@ -345,13 +359,104 @@ func (d *DAO) InsertAlertIssue(ctx context.Context, r *AlertIssueRow) error {
345359
346360⸻
347361
348- ⑧ 成功/失败返回与日志
362+ ⑧ Redis 缓存写通(Write-through)与分布式幂等
363+
364+ 目标:在成功写入 PostgreSQL 后,将关键数据写入 Redis,既为前端查询提供加速缓存,也为后续定时任务提供快速读取能力;同时用 Redis 提供跨实例幂等控制。
365+
366+ 依赖:
367+
368+ ``` bash
369+ go get github.com/redis/go-redis/v9
370+ ```
371+
372+ 配置(环境变量):
373+
374+ ```
375+ REDIS_ADDR=localhost:6379
376+ REDIS_PASSWORD=""
377+ REDIS_DB=0
378+ ```
379+
380+ key 设计与 TTL:
381+
382+ - alert:issue:{id} → JSON(AlertIssueRow + 补充字段),TTL 3d
383+ - alert:idemp:{fingerprint}|{startsAtRFC3339Nano} → "1",TTL 10m(用于分布式幂等 SETNX)
384+ - alert:index: open → Set(issues...),无 TTL(恢复时再移除)
385+ - alert:index:svc:{service}: open → Set(issues...),无 TTL
386+
387+ cache.go(示例):
388+
389+ ``` go
390+ type Cache struct { R *redis.Client }
391+
392+ func NewCacheFromEnv () *Cache {
393+ db , _ := strconv.Atoi (os.Getenv (" REDIS_DB" ))
394+ c := redis.NewClient (&redis.Options {Addr: os.Getenv (" REDIS_ADDR" ), Password: os.Getenv (" REDIS_PASSWORD" ), DB: db})
395+ return &Cache{R: c}
396+ }
397+
398+ // 写通:issue 主键对象 + 索引集合
399+ func (c *Cache ) WriteIssue (ctx context .Context , r *AlertIssueRow , a AMAlert ) error {
400+ if c == nil || c.R == nil { return nil }
401+ key := " alert:issue:" + r.ID
402+ payload := map [string ]any{
403+ " id" : r.ID , " state" : r.State , " level" : r.Level , " alertState" : r.AlertState ,
404+ " title" : r.Title , " labels" : json.RawMessage (r.LabelJSON ), " alertSince" : r.AlertSince ,
405+ " fingerprint" : a.Fingerprint , " service" : a.Labels [" service" ], " alertname" : a.Labels [" alertname" ],
406+ }
407+ b , _ := json.Marshal (payload)
408+ svc := strings.TrimSpace (a.Labels [" service" ])
409+ pipe := c.R .Pipeline ()
410+ pipe.Set (ctx, key, b, 72 *time.Hour )
411+ pipe.SAdd (ctx, " alert:index:open" , r.ID )
412+ if svc != " " {
413+ pipe.SAdd (ctx, " alert:index:svc:" +svc+" :open" , r.ID )
414+ }
415+ _ , err := pipe.Exec (ctx)
416+ return err
417+ }
418+
419+ // 分布式幂等:SETNX + TTL
420+ func (c *Cache ) TryMarkIdempotent (ctx context .Context , a AMAlert ) (bool , error ) {
421+ if c == nil || c.R == nil { return true , nil }
422+ k := " alert:idemp:" + a.Fingerprint + " |" + a.StartsAt .UTC ().Format (time.RFC3339Nano )
423+ ok , err := c.R .SetNX (ctx, k, " 1" , 10 *time.Minute ).Result ()
424+ return ok, err
425+ }
426+ ```
427+
428+ 在 handler 中接入(伪码):
429+
430+ ``` go
431+ // 幂等短路(跨实例)
432+ if ok , _ := h.cache .TryMarkIdempotent (c, a); !ok {
433+ continue
434+ }
435+ // DB 成功后写通 Redis
436+ _ = h.cache .WriteIssue (c, row, a)
437+ ```
438+
439+ 失败处理:Redis 失败不影响 HTTP 主流程(Alertmanager 侧重试依赖 2xx),但需要日志打点与告警;后续可在定时任务做补偿(扫描最近 N 分钟的 DB 记录回填 Redis)。
440+
441+ 快速验证:
442+
443+ ``` bash
444+ # 触发一次 webhook 后在 Redis 查看
445+ redis-cli --raw keys ' alert:*'
446+ redis-cli --raw get alert:issue:< id>
447+ redis-cli --raw smembers alert:index:open | head -n 10
448+ redis-cli ttl alert:issue:< id>
449+ ```
450+
451+ ⸻
452+
453+ ⑨ 成功/失败返回与日志
349454 • 返回:统一 200 {"ok": true, "created": <n >},即使个别记录失败也快速返回,避免 Alertmanager 阻塞重试。
350455 • 日志:按 alertname/service/severity/fingerprint 打点;错误包含 SQLSTATE/堆栈;统计接收/解析/插入耗时分位。
351456
352457⸻
353458
354- ⑨ 最小联调(人工模拟)
459+ ⑩ 最小联调(人工模拟)
355460
356461firing 模拟:
357462
@@ -363,7 +468,13 @@ curl -X POST http://localhost:8080/v1/integrations/alertmanager/webhook \
363468 "alerts":[
364469 {
365470 "status":"firing",
366- "labels":{"alertname":"HighRequestLatency","service":"serviceA","severity":"P1","idc":"yzh"},
471+ "labels":{
472+ "alertname":"HighRequestLatency",
473+ "service":"serviceA",
474+ "severity":"P1",
475+ "idc":"yzh",
476+ "service_version": "v1.3.7"
477+ },
367478 "annotations":{"summary":"p95 latency over threshold","description":"apitime p95 > 450ms"},
368479 "startsAt":"2025-05-05T11:00:00Z",
369480 "endsAt":"0001-01-01T00:00:00Z",
@@ -378,10 +489,22 @@ curl -X POST http://localhost:8080/v1/integrations/alertmanager/webhook \
378489
379490入库后,alert_issues 里应看到:
380491 • state=Open
381- • alertState=InProcessing
492+ • alertState=Pending
382493 • level=P1
383494 • title="p95 latency over threshold"
384495 • label 中包含 am_fingerprint/generatorURL/groupKey/...
385496 • alertSince=2025-05-05 11:00:00+00
386497
498+ 同时,service_states 里应看到/更新(按 service+version):
499+ • service=serviceA
500+ • version=(若 labels 中有 service_version 则为其值,否则为空字符串)
501+ • report_at=与 alert_since 一致(若已存在则保留更早的 report_at)
502+ • health_state=Error
503+ • detail/resolved_at/correlation_id 为空
504+
505+ Redis 中应看到:
506+ • key: alert:issue:<id > 值为 JSON 且 TTL≈3 天
507+ • 集合 alert:index: open 中包含 <id >
508+ • 若有 service=serviceA,则 alert:index:svc:serviceA: open 包含 <id >
509+
387510⸻
0 commit comments