Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 267 additions & 0 deletions cmd/pgwatch/main_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,270 @@ metrics:
assert.GreaterOrEqual(t, count, len(specialNames), "expected metrics to be stored in test_metric table")
})
}

// TestMain_Integration_ConfigChanges tests that source configuration changes
// are properly applied to running gatherers
func TestMain_Integration_ConfigChanges(t *testing.T) {
tempDir := t.TempDir()

pg, tearDown, err := testutil.SetupPostgresContainer()
require.NoError(t, err)
defer tearDown()

connStr, err := pg.ConnectionString(testutil.TestContext, "sslmode=disable")
require.NoError(t, err)

var gotExit int32
Exit = func(code int) { gotExit = int32(code) }
defer func() { Exit = os.Exit }()

metricsYaml := filepath.Join(tempDir, "metrics.yaml")
sourcesYaml := filepath.Join(tempDir, "sources.yaml")

require.NoError(t, os.WriteFile(metricsYaml, []byte(`
metrics:
test_metric:
sqls:
11: select (extract(epoch from now()) * 1e9)::int8 as epoch_ns, 1 as value
presets:
test_preset:
metrics:
test_metric: 1
`), 0644))

require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
- name: test_source
conn_str: `+connStr+`
kind: postgres
is_enabled: true
custom_tags:
environment: production
version: "1.0"
custom_metrics:
test_metric: 1
`), 0644))

os.Args = []string{
"pgwatch",
"--metrics", metricsYaml,
"--sources", sourcesYaml,
"--sink", connStr,
"--refresh=2",
"--web-disable",
}

sinkConn, err := pgx.Connect(context.Background(), connStr)
require.NoError(t, err)
defer sinkConn.Close(context.Background())

go main()

// Below tests are expected to run sequentially and depend on
// data generated by each other

t.Run("Ensure tag changes are applied", func(t *testing.T) {
// Wait for some initial metrics to be written
time.Sleep(2 * time.Second)

var tagData map[string]string
err = sinkConn.QueryRow(context.Background(),
`SELECT tag_data FROM test_metric
WHERE dbname = 'test_source'
ORDER BY time DESC LIMIT 1`).Scan(&tagData)
require.NoError(t, err)
assert.Equal(t, "production", tagData["environment"], "initial environment tag should be 'production'")
assert.Equal(t, "1.0", tagData["version"], "initial version tag should be '1.0'")

// Update custom_tags
require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
- name: test_source
conn_str: `+connStr+`
kind: postgres
is_enabled: true
custom_tags:
environment: staging
version: "2.0"
new_tag: added
custom_metrics:
test_metric: 1
`), 0644))

// Wait for config reload and new metrics with updated tags
time.Sleep(3 * time.Second)

err = sinkConn.QueryRow(context.Background(),
`SELECT tag_data FROM test_metric
WHERE dbname = 'test_source'
ORDER BY time DESC LIMIT 1`).Scan(&tagData)
require.NoError(t, err)
assert.Equal(t, "staging", tagData["environment"], "updated environment tag should be 'staging'")
assert.Equal(t, "2.0", tagData["version"], "updated version tag should be '2.0'")
assert.Equal(t, "added", tagData["new_tag"], "new_tag should be present")
})

