Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/monitor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (mon *monitor) workOne(ctx context.Context, log *logrus.Entry, doc *api.Ope
}

monitors = append(monitors, c, nsgMon)
allJobsDone := make(chan bool)
allJobsDone := make(chan bool, 1)
onPanic := func(m monitoring.Monitor) {
// emit a failed worker metric on panic
mon.m.EmitGauge("monitor."+m.MonitorName()+".failedworker", 1, dims)
Expand Down
38 changes: 38 additions & 0 deletions pkg/monitor/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,44 @@ func TestExecute(t *testing.T) {
assert.True(t, triggeredFail)
}

type slowMonitor struct{}

func (m *slowMonitor) Monitor(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}

func (m *slowMonitor) MonitorName() string {
return "slowMonitor"
}

// TestExecuteReturnsWhenNoReceiver verifies that the execute goroutine does not
// leak when nobody reads from the done channel (the timeout path in workOne).
// With an unbuffered channel this test would hang forever.
func TestExecuteReturnsWhenNoReceiver(t *testing.T) {
_, log := testlog.New()

ctx, cancel := context.WithCancel(context.Background())
onPanic := func(m monitoring.Monitor) {}

// Buffered channel: execute can send without a receiver
done := make(chan bool, 1)
go execute(ctx, log, done, []monitoring.Monitor{&slowMonitor{}}, onPanic)

// Simulate workOne's timeout path: cancel the context and never read from done
cancel()

// The execute goroutine must exit within a reasonable time
assert.Eventually(t, func() bool {
select {
case <-done:
return true
default:
return false
}
}, 2*time.Second, 10*time.Millisecond, "execute goroutine leaked: blocked sending on done channel")
}

func TestChangefeedOperations(t *testing.T) {
// Setup test environment
env := SetupTestEnvironment(t)
Expand Down
Loading