From f60b6811045b26c7f29f7fe26851e12218974da6 Mon Sep 17 00:00:00 2001 From: Louis Coquerelle Date: Fri, 20 Feb 2026 15:32:19 +0100 Subject: [PATCH] fix(collector): propagate shutdown context to worker hostname resolution --- pkg/collector/runner/runner.go | 15 ++++++++++++++- pkg/collector/worker/worker.go | 8 +++++--- pkg/collector/worker/worker_test.go | 23 ++++++++++++----------- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/pkg/collector/runner/runner.go b/pkg/collector/runner/runner.go index ee5a17e40822d4..dc1fde2a3398fe 100644 --- a/pkg/collector/runner/runner.go +++ b/pkg/collector/runner/runner.go @@ -7,6 +7,7 @@ package runner import ( + "context" "fmt" "sync" @@ -55,12 +56,18 @@ type Runner struct { schedulerLock sync.RWMutex // Lock around operations on the scheduler utilizationMonitor *worker.UtilizationMonitor // Monitor in charge of checking the worker utilization utilizationLogLimit *log.Limit // Log limiter for utilization warnings + // ctx is cancelled when the runner stops, providing a cancellation signal + // to any context-aware operation inside workers (e.g. hostname resolution). + ctx context.Context + cancel context.CancelFunc } // NewRunner takes the number of desired goroutines processing incoming checks. func NewRunner(senderManager sender.SenderManager, haAgent haagent.Component, healthPlatform healthplatform.Component) *Runner { numWorkers := pkgconfigsetup.Datadog().GetInt("check_runners") + ctx, cancel := context.WithCancel(context.Background()) + r := &Runner{ senderManager: senderManager, haAgent: haAgent, @@ -73,6 +80,8 @@ func NewRunner(senderManager sender.SenderManager, haAgent haagent.Component, he checksTracker: tracker.NewRunningChecksTracker(), utilizationMonitor: worker.NewUtilizationMonitor(pkgconfigsetup.Datadog().GetFloat64("check_runner_utilization_threshold")), utilizationLogLimit: log.NewLogLimit(1, pkgconfigsetup.Datadog().GetDuration("check_runner_utilization_warning_cooldown")), + ctx: ctx, + cancel: cancel, } if !r.isStaticWorkerCount { @@ -149,7 +158,7 @@ func (r *Runner) newWorker() (*worker.Worker, error) { go func() { defer r.removeWorker(worker.ID) - worker.Run() + worker.Run(r.ctx) }() return worker, nil @@ -196,6 +205,10 @@ func (r *Runner) Stop() { return } + // Cancel the runner context to unblock any context-aware operations in workers + // (e.g. hostname resolution via EC2 IMDS) that may be waiting on I/O. + r.cancel() + log.Infof("Runner %d is shutting down...", r.id) close(r.pendingChecksChan) diff --git a/pkg/collector/worker/worker.go b/pkg/collector/worker/worker.go index 6affb74e1c4027..608ecb49a18243 100644 --- a/pkg/collector/worker/worker.go +++ b/pkg/collector/worker/worker.go @@ -138,8 +138,10 @@ func newWorkerWithOptions( }, nil } -// Run waits for checks and run them as long as they arrive on the channel -func (w *Worker) Run() { +// Run waits for checks and run them as long as they arrive on the channel. +// The provided ctx is used for cancellable operations such as hostname resolution; +// it should be cancelled when the agent shuts down. +func (w *Worker) Run(ctx context.Context) { log.Debugf("Runner %d, worker %d: Ready to process checks...", w.runnerID, w.ID) alpha := 0.25 // converges to 99.98% of constant input in 30 iterations. @@ -206,7 +208,7 @@ func (w *Worker) Run() { serviceCheckTags := []string{"check:" + check.String(), "dd_enable_check_intake:true"} serviceCheckStatus := servicecheck.ServiceCheckOK - hname, _ := hostname.Get(context.TODO()) + hname, _ := hostname.Get(ctx) if len(checkWarnings) != 0 { expvars.AddWarningsCount(len(checkWarnings)) diff --git a/pkg/collector/worker/worker_test.go b/pkg/collector/worker/worker_test.go index 240fc375c64cbf..8fc2380006cfdd 100644 --- a/pkg/collector/worker/worker_test.go +++ b/pkg/collector/worker/worker_test.go @@ -8,6 +8,7 @@ package worker import ( "bufio" "bytes" + "context" "errors" "expvar" "fmt" @@ -186,7 +187,7 @@ func TestWorkerInitExpvarStats(t *testing.T) { worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), healthplatformmock.Mock(t), 1, idx, pendingChecksChan, checksTracker, mockShouldAddStatsFunc, 0) assert.Nil(t, err) - worker.Run() + worker.Run(context.Background()) }(i) } @@ -264,7 +265,7 @@ func TestWorker(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - worker.Run() + worker.Run(context.Background()) }() wg.Wait() @@ -329,7 +330,7 @@ func TestWorkerUtilizationExpvars(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - worker.Run() + worker.Run(context.Background()) }() // Clean things up @@ -403,7 +404,7 @@ func TestWorkerErrorAndWarningHandling(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - worker.Run() + worker.Run(context.Background()) }() wg.Wait() @@ -446,7 +447,7 @@ func TestWorkerConcurrentCheckScheduling(t *testing.T) { worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), healthplatformmock.Mock(t), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc, 0) require.Nil(t, err) - worker.Run() + worker.Run(context.Background()) assert.Equal(t, 0, testCheck.RunCount()) assert.Equal(t, 0, int(expvars.GetRunsCount())) @@ -502,7 +503,7 @@ func TestWorkerStatsAddition(t *testing.T) { worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), healthplatformmock.Mock(t), 100, 200, pendingChecksChan, checksTracker, shouldAddStatsFunc, 0) require.Nil(t, err) - worker.Run() + worker.Run(context.Background()) for c, statsExpected := range map[check.Check]bool{ longRunningCheckNoErrorNoWarning: false, @@ -591,7 +592,7 @@ func TestWorkerServiceCheckSending(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - worker.Run() + worker.Run(context.Background()) }() wg.Wait() @@ -634,7 +635,7 @@ func TestWorkerSenderNil(t *testing.T) { require.Nil(t, err) // Implicit assertion that we don't panic - worker.Run() + worker.Run(context.Background()) // Quick sanity check assert.Equal(t, 1, int(expvars.GetRunsCount())) @@ -677,7 +678,7 @@ func TestWorkerServiceCheckSendingLongRunningTasks(t *testing.T) { ) require.Nil(t, err) - worker.Run() + worker.Run(context.Background()) // Quick sanity check assert.Equal(t, 1, int(expvars.GetRunsCount())) @@ -762,7 +763,7 @@ func TestWorker_HaIntegration(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - worker.Run() + worker.Run(context.Background()) }() wg.Wait() @@ -860,7 +861,7 @@ func TestWorkerWatchdogWarningLog(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - worker.Run() + worker.Run(context.Background()) }() time.Sleep(tt.checkDuration)