Skip to content

Commit e8de600

Browse files
0xgoudapashagolub
andauthored
[-] fix source config update handling (#1106)
* Shut down old workers for a source on configs change Ensures all gatherers are re-runned with up-to-date configs * Add unit tests for gatherers restart on source configs change * Add integration tests for source config changes handling --------- Co-authored-by: Pavlo Golub <[email protected]>
1 parent 59c4b1f commit e8de600

File tree

4 files changed

+533
-22
lines changed

4 files changed

+533
-22
lines changed

cmd/pgwatch/main_integration_test.go

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,270 @@ metrics:
204204
assert.GreaterOrEqual(t, count, len(specialNames), "expected metrics to be stored in test_metric table")
205205
})
206206
}
207+
208+
// TestMain_Integration_ConfigChanges tests that source configuration changes
209+
// are properly applied to running gatherers
210+
func TestMain_Integration_ConfigChanges(t *testing.T) {
211+
tempDir := t.TempDir()
212+
213+
pg, tearDown, err := testutil.SetupPostgresContainer()
214+
require.NoError(t, err)
215+
defer tearDown()
216+
217+
connStr, err := pg.ConnectionString(testutil.TestContext, "sslmode=disable")
218+
require.NoError(t, err)
219+
220+
var gotExit int32
221+
Exit = func(code int) { gotExit = int32(code) }
222+
defer func() { Exit = os.Exit }()
223+
224+
metricsYaml := filepath.Join(tempDir, "metrics.yaml")
225+
sourcesYaml := filepath.Join(tempDir, "sources.yaml")
226+
227+
require.NoError(t, os.WriteFile(metricsYaml, []byte(`
228+
metrics:
229+
test_metric:
230+
sqls:
231+
11: select (extract(epoch from now()) * 1e9)::int8 as epoch_ns, 1 as value
232+
presets:
233+
test_preset:
234+
metrics:
235+
test_metric: 1
236+
`), 0644))
237+
238+
require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
239+
- name: test_source
240+
conn_str: `+connStr+`
241+
kind: postgres
242+
is_enabled: true
243+
custom_tags:
244+
environment: production
245+
version: "1.0"
246+
custom_metrics:
247+
test_metric: 1
248+
`), 0644))
249+
250+
os.Args = []string{
251+
"pgwatch",
252+
"--metrics", metricsYaml,
253+
"--sources", sourcesYaml,
254+
"--sink", connStr,
255+
"--refresh=2",
256+
"--web-disable",
257+
}
258+
259+
sinkConn, err := pgx.Connect(context.Background(), connStr)
260+
require.NoError(t, err)
261+
defer sinkConn.Close(context.Background())
262+
263+
go main()
264+
265+
// Below tests are expected to run sequentially and depend on
266+
// data generated by each other
267+
268+
t.Run("Ensure tag changes are applied", func(t *testing.T) {
269+
// Wait for some initial metrics to be written
270+
time.Sleep(2 * time.Second)
271+
272+
var tagData map[string]string
273+
err = sinkConn.QueryRow(context.Background(),
274+
`SELECT tag_data FROM test_metric
275+
WHERE dbname = 'test_source'
276+
ORDER BY time DESC LIMIT 1`).Scan(&tagData)
277+
require.NoError(t, err)
278+
assert.Equal(t, "production", tagData["environment"], "initial environment tag should be 'production'")
279+
assert.Equal(t, "1.0", tagData["version"], "initial version tag should be '1.0'")
280+
281+
// Update custom_tags
282+
require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
283+
- name: test_source
284+
conn_str: `+connStr+`
285+
kind: postgres
286+
is_enabled: true
287+
custom_tags:
288+
environment: staging
289+
version: "2.0"
290+
new_tag: added
291+
custom_metrics:
292+
test_metric: 1
293+
`), 0644))
294+
295+
// Wait for config reload and new metrics with updated tags
296+
time.Sleep(3 * time.Second)
297+
298+
err = sinkConn.QueryRow(context.Background(),
299+
`SELECT tag_data FROM test_metric
300+
WHERE dbname = 'test_source'
301+
ORDER BY time DESC LIMIT 1`).Scan(&tagData)
302+
require.NoError(t, err)
303+
assert.Equal(t, "staging", tagData["environment"], "updated environment tag should be 'staging'")
304+
assert.Equal(t, "2.0", tagData["version"], "updated version tag should be '2.0'")
305+
assert.Equal(t, "added", tagData["new_tag"], "new_tag should be present")
306+
})
307+
308+
t.Run("Ensure metric interval changes are applied", func(t *testing.T) {
309+
// Get collection interval before change
310+
var epochNsBefore []int64
311+
rows, err := sinkConn.Query(context.Background(),
312+
`SELECT (data->>'epoch_ns')::bigint as epoch_ns
313+
FROM test_metric
314+
WHERE dbname = 'test_source'
315+
ORDER BY time DESC LIMIT 2`)
316+
require.NoError(t, err)
317+
for rows.Next() {
318+
var epochNs int64
319+
require.NoError(t, rows.Scan(&epochNs))
320+
epochNsBefore = append(epochNsBefore, epochNs)
321+
}
322+
rows.Close()
323+
require.GreaterOrEqual(t, len(epochNsBefore), 2, "we need at least 2 measurements")
324+
325+
// Calculate interval before change
326+
intervalBefore := float64(epochNsBefore[0] - epochNsBefore[1]) / 1e9
327+
assert.InDelta(t, 1.0, intervalBefore, 0.5, "interval should be approximately 1 second")
328+
329+
// Change interval to 2 seconds
330+
require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
331+
- name: test_source
332+
conn_str: `+connStr+`
333+
kind: postgres
334+
is_enabled: true
335+
custom_metrics:
336+
test_metric: 2
337+
`), 0644))
338+
339+
time.Sleep(5 * time.Second)
340+
341+
// Get collection interval after change
342+
var epochNsAfter []int64
343+
rows, err = sinkConn.Query(context.Background(),
344+
`SELECT (data->>'epoch_ns')::bigint as epoch_ns
345+
FROM test_metric
346+
WHERE dbname = 'test_source'
347+
ORDER BY time DESC LIMIT 2`)
348+
require.NoError(t, err)
349+
for rows.Next() {
350+
var epochNs int64
351+
require.NoError(t, rows.Scan(&epochNs))
352+
epochNsAfter = append(epochNsAfter, epochNs)
353+
}
354+
rows.Close()
355+
require.GreaterOrEqual(t, len(epochNsAfter), 2, "we need at least 2 measurements after interval change")
356+
357+
// Calculate interval after change
358+
intervalAfter := float64(epochNsAfter[0] - epochNsAfter[1]) / 1e9
359+
assert.InDelta(t, 2.0, intervalAfter, 0.5, "new interval should be approximately 2 seconds")
360+
assert.Greater(t, intervalAfter, intervalBefore, "new interval should be greater than old interval")
361+
})
362+
363+
t.Run("Ensure conn str changes are applied", func(t *testing.T) {
364+
// Count rows before connection string change
365+
var countBefore int
366+
err = sinkConn.QueryRow(context.Background(),
367+
`SELECT count(*) FROM test_metric WHERE dbname = 'test_source'`).Scan(&countBefore)
368+
require.NoError(t, err)
369+
require.Greater(t, countBefore, 0)
370+
371+
// Change to invalid connection string
372+
require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
373+
- name: test_source
374+
conn_str: postgres://invalid:invalid@localhost:59999/nonexistent
375+
kind: postgres
376+
is_enabled: true
377+
custom_metrics:
378+
test_metric: 1
379+
`), 0644))
380+
381+
// Wait for config reload and failed metric fetches
382+
time.Sleep(4 * time.Second)
383+
384+
// Count rows after connection string change
385+
var countAfter int
386+
err = sinkConn.QueryRow(context.Background(),
387+
`SELECT count(*) FROM public.test_metric WHERE dbname = 'test_source'`).Scan(&countAfter)
388+
require.NoError(t, err)
389+
390+
assert.LessOrEqual(t, countAfter - countBefore, 2)
391+
})
392+
393+
t.Run("Ensure preset intervals updates are applied - issue #1091", func(t *testing.T) {
394+
require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
395+
- name: test_source
396+
conn_str: `+connStr+`
397+
kind: postgres
398+
is_enabled: true
399+
custom_tags:
400+
version: "1.0"
401+
preset_metrics: test_preset
402+
`), 0644))
403+
404+
// Wait for reload and some metrics collection
405+
time.Sleep(4 * time.Second)
406+
407+
var epochNsBefore []int64
408+
rows, err := sinkConn.Query(context.Background(),
409+
`SELECT (data->>'epoch_ns')::bigint as epoch_ns
410+
FROM public.test_metric
411+
WHERE dbname = 'test_source'
412+
ORDER BY time DESC LIMIT 2`)
413+
require.NoError(t, err)
414+
for rows.Next() {
415+
var epochNs int64
416+
require.NoError(t, rows.Scan(&epochNs))
417+
epochNsBefore = append(epochNsBefore, epochNs)
418+
}
419+
rows.Close()
420+
require.GreaterOrEqual(t, len(epochNsBefore), 2, "should have at least 2 measurements")
421+
422+
// Calculate interval before change
423+
intervalBefore := float64(epochNsBefore[0] - epochNsBefore[1]) / 1e9
424+
assert.InDelta(t, 1.0, intervalBefore, 0.5, "interval should be approximately 1 second")
425+
426+
require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
427+
- name: test_source
428+
conn_str: `+connStr+`
429+
kind: postgres
430+
is_enabled: true
431+
custom_tags:
432+
version: "2.0" # to force a reload - triggering the bug
433+
preset_metrics: test_preset
434+
`), 0644))
435+
436+
require.NoError(t, os.WriteFile(metricsYaml, []byte(`
437+
metrics:
438+
test_metric:
439+
sqls:
440+
11: select (extract(epoch from now()) * 1e9)::int8 as epoch_ns, 1 as value
441+
presets:
442+
test_preset:
443+
metrics:
444+
test_metric: 2
445+
`), 0644))
446+
447+
// Wait for config reload and some metrics
448+
time.Sleep(5 * time.Second)
449+
450+
var epochNsAfter []int64
451+
rows, err = sinkConn.Query(context.Background(),
452+
`SELECT (data->>'epoch_ns')::bigint as epoch_ns
453+
FROM public.test_metric
454+
WHERE dbname = 'test_source'
455+
ORDER BY time DESC LIMIT 2`)
456+
require.NoError(t, err)
457+
for rows.Next() {
458+
var epochNs int64
459+
require.NoError(t, rows.Scan(&epochNs))
460+
epochNsAfter = append(epochNsAfter, epochNs)
461+
}
462+
rows.Close()
463+
require.GreaterOrEqual(t, len(epochNsAfter), 2, "should have at least 2 measurements")
464+
465+
// Calculate interval after change
466+
intervalAfter := float64(epochNsAfter[0] - epochNsAfter[1]) / 1e9
467+
assert.InDelta(t, 2.0, intervalAfter, 0.5, "interval should be approximately 2 seconds")
468+
})
469+
470+
cancel()
471+
<-mainCtx.Done()
472+
assert.Equal(t, cmdopts.ExitCodeOK, gotExit)
473+
}

