Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/collector/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package runner

import (
"context"
"fmt"

"sync"
Expand Down Expand Up @@ -55,12 +56,18 @@ type Runner struct {
schedulerLock sync.RWMutex // Lock around operations on the scheduler
utilizationMonitor *worker.UtilizationMonitor // Monitor in charge of checking the worker utilization
utilizationLogLimit *log.Limit // Log limiter for utilization warnings
// ctx is cancelled when the runner stops, providing a cancellation signal
// to any context-aware operation inside workers (e.g. hostname resolution).
ctx context.Context
cancel context.CancelFunc
}

// NewRunner takes the number of desired goroutines processing incoming checks.
func NewRunner(senderManager sender.SenderManager, haAgent haagent.Component, healthPlatform healthplatform.Component) *Runner {
numWorkers := pkgconfigsetup.Datadog().GetInt("check_runners")

ctx, cancel := context.WithCancel(context.Background())

r := &Runner{
senderManager: senderManager,
haAgent: haAgent,
Expand All @@ -73,6 +80,8 @@ func NewRunner(senderManager sender.SenderManager, haAgent haagent.Component, he
checksTracker: tracker.NewRunningChecksTracker(),
utilizationMonitor: worker.NewUtilizationMonitor(pkgconfigsetup.Datadog().GetFloat64("check_runner_utilization_threshold")),
utilizationLogLimit: log.NewLogLimit(1, pkgconfigsetup.Datadog().GetDuration("check_runner_utilization_warning_cooldown")),
ctx: ctx,
cancel: cancel,
}

if !r.isStaticWorkerCount {
Expand Down Expand Up @@ -149,7 +158,7 @@ func (r *Runner) newWorker() (*worker.Worker, error) {
go func() {
defer r.removeWorker(worker.ID)

worker.Run()
worker.Run(r.ctx)
}()

return worker, nil
Expand Down Expand Up @@ -196,6 +205,10 @@ func (r *Runner) Stop() {
return
}

// Cancel the runner context to unblock any context-aware operations in workers
// (e.g. hostname resolution via EC2 IMDS) that may be waiting on I/O.
r.cancel()

log.Infof("Runner %d is shutting down...", r.id)
close(r.pendingChecksChan)

Expand Down
8 changes: 5 additions & 3 deletions pkg/collector/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ func newWorkerWithOptions(
}, nil
}

// Run waits for checks and run them as long as they arrive on the channel
func (w *Worker) Run() {
// Run waits for checks and run them as long as they arrive on the channel.
// The provided ctx is used for cancellable operations such as hostname resolution;
// it should be cancelled when the agent shuts down.
func (w *Worker) Run(ctx context.Context) {
log.Debugf("Runner %d, worker %d: Ready to process checks...", w.runnerID, w.ID)

alpha := 0.25 // converges to 99.98% of constant input in 30 iterations.
Expand Down Expand Up @@ -206,7 +208,7 @@ func (w *Worker) Run() {
serviceCheckTags := []string{"check:" + check.String(), "dd_enable_check_intake:true"}
serviceCheckStatus := servicecheck.ServiceCheckOK

hname, _ := hostname.Get(context.TODO())
hname, _ := hostname.Get(ctx)

if len(checkWarnings) != 0 {
expvars.AddWarningsCount(len(checkWarnings))
Expand Down
23 changes: 12 additions & 11 deletions pkg/collector/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package worker
import (
"bufio"
"bytes"
"context"
"errors"
"expvar"
"fmt"
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestWorkerInitExpvarStats(t *testing.T) {
worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), healthplatformmock.Mock(t), 1, idx, pendingChecksChan, checksTracker, mockShouldAddStatsFunc, 0)
assert.Nil(t, err)

worker.Run()
worker.Run(context.Background())
}(i)
}

Expand Down Expand Up @@ -264,7 +265,7 @@ func TestWorker(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
worker.Run()
worker.Run(context.Background())
}()

wg.Wait()
Expand Down Expand Up @@ -329,7 +330,7 @@ func TestWorkerUtilizationExpvars(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
worker.Run()
worker.Run(context.Background())
}()

// Clean things up
Expand Down Expand Up @@ -403,7 +404,7 @@ func TestWorkerErrorAndWarningHandling(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
worker.Run()
worker.Run(context.Background())
}()

wg.Wait()
Expand Down Expand Up @@ -446,7 +447,7 @@ func TestWorkerConcurrentCheckScheduling(t *testing.T) {
worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), healthplatformmock.Mock(t), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc, 0)
require.Nil(t, err)

worker.Run()
worker.Run(context.Background())

assert.Equal(t, 0, testCheck.RunCount())
assert.Equal(t, 0, int(expvars.GetRunsCount()))
Expand Down Expand Up @@ -502,7 +503,7 @@ func TestWorkerStatsAddition(t *testing.T) {
worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), healthplatformmock.Mock(t), 100, 200, pendingChecksChan, checksTracker, shouldAddStatsFunc, 0)
require.Nil(t, err)

worker.Run()
worker.Run(context.Background())

for c, statsExpected := range map[check.Check]bool{
longRunningCheckNoErrorNoWarning: false,
Expand Down Expand Up @@ -591,7 +592,7 @@ func TestWorkerServiceCheckSending(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
worker.Run()
worker.Run(context.Background())
}()

wg.Wait()
Expand Down Expand Up @@ -634,7 +635,7 @@ func TestWorkerSenderNil(t *testing.T) {
require.Nil(t, err)

// Implicit assertion that we don't panic
worker.Run()
worker.Run(context.Background())

// Quick sanity check
assert.Equal(t, 1, int(expvars.GetRunsCount()))
Expand Down Expand Up @@ -677,7 +678,7 @@ func TestWorkerServiceCheckSendingLongRunningTasks(t *testing.T) {
)
require.Nil(t, err)

worker.Run()
worker.Run(context.Background())

// Quick sanity check
assert.Equal(t, 1, int(expvars.GetRunsCount()))
Expand Down Expand Up @@ -762,7 +763,7 @@ func TestWorker_HaIntegration(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
worker.Run()
worker.Run(context.Background())
}()

wg.Wait()
Expand Down Expand Up @@ -860,7 +861,7 @@ func TestWorkerWatchdogWarningLog(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
worker.Run()
worker.Run(context.Background())
}()

time.Sleep(tt.checkDuration)
Expand Down