Skip to content

Commit 62bf5e5

Browse files
authored
[ AGNTLOG-462 ] Fix auditor Flush() race condition during transport restart (#46882)
## What does this PR do? In response to flaky CI test failures in `TestRestartTestSuite`, this PR fixes a race condition in the auditor's `Flush()` method that caused stale offsets to be written to disk during transport restarts. Previously, `Flush()` wrote the in-memory registry directly, missing payloads that destinations had already sent to the auditor channel but the `run()` goroutine hadn't consumed yet. This caused stale offsets on disk after `partialStop()`, leading to duplicate log processing after a TCP-to-HTTP transport restart. `Flush()` now sends a synchronous request through the auditor's `run()` goroutine event loop. The goroutine drains buffered `inputChan` payloads (bounded by a `len()` snapshot), updates the in-memory registry, then writes to disk. When the auditor is stopped, it falls back to a direct `flushRegistry()` call. ## Motivation Resolves AGNTLOG-462. `TestRestartTestSuite` sub-tests (`TestPartialStop_FlushesRegistryToDisk`, `TestRestart_FlushesAuditor`) were failing intermittently on macOS ARM64 and IoT Linux x64 CI runners. Investigation revealed the test failures exposed a real production race condition: the `LogsSent` metric is incremented by destinations before payloads reach the auditor's `inputChan`, so `partialStop()` calling `Flush()` immediately after stopping destinations could write a registry missing the latest offsets. ## Describe how you validated your changes Existing automated tests were relied on. ## Additional Notes - `Flush()` is now blocking (waits for the `run()` goroutine to complete drain + write). All existing callers already treated it as synchronous. - Must not be called concurrently with `Stop()` (not a new constraint -- this was already the case). Co-authored-by: ryan.hall <ryan.hall@datadoghq.com>
1 parent 73423a7 commit 62bf5e5

File tree

2 files changed

+56
-5
lines changed

2 files changed

+56
-5
lines changed

comp/logs/auditor/impl/auditor.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type registryAuditor struct {
5454
kubeHealthRegistrar kubehealthdef.Component
5555
chansMutex sync.Mutex
5656
inputChan chan *message.Payload
57+
flushRequestChan chan chan struct{}
5758
registry map[string]*RegistryEntry
5859
tailedSources map[string]bool
5960
registryPath string
@@ -136,19 +137,34 @@ func (a *registryAuditor) Stop() {
136137
}
137138
}
138139

139-
// Flush immediately writes the current registry to disk.
140-
// This is useful to ensure all file positions are committed before a restart,
141-
// preventing duplicate log processing.
140+
// Flush drains all pending payloads from the input channel, updates the
141+
// in-memory registry, then writes it to disk. It blocks until complete.
142+
// When the auditor is stopped (run loop not active), it falls back to
143+
// writing the current in-memory registry directly.
144+
//
145+
// Must not be called concurrently with Stop.
142146
func (a *registryAuditor) Flush() {
143-
if err := a.flushRegistry(); err != nil {
144-
a.log.Warnf("Failed to flush auditor registry: %v", err)
147+
a.chansMutex.Lock()
148+
reqChan := a.flushRequestChan
149+
a.chansMutex.Unlock()
150+
151+
if reqChan == nil {
152+
if err := a.flushRegistry(); err != nil {
153+
a.log.Warnf("Failed to flush auditor registry: %v", err)
154+
}
155+
return
145156
}
157+
158+
done := make(chan struct{})
159+
reqChan <- done
160+
<-done
146161
}
147162

148163
func (a *registryAuditor) createChannels() {
149164
a.chansMutex.Lock()
150165
defer a.chansMutex.Unlock()
151166
a.inputChan = make(chan *message.Payload, a.messageChannelSize)
167+
a.flushRequestChan = make(chan chan struct{})
152168
a.done = make(chan struct{})
153169
}
154170

@@ -164,6 +180,7 @@ func (a *registryAuditor) closeChannels() {
164180
a.done = nil
165181
}
166182
a.inputChan = nil
183+
a.flushRequestChan = nil
167184
}
168185

169186
// GetFingerprint returns the fingerprint for a given identifier,
@@ -293,6 +310,25 @@ func (a *registryAuditor) run() {
293310
a.log.Warn(err)
294311
}
295312
}
313+
case responseChan := <-a.flushRequestChan:
314+
n := len(a.inputChan)
315+
for i := 0; i < n; i++ {
316+
select {
317+
case payload := <-a.inputChan:
318+
for _, msg := range payload.MessageMetas {
319+
var fingerprint types.Fingerprint
320+
if msg.Origin.Fingerprint != nil {
321+
fingerprint = *msg.Origin.Fingerprint
322+
}
323+
a.updateRegistry(msg.Origin.Identifier, msg.Origin.Offset, msg.Origin.LogSource.Config.TailingMode, msg.IngestionTimestamp, fingerprint)
324+
}
325+
default:
326+
}
327+
}
328+
if err := a.flushRegistry(); err != nil {
329+
a.log.Warnf("Flush: failed to flush registry: %v", err)
330+
}
331+
close(responseChan)
296332
}
297333
}
298334
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Each section from every release note are combined when the
2+
# CHANGELOG.rst is rendered. So the text needs to be worded so that
3+
# it does not depend on any information only available in another
4+
# section. This may mean repeating some details, but each section
5+
# must be readable independently of the other.
6+
#
7+
# Each section note must be formatted as reStructuredText.
8+
---
9+
fixes:
10+
- |
11+
Fixed a race condition in the logs auditor where ``Flush()`` could write a
12+
stale registry to disk during a transport restart. The auditor now drains
13+
all pending payloads from its input channel before flushing, ensuring file
14+
offsets are up to date and reducing duplicate log processing after a
15+
TCP-to-HTTP transport switch.

0 commit comments

Comments
 (0)