internal/reaper/database.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -527,18 +527,18 @@ func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn
527527
return
528528
}
529529

530-
func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) {
530+
func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool) {
531531
for _, prevDB := range r.prevLoopMonitoredDBs {
532532
if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil { // removed from config
533533
prevDB.Conn.Close()
534534
_ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
535535
}
536536
}
537537

538-
for roleChangedDB := range shutDownDueToRoleChange {
539-
if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil {
538+
for toShutDownDB := range hostsToShutDown {
539+
if db := r.monitoredSources.GetMonitoredDatabase(toShutDownDB); db != nil {
540540
db.Conn.Close()
541541
}
542-
_ = r.SinksWriter.SyncMetric(roleChangedDB, "", sinks.DeleteOp)
542+
_ = r.SinksWriter.SyncMetric(toShutDownDB, "", sinks.DeleteOp)
543543
}
544544
}

internal/reaper/reaper.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (r *Reaper) Reap(ctx context.Context) {
8888
if r.Logging.LogLevel == "debug" {
8989
r.PrintMemStats()
9090
}
91-
if err = r.LoadSources(); err != nil {
91+
if err = r.LoadSources(ctx); err != nil {
9292
logger.WithError(err).Error("could not refresh active sources, using last valid cache")
9393
}
9494
if err = r.LoadMetrics(); err != nil {
@@ -232,7 +232,7 @@ func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monit
232232

233233
}
234234

235-
func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRoleChange map[string]bool) {
235+
func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool) {
236236
logger := r.logger
237237
// loop over existing channels and stop workers if DB or metric removed from config
238238
// or state change makes it uninteresting
@@ -241,31 +241,36 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRol
241241
var currentMetricConfig map[string]float64
242242
var md *sources.SourceConn
243243
var dbRemovedFromConfig bool
244-
singleMetricDisabled := false
244+
var metricRemovedFromPreset bool
245245
splits := strings.Split(dbMetric, dbMetricJoinStr)
246246
db := splits[0]
247247
metric := splits[1]
248248

249-
_, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
250-
if !wholeDbShutDownDueToRoleChange {
249+
_, wholeDbShutDown := hostsToShutDown[db]
250+
if !wholeDbShutDown {
251251
md = r.monitoredSources.GetMonitoredDatabase(db)
252252
if md == nil { // normal removing of DB from config
253253
dbRemovedFromConfig = true
254254
logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
255255
}
256256
}
257257

258-
if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) { // maybe some single metric was disabled
258+
// Detects metrics removed from a preset definition.
259+
//
260+
// If not using presets, a metric removed from configs will
261+
// be detected earlier by `LoadSources()` as configs change that
262+
// triggers a restart and get passed in `hostsToShutDown`.
263+
if !(wholeDbShutDown || dbRemovedFromConfig) {
259264
if md.IsInRecovery && len(md.MetricsStandby) > 0 {
260265
currentMetricConfig = md.MetricsStandby
261266
} else {
262267
currentMetricConfig = md.Metrics
263268
}
264269
interval, isMetricActive := currentMetricConfig[metric]
265-
singleMetricDisabled = !isMetricActive || interval <= 0
270+
metricRemovedFromPreset = !isMetricActive || interval <= 0
266271
}
267272

268-
if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
273+
if ctx.Err() != nil || wholeDbShutDown || dbRemovedFromConfig || metricRemovedFromPreset {
269274
logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
270275
cancelFunc()
271276
delete(r.cancelFuncs, dbMetric)
@@ -276,7 +281,7 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRol
276281
}
277282

278283
// Destroy conn pools and metric writers
279-
r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDownDueToRoleChange)
284+
r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDown)
280285
}
281286

282287
func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
@@ -379,7 +384,7 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceC
379384
}
380385

381386
// LoadSources loads sources from the reader
382-
func (r *Reaper) LoadSources() (err error) {
387+
func (r *Reaper) LoadSources(ctx context.Context) (err error) {
383388
if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
384389
r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
385390
r.monitoredSources = make(sources.SourceConns, 0)
@@ -408,6 +413,10 @@ func (r *Reaper) LoadSources() (err error) {
408413
newSrcs[i] = md
409414
continue
410415
}
416+
// Source configs changed, stop all running gatherers to trigger a restart
417+
// TODO: Optimize this for single metric addition/deletion/interval-change cases to not do a full restart
418+
r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...")
419+
r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true})
411420
}
412421
r.monitoredSources = newSrcs
413422
r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")

0 commit comments

Comments
 (0)