@@ -9,8 +9,10 @@ import (
99 "github.com/cybertec-postgresql/pgwatch/v3/internal/cmdopts"
1010 "github.com/cybertec-postgresql/pgwatch/v3/internal/log"
1111 "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
12+ "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
1213 "github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
1314 "github.com/cybertec-postgresql/pgwatch/v3/internal/testutil"
15+ "github.com/pashagolub/pgxmock/v4"
1416 "github.com/stretchr/testify/assert"
1517 "github.com/stretchr/testify/require"
1618)
@@ -22,7 +24,7 @@ func TestReaper_LoadSources(t *testing.T) {
2224 pausefile := filepath .Join (t .TempDir (), "pausefile" )
2325 require .NoError (t , os .WriteFile (pausefile , []byte ("foo" ), 0644 ))
2426 r := NewReaper (ctx , & cmdopts.Options {Metrics : metrics.CmdOpts {EmergencyPauseTriggerfile : pausefile }})
25- assert .NoError (t , r .LoadSources ())
27+ assert .NoError (t , r .LoadSources (ctx ))
2628 assert .True (t , len (r .monitoredSources ) == 0 , "Expected no monitored sources when pause trigger file exists" )
2729 })
2830
@@ -33,7 +35,7 @@ func TestReaper_LoadSources(t *testing.T) {
3335 },
3436 }
3537 r := NewReaper (ctx , & cmdopts.Options {SourcesReaderWriter : reader })
36- assert .Error (t , r .LoadSources ())
38+ assert .Error (t , r .LoadSources (ctx ))
3739 assert .Equal (t , 0 , len (r .monitoredSources ), "Expected no monitored sources after error" )
3840 })
3941
@@ -47,7 +49,7 @@ func TestReaper_LoadSources(t *testing.T) {
4749 }
4850
4951 r := NewReaper (ctx , & cmdopts.Options {SourcesReaderWriter : reader })
50- assert .NoError (t , r .LoadSources ())
52+ assert .NoError (t , r .LoadSources (ctx ))
5153 assert .Equal (t , 2 , len (r .monitoredSources ), "Expected two monitored sources after successful load" )
5254 assert .NotNil (t , r .monitoredSources .GetMonitoredDatabase (source1 .Name ))
5355 assert .NotNil (t , r .monitoredSources .GetMonitoredDatabase (source2 .Name ))
@@ -63,11 +65,11 @@ func TestReaper_LoadSources(t *testing.T) {
6365 }
6466
6567 r := NewReaper (ctx , & cmdopts.Options {SourcesReaderWriter : reader })
66- assert .NoError (t , r .LoadSources ())
68+ assert .NoError (t , r .LoadSources (ctx ))
6769 assert .Equal (t , 2 , len (r .monitoredSources ), "Expected two monitored sources after first load" )
6870
6971 // Load again with the same sources
70- assert .NoError (t , r .LoadSources ())
72+ assert .NoError (t , r .LoadSources (ctx ))
7173 assert .Equal (t , 2 , len (r .monitoredSources ), "Expected still two monitored sources after second load" )
7274 })
7375
@@ -83,15 +85,248 @@ func TestReaper_LoadSources(t *testing.T) {
8385 }
8486
8587 r := NewReaper (ctx , & cmdopts.Options {SourcesReaderWriter : newReader , Sources : sources.CmdOpts {Groups : []string {"group1" , "group2" }}})
86- assert .NoError (t , r .LoadSources ())
88+ assert .NoError (t , r .LoadSources (ctx ))
8789 assert .Equal (t , 4 , len (r .monitoredSources ), "Expected four monitored sources after load" )
8890
8991 r = NewReaper (ctx , & cmdopts.Options {SourcesReaderWriter : newReader , Sources : sources.CmdOpts {Groups : []string {"group1" }}})
90- assert .NoError (t , r .LoadSources ())
92+ assert .NoError (t , r .LoadSources (ctx ))
9193 assert .Equal (t , 3 , len (r .monitoredSources ), "Expected three monitored sources after group filtering" )
9294
9395 r = NewReaper (ctx , & cmdopts.Options {SourcesReaderWriter : newReader })
94- assert .NoError (t , r .LoadSources ())
96+ assert .NoError (t , r .LoadSources (ctx ))
9597 assert .Equal (t , 4 , len (r .monitoredSources ), "Expected four monitored sources after resetting groups" )
9698 })
99+
100+ t .Run ("Test source config changes trigger restart" , func (t * testing.T ) {
101+ baseSource := sources.Source {
102+ Name : "TestSource" ,
103+ IsEnabled : true ,
104+ Kind : sources .SourcePostgres ,
105+ ConnStr : "postgres://localhost:5432/testdb" ,
106+ Metrics : map [string ]float64 {"cpu" : 10 , "memory" : 20 },
107+ MetricsStandby : map [string ]float64 {"cpu" : 30 },
108+ CustomTags : map [string ]string {"env" : "test" },
109+ Group : "default" ,
110+ }
111+
112+ testCases := []struct {
113+ name string
114+ modifySource func (s * sources.Source )
115+ expectCancel bool
116+ }{
117+ {
118+ name : "custom tags change" ,
119+ modifySource : func (s * sources.Source ) {
120+ s .CustomTags = map [string ]string {"env" : "production" }
121+ },
122+ expectCancel : true ,
123+ },
124+ {
125+ name : "custom tags add new tag" ,
126+ modifySource : func (s * sources.Source ) {
127+ s .CustomTags = map [string ]string {"env" : "test" , "region" : "us-east" }
128+ },
129+ expectCancel : true ,
130+ },
131+ {
132+ name : "custom tags remove tag" ,
133+ modifySource : func (s * sources.Source ) {
134+ s .CustomTags = map [string ]string {}
135+ },
136+ expectCancel : true ,
137+ },
138+ {
139+ name : "preset metrics change" ,
140+ modifySource : func (s * sources.Source ) {
141+ s .PresetMetrics = "exhaustive"
142+ },
143+ expectCancel : true ,
144+ },
145+ {
146+ name : "preset standby metrics change" ,
147+ modifySource : func (s * sources.Source ) {
148+ s .PresetMetricsStandby = "standby-preset"
149+ },
150+ expectCancel : true ,
151+ },
152+ {
153+ name : "connection string change" ,
154+ modifySource : func (s * sources.Source ) {
155+ s .ConnStr = "postgres://localhost:5433/newdb"
156+ },
157+ expectCancel : true ,
158+ },
159+ {
160+ name : "custom metrics change interval" ,
161+ modifySource : func (s * sources.Source ) {
162+ s .Metrics = map [string ]float64 {"cpu" : 15 , "memory" : 20 }
163+ },
164+ expectCancel : true ,
165+ },
166+ {
167+ name : "custom metrics add new metric" ,
168+ modifySource : func (s * sources.Source ) {
169+ s .Metrics = map [string ]float64 {"cpu" : 10 , "memory" : 20 , "disk" : 30 }
170+ },
171+ expectCancel : true ,
172+ },
173+ {
174+ name : "custom metrics remove metric" ,
175+ modifySource : func (s * sources.Source ) {
176+ s .Metrics = map [string ]float64 {"cpu" : 10 }
177+ },
178+ expectCancel : true ,
179+ },
180+ {
181+ name : "standby metrics change" ,
182+ modifySource : func (s * sources.Source ) {
183+ s .MetricsStandby = map [string ]float64 {"cpu" : 60 }
184+ },
185+ expectCancel : true ,
186+ },
187+ {
188+ name : "group change" ,
189+ modifySource : func (s * sources.Source ) {
190+ s .Group = "new-group"
191+ },
192+ expectCancel : true ,
193+ },
194+ {
195+ name : "kind change" ,
196+ modifySource : func (s * sources.Source ) {
197+ s .Kind = sources .SourcePgBouncer
198+ },
199+ expectCancel : true ,
200+ },
201+ {
202+ name : "only if master change" ,
203+ modifySource : func (s * sources.Source ) {
204+ s .OnlyIfMaster = true
205+ },
206+ expectCancel : true ,
207+ },
208+ {
209+ name : "no change - same config" ,
210+ modifySource : func (_ * sources.Source ) {
211+ // No modifications - source stays the same
212+ },
213+ expectCancel : false ,
214+ },
215+ }
216+
217+ for _ , tc := range testCases {
218+ t .Run (tc .name , func (t * testing.T ) {
219+ initialSource := * baseSource .Clone ()
220+ initialReader := & testutil.MockSourcesReaderWriter {
221+ GetSourcesFunc : func () (sources.Sources , error ) {
222+ return sources.Sources {initialSource }, nil
223+ },
224+ }
225+
226+ r := NewReaper (ctx , & cmdopts.Options {
227+ SourcesReaderWriter : initialReader ,
228+ SinksWriter : & sinks.MultiWriter {},
229+ })
230+ assert .NoError (t , r .LoadSources (ctx ))
231+ assert .Equal (t , 1 , len (r .monitoredSources ), "Expected one monitored source after initial load" )
232+
233+ mockConn , err := pgxmock .NewPool ()
234+ require .NoError (t , err )
235+ mockConn .ExpectClose ()
236+ r .monitoredSources [0 ].Conn = mockConn
237+
238+ // Add a mock cancel function for a metric gatherer
239+ cancelCalled := make (map [string ]bool )
240+ for metric := range initialSource .Metrics {
241+ dbMetric := initialSource .Name + "¤¤¤" + metric
242+ r .cancelFuncs [dbMetric ] = func () {
243+ cancelCalled [dbMetric ] = true
244+ }
245+ }
246+
247+ // Create modified source
248+ modifiedSource := * baseSource .Clone ()
249+ tc .modifySource (& modifiedSource )
250+
251+ modifiedReader := & testutil.MockSourcesReaderWriter {
252+ GetSourcesFunc : func () (sources.Sources , error ) {
253+ return sources.Sources {modifiedSource }, nil
254+ },
255+ }
256+ r .SourcesReaderWriter = modifiedReader
257+
258+ // Reload sources
259+ assert .NoError (t , r .LoadSources (ctx ))
260+ assert .Equal (t , 1 , len (r .monitoredSources ), "Expected one monitored source after reload" )
261+ assert .Equal (t , modifiedSource , r .monitoredSources [0 ].Source )
262+
263+ for metric := range initialSource .Metrics {
264+ dbMetric := initialSource .Name + "¤¤¤" + metric
265+ assert .Equal (t , tc .expectCancel , cancelCalled [dbMetric ])
266+ if tc .expectCancel {
267+ assert .Nil (t , mockConn .ExpectationsWereMet (), "Expected all mock expectations to be met" )
268+ _ , exists := r .cancelFuncs [dbMetric ]
269+ assert .False (t , exists , "Expected cancel func to be removed from map after cancellation" )
270+ }
271+ }
272+ })
273+ }
274+ })
275+
276+ t .Run ("Test only changed source cancelled in multi-source setup" , func (t * testing.T ) {
277+ source1 := sources.Source {
278+ Name : "Source1" ,
279+ IsEnabled : true ,
280+ Kind : sources .SourcePostgres ,
281+ ConnStr : "postgres://localhost:5432/db1" ,
282+ Metrics : map [string ]float64 {"cpu" : 10 },
283+ }
284+ source2 := sources.Source {
285+ Name : "Source2" ,
286+ IsEnabled : true ,
287+ Kind : sources .SourcePostgres ,
288+ ConnStr : "postgres://localhost:5432/db2" ,
289+ Metrics : map [string ]float64 {"memory" : 20 },
290+ }
291+
292+ initialReader := & testutil.MockSourcesReaderWriter {
293+ GetSourcesFunc : func () (sources.Sources , error ) {
294+ return sources.Sources {source1 , source2 }, nil
295+ },
296+ }
297+
298+ r := NewReaper (ctx , & cmdopts.Options {
299+ SourcesReaderWriter : initialReader ,
300+ SinksWriter : & sinks.MultiWriter {},
301+ })
302+ assert .NoError (t , r .LoadSources (ctx ))
303+
304+ // Set mock connections for both sources to avoid nil pointer on Close()
305+ mockConn1 , err := pgxmock .NewPool ()
306+ require .NoError (t , err )
307+ mockConn1 .ExpectClose ()
308+ r .monitoredSources [0 ].Conn = mockConn1
309+
310+ source1Cancelled := false
311+ source2Cancelled := false
312+ r .cancelFuncs [source1 .Name + "¤¤¤" + "cpu" ] = func () { source1Cancelled = true }
313+ r .cancelFuncs [source2 .Name + "¤¤¤" + "memory" ] = func () { source2Cancelled = true }
314+
315+ // Only modify source1
316+ modifiedSource1 := * source1 .Clone ()
317+ modifiedSource1 .ConnStr = "postgres://localhost:5433/db1_new"
318+
319+ modifiedReader := & testutil.MockSourcesReaderWriter {
320+ GetSourcesFunc : func () (sources.Sources , error ) {
321+ return sources.Sources {modifiedSource1 , source2 }, nil
322+ },
323+ }
324+ r .SourcesReaderWriter = modifiedReader
325+
326+ assert .NoError (t , r .LoadSources (ctx ))
327+
328+ assert .True (t , source1Cancelled , "Source1 should be cancelled due to config change" )
329+ assert .False (t , source2Cancelled , "Source2 should NOT be cancelled as it was not modified" )
330+ assert .Nil (t , mockConn1 .ExpectationsWereMet (), "Expected all mock expectations to be met" )
331+ })
97332}
0 commit comments