diff --git a/cmd/pgwatch/main_integration_test.go b/cmd/pgwatch/main_integration_test.go index 016ec91d7..5ee28fcac 100644 --- a/cmd/pgwatch/main_integration_test.go +++ b/cmd/pgwatch/main_integration_test.go @@ -204,3 +204,270 @@ metrics: assert.GreaterOrEqual(t, count, len(specialNames), "expected metrics to be stored in test_metric table") }) } + +// TestMain_Integration_ConfigChanges tests that source configuration changes +// are properly applied to running gatherers +func TestMain_Integration_ConfigChanges(t *testing.T) { + tempDir := t.TempDir() + + pg, tearDown, err := testutil.SetupPostgresContainer() + require.NoError(t, err) + defer tearDown() + + connStr, err := pg.ConnectionString(testutil.TestContext, "sslmode=disable") + require.NoError(t, err) + + var gotExit int32 + Exit = func(code int) { gotExit = int32(code) } + defer func() { Exit = os.Exit }() + + metricsYaml := filepath.Join(tempDir, "metrics.yaml") + sourcesYaml := filepath.Join(tempDir, "sources.yaml") + + require.NoError(t, os.WriteFile(metricsYaml, []byte(` +metrics: + test_metric: + sqls: + 11: select (extract(epoch from now()) * 1e9)::int8 as epoch_ns, 1 as value +presets: + test_preset: + metrics: + test_metric: 1 +`), 0644)) + + require.NoError(t, os.WriteFile(sourcesYaml, []byte(` +- name: test_source + conn_str: `+connStr+` + kind: postgres + is_enabled: true + custom_tags: + environment: production + version: "1.0" + custom_metrics: + test_metric: 1 +`), 0644)) + + os.Args = []string{ + "pgwatch", + "--metrics", metricsYaml, + "--sources", sourcesYaml, + "--sink", connStr, + "--refresh=2", + "--web-disable", + } + + sinkConn, err := pgx.Connect(context.Background(), connStr) + require.NoError(t, err) + defer sinkConn.Close(context.Background()) + + go main() + + // Below tests are expected to run sequentially and depend on + // data generated by each other + + t.Run("Ensure tag changes are applied", func(t *testing.T) { + // Wait for some initial metrics to be written + time.Sleep(2 * time.Second) + + var tagData map[string]string + err = sinkConn.QueryRow(context.Background(), + `SELECT tag_data FROM test_metric + WHERE dbname = 'test_source' + ORDER BY time DESC LIMIT 1`).Scan(&tagData) + require.NoError(t, err) + assert.Equal(t, "production", tagData["environment"], "initial environment tag should be 'production'") + assert.Equal(t, "1.0", tagData["version"], "initial version tag should be '1.0'") + + // Update custom_tags + require.NoError(t, os.WriteFile(sourcesYaml, []byte(` +- name: test_source + conn_str: `+connStr+` + kind: postgres + is_enabled: true + custom_tags: + environment: staging + version: "2.0" + new_tag: added + custom_metrics: + test_metric: 1 +`), 0644)) + + // Wait for config reload and new metrics with updated tags + time.Sleep(3 * time.Second) + + err = sinkConn.QueryRow(context.Background(), + `SELECT tag_data FROM test_metric + WHERE dbname = 'test_source' + ORDER BY time DESC LIMIT 1`).Scan(&tagData) + require.NoError(t, err) + assert.Equal(t, "staging", tagData["environment"], "updated environment tag should be 'staging'") + assert.Equal(t, "2.0", tagData["version"], "updated version tag should be '2.0'") + assert.Equal(t, "added", tagData["new_tag"], "new_tag should be present") + }) + + t.Run("Ensure metric interval changes are applied", func(t *testing.T) { + // Get collection interval before change + var epochNsBefore []int64 + rows, err := sinkConn.Query(context.Background(), + `SELECT (data->>'epoch_ns')::bigint as epoch_ns + FROM test_metric + WHERE dbname = 'test_source' + ORDER BY time DESC LIMIT 2`) + require.NoError(t, err) + for rows.Next() { + var epochNs int64 + require.NoError(t, rows.Scan(&epochNs)) + epochNsBefore = append(epochNsBefore, epochNs) + } + rows.Close() + require.GreaterOrEqual(t, len(epochNsBefore), 2, "we need at least 2 measurements") + + // Calculate interval before change + intervalBefore := float64(epochNsBefore[0] - epochNsBefore[1]) / 1e9 + assert.InDelta(t, 1.0, intervalBefore, 0.5, "interval should be approximately 1 second") + + // Change interval to 2 seconds + require.NoError(t, os.WriteFile(sourcesYaml, []byte(` +- name: test_source + conn_str: `+connStr+` + kind: postgres + is_enabled: true + custom_metrics: + test_metric: 2 +`), 0644)) + + time.Sleep(5 * time.Second) + + // Get collection interval after change + var epochNsAfter []int64 + rows, err = sinkConn.Query(context.Background(), + `SELECT (data->>'epoch_ns')::bigint as epoch_ns + FROM test_metric + WHERE dbname = 'test_source' + ORDER BY time DESC LIMIT 2`) + require.NoError(t, err) + for rows.Next() { + var epochNs int64 + require.NoError(t, rows.Scan(&epochNs)) + epochNsAfter = append(epochNsAfter, epochNs) + } + rows.Close() + require.GreaterOrEqual(t, len(epochNsAfter), 2, "we need at least 2 measurements after interval change") + + // Calculate interval after change + intervalAfter := float64(epochNsAfter[0] - epochNsAfter[1]) / 1e9 + assert.InDelta(t, 2.0, intervalAfter, 0.5, "new interval should be approximately 2 seconds") + assert.Greater(t, intervalAfter, intervalBefore, "new interval should be greater than old interval") + }) + + t.Run("Ensure conn str changes are applied", func(t *testing.T) { + // Count rows before connection string change + var countBefore int + err = sinkConn.QueryRow(context.Background(), + `SELECT count(*) FROM test_metric WHERE dbname = 'test_source'`).Scan(&countBefore) + require.NoError(t, err) + require.Greater(t, countBefore, 0) + + // Change to invalid connection string + require.NoError(t, os.WriteFile(sourcesYaml, []byte(` +- name: test_source + conn_str: postgres://invalid:invalid@localhost:59999/nonexistent + kind: postgres + is_enabled: true + custom_metrics: + test_metric: 1 +`), 0644)) + + // Wait for config reload and failed metric fetches + time.Sleep(4 * time.Second) + + // Count rows after connection string change + var countAfter int + err = sinkConn.QueryRow(context.Background(), + `SELECT count(*) FROM public.test_metric WHERE dbname = 'test_source'`).Scan(&countAfter) + require.NoError(t, err) + + assert.LessOrEqual(t, countAfter - countBefore, 2) + }) + + t.Run("Ensure preset intervals updates are applied - issue #1091", func(t *testing.T) { + require.NoError(t, os.WriteFile(sourcesYaml, []byte(` +- name: test_source + conn_str: `+connStr+` + kind: postgres + is_enabled: true + custom_tags: + version: "1.0" + preset_metrics: test_preset +`), 0644)) + + // Wait for reload and some metrics collection + time.Sleep(4 * time.Second) + + var epochNsBefore []int64 + rows, err := sinkConn.Query(context.Background(), + `SELECT (data->>'epoch_ns')::bigint as epoch_ns + FROM public.test_metric + WHERE dbname = 'test_source' + ORDER BY time DESC LIMIT 2`) + require.NoError(t, err) + for rows.Next() { + var epochNs int64 + require.NoError(t, rows.Scan(&epochNs)) + epochNsBefore = append(epochNsBefore, epochNs) + } + rows.Close() + require.GreaterOrEqual(t, len(epochNsBefore), 2, "should have at least 2 measurements") + + // Calculate interval before change + intervalBefore := float64(epochNsBefore[0] - epochNsBefore[1]) / 1e9 + assert.InDelta(t, 1.0, intervalBefore, 0.5, "interval should be approximately 1 second") + + require.NoError(t, os.WriteFile(sourcesYaml, []byte(` +- name: test_source + conn_str: `+connStr+` + kind: postgres + is_enabled: true + custom_tags: + version: "2.0" # to force a reload - triggering the bug + preset_metrics: test_preset +`), 0644)) + + require.NoError(t, os.WriteFile(metricsYaml, []byte(` +metrics: + test_metric: + sqls: + 11: select (extract(epoch from now()) * 1e9)::int8 as epoch_ns, 1 as value +presets: + test_preset: + metrics: + test_metric: 2 +`), 0644)) + + // Wait for config reload and some metrics + time.Sleep(5 * time.Second) + + var epochNsAfter []int64 + rows, err = sinkConn.Query(context.Background(), + `SELECT (data->>'epoch_ns')::bigint as epoch_ns + FROM public.test_metric + WHERE dbname = 'test_source' + ORDER BY time DESC LIMIT 2`) + require.NoError(t, err) + for rows.Next() { + var epochNs int64 + require.NoError(t, rows.Scan(&epochNs)) + epochNsAfter = append(epochNsAfter, epochNs) + } + rows.Close() + require.GreaterOrEqual(t, len(epochNsAfter), 2, "should have at least 2 measurements") + + // Calculate interval after change + intervalAfter := float64(epochNsAfter[0] - epochNsAfter[1]) / 1e9 + assert.InDelta(t, 2.0, intervalAfter, 0.5, "interval should be approximately 2 seconds") + }) + + cancel() + <-mainCtx.Done() + assert.Equal(t, cmdopts.ExitCodeOK, gotExit) +} diff --git a/internal/reaper/database.go b/internal/reaper/database.go index 48bdfa8f1..08c6b8eae 100644 --- a/internal/reaper/database.go +++ b/internal/reaper/database.go @@ -527,7 +527,7 @@ func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn return } -func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange map[string]bool) { +func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool) { for _, prevDB := range r.prevLoopMonitoredDBs { if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil { // removed from config prevDB.Conn.Close() @@ -535,10 +535,10 @@ func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(shutDownDueToRoleChange ma } } - for roleChangedDB := range shutDownDueToRoleChange { - if db := r.monitoredSources.GetMonitoredDatabase(roleChangedDB); db != nil { + for toShutDownDB := range hostsToShutDown { + if db := r.monitoredSources.GetMonitoredDatabase(toShutDownDB); db != nil { db.Conn.Close() } - _ = r.SinksWriter.SyncMetric(roleChangedDB, "", sinks.DeleteOp) + _ = r.SinksWriter.SyncMetric(toShutDownDB, "", sinks.DeleteOp) } } diff --git a/internal/reaper/reaper.go b/internal/reaper/reaper.go index ee0b21eac..3b292787b 100644 --- a/internal/reaper/reaper.go +++ b/internal/reaper/reaper.go @@ -88,7 +88,7 @@ func (r *Reaper) Reap(ctx context.Context) { if r.Logging.LogLevel == "debug" { r.PrintMemStats() } - if err = r.LoadSources(); err != nil { + if err = r.LoadSources(ctx); err != nil { logger.WithError(err).Error("could not refresh active sources, using last valid cache") } if err = r.LoadMetrics(); err != nil { @@ -232,7 +232,7 @@ func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monit } -func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRoleChange map[string]bool) { +func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[string]bool) { logger := r.logger // loop over existing channels and stop workers if DB or metric removed from config // or state change makes it uninteresting @@ -241,13 +241,13 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRol var currentMetricConfig map[string]float64 var md *sources.SourceConn var dbRemovedFromConfig bool - singleMetricDisabled := false + var metricRemovedFromPreset bool splits := strings.Split(dbMetric, dbMetricJoinStr) db := splits[0] metric := splits[1] - _, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db] - if !wholeDbShutDownDueToRoleChange { + _, wholeDbShutDown := hostsToShutDown[db] + if !wholeDbShutDown { md = r.monitoredSources.GetMonitoredDatabase(db) if md == nil { // normal removing of DB from config dbRemovedFromConfig = true @@ -255,17 +255,22 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRol } } - if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) { // maybe some single metric was disabled + // Detects metrics removed from a preset definition. + // + // If not using presets, a metric removed from configs will + // be detected earlier by `LoadSources()` as configs change that + // triggers a restart and get passed in `hostsToShutDown`. + if !(wholeDbShutDown || dbRemovedFromConfig) { if md.IsInRecovery && len(md.MetricsStandby) > 0 { currentMetricConfig = md.MetricsStandby } else { currentMetricConfig = md.Metrics } interval, isMetricActive := currentMetricConfig[metric] - singleMetricDisabled = !isMetricActive || interval <= 0 + metricRemovedFromPreset = !isMetricActive || interval <= 0 } - if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled { + if ctx.Err() != nil || wholeDbShutDown || dbRemovedFromConfig || metricRemovedFromPreset { logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...") cancelFunc() delete(r.cancelFuncs, dbMetric) @@ -276,7 +281,7 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRol } // Destroy conn pools and metric writers - r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDownDueToRoleChange) + r.CloseResourcesForRemovedMonitoredDBs(hostsToShutDown) } func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) { @@ -379,7 +384,7 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceC } // LoadSources loads sources from the reader -func (r *Reaper) LoadSources() (err error) { +func (r *Reaper) LoadSources(ctx context.Context) (err error) { if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) { r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile) r.monitoredSources = make(sources.SourceConns, 0) @@ -408,6 +413,10 @@ func (r *Reaper) LoadSources() (err error) { newSrcs[i] = md continue } + // Source configs changed, stop all running gatherers to trigger a restart + // TODO: Optimize this for single metric addition/deletion/interval-change cases to not do a full restart + r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...") + r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true}) } r.monitoredSources = newSrcs r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed") diff --git a/internal/reaper/reaper_test.go b/internal/reaper/reaper_test.go index c92bc5fc2..e4fca566b 100644 --- a/internal/reaper/reaper_test.go +++ b/internal/reaper/reaper_test.go @@ -9,8 +9,10 @@ import ( "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts" "github.com/cybertec-postgresql/pgwatch/v5/internal/log" "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics" + "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks" "github.com/cybertec-postgresql/pgwatch/v5/internal/sources" "github.com/cybertec-postgresql/pgwatch/v5/internal/testutil" + "github.com/pashagolub/pgxmock/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -22,7 +24,7 @@ func TestReaper_LoadSources(t *testing.T) { pausefile := filepath.Join(t.TempDir(), "pausefile") require.NoError(t, os.WriteFile(pausefile, []byte("foo"), 0644)) r := NewReaper(ctx, &cmdopts.Options{Metrics: metrics.CmdOpts{EmergencyPauseTriggerfile: pausefile}}) - assert.NoError(t, r.LoadSources()) + assert.NoError(t, r.LoadSources(ctx)) assert.True(t, len(r.monitoredSources) == 0, "Expected no monitored sources when pause trigger file exists") }) @@ -33,7 +35,7 @@ func TestReaper_LoadSources(t *testing.T) { }, } r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader}) - assert.Error(t, r.LoadSources()) + assert.Error(t, r.LoadSources(ctx)) assert.Equal(t, 0, len(r.monitoredSources), "Expected no monitored sources after error") }) @@ -47,7 +49,7 @@ func TestReaper_LoadSources(t *testing.T) { } r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader}) - assert.NoError(t, r.LoadSources()) + assert.NoError(t, r.LoadSources(ctx)) assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored sources after successful load") assert.NotNil(t, r.monitoredSources.GetMonitoredDatabase(source1.Name)) assert.NotNil(t, r.monitoredSources.GetMonitoredDatabase(source2.Name)) @@ -63,11 +65,11 @@ func TestReaper_LoadSources(t *testing.T) { } r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader}) - assert.NoError(t, r.LoadSources()) + assert.NoError(t, r.LoadSources(ctx)) assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored sources after first load") // Load again with the same sources - assert.NoError(t, r.LoadSources()) + assert.NoError(t, r.LoadSources(ctx)) assert.Equal(t, 2, len(r.monitoredSources), "Expected still two monitored sources after second load") }) @@ -83,15 +85,248 @@ func TestReaper_LoadSources(t *testing.T) { } r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader, Sources: sources.CmdOpts{Groups: []string{"group1", "group2"}}}) - assert.NoError(t, r.LoadSources()) + assert.NoError(t, r.LoadSources(ctx)) assert.Equal(t, 4, len(r.monitoredSources), "Expected four monitored sources after load") r = NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader, Sources: sources.CmdOpts{Groups: []string{"group1"}}}) - assert.NoError(t, r.LoadSources()) + assert.NoError(t, r.LoadSources(ctx)) assert.Equal(t, 3, len(r.monitoredSources), "Expected three monitored sources after group filtering") r = NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader}) - assert.NoError(t, r.LoadSources()) + assert.NoError(t, r.LoadSources(ctx)) assert.Equal(t, 4, len(r.monitoredSources), "Expected four monitored sources after resetting groups") }) + + t.Run("Test source config changes trigger restart", func(t *testing.T) { + baseSource := sources.Source{ + Name: "TestSource", + IsEnabled: true, + Kind: sources.SourcePostgres, + ConnStr: "postgres://localhost:5432/testdb", + Metrics: map[string]float64{"cpu": 10, "memory": 20}, + MetricsStandby: map[string]float64{"cpu": 30}, + CustomTags: map[string]string{"env": "test"}, + Group: "default", + } + + testCases := []struct { + name string + modifySource func(s *sources.Source) + expectCancel bool + }{ + { + name: "custom tags change", + modifySource: func(s *sources.Source) { + s.CustomTags = map[string]string{"env": "production"} + }, + expectCancel: true, + }, + { + name: "custom tags add new tag", + modifySource: func(s *sources.Source) { + s.CustomTags = map[string]string{"env": "test", "region": "us-east"} + }, + expectCancel: true, + }, + { + name: "custom tags remove tag", + modifySource: func(s *sources.Source) { + s.CustomTags = map[string]string{} + }, + expectCancel: true, + }, + { + name: "preset metrics change", + modifySource: func(s *sources.Source) { + s.PresetMetrics = "exhaustive" + }, + expectCancel: true, + }, + { + name: "preset standby metrics change", + modifySource: func(s *sources.Source) { + s.PresetMetricsStandby = "standby-preset" + }, + expectCancel: true, + }, + { + name: "connection string change", + modifySource: func(s *sources.Source) { + s.ConnStr = "postgres://localhost:5433/newdb" + }, + expectCancel: true, + }, + { + name: "custom metrics change interval", + modifySource: func(s *sources.Source) { + s.Metrics = map[string]float64{"cpu": 15, "memory": 20} + }, + expectCancel: true, + }, + { + name: "custom metrics add new metric", + modifySource: func(s *sources.Source) { + s.Metrics = map[string]float64{"cpu": 10, "memory": 20, "disk": 30} + }, + expectCancel: true, + }, + { + name: "custom metrics remove metric", + modifySource: func(s *sources.Source) { + s.Metrics = map[string]float64{"cpu": 10} + }, + expectCancel: true, + }, + { + name: "standby metrics change", + modifySource: func(s *sources.Source) { + s.MetricsStandby = map[string]float64{"cpu": 60} + }, + expectCancel: true, + }, + { + name: "group change", + modifySource: func(s *sources.Source) { + s.Group = "new-group" + }, + expectCancel: true, + }, + { + name: "kind change", + modifySource: func(s *sources.Source) { + s.Kind = sources.SourcePgBouncer + }, + expectCancel: true, + }, + { + name: "only if master change", + modifySource: func(s *sources.Source) { + s.OnlyIfMaster = true + }, + expectCancel: true, + }, + { + name: "no change - same config", + modifySource: func(_ *sources.Source) { + // No modifications - source stays the same + }, + expectCancel: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + initialSource := *baseSource.Clone() + initialReader := &testutil.MockSourcesReaderWriter{ + GetSourcesFunc: func() (sources.Sources, error) { + return sources.Sources{initialSource}, nil + }, + } + + r := NewReaper(ctx, &cmdopts.Options{ + SourcesReaderWriter: initialReader, + SinksWriter: &sinks.MultiWriter{}, + }) + assert.NoError(t, r.LoadSources(ctx)) + assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after initial load") + + mockConn, err := pgxmock.NewPool() + require.NoError(t, err) + mockConn.ExpectClose() + r.monitoredSources[0].Conn = mockConn + + // Add a mock cancel function for a metric gatherer + cancelCalled := make(map[string]bool) + for metric := range initialSource.Metrics { + dbMetric := initialSource.Name + "¤¤¤" + metric + r.cancelFuncs[dbMetric] = func() { + cancelCalled[dbMetric] = true + } + } + + // Create modified source + modifiedSource := *baseSource.Clone() + tc.modifySource(&modifiedSource) + + modifiedReader := &testutil.MockSourcesReaderWriter{ + GetSourcesFunc: func() (sources.Sources, error) { + return sources.Sources{modifiedSource}, nil + }, + } + r.SourcesReaderWriter = modifiedReader + + // Reload sources + assert.NoError(t, r.LoadSources(ctx)) + assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after reload") + assert.Equal(t, modifiedSource, r.monitoredSources[0].Source) + + for metric := range initialSource.Metrics { + dbMetric := initialSource.Name + "¤¤¤" + metric + assert.Equal(t, tc.expectCancel, cancelCalled[dbMetric]) + if tc.expectCancel { + assert.Nil(t, mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met") + _, exists := r.cancelFuncs[dbMetric] + assert.False(t, exists, "Expected cancel func to be removed from map after cancellation") + } + } + }) + } + }) + + t.Run("Test only changed source cancelled in multi-source setup", func(t *testing.T) { + source1 := sources.Source{ + Name: "Source1", + IsEnabled: true, + Kind: sources.SourcePostgres, + ConnStr: "postgres://localhost:5432/db1", + Metrics: map[string]float64{"cpu": 10}, + } + source2 := sources.Source{ + Name: "Source2", + IsEnabled: true, + Kind: sources.SourcePostgres, + ConnStr: "postgres://localhost:5432/db2", + Metrics: map[string]float64{"memory": 20}, + } + + initialReader := &testutil.MockSourcesReaderWriter{ + GetSourcesFunc: func() (sources.Sources, error) { + return sources.Sources{source1, source2}, nil + }, + } + + r := NewReaper(ctx, &cmdopts.Options{ + SourcesReaderWriter: initialReader, + SinksWriter: &sinks.MultiWriter{}, + }) + assert.NoError(t, r.LoadSources(ctx)) + + // Set mock connections for both sources to avoid nil pointer on Close() + mockConn1, err := pgxmock.NewPool() + require.NoError(t, err) + mockConn1.ExpectClose() + r.monitoredSources[0].Conn = mockConn1 + + source1Cancelled := false + source2Cancelled := false + r.cancelFuncs[source1.Name+"¤¤¤"+"cpu"] = func() { source1Cancelled = true } + r.cancelFuncs[source2.Name+"¤¤¤"+"memory"] = func() { source2Cancelled = true } + + // Only modify source1 + modifiedSource1 := *source1.Clone() + modifiedSource1.ConnStr = "postgres://localhost:5433/db1_new" + + modifiedReader := &testutil.MockSourcesReaderWriter{ + GetSourcesFunc: func() (sources.Sources, error) { + return sources.Sources{modifiedSource1, source2}, nil + }, + } + r.SourcesReaderWriter = modifiedReader + + assert.NoError(t, r.LoadSources(ctx)) + + assert.True(t, source1Cancelled, "Source1 should be cancelled due to config change") + assert.False(t, source2Cancelled, "Source2 should NOT be cancelled as it was not modified") + assert.Nil(t, mockConn1.ExpectationsWereMet(), "Expected all mock expectations to be met") + }) }