Skip to content

Commit d9be88a

Browse files
authored
fix(workloadmeta): fix data race and duplicate registration in process collector telemetry (#48093)
### What does this PR do? Moves the `discovered_services` telemetry gauge initialization from the process collector's `Start()` method to the `newProcessCollector()` constructor. This fixes both a data race and a duplicate metrics registration panic. ### Motivation Enabling `discovery.enabled` by default on Linux caused the process workloadmeta collector to start in tests where it previously didn't. This exposed two latent bugs: 1. **Data race**: `Start()` is called from `startCandidatesWithRetry` in a background goroutine (`store.go:74`). It calls `telemetry.NewGaugeWithOpts()` → `GetCompatComponent()` → `newTelemetry()`, which reads the global `registry` variable without holding the mutex. Meanwhile, the fx Stop hook calls `Reset()`, which writes to `registry` under the mutex. Since the background goroutine is fire-and-forget (the fx Start hook returns immediately), fx Stop can begin while the goroutine is still running. 2. **Duplicate registration panic** (with `-count=N`): Leaked background goroutines from test run N survive into test run N+1. After `Reset()` creates a new registry between runs, both the leaked goroutine and the new test's collector try to register the same gauge on the same new registry. Both issues are fixed by moving the gauge initialization to the constructor, which runs synchronously during fx construction — before Start and well before Stop. ### Describe how you validated your changes Reproduced the data race by running: ``` dda inv test --targets=./cmd/agent/subcommands/processchecks/ --race --extra-args="-count=20" ``` After the fix, confirmed 80/80 passes across multiple batches with the race detector enabled. Co-authored-by: vincent.whitchurch <vincent.whitchurch@datadoghq.com>
1 parent c43b6e2 commit d9be88a

File tree

1 file changed

+16
-9
lines changed

1 file changed

+16
-9
lines changed

comp/core/workloadmeta/collectors/internal/process/process_collector.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ type Event struct {
9292
}
9393

9494
func newProcessCollector(id string, catalog workloadmeta.AgentType, clock clock.Clock, processProbe procutil.Probe, config pkgconfigmodel.Reader, systemProbeConfig pkgconfigmodel.Reader) collector {
95+
var discoveredServicesGauge telemetry.Gauge
96+
if serviceDiscoveryEnabled(systemProbeConfig) {
97+
discoveredServicesGauge = telemetry.NewGaugeWithOpts(
98+
collectorID,
99+
"discovered_services",
100+
[]string{},
101+
"Number of discovered alive services.",
102+
telemetry.DefaultOptions,
103+
)
104+
}
95105
return collector{
96106
id: id,
97107
catalog: catalog,
@@ -108,6 +118,7 @@ func newProcessCollector(id string, catalog workloadmeta.AgentType, clock clock.
108118
ignoredPids: make(core.PidSet),
109119
pidHeartbeats: make(map[int32]time.Time),
110120
knownInjectionStatusPids: make(core.PidSet),
121+
metricDiscoveredServices: discoveredServicesGauge,
111122
}
112123
}
113124

@@ -159,7 +170,11 @@ func (c *collector) isProcessCollectionEnabled() bool {
159170

160171
// isServiceDiscoveryEnabled returns a boolean indicating if service discovery is enabled
161172
func (c *collector) isServiceDiscoveryEnabled() bool {
162-
return c.systemProbeConfig.GetBool("discovery.enabled")
173+
return serviceDiscoveryEnabled(c.systemProbeConfig)
174+
}
175+
176+
func serviceDiscoveryEnabled(systemProbeConfig pkgconfigmodel.Reader) bool {
177+
return systemProbeConfig.GetBool("discovery.enabled")
163178
}
164179

165180
// isGPUMonitoringEnabled returns a boolean indicating if GPU monitoring is enabled
@@ -213,14 +228,6 @@ func (c *collector) Start(ctx context.Context, store workloadmeta.Component) err
213228

214229
if c.isServiceDiscoveryEnabled() {
215230
serviceCollectionInterval := c.getServiceCollectionInterval()
216-
// Initialize service discovery metric
217-
c.metricDiscoveredServices = telemetry.NewGaugeWithOpts(
218-
collectorID,
219-
"discovered_services",
220-
[]string{},
221-
"Number of discovered alive services.",
222-
telemetry.DefaultOptions,
223-
)
224231

225232
if c.isProcessCollectionEnabled() || c.isLanguageCollectionEnabled() {
226233
log.Debug("Starting cached service collection (process collection enabled)")

0 commit comments

Comments
 (0)