t.Run("Ensure metric interval changes are applied", func(t *testing.T) {
// Get collection interval before change
var epochNsBefore []int64
rows, err := sinkConn.Query(context.Background(),
`SELECT (data->>'epoch_ns')::bigint as epoch_ns
FROM test_metric
WHERE dbname = 'test_source'
ORDER BY time DESC LIMIT 2`)
require.NoError(t, err)
for rows.Next() {
var epochNs int64
require.NoError(t, rows.Scan(&epochNs))
epochNsBefore = append(epochNsBefore, epochNs)
}
rows.Close()
require.GreaterOrEqual(t, len(epochNsBefore), 2, "we need at least 2 measurements")

// Calculate interval before change
intervalBefore := float64(epochNsBefore[0] - epochNsBefore[1]) / 1e9
assert.InDelta(t, 1.0, intervalBefore, 0.5, "interval should be approximately 1 second")

// Change interval to 2 seconds
require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
- name: test_source
conn_str: `+connStr+`
kind: postgres
is_enabled: true
custom_metrics:
test_metric: 2
`), 0644))

time.Sleep(5 * time.Second)

// Get collection interval after change
var epochNsAfter []int64
rows, err = sinkConn.Query(context.Background(),
`SELECT (data->>'epoch_ns')::bigint as epoch_ns
FROM test_metric
WHERE dbname = 'test_source'
ORDER BY time DESC LIMIT 2`)
require.NoError(t, err)
for rows.Next() {
var epochNs int64
require.NoError(t, rows.Scan(&epochNs))
epochNsAfter = append(epochNsAfter, epochNs)
}
rows.Close()
require.GreaterOrEqual(t, len(epochNsAfter), 2, "we need at least 2 measurements after interval change")

// Calculate interval after change
intervalAfter := float64(epochNsAfter[0] - epochNsAfter[1]) / 1e9
assert.InDelta(t, 2.0, intervalAfter, 0.5, "new interval should be approximately 2 seconds")
assert.Greater(t, intervalAfter, intervalBefore, "new interval should be greater than old interval")
})

t.Run("Ensure conn str changes are applied", func(t *testing.T) {
// Count rows before connection string change
var countBefore int
err = sinkConn.QueryRow(context.Background(),
`SELECT count(*) FROM test_metric WHERE dbname = 'test_source'`).Scan(&countBefore)
require.NoError(t, err)
require.Greater(t, countBefore, 0)

// Change to invalid connection string
require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
- name: test_source
conn_str: postgres://invalid:invalid@localhost:59999/nonexistent
kind: postgres
is_enabled: true
custom_metrics:
test_metric: 1
`), 0644))

// Wait for config reload and failed metric fetches
time.Sleep(4 * time.Second)

// Count rows after connection string change
var countAfter int
err = sinkConn.QueryRow(context.Background(),
`SELECT count(*) FROM public.test_metric WHERE dbname = 'test_source'`).Scan(&countAfter)
require.NoError(t, err)

assert.LessOrEqual(t, countAfter - countBefore, 2)
})

t.Run("Ensure preset intervals updates are applied - issue #1091", func(t *testing.T) {
require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
- name: test_source
conn_str: `+connStr+`
kind: postgres
is_enabled: true
custom_tags:
version: "1.0"
preset_metrics: test_preset
`), 0644))

// Wait for reload and some metrics collection
time.Sleep(4 * time.Second)

var epochNsBefore []int64
rows, err := sinkConn.Query(context.Background(),
`SELECT (data->>'epoch_ns')::bigint as epoch_ns
FROM public.test_metric
WHERE dbname = 'test_source'
ORDER BY time DESC LIMIT 2`)
require.NoError(t, err)
for rows.Next() {
var epochNs int64
require.NoError(t, rows.Scan(&epochNs))
epochNsBefore = append(epochNsBefore, epochNs)
}
rows.Close()
require.GreaterOrEqual(t, len(epochNsBefore), 2, "should have at least 2 measurements")

// Calculate interval before change
intervalBefore := float64(epochNsBefore[0] - epochNsBefore[1]) / 1e9
assert.InDelta(t, 1.0, intervalBefore, 0.5, "interval should be approximately 1 second")

require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
- name: test_source
conn_str: `+connStr+`
kind: postgres
is_enabled: true
custom_tags:
version: "2.0" # to force a reload - triggering the bug
preset_metrics: test_preset
`), 0644))

require.NoError(t, os.WriteFile(metricsYaml, []byte(`
metrics:
test_metric:
sqls:
11: select (extract(epoch from now()) * 1e9)::int8 as epoch_ns, 1 as value
presets:
test_preset:
metrics:
test_metric: 2
`), 0644))

// Wait for config reload and some metrics
time.Sleep(5 * time.Second)

var epochNsAfter []int64
rows, err = sinkConn.Query(context.Background(),
`SELECT (data->>'epoch_ns')::bigint as epoch_ns
FROM public.test_metric
WHERE dbname = 'test_source'
ORDER BY time DESC LIMIT 2`)
require.NoError(t, err)
for rows.Next() {
var epochNs int64
require.NoError(t, rows.Scan(&epochNs))
epochNsAfter = append(epochNsAfter, epochNs)
}
rows.Close()
require.GreaterOrEqual(t, len(epochNsAfter), 2, "should have at least 2 measurements")

