Skip to content

Commit e0b64ae

Browse files
DSMS-90: Flush DSM stats on Lambda Stop (#7054)
## Summary of changes We intermittently drop DSM checkpoints when running via Lambda. We don't flush DSM stats on Lambda end, which can cause us to lose that data if the invocation doesn't occur during a flush interval. It's the same logic as for trace submission. ## Reason for change ## Implementation details Adds a FlushAsync call to the DataStreamsManager. ## Test coverage Mostly manual testing, I was invoking a lambda at a regular cadence and expecting to see a steady number of messages. With the changes in the PR, the rate is constant. When the changes were disabled, then we started to see gaps in DSM metrics. There's probably a more thorough way of testing this, but I think this is good enough. **Bug:** Number of DSM tracked messages (Missing data points in the middle invocation, which uses the old logic): <img width="1511" height="366" alt="Screenshot 2025-07-28 at 3 12 45 pm" src="https://github.com/user-attachments/assets/579af9dc-021e-4cb8-982a-4d80a5b40fcf" /> Lambda Invocations (Constant for both): <img width="2382" height="616" alt="Screenshot 2025-07-28 at 3 12 57 pm" src="https://github.com/user-attachments/assets/e29caec9-053b-450b-b4a4-564a6dba75be" /> ## Other details <!-- Fixes #{issue} --> <!-- ⚠️ Note: where possible, please obtain 2 approvals prior to merging. Unless CODEOWNERS specifies otherwise, for external teams it is typically best to have one review from a team member, and one review from apm-dotnet. Trivial changes do not require 2 reviews. -->
1 parent c82159d commit e0b64ae

File tree

5 files changed

+73
-3
lines changed

5 files changed

+73
-3
lines changed

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/Lambda/LambdaCommon.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,12 @@ internal static async Task EndInvocationAsync(string returnValue, Exception exce
7171
{
7272
try
7373
{
74-
await Tracer.Instance.TracerManager.AgentWriter.FlushTracesAsync()
75-
.WaitAsync(TimeSpan.FromSeconds(ServerlessMaxWaitingFlushTime))
76-
.ConfigureAwait(false);
74+
await Task.WhenAll(
75+
Tracer.Instance.TracerManager.AgentWriter.FlushTracesAsync()
76+
.WaitAsync(TimeSpan.FromSeconds(ServerlessMaxWaitingFlushTime)),
77+
Tracer.Instance.TracerManager.DataStreamsManager.FlushAsync()
78+
.WaitAsync(TimeSpan.FromSeconds(ServerlessMaxWaitingFlushTime)))
79+
.ConfigureAwait(false);
7780
}
7881
catch (Exception ex)
7982
{

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,22 @@ public async Task DisposeAsync()
7575
await writer.DisposeAsync().ConfigureAwait(false);
7676
}
7777

78+
public async Task FlushAsync()
79+
{
80+
if (!IsEnabled)
81+
{
82+
return;
83+
}
84+
85+
var writer = Volatile.Read(ref _writer);
86+
if (writer is null)
87+
{
88+
return;
89+
}
90+
91+
await writer.FlushAsync().ConfigureAwait(false);
92+
}
93+
7894
/// <summary>
7995
/// Trys to extract a <see cref="PathwayContext"/>, from the provided <paramref name="headers"/>
8096
/// If data streams is disabled, or no pathway is present, returns null.

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ internal class DataStreamsWriter : IDataStreamsWriter
3434
private readonly bool _isInDefaultState;
3535

3636
private readonly TaskCompletionSource<bool> _processExit = new(TaskCreationOptions.RunContinuationsAsynchronously);
37+
private readonly SemaphoreSlim _flushSemaphore = new(1, 1);
3738
private MemoryStream? _serializationBuffer;
3839
private long _pointsDropped;
3940
private int _flushRequested;
4041
private Task? _processTask;
4142
private Timer? _flushTimer;
43+
private TaskCompletionSource<bool>? _currentFlushTcs;
4244

4345
private int _isSupported = SupportState.Unknown;
4446
private bool _isInitialized;
@@ -152,6 +154,7 @@ public async Task DisposeAsync()
152154
_flushTimer?.Dispose();
153155
#endif
154156
await FlushAndCloseAsync().ConfigureAwait(false);
157+
_flushSemaphore.Dispose();
155158
}
156159

157160
private async Task FlushAndCloseAsync()
@@ -184,6 +187,45 @@ private async Task FlushAndCloseAsync()
184187
}
185188
}
186189

190+
public async Task FlushAsync()
191+
{
192+
await _flushSemaphore.WaitAsync().ConfigureAwait(false);
193+
try
194+
{
195+
var timeout = TimeSpan.FromSeconds(5);
196+
197+
if (_processExit.Task.IsCompleted)
198+
{
199+
return;
200+
}
201+
202+
if (!Volatile.Read(ref _isInitialized) || _processTask == null)
203+
{
204+
return;
205+
}
206+
207+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
208+
Interlocked.Exchange(ref _currentFlushTcs, tcs);
209+
210+
RequestFlush();
211+
212+
var completedTask = await Task.WhenAny(
213+
tcs.Task,
214+
_processExit.Task,
215+
Task.Delay(timeout)).ConfigureAwait(false);
216+
217+
if (completedTask != tcs.Task)
218+
{
219+
Log.Error("Data streams flush timeout after {Timeout}ms", timeout.TotalMilliseconds);
220+
}
221+
}
222+
finally
223+
{
224+
_currentFlushTcs = null;
225+
_flushSemaphore.Release();
226+
}
227+
}
228+
187229
private void RequestFlush()
188230
{
189231
Interlocked.Exchange(ref _flushRequested, 1);
@@ -248,6 +290,8 @@ private async Task ProcessQueueLoopAsync()
248290
if (flushRequested == 1)
249291
{
250292
await WriteToApiAsync().ConfigureAwait(false);
293+
var currentFlushTcs = Volatile.Read(ref _currentFlushTcs);
294+
currentFlushTcs?.TrySetResult(true);
251295
FlushComplete?.Invoke(this, EventArgs.Empty);
252296
}
253297
}

tracer/src/Datadog.Trace/DataStreamsMonitoring/IDataStreamsWriter.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,7 @@ internal interface IDataStreamsWriter
1515

1616
void AddBacklog(in BacklogPoint point);
1717

18+
Task FlushAsync();
19+
1820
Task DisposeAsync();
1921
}

tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,5 +332,10 @@ public async Task DisposeAsync()
332332
Interlocked.Increment(ref _disposeCount);
333333
await Task.Yield();
334334
}
335+
336+
public async Task FlushAsync()
337+
{
338+
await Task.Yield();
339+
}
335340
}
336341
}

0 commit comments

Comments
 (0)