Skip to content

Commit 7ae82d1

Browse files
committed
优化promethues与数据库操作顺序,改进错误处理,改进存储类型转换
1 parent af45248 commit 7ae82d1

File tree

4 files changed

+352
-65
lines changed

4 files changed

+352
-65
lines changed

docs/alerting/database-design.md

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,13 @@
7676

7777
### 4) alert_rules(告警规则表)
7878

79-
字段名 类型 说明
79+
| 字段名 | 类型 | 说明 |
8080
|--------|------|------|
81-
namevarchar(255)主键,告警规则名称
82-
descriptiontext可读标题,可拼接渲染为可读的 title
83-
exprtext左侧业务指标表达式,(通常对应 PromQL 左侧的聚合,如 sum(apitime) by (service, version))
84-
|op|varchar(4)阈值比较方式(枚举:>, <, =, !=)
85-
severityvarchar(32)告警等级,通常进入告警的 labels.severity
81+
| name | varchar(255) | 主键,告警规则名称 |
82+
| description | text | 可读标题,可拼接渲染为可读的 title |
83+
| expr | text | 左侧业务指标表达式,(通常对应 PromQL 左侧的聚合,如 sum(apitime) by (service, version)) |
84+
| op | varchar(4) | 阈值比较方式(枚举:>, <, =, !=) |
85+
| severity | varchar(32) | 告警等级,通常进入告警的 labels.severity |
8686

8787
**约束建议:**
8888
- CHECK 约束:`op IN ('>', '<', '=', '!=')`
@@ -91,11 +91,12 @@
9191

9292
### 5) alert_rule_metas(规则阈值元信息表)
9393

94-
字段名 类型 说明
95-
alert_name varchar(255) 关联 `alert_rules.name`
96-
labels jsonb 适用标签(示例:{"service":"s3","version":"v1"});为空 `{}` 表示全局
97-
threshold numeric 阈值(会被渲染成特定规则的 threshold metric 数值)
98-
watch_time interval 持续时长(映射 Prometheus rule 的 for:)
94+
| 字段名 | 类型 | 说明 |
95+
|--------|------|------|
96+
| alert_name | varchar(255) | 关联 `alert_rules.name` |
97+
| labels | jsonb | 适用标签(示例:{"service":"s3","version":"v1"});为空 `{}` 表示全局 |
98+
| threshold | numeric | 阈值(会被渲染成特定规则的 threshold metric 数值) |
99+
| watch_time | interval | 持续时长(映射 Prometheus rule 的 for:) |
99100

100101
**约束与索引建议:**
101102
- FOREIGN KEY: `(alert_name)` REFERENCES `alert_rules(name)` ON DELETE CASCADE

internal/alerting/service/ruleset/manager.go

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,35 @@ func (m *Manager) AddAlertRule(ctx context.Context, r *AlertRule) error {
3232
if r == nil || r.Name == "" {
3333
return fmt.Errorf("invalid rule")
3434
}
35+
// First ensure the rule is added to Prometheus successfully
36+
// This guarantees Prometheus has the correct data even if DB write fails
37+
if err := m.prom.AddToPrometheus(ctx, r); err != nil {
38+
return fmt.Errorf("failed to add rule to Prometheus: %w", err)
39+
}
40+
// Then persist to database
41+
// If this fails, the rule will still be in Prometheus, which is better than
42+
// having it in DB but not in Prometheus (which would cause missing alerts)
3543
if err := m.store.CreateRule(ctx, r); err != nil {
36-
return err
44+
return fmt.Errorf("failed to create rule in database: %w", err)
3745
}
38-
return m.prom.AddToPrometheus(ctx, r)
46+
return nil
3947
}
4048

4149
func (m *Manager) DeleteAlertRule(ctx context.Context, name string) error {
4250
if name == "" {
4351
return fmt.Errorf("invalid name")
4452
}
53+
// First remove from Prometheus to stop alerting immediately
54+
// This prevents false alerts if DB deletion fails
55+
if err := m.prom.DeleteFromPrometheus(ctx, name); err != nil {
56+
return fmt.Errorf("failed to delete rule from Prometheus: %w", err)
57+
}
58+
// Then remove from database
59+
// If this fails, the rule is already removed from Prometheus (no false alerts)
4560
if err := m.store.DeleteRule(ctx, name); err != nil {
46-
return err
61+
return fmt.Errorf("failed to delete rule from database: %w", err)
4762
}
48-
return m.prom.DeleteFromPrometheus(ctx, name)
63+
return nil
4964
}
5065

