Skip to content

Commit d4f1460

Browse files
author
Jade Wang
committed
address comments
1 parent 29c7c37 commit d4f1460

File tree

8 files changed

+194
-102
lines changed

8 files changed

+194
-102
lines changed

csharp/src/Telemetry/CircuitBreakerManager.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,5 +85,15 @@ public CircuitBreaker GetCircuitBreaker(string host)
8585

8686
return _circuitBreakers.GetOrAdd(host, _ => new CircuitBreaker());
8787
}
88+
89+
/// <summary>
90+
/// Removes the circuit breaker for the specified host.
91+
/// Called when the last client for a host is released to prevent memory leaks.
92+
/// </summary>
93+
/// <param name="host">The host identifier.</param>
94+
public void RemoveCircuitBreaker(string host)
95+
{
96+
_circuitBreakers.TryRemove(host, out _);
97+
}
8898
}
8999
}

csharp/src/Telemetry/CircuitBreakerTelemetryExporter.cs

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,6 @@ namespace AdbcDrivers.Databricks.Telemetry
4747
/// </remarks>
4848
internal sealed class CircuitBreakerTelemetryExporter : ITelemetryExporter
4949
{
50-
/// <summary>
51-
/// Activity source for circuit breaker telemetry tracing.
52-
/// </summary>
53-
private static readonly ActivitySource s_activitySource = new ActivitySource("AdbcDrivers.Databricks.CircuitBreakerTelemetryExporter");
54-
5550
private readonly ITelemetryExporter _innerExporter;
5651
private readonly CircuitBreaker _circuitBreaker;
5752
private readonly string _host;
@@ -116,26 +111,11 @@ public async Task<bool> ExportAsync(IReadOnlyList<TelemetryFrontendLog> logs, Ca
116111
return true;
117112
}
118113

119-
// Check circuit state before attempting export
120-
if (_circuitBreaker.State == CircuitBreakerState.Open)
121-
{
122-
// Circuit is open - silently drop events (log at DEBUG level)
123-
Activity.Current?.AddEvent(new ActivityEvent("telemetry.export.circuit_open",
124-
tags: new ActivityTagsCollection
125-
{
126-
{ "host", _host },
127-
{ "log_count", logs.Count },
128-
{ "action", "dropped" }
129-
}));
130-
131-
// Return true because dropping is not a failure - it's expected behavior
132-
return true;
133-
}
134-
135114
try
136115
{
137116
// Execute through circuit breaker
138117
// The circuit breaker will track failures and open if threshold is reached
118+
// Polly handles the open-circuit case internally by throwing BrokenCircuitException
139119
bool result = await _circuitBreaker.ExecuteAsync(async () =>
140120
{
141121
bool success = await _innerExporter.ExportAsync(logs, ct).ConfigureAwait(false);
@@ -154,22 +134,30 @@ public async Task<bool> ExportAsync(IReadOnlyList<TelemetryFrontendLog> logs, Ca
154134
}
155135
catch (BrokenCircuitException)
156136
{
157-
// Circuit just opened - log and silently drop
158-
Activity.Current?.AddEvent(new ActivityEvent("telemetry.export.circuit_opened",
137+
// Circuit is open - silently drop events
138+
Activity.Current?.AddEvent(new ActivityEvent("telemetry.export.circuit_open",
159139
tags: new ActivityTagsCollection
160140
{
161141
{ "host", _host },
162142
{ "log_count", logs.Count },
163143
{ "action", "dropped" }
164144
}));
165145

166-
// Return true because dropping is expected behavior when circuit opens
146+
// Return true because dropping is expected behavior when circuit is open
167147
return true;
168148
}
169149
catch (OperationCanceledException)
170150
{
171-
// Don't swallow cancellation - let it propagate
172-
throw;
151+
// Cancellation should not impact driver behavior; treat as a no-op.
152+
Activity.Current?.AddEvent(new ActivityEvent("telemetry.export.canceled",
153+
tags: new ActivityTagsCollection
154+
{
155+
{ "host", _host },
156+
{ "log_count", logs.Count },
157+
{ "action", "canceled" }
158+
}));
159+
160+
return true;
173161
}
174162
catch (Exception ex)
175163
{

csharp/src/Telemetry/TelemetryClient.cs

Lines changed: 90 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ namespace AdbcDrivers.Databricks.Telemetry
3737
/// Key Behaviors:
3838
/// - Constructor initializes all pipeline components in correct order
3939
/// - ExportAsync() delegates to the circuit breaker-protected exporter
40-
/// - CloseAsync() performs graceful shutdown: flush pending metrics, cancel background tasks, dispose resources
40+
/// - CloseAsync() performs graceful shutdown: flush pending metrics, wait for exports, dispose resources
4141
/// - All exceptions are swallowed per telemetry requirement
4242
/// </para>
4343
/// <para>
@@ -46,18 +46,15 @@ namespace AdbcDrivers.Databricks.Telemetry
4646
/// </remarks>
4747
internal sealed class TelemetryClient : ITelemetryClient
4848
{
49-
/// <summary>
50-
/// Activity source for tracing telemetry client operations.
51-
/// </summary>
52-
private static readonly ActivitySource s_activitySource = new ActivitySource("AdbcDrivers.Databricks.TelemetryClient");
53-
54-
private readonly DatabricksTelemetryExporter _databricksExporter;
55-
private readonly CircuitBreakerTelemetryExporter _circuitBreakerExporter;
5649
private readonly ITelemetryExporter _effectiveExporter;
5750
private readonly ConcurrentQueue<TelemetryFrontendLog> _pendingLogs;
5851
private readonly int _batchSize;
52+
private readonly bool _enabled;
5953
private readonly CancellationTokenSource _cts;
60-
private volatile bool _disposed;
54+
private readonly SemaphoreSlim _flushSemaphore = new SemaphoreSlim(1, 1);
55+
private readonly Timer? _flushTimer;
56+
private int _disposed;
57+
private int _queueCount;
6158

6259
/// <summary>
6360
/// Creates a new TelemetryClient with the specified configuration and HTTP client.
@@ -94,18 +91,29 @@ public TelemetryClient(
9491
_cts = new CancellationTokenSource();
9592
_pendingLogs = new ConcurrentQueue<TelemetryFrontendLog>();
9693
_batchSize = configuration.BatchSize;
94+
_enabled = configuration.Enabled;
9795

9896
try
9997
{
10098
// Initialize pipeline components in order:
10199
// 1. DatabricksTelemetryExporter (innermost - does the HTTP export)
102-
_databricksExporter = new DatabricksTelemetryExporter(httpClient, host, isAuthenticated, configuration);
100+
var databricksExporter = new DatabricksTelemetryExporter(httpClient, host, isAuthenticated, configuration);
103101

104102
// 2. CircuitBreakerTelemetryExporter (wraps exporter with circuit breaker protection)
105-
_circuitBreakerExporter = new CircuitBreakerTelemetryExporter(_databricksExporter, host);
103+
var circuitBreakerExporter = new CircuitBreakerTelemetryExporter(databricksExporter, host);
106104

107105
// Use override exporter if provided (for testing), otherwise use circuit breaker exporter
108-
_effectiveExporter = exporterOverride ?? (ITelemetryExporter)_circuitBreakerExporter;
106+
_effectiveExporter = exporterOverride ?? (ITelemetryExporter)circuitBreakerExporter;
107+
108+
// Start periodic flush timer if interval is configured
109+
if (configuration.FlushIntervalMs > 0)
110+
{
111+
_flushTimer = new Timer(
112+
_ => { _ = FlushAsync(_cts.Token); },
113+
null,
114+
configuration.FlushIntervalMs,
115+
configuration.FlushIntervalMs);
116+
}
109117

110118
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.initialized",
111119
tags: new ActivityTagsCollection
@@ -122,6 +130,7 @@ public TelemetryClient(
122130
try
123131
{
124132
_cts?.Dispose();
133+
_flushTimer?.Dispose();
125134
}
126135
catch
127136
{
@@ -152,19 +161,20 @@ public TelemetryClient(
152161
/// </remarks>
153162
public void Enqueue(TelemetryFrontendLog log)
154163
{
155-
if (_disposed || log == null)
164+
if (_disposed != 0 || !_enabled || log == null)
156165
{
157166
return;
158167
}
159168

160169
try
161170
{
162171
_pendingLogs.Enqueue(log);
172+
int count = Interlocked.Increment(ref _queueCount);
163173

164-
// Trigger flush if batch size reached
165-
if (_pendingLogs.Count >= _batchSize)
174+
// Trigger flush if batch size reached (single flush at a time via semaphore)
175+
if (count >= _batchSize)
166176
{
167-
_ = Task.Run(() => FlushAsync(CancellationToken.None));
177+
_ = FlushAsync(_cts.Token);
168178
}
169179
}
170180
catch (Exception ex)
@@ -197,23 +207,33 @@ public void Enqueue(TelemetryFrontendLog log)
197207
/// </remarks>
198208
public async Task FlushAsync(CancellationToken ct = default)
199209
{
200-
if (_disposed)
210+
if (_disposed != 0)
211+
{
212+
return;
213+
}
214+
215+
// Ensure only one flush runs at a time
216+
if (!await _flushSemaphore.WaitAsync(0, ct).ConfigureAwait(false))
201217
{
202218
return;
203219
}
204220

205221
try
206222
{
223+
// Use linked token so both caller cancellation and client shutdown are respected
224+
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, _cts.Token);
225+
207226
// Flush directly enqueued logs (V3 direct-object path)
208227
List<TelemetryFrontendLog> logsToFlush = new List<TelemetryFrontendLog>();
209-
while (_pendingLogs.TryDequeue(out TelemetryFrontendLog? log))
228+
while (_pendingLogs.TryDequeue(out TelemetryFrontendLog log))
210229
{
211230
logsToFlush.Add(log);
212231
}
232+
Interlocked.Add(ref _queueCount, -logsToFlush.Count);
213233

214234
if (logsToFlush.Count > 0)
215235
{
216-
await _effectiveExporter.ExportAsync(logsToFlush, ct).ConfigureAwait(false);
236+
await _effectiveExporter.ExportAsync(logsToFlush, linkedCts.Token).ConfigureAwait(false);
217237
}
218238

219239
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.flush_completed"));
@@ -228,6 +248,10 @@ public async Task FlushAsync(CancellationToken ct = default)
228248
{ "error.type", ex.GetType().Name }
229249
}));
230250
}
251+
finally
252+
{
253+
_flushSemaphore.Release();
254+
}
231255
}
232256

233257
/// <summary>
@@ -241,32 +265,60 @@ public async Task FlushAsync(CancellationToken ct = default)
241265
/// </para>
242266
/// <para>
243267
/// The close operation performs the following steps:
244-
/// 1. Cancel any pending background tasks
245-
/// 2. Dispose all resources (cancellation token source)
268+
/// 1. Flush all remaining queued events
269+
/// 2. Stop the periodic flush timer
270+
/// 3. Dispose all resources (cancellation token source, semaphore, timer)
246271
/// </para>
247272
/// <para>
248273
/// This method is idempotent - calling it multiple times is safe and has no effect
249-
/// after the first call completes.
274+
/// after the first call completes. Uses Interlocked.Exchange for atomic check-then-set.
250275
/// </para>
251276
/// <para>
252277
/// This method never throws exceptions. All errors during close are caught and
253278
/// logged internally.
254279
/// </para>
255280
/// </remarks>
256-
public Task CloseAsync()
281+
public async Task CloseAsync()
257282
{
258-
if (_disposed)
283+
if (Interlocked.Exchange(ref _disposed, 1) == 1)
259284
{
260-
return Task.CompletedTask;
285+
return;
261286
}
262287

263-
_disposed = true;
264-
265288
try
266289
{
267290
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.closing"));
268291

269-
// Cancel any pending background tasks
292+
// Stop the periodic flush timer first
293+
try
294+
{
295+
_flushTimer?.Dispose();
296+
}
297+
catch
298+
{
299+
// Swallow timer dispose exceptions
300+
}
301+
302+
// Flush remaining queued events before shutdown
303+
try
304+
{
305+
// Temporarily reset _disposed to allow FlushAsync to proceed
306+
Interlocked.Exchange(ref _disposed, 0);
307+
await FlushAsync(CancellationToken.None).ConfigureAwait(false);
308+
Interlocked.Exchange(ref _disposed, 1);
309+
}
310+
catch (Exception ex)
311+
{
312+
Interlocked.Exchange(ref _disposed, 1);
313+
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.final_flush_error",
314+
tags: new ActivityTagsCollection
315+
{
316+
{ "error.message", ex.Message },
317+
{ "error.type", ex.GetType().Name }
318+
}));
319+
}
320+
321+
// Cancel the token to signal any in-flight operations
270322
try
271323
{
272324
_cts.Cancel();
@@ -295,7 +347,7 @@ public Task CloseAsync()
295347
}
296348
finally
297349
{
298-
// Ensure CancellationTokenSource is disposed even if other operations fail
350+
// Ensure resources are disposed even if other operations fail
299351
try
300352
{
301353
_cts.Dispose();
@@ -304,9 +356,16 @@ public Task CloseAsync()
304356
{
305357
// Swallow CTS dispose exceptions
306358
}
307-
}
308359

309-
return Task.CompletedTask;
360+
try
361+
{
362+
_flushSemaphore.Dispose();
363+
}
364+
catch
365+
{
366+
// Swallow semaphore dispose exceptions
367+
}
368+
}
310369
}
311370

312371
/// <summary>

csharp/src/Telemetry/TelemetryClientHolder.cs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,30 @@
1414
* limitations under the License.
1515
*/
1616

17+
using System.Threading;
18+
1719
namespace AdbcDrivers.Databricks.Telemetry
1820
{
1921
/// <summary>
2022
/// Holds a telemetry client and its reference count.
2123
/// Used by TelemetryClientManager to track how many connections are using a client.
2224
/// </summary>
2325
/// <remarks>
24-
/// Thread Safety: The _refCount field is accessed via Interlocked operations to ensure
25-
/// thread-safe increment and decrement operations from concurrent connections.
26+
/// Thread Safety: Access to the _refCount field is coordinated via Interlocked operations
27+
/// (AddRef/Release) for thread-safe increment and decrement from concurrent connections.
2628
/// </remarks>
2729
internal sealed class TelemetryClientHolder
2830
{
2931
/// <summary>
3032
/// Reference count tracking the number of connections using this client.
31-
/// Must be accessed via Interlocked operations for thread safety.
33+
/// Access to this field must be done via AddRef/Release methods for thread safety.
34+
/// </summary>
35+
private int _refCount = 1;
36+
37+
/// <summary>
38+
/// Gets the current reference count.
3239
/// </summary>
33-
internal int _refCount = 1;
40+
public int RefCount => Volatile.Read(ref _refCount);
3441

3542
/// <summary>
3643
/// Gets the telemetry client instance.
@@ -45,5 +52,23 @@ public TelemetryClientHolder(ITelemetryClient client)
4552
{
4653
Client = client;
4754
}
55+
56+
/// <summary>
57+
/// Atomically increments the reference count.
58+
/// </summary>
59+
/// <returns>The new reference count after incrementing.</returns>
60+
public int AddRef()
61+
{
62+
return Interlocked.Increment(ref _refCount);
63+
}
64+
65+
/// <summary>
66+
/// Atomically decrements the reference count.
67+
/// </summary>
68+
/// <returns>The new reference count after decrementing.</returns>
69+
public int Release()
70+
{
71+
return Interlocked.Decrement(ref _refCount);
72+
}
4873
}
4974
}

0 commit comments

Comments
 (0)