Skip to content

Commit fdecb96

Browse files
authored
Merge pull request coroot#755 from coroot/fix_752
metric cache: prevent updater from stopping on Prometheus errors
2 parents 11487f6 + 162b58f commit fdecb96

File tree

1 file changed

+105
-107
lines changed

1 file changed

+105
-107
lines changed

cache/updater.go

Lines changed: 105 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func (c *Cache) updater() {
4040
if project.Multicluster() {
4141
continue
4242
}
43+
ids[project.Id] = true
4344
promClient, err := c.getPromClient(project)
4445
if err != nil {
4546
klog.Warningln(err)
@@ -51,7 +52,6 @@ func (c *Cache) updater() {
5152
klog.Errorln(err)
5253
continue
5354
}
54-
ids[project.Id] = true
5555
_, ok := workers.Load(project.Id)
5656
workers.Store(project.Id, project)
5757
if !ok {
@@ -76,6 +76,108 @@ func (c *Cache) getPromClient(project *db.Project) (prom.Client, error) {
7676
return prom.NewClient(project.PrometheusConfig(c.globalPrometheus), project.ClickHouseConfig(c.globalClickHouse))
7777
}
7878

79+
func (c *Cache) projectUpdateIteration(project *db.Project, step timeseries.Duration) error {
80+
states, err := c.loadStates(project.Id)
81+
if err != nil {
82+
return fmt.Errorf("could not get query states: %w", err)
83+
84+
}
85+
checkConfigs, err := c.db.GetCheckConfigs(project.Id)
86+
if err != nil {
87+
return fmt.Errorf("could not get check configs: %w", err)
88+
}
89+
90+
queries := slices.Clone(constructor.QUERIES)
91+
for appId := range checkConfigs {
92+
availabilityCfg, _ := checkConfigs.GetAvailability(appId)
93+
if availabilityCfg.Custom {
94+
queries = append(queries, constructor.Q("", availabilityCfg.Total()), constructor.Q("", availabilityCfg.Failed()))
95+
}
96+
latencyCfg, _ := checkConfigs.GetLatency(appId, project.CalcApplicationCategory(appId))
97+
if latencyCfg.Custom {
98+
queries = append(queries, constructor.Q("", latencyCfg.Histogram(), "le"))
99+
}
100+
}
101+
102+
var recordingRules []constructor.Query
103+
for q := range constructor.RecordingRules {
104+
recordingRules = append(recordingRules, constructor.Q("", q))
105+
}
106+
107+
actualQueries := map[string]bool{}
108+
now := timeseries.Now()
109+
for _, q := range append(queries, recordingRules...) {
110+
actualQueries[q.Query] = true
111+
state := states[q.Query]
112+
if state == nil {
113+
state = &PrometheusQueryState{ProjectId: project.Id, Query: q.Query, LastTs: now.Add(-BackFillInterval)}
114+
if err := c.saveState(state); err != nil {
115+
return fmt.Errorf("failed to create query state: %w", err)
116+
}
117+
states[q.Query] = state
118+
}
119+
}
120+
for q, s := range states {
121+
if actualQueries[q] {
122+
continue
123+
}
124+
if err := c.deleteState(s); err != nil {
125+
klog.Warningln("failed to delete obsolete query state:", err)
126+
continue
127+
}
128+
}
129+
130+
promClient, err := c.getPromClient(project)
131+
if err != nil {
132+
return err
133+
}
134+
defer promClient.Close()
135+
si, err := getScrapeInterval(promClient)
136+
if err != nil {
137+
klog.Errorln(err)
138+
} else if si != step {
139+
step = si
140+
c.lock.Lock()
141+
if c.byProject[project.Id] == nil {
142+
c.lock.Unlock()
143+
return fmt.Errorf("unknown project: %s", project.Id)
144+
}
145+
c.byProject[project.Id].step = step
146+
c.lock.Unlock()
147+
}
148+
wg := sync.WaitGroup{}
149+
tasks := make(chan UpdateTask)
150+
to := now.Add(-step)
151+
for i := 0; i < QueryConcurrency; i++ {
152+
wg.Add(1)
153+
go func() {
154+
defer wg.Done()
155+
for task := range tasks {
156+
c.download(to, promClient, project.Id, step, task)
157+
}
158+
}()
159+
}
160+
for _, q := range queries {
161+
tasks <- UpdateTask{query: q, state: states[q.Query]}
162+
}
163+
close(tasks)
164+
wg.Wait()
165+
166+
cacheTo, err := c.getMinUpdateTimeWithoutRecordingRules(project.Id)
167+
if err != nil {
168+
return err
169+
}
170+
if cacheTo.IsZero() {
171+
return nil
172+
}
173+
c.processRecordingRules(cacheTo, project, step, states)
174+
select {
175+
case c.updates <- project.Id:
176+
default:
177+
}
178+
return nil
179+
}
180+
79181
func (c *Cache) updaterWorker(projects *sync.Map, projectId db.ProjectId, step timeseries.Duration) {
80182
c.lock.Lock()
81183
if projData := c.byProject[projectId]; projData == nil {
@@ -98,114 +200,10 @@ func (c *Cache) updaterWorker(projects *sync.Map, projectId db.ProjectId, step t
98200
klog.Infoln("stopping worker for project:", projectId)
99201
return
100202
}
101-
102203
project := p.(*db.Project)
103-
states, err := c.loadStates(projectId)
104-
if err != nil {
105-
klog.Errorln("could not get query states:", err)
106-
return
107-
}
108-
checkConfigs, err := c.db.GetCheckConfigs(projectId)
109-
if err != nil {
110-
klog.Errorln("could not get check configs:", err)
111-
return
112-
}
113-
114-
queries := slices.Clone(constructor.QUERIES)
115-
for appId := range checkConfigs {
116-
availabilityCfg, _ := checkConfigs.GetAvailability(appId)
117-
if availabilityCfg.Custom {
118-
queries = append(queries, constructor.Q("", availabilityCfg.Total()), constructor.Q("", availabilityCfg.Failed()))
119-
}
120-
latencyCfg, _ := checkConfigs.GetLatency(appId, project.CalcApplicationCategory(appId))
121-
if latencyCfg.Custom {
122-
queries = append(queries, constructor.Q("", latencyCfg.Histogram(), "le"))
123-
}
124-
}
125-
126-
var recordingRules []constructor.Query
127-
for q := range constructor.RecordingRules {
128-
recordingRules = append(recordingRules, constructor.Q("", q))
129-
}
130-
131-
actualQueries := map[string]bool{}
132-
now := timeseries.Now()
133-
for _, q := range append(queries, recordingRules...) {
134-
actualQueries[q.Query] = true
135-
state := states[q.Query]
136-
if state == nil {
137-
state = &PrometheusQueryState{ProjectId: projectId, Query: q.Query, LastTs: now.Add(-BackFillInterval)}
138-
if err := c.saveState(state); err != nil {
139-
klog.Errorln("failed to create query state:", err)
140-
return
141-
}
142-
states[q.Query] = state
143-
}
144-
}
145-
for q, s := range states {
146-
if actualQueries[q] {
147-
continue
148-
}
149-
if err := c.deleteState(s); err != nil {
150-
klog.Warningln("failed to delete obsolete query state:", err)
151-
continue
152-
}
204+
if err := c.projectUpdateIteration(project, step); err != nil {
205+
klog.Errorln(err)
153206
}
154-
155-
func() {
156-
promClient, err := c.getPromClient(project)
157-
if err != nil {
158-
return
159-
}
160-
defer promClient.Close()
161-
si, err := getScrapeInterval(promClient)
162-
if err != nil {
163-
klog.Errorln(err)
164-
} else if si != step {
165-
step = si
166-
c.lock.Lock()
167-
if c.byProject[projectId] == nil {
168-
c.lock.Unlock()
169-
klog.Warningln("unknown project:", projectId)
170-
return
171-
}
172-
c.byProject[projectId].step = step
173-
c.lock.Unlock()
174-
}
175-
wg := sync.WaitGroup{}
176-
tasks := make(chan UpdateTask)
177-
to := now.Add(-step)
178-
for i := 0; i < QueryConcurrency; i++ {
179-
wg.Add(1)
180-
go func() {
181-
defer wg.Done()
182-
for task := range tasks {
183-
c.download(to, promClient, project.Id, step, task)
184-
}
185-
}()
186-
}
187-
for _, q := range queries {
188-
tasks <- UpdateTask{query: q, state: states[q.Query]}
189-
}
190-
close(tasks)
191-
wg.Wait()
192-
193-
cacheTo, err := c.getMinUpdateTimeWithoutRecordingRules(project.Id)
194-
if err != nil {
195-
klog.Errorln(err)
196-
return
197-
}
198-
if cacheTo.IsZero() {
199-
return
200-
}
201-
c.processRecordingRules(cacheTo, project, step, states)
202-
203-
select {
204-
case c.updates <- project.Id:
205-
default:
206-
}
207-
}()
208-
209207
duration := time.Since(start)
210208
klog.Infof("%s: cache updated in %s", projectId, duration.Truncate(time.Millisecond))
211209
refreshInterval := step

0 commit comments

Comments
 (0)