// Calculate interval after change
intervalAfter := float64(epochNsAfter[0] - epochNsAfter[1]) / 1e9
assert.InDelta(t, 2.0, intervalAfter, 0.5, "interval should be approximately 2 seconds")
})

cancel()
<-mainCtx.Done()
assert.Equal(t, cmdopts.ExitCodeOK, gotExit)
}
8 changes: 4 additions & 4 deletions internal/reaper/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,18 +527,18 @@ func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn
return
}

func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) {
func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool) {
for _, prevDB := range r.prevLoopMonitoredDBs {
if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil { // removed from config
prevDB.Conn.Close()
_ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp)
}
}

for roleChangedDB := range shutDownDueToRoleChange {
if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil {
for toShutDownDB := range hostsToShutDown {
if db := r.monitoredSources.GetMonitoredDatabase(toShutDownDB); db != nil {
db.Conn.Close()
}
_ = r.SinksWriter.SyncMetric(roleChangedDB, "", sinks.DeleteOp)
_ = r.SinksWriter.SyncMetric(toShutDownDB, "", sinks.DeleteOp)
}
}
29 changes: 19 additions & 10 deletions internal/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (r *Reaper) Reap(ctx context.Context) {
if r.Logging.LogLevel == "debug" {
r.PrintMemStats()
}
if err = r.LoadSources(); err != nil {
if err = r.LoadSources(ctx); err != nil {
logger.WithError(err).Error("could not refresh active sources, using last valid cache")
}
if err = r.LoadMetrics(); err != nil {
Expand Down Expand Up @@ -232,7 +232,7 @@ func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monit

}

func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRoleChange map[string]bool) {
func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool) {
logger := r.logger
// loop over existing channels and stop workers if DB or metric removed from config
// or state change makes it uninteresting
Expand All @@ -241,31 +241,36 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRol
var currentMetricConfig map[string]float64
var md *sources.SourceConn
var dbRemovedFromConfig bool
singleMetricDisabled := false
var metricRemovedFromPreset bool
splits := strings.Split(dbMetric, dbMetricJoinStr)
db := splits[0]
metric := splits[1]

_, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
if !wholeDbShutDownDueToRoleChange {
_, wholeDbShutDown := hostsToShutDown[db]
if !wholeDbShutDown {
md = r.monitoredSources.GetMonitoredDatabase(db)
if md == nil { // normal removing of DB from config
dbRemovedFromConfig = true
logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
}
}

if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) { // maybe some single metric was disabled
// Detects metrics removed from a preset definition.
//
// If not using presets, a metric removed from configs will
// be detected earlier by `LoadSources()` as configs change that
// triggers a restart and get passed in `hostsToShutDown`.
if !(wholeDbShutDown || dbRemovedFromConfig) {
if md.IsInRecovery && len(md.MetricsStandby) > 0 {
currentMetricConfig = md.MetricsStandby
} else {
currentMetricConfig = md.Metrics
}
interval, isMetricActive := currentMetricConfig[metric]
singleMetricDisabled = !isMetricActive || interval <= 0
metricRemovedFromPreset = !isMetricActive || interval <= 0
}

if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
if ctx.Err() != nil || wholeDbShutDown || dbRemovedFromConfig || metricRemovedFromPreset {
logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
cancelFunc()
delete(r.cancelFuncs, dbMetric)
Expand All @@ -276,7 +281,7 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRol
}

// Destroy conn pools and metric writers
r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDownDueToRoleChange)
r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDown)
}

func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
Expand Down Expand Up @@ -379,7 +384,7 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceC
}

// LoadSources loads sources from the reader
func (r *Reaper) LoadSources() (err error) {
func (r *Reaper) LoadSources(ctx context.Context) (err error) {
if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
r.monitoredSources = make(sources.SourceConns, 0)
Expand Down Expand Up @@ -408,6 +413,10 @@ func (r *Reaper) LoadSources() (err error) {
newSrcs[i] = md
continue
}
// Source configs changed, stop all running gatherers to trigger a restart
// TODO: Optimize this for single metric addition/deletion/interval-change cases to not do a full restart
r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...")
r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true})
}
r.monitoredSources = newSrcs
r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
Expand Down
Loading
Loading