5166
func (m *Manager) AddToPrometheus(ctx context.Context, r *AlertRule) error {
@@ -66,33 +81,54 @@ func (m *Manager) UpsertRuleMetas(ctx context.Context, meta *AlertRuleMeta) erro
6681
if err := validateMeta(meta); err != nil {
6782
return err
6883
}
84+
85+
// First, get the old meta for change logging
86+
oldList, err := m.store.GetMetas(ctx, meta.AlertName, meta.Labels)
87+
if err != nil {
88+
return err
89+
}
90+
var old *AlertRuleMeta
91+
if len(oldList) > 0 {
92+
old = oldList[0]
93+
}
94+
95+
// Prepare change log parameters outside of transaction to minimize lock time
96+
var changeLog *ChangeLog
97+
if old != nil || meta != nil {
98+
changeLog = m.prepareChangeLog(old, meta)
99+
}
100+
101+
// First ensure the meta is synced to Prometheus successfully
102+
// This guarantees Prometheus has the correct threshold data even if DB write fails
103+
if err := m.prom.SyncMetaToPrometheus(ctx, meta); err != nil {
104+
return fmt.Errorf("failed to sync meta to Prometheus: %w", err)
105+
}
106+
107+
// Then persist to database within a transaction
108+
// If this fails, the meta will still be in Prometheus, which is better than
109+
// having it in DB but not in Prometheus (which would cause incorrect thresholds)
69110
return m.store.WithTx(ctx, func(tx Store) error {
70-
oldList, err := tx.GetMetas(ctx, meta.AlertName, meta.Labels)
71-
if err != nil {
72-
return err
73-
}
74-
var old *AlertRuleMeta
75-
if len(oldList) > 0 {
76-
old = oldList[0]
77-
}
78111
_, err = tx.UpsertMeta(ctx, meta)
79112
if err != nil {
80113
return err
81114
}
82-
if err := m.RecordMetaChangeLog(ctx, old, meta); err != nil {
83-
return err
84-
}
85-
if err := m.prom.SyncMetaToPrometheus(ctx, meta); err != nil {
86-
return err
115+
// Insert pre-prepared change log
116+
if changeLog != nil {
117+
if err := tx.InsertChangeLog(ctx, changeLog); err != nil {
118+
return err
119+
}
87120
}
88121
return nil
89122
})
90123
}
91124

92-
func (m *Manager) RecordMetaChangeLog(ctx context.Context, oldMeta, newMeta *AlertRuleMeta) error {
125+
// prepareChangeLog prepares change log parameters outside of transaction to minimize lock time
126+
func (m *Manager) prepareChangeLog(oldMeta, newMeta *AlertRuleMeta) *ChangeLog {
93127
if newMeta == nil {
94128
return nil
95129
}
130+
131+
// Prepare all parameters outside of transaction
96132
var oldTh, newTh *float64
97133
var oldW, newW *time.Duration
98134
if oldMeta != nil {
@@ -103,18 +139,31 @@ func (m *Manager) RecordMetaChangeLog(ctx context.Context, oldMeta, newMeta *Ale
103139
newTh = &newMeta.Threshold
104140
newW = &newMeta.WatchTime
105141
}
106-
log := &ChangeLog{
107-
ID: fmt.Sprintf("%s-%s-%d", newMeta.AlertName, CanonicalLabelKey(newMeta.Labels), time.Now().UnixNano()),
142+
143+
// Generate ID and timestamp outside of transaction
144+
now := time.Now()
145+
changeTime := now.UTC()
146+
id := fmt.Sprintf("%s-%s-%d", newMeta.AlertName, CanonicalLabelKey(newMeta.Labels), now.UnixNano())
147+
148+
return &ChangeLog{
149+
ID: id,
108150
AlertName: newMeta.AlertName,
109151
ChangeType: classifyChange(oldMeta, newMeta),
110152
Labels: newMeta.Labels,
111153
OldThreshold: oldTh,
112154
NewThreshold: newTh,
113155
OldWatch: oldW,
114156
NewWatch: newW,
115-
ChangeTime: time.Now().UTC(),
157+
ChangeTime: changeTime,
158+
}
159+
}
160+
161+
func (m *Manager) RecordMetaChangeLog(ctx context.Context, oldMeta, newMeta *AlertRuleMeta) error {
162+
changeLog := m.prepareChangeLog(oldMeta, newMeta)
163+
if changeLog == nil {
164+
return nil
116165
}
117-
return m.store.InsertChangeLog(ctx, log)
166+
return m.store.InsertChangeLog(ctx, changeLog)
118167
}
119168

120169
func classifyChange(oldMeta, newMeta *AlertRuleMeta) string {

internal/alerting/service/ruleset/store_pg.go

Lines changed: 81 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/jackc/pgx/v5/pgtype"
910
abd "github.com/qiniu/zeroops/internal/alerting/database"
1011
)
1112

1213
// PgStore is a PostgreSQL-backed Store implementation using the alerting database wrapper.
1314
// Note: The current database wrapper does not expose transactions; WithTx acts as a simple wrapper.
1415
// For production-grade atomicity, extend the database wrapper to support sql.Tx and wire it here.
16+
// This implementation uses pgx native types to avoid manual parsing of PostgreSQL interval types.
1517
type PgStore struct {
1618
DB *abd.Database
1719
}
@@ -76,16 +78,22 @@ func (s *PgStore) DeleteRule(ctx context.Context, name string) error {
7678
}
7779

7880
func (s *PgStore) UpsertMeta(ctx context.Context, m *AlertRuleMeta) (bool, error) {
79-
labelsJSON, _ := json.Marshal(m.Labels)
81+
labelsJSON, err := json.Marshal(m.Labels)
82+
if err != nil {
83+
return false, fmt.Errorf("marshal labels: %w", err)
84+
}
85+
86+
// Convert time.Duration to pgtype.Interval
87+
interval := durationToPgInterval(m.WatchTime)
88+
8089
const q = `
8190
INSERT INTO alert_rule_metas(alert_name, labels, threshold, watch_time)
8291
VALUES ($1, $2::jsonb, $3, $4)
8392
ON CONFLICT (alert_name, labels) DO UPDATE SET
8493
threshold=EXCLUDED.threshold,
85-
watch_time=EXCLUDED.watch_time,
86-
updated_at=now()
94+
watch_time=EXCLUDED.watch_time
8795
`
88-
_, err := s.DB.ExecContext(ctx, q, m.AlertName, string(labelsJSON), m.Threshold, m.WatchTime)
96+
_, err = s.DB.ExecContext(ctx, q, m.AlertName, string(labelsJSON), m.Threshold, interval)
8997
if err != nil {
9098
return false, fmt.Errorf("upsert meta: %w", err)
9199
}
@@ -94,7 +102,10 @@ func (s *PgStore) UpsertMeta(ctx context.Context, m *AlertRuleMeta) (bool, error
94102
}
95103

96104
func (s *PgStore) GetMetas(ctx context.Context, name string, labels LabelMap) ([]*AlertRuleMeta, error) {
97-
labelsJSON, _ := json.Marshal(labels)
105+
labelsJSON, err := json.Marshal(labels)
106+
if err != nil {
107+
return nil, fmt.Errorf("marshal labels for get: %w", err)
108+
}
98109
const q = `
99110
SELECT alert_name, labels, threshold, watch_time
100111
FROM alert_rule_metas
@@ -110,61 +121,99 @@ func (s *PgStore) GetMetas(ctx context.Context, name string, labels LabelMap) ([
110121
var alertName string
111122
var labelsRaw string
112123
var threshold float64
113-
var watch any
124+
var watch pgtype.Interval
114125
if err := rows.Scan(&alertName, &labelsRaw, &threshold, &watch); err != nil {
115126
return nil, fmt.Errorf("scan meta: %w", err)
116127
}
117128
lm := LabelMap{}
118-
_ = json.Unmarshal([]byte(labelsRaw), &lm)
129+
if err := json.Unmarshal([]byte(labelsRaw), &lm); err != nil {
130+
return nil, fmt.Errorf("unmarshal labels: %w", err)
131+
}
119132
meta := &AlertRuleMeta{AlertName: alertName, Labels: lm, Threshold: threshold}
120-
// best-effort: watch_time may come back as string or duration; we try string -> duration
121-
switch v := watch.(type) {
122-
case string:
123-
if d, err := timeParseDurationPG(v); err == nil {
124-
meta.WatchTime = d
125-
}
133+
134+
// Convert pgtype.Interval to time.Duration
135+
if duration, err := pgIntervalToDuration(watch); err == nil {
136+
meta.WatchTime = duration
126137
}
127138
res = append(res, meta)
128139
}
129140
return res, nil
130141
}
131142

132143
func (s *PgStore) DeleteMeta(ctx context.Context, name string, labels LabelMap) error {
133-
labelsJSON, _ := json.Marshal(labels)
144+
labelsJSON, err := json.Marshal(labels)
145+
if err != nil {
146+
return fmt.Errorf("marshal labels: %w", err)
147+
}
134148
const q = `DELETE FROM alert_rule_metas WHERE alert_name=$1 AND labels=$2::jsonb`
135-
_, err := s.DB.ExecContext(ctx, q, name, string(labelsJSON))
149+
_, err = s.DB.ExecContext(ctx, q, name, string(labelsJSON))
136150
if err != nil {
137151
return fmt.Errorf("delete meta: %w", err)
138152
}
139153
return nil
140154
}
141155

142156
func (s *PgStore) InsertChangeLog(ctx context.Context, log *ChangeLog) error {
143-
labelsJSON, _ := json.Marshal(log.Labels)
157+
labelsJSON, err := json.Marshal(log.Labels)
158+
if err != nil {
159+
return fmt.Errorf("marshal labels for changelog: %w", err)
160+
}
161+
162+
// Convert time.Duration to pgtype.Interval for old and new watch times
163+
var oldWatch, newWatch *pgtype.Interval
164+
if log.OldWatch != nil {
165+
interval := durationToPgInterval(*log.OldWatch)
166+
oldWatch = &interval
167+
}
168+
if log.NewWatch != nil {
169+
interval := durationToPgInterval(*log.NewWatch)
170+
newWatch = &interval
171+
}
172+
144173
const q = `
145174
INSERT INTO alert_meta_change_logs(id, alert_name, change_type, labels, old_threshold, new_threshold, old_watch, new_watch, change_time)
146175
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
147176
`
148-
_, err := s.DB.ExecContext(ctx, q, log.ID, log.AlertName, log.ChangeType, string(labelsJSON), log.OldThreshold, log.NewThreshold, log.OldWatch, log.NewWatch, log.ChangeTime)
177+
_, err = s.DB.ExecContext(ctx, q, log.ID, log.AlertName, log.ChangeType, string(labelsJSON), log.OldThreshold, log.NewThreshold, oldWatch, newWatch, log.ChangeTime)
149178
if err != nil {
150179
return fmt.Errorf("insert change log: %w", err)
151180
}
152181
return nil
153182
}
154183

155-
// timeParseDurationPG parses a small subset of PostgreSQL interval text output into time.Duration.
156-
// Supported examples: "01:02:03", "02:03", "3600 seconds". Best-effort only.
157-
func timeParseDurationPG(s string) (time.Duration, error) {
158-
// HH:MM:SS
159-
var h, m int
160-
var sec float64
161-
if n, _ := fmt.Sscanf(s, "%d:%d:%f", &h, &m, &sec); n >= 2 {
162-
d := time.Duration(h)*time.Hour + time.Duration(m)*time.Minute + time.Duration(sec*float64(time.Second))
163-
return d, nil
164-
}
165-
var seconds float64
166-
if n, _ := fmt.Sscanf(s, "%f seconds", &seconds); n == 1 {
167-
return time.Duration(seconds * float64(time.Second)), nil
168-
}
169-
return 0, fmt.Errorf("unsupported interval format: %s", s)
184+
// durationToPgInterval converts a time.Duration to pgtype.Interval.
185+
// Note: This conversion assumes the duration represents a fixed time period.
186+
// For durations that include months or years, this conversion may not be accurate.
187+
func durationToPgInterval(d time.Duration) pgtype.Interval {
188+
// Convert to total microseconds first
189+
totalMicroseconds := d.Microseconds()
190+
191+
// Calculate days and remaining microseconds
192+
days := totalMicroseconds / (24 * 60 * 60 * 1000000) // 24 hours * 60 minutes * 60 seconds * 1,000,000 microseconds
193+
remainingMicroseconds := totalMicroseconds % (24 * 60 * 60 * 1000000)
194+
195+
return pgtype.Interval{
196+
Microseconds: remainingMicroseconds,
197+
Days: int32(days),
198+
Months: 0, // Duration doesn't include months
199+
Valid: true,
200+
}
201+
}
202+
203+
// pgIntervalToDuration converts a pgtype.Interval to time.Duration.
204+
// This function returns an error if the interval contains months or years,
205+
// as these cannot be accurately converted to a fixed duration.
206+
func pgIntervalToDuration(interval pgtype.Interval) (time.Duration, error) {
207+
if !interval.Valid {
208+
return 0, fmt.Errorf("interval is not valid")
209+
}
210+
211+
// Check if the interval contains months or years
212+
if interval.Months != 0 {
213+
return 0, fmt.Errorf("cannot convert interval with months to duration: %d months", interval.Months)
214+
}
215+
216+
// Convert to duration
217+
totalMicroseconds := interval.Microseconds + int64(interval.Days)*24*60*60*1000000
218+
return time.Duration(totalMicroseconds) * time.Microsecond, nil
170219
}

0 commit comments

Comments
 (0)