Skip to content

Commit da9f50c

Browse files
ultrotterGuido Trotter
andauthored
Fix erroneous channels close (#4733)
* Fix erroneous channels close while we did fix the goroutine leak at bulk import, in case of errors, unfortunately we broke the non-error path, since the silence channel needed to be closed, so that addSilenceWorker would terminate the loop, and and wg.Wait would work solve this by having just one cleanup function, that gets called on defer, but also manually before returning, ensuring the error count is correct, and all workers have indeed been collected. Fixes issue introduced in #4556 Signed-off-by: Guido Trotter <[email protected]> * Add import silence cli tests Signed-off-by: Guido Trotter <[email protected]> --------- Signed-off-by: Guido Trotter <[email protected]> Co-authored-by: Guido Trotter <[email protected]>
1 parent 1e01172 commit da9f50c

File tree

4 files changed

+180
-4
lines changed

4 files changed

+180
-4
lines changed

cli/silence_import.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,21 @@ func (c *silenceImportCmd) bulkImport(ctx context.Context, _ *kingpin.ParseConte
9898

9999
amclient := NewAlertmanagerClient(alertmanagerURL)
100100
silencec := make(chan *models.PostableSilence, 100)
101-
defer close(silencec)
102-
103101
errc := make(chan error, 100)
104-
defer close(errc)
105102

106103
var wg sync.WaitGroup
104+
cleanupDone := false
105+
106+
closeChannels := func() {
107+
if cleanupDone {
108+
return
109+
}
110+
close(silencec)
111+
wg.Wait()
112+
close(errc)
113+
cleanupDone = true
114+
}
115+
defer closeChannels()
107116
for w := 0; w < c.workers; w++ {
108117
wg.Add(1)
109118
go func() {
@@ -138,7 +147,7 @@ func (c *silenceImportCmd) bulkImport(ctx context.Context, _ *kingpin.ParseConte
138147
count++
139148
}
140149

141-
wg.Wait()
150+
closeChannels()
142151

143152
if errCount > 0 {
144153
return fmt.Errorf("couldn't import %v out of %v silences", errCount, count)

test/cli/acceptance.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,18 @@ func (am *Alertmanager) QuerySilence(match ...string) ([]TestSilence, error) {
590590
return parseSilenceQueryResponse(out)
591591
}
592592

593+
// QueryExpiredSilence queries expired silences using the 'amtool silence query --expired --within' command.
594+
func (am *Alertmanager) QueryExpiredSilence(match ...string) ([]TestSilence, error) {
595+
amURLFlag := "--alertmanager.url=" + am.getURL("/")
596+
args := append([]string{amURLFlag, "silence", "query", "--expired", "--within=1h"}, match...)
597+
cmd := exec.Command(amtool, args...)
598+
out, err := cmd.CombinedOutput()
599+
if err != nil {
600+
am.t.Error("Silence query command failed: ", err)
601+
}
602+
return parseSilenceQueryResponse(out)
603+
}
604+
593605
var silenceHeaderFields = []string{"ID", "Matchers", "Ends At", "Created By", "Comment"}
594606

595607
func parseSilenceQueryResponse(data []byte) ([]TestSilence, error) {
@@ -658,6 +670,30 @@ func (am *Alertmanager) expireSilenceCommand(sil *TestSilence) ([]byte, error) {
658670
return cmd.CombinedOutput()
659671
}
660672

673+
// ExportSilences exports all silences to JSON format using 'amtool silence query -o json'.
674+
func (am *Alertmanager) ExportSilences() ([]byte, error) {
675+
amURLFlag := "--alertmanager.url=" + am.getURL("/")
676+
args := []string{amURLFlag, "silence", "query", "-o", "json"}
677+
cmd := exec.Command(amtool, args...)
678+
return cmd.Output()
679+
}
680+
681+
// ImportSilences imports silences from a JSON file using 'amtool silence import'.
682+
func (am *Alertmanager) ImportSilences(filename string) ([]byte, error) {
683+
amURLFlag := "--alertmanager.url=" + am.getURL("/")
684+
args := []string{amURLFlag, "silence", "import", filename}
685+
cmd := exec.Command(amtool, args...)
686+
return cmd.CombinedOutput()
687+
}
688+
689+
// ExpireSilenceByID expires a silence by its ID using 'amtool silence expire'.
690+
func (am *Alertmanager) ExpireSilenceByID(id string) ([]byte, error) {
691+
amURLFlag := "--alertmanager.url=" + am.getURL("/")
692+
args := []string{amURLFlag, "silence", "expire", id}
693+
cmd := exec.Command(amtool, args...)
694+
return cmd.CombinedOutput()
695+
}
696+
661697
// UpdateConfig rewrites the configuration file for the Alertmanager cluster. It
662698
// does not initiate config reloading.
663699
func (amc *AlertmanagerCluster) UpdateConfig(conf string) {

test/cli/acceptance/cli_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,3 +271,129 @@ receivers:
271271
require.EqualError(t, err, "exit status 1")
272272
require.Equal(t, "amtool: error: Failed to parse labels: unexpected open or close brace: {foo=bar}\n\n", string(out))
273273
}
274+
275+
func TestSilenceImport(t *testing.T) {
276+
t.Parallel()
277+
278+
conf := `
279+
route:
280+
receiver: "default"
281+
group_by: [alertname]
282+
group_wait: 1s
283+
group_interval: 1s
284+
repeat_interval: 1ms
285+
286+
receivers:
287+
- name: "default"
288+
webhook_configs:
289+
- url: 'http://%s'
290+
send_resolved: true
291+
`
292+
293+
at := NewAcceptanceTest(t, &AcceptanceOpts{
294+
Tolerance: 1 * time.Second,
295+
})
296+
co := at.Collector("webhook")
297+
wh := NewWebhook(co)
298+
299+
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
300+
require.NoError(t, amc.Start())
301+
defer amc.Terminate()
302+
303+
am := amc.Members()[0]
304+
305+
// Add some test silences
306+
silence1 := Silence(0, 4).Match("alertname=test1", "severity=warning").Comment("test silence 1")
307+
silence2 := Silence(0, 4).Match("alertname=test2", "severity=critical").Comment("test silence 2")
308+
309+
am.SetSilence(0, silence1)
310+
am.SetSilence(0, silence2)
311+
312+
// Export silences to JSON file
313+
tmpDir := t.TempDir()
314+
exportFile := tmpDir + "/silences.json"
315+
316+
exportOut, err := am.ExportSilences()
317+
require.NoError(t, err)
318+
319+
// Write to file
320+
err = os.WriteFile(exportFile, exportOut, 0o644)
321+
require.NoError(t, err)
322+
323+
// Query current silences to get their IDs, then expire them
324+
sils, err := am.QuerySilence()
325+
require.NoError(t, err)
326+
require.Len(t, sils, 2)
327+
silIDs := make([]string, 0, len(sils))
328+
329+
// Expire all silences by ID
330+
for _, sil := range sils {
331+
id := sil.ID()
332+
_, err := am.ExpireSilenceByID(id)
333+
require.NoError(t, err)
334+
silIDs = append(silIDs, id)
335+
}
336+
337+
// Verify silences show as expired
338+
sils, err = am.QueryExpiredSilence()
339+
require.NoError(t, err)
340+
// Silences should still be queryable but in expired state
341+
require.Len(t, sils, 2, "expired silences should still be queryable")
342+
// Check that the silences are actually expired (endsAt is in the past or equal to now)
343+
now := float64(time.Now().Unix())
344+
for _, sil := range sils {
345+
require.Contains(t, silIDs, sil.ID(), "silence ID should be in the expired list")
346+
require.LessOrEqual(t, sil.EndsAt(), now, "silence %s should be expired", sil.ID())
347+
}
348+
349+
// Import silences back
350+
importOut, err := am.ImportSilences(exportFile)
351+
require.NoError(t, err, "import failed: %s", string(importOut))
352+
353+
// Verify silences were imported
354+
sils, err = am.QuerySilence()
355+
require.NoError(t, err)
356+
require.GreaterOrEqual(t, len(sils), 2, "expected at least 2 silences after import")
357+
}
358+
359+
func TestSilenceImportInvalidJSON(t *testing.T) {
360+
t.Parallel()
361+
362+
conf := `
363+
route:
364+
receiver: "default"
365+
group_by: [alertname]
366+
group_wait: 1s
367+
group_interval: 1s
368+
repeat_interval: 1ms
369+
370+
receivers:
371+
- name: "default"
372+
webhook_configs:
373+
- url: 'http://%s'
374+
send_resolved: true
375+
`
376+
377+
at := NewAcceptanceTest(t, &AcceptanceOpts{
378+
Tolerance: 1 * time.Second,
379+
})
380+
co := at.Collector("webhook")
381+
wh := NewWebhook(co)
382+
383+
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
384+
require.NoError(t, amc.Start())
385+
defer amc.Terminate()
386+
387+
am := amc.Members()[0]
388+
389+
// Create file with invalid JSON
390+
tmpDir := t.TempDir()
391+
invalidFile := tmpDir + "/invalid.json"
392+
err := os.WriteFile(invalidFile, []byte(`[{"broken": "json"`), 0o644)
393+
require.NoError(t, err)
394+
395+
// Try to import - should fail
396+
out, err := am.ImportSilences(invalidFile)
397+
require.Error(t, err, "import should fail with invalid JSON")
398+
require.Contains(t, string(out), "couldn't unmarshal", "error message should mention JSON parsing")
399+
}

test/cli/mock.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ func (s *TestSilence) ID() string {
111111
return s.id
112112
}
113113

114+
// EndsAt gets the silence end time.
115+
func (s *TestSilence) EndsAt() float64 {
116+
return s.endsAt
117+
}
118+
114119
// TestAlert models a model.Alert with relative times.
115120
type TestAlert struct {
116121
labels models.LabelSet

0 commit comments

Comments
 (0)