Skip to content

Commit 4362397

Browse files
msrathore-dbclaude
andcommitted
Fix Activity propagation in RetryHttpHandler for client logging
Address reviewer feedback by implementing proper Activity tracing infrastructure following the CloudFetchDownloader pattern. Changes: - RetryHttpHandler now implements IActivityTracer interface - Accepts IActivityTracer parameter to delegate to connection's trace - Wraps retry logic in TraceActivityAsync for proper Activity creation - DatabricksConnection passes 'this' to RetryHttpHandler constructor - Updated all unit tests with MockActivityTracer This ensures Activity-based logging works reliably in client scenarios like PowerBI Desktop by properly propagating trace context through the handler chain, rather than relying on Activity.Current which may be null. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent f3721e5 commit 4362397

File tree

3 files changed

+140
-108
lines changed

3 files changed

+140
-108
lines changed

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -590,8 +590,8 @@ protected override HttpMessageHandler CreateHttpHandler()
590590
{
591591
// Add retry handler for 408, 502, 503, 504 responses with Retry-After support
592592
// This must be INSIDE ThriftErrorMessageHandler so retries happen before exceptions are thrown
593-
baseHandler = new RetryHttpHandler(baseHandler, TemporarilyUnavailableRetryTimeout);
594-
baseAuthHandler = new RetryHttpHandler(baseAuthHandler, TemporarilyUnavailableRetryTimeout);
593+
baseHandler = new RetryHttpHandler(baseHandler, this, TemporarilyUnavailableRetryTimeout);
594+
baseAuthHandler = new RetryHttpHandler(baseAuthHandler, this, TemporarilyUnavailableRetryTimeout);
595595
}
596596

597597
// Add Thrift error message handler AFTER retry handler (OUTSIDE in the chain)

csharp/src/Drivers/Databricks/RetryHttpHandler.cs

Lines changed: 110 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@
2222
using System.Threading;
2323
using System.Threading.Tasks;
2424
using System.IO;
25+
using Apache.Arrow.Adbc.Tracing;
2526

2627
namespace Apache.Arrow.Adbc.Drivers.Databricks
2728
{
2829
/// <summary>
2930
/// HTTP handler that implements retry behavior for 408, 502, 503, and 504 responses.
3031
/// Uses Retry-After header if present, otherwise uses exponential backoff.
3132
/// </summary>
32-
internal class RetryHttpHandler : DelegatingHandler
33+
internal class RetryHttpHandler : DelegatingHandler, IActivityTracer
3334
{
35+
private readonly IActivityTracer _activityTracer;
3436
private readonly int _retryTimeoutSeconds;
3537
private readonly int _initialBackoffSeconds = 1;
3638
private readonly int _maxBackoffSeconds = 32;
@@ -39,141 +41,151 @@ internal class RetryHttpHandler : DelegatingHandler
3941
/// Initializes a new instance of the <see cref="RetryHttpHandler"/> class.
4042
/// </summary>
4143
/// <param name="innerHandler">The inner handler to delegate to.</param>
44+
/// <param name="activityTracer">The activity tracer for logging and diagnostics.</param>
4245
/// <param name="retryTimeoutSeconds">Maximum total time in seconds to retry before failing.</param>
43-
public RetryHttpHandler(HttpMessageHandler innerHandler, int retryTimeoutSeconds)
46+
public RetryHttpHandler(HttpMessageHandler innerHandler, IActivityTracer activityTracer, int retryTimeoutSeconds)
4447
: base(innerHandler)
4548
{
49+
_activityTracer = activityTracer;
4650
_retryTimeoutSeconds = retryTimeoutSeconds;
4751
}
4852

53+
// IActivityTracer implementation - delegates to the connection
54+
ActivityTrace IActivityTracer.Trace => _activityTracer.Trace;
55+
string? IActivityTracer.TraceParent => _activityTracer.TraceParent;
56+
string IActivityTracer.AssemblyVersion => _activityTracer.AssemblyVersion;
57+
string IActivityTracer.AssemblyName => _activityTracer.AssemblyName;
58+
4959
/// <summary>
5060
/// Sends an HTTP request to the inner handler with retry logic for retryable status codes.
5161
/// </summary>
5262
protected override async Task<HttpResponseMessage> SendAsync(
5363
HttpRequestMessage request,
5464
CancellationToken cancellationToken)
5565
{
56-
Activity? activity = Activity.Current;
57-
58-
// Clone the request content if it's not null so we can reuse it for retries
59-
var requestContentClone = request.Content != null
60-
? await CloneHttpContentAsync(request.Content)
61-
: null;
62-
63-
HttpResponseMessage response;
64-
string? lastErrorMessage = null;
65-
DateTime startTime = DateTime.UtcNow;
66-
int attemptCount = 0;
67-
int currentBackoffSeconds = _initialBackoffSeconds;
68-
int totalRetrySeconds = 0;
69-
70-
do
66+
return await this.TraceActivityAsync(async activity =>
7167
{
72-
// Set the content for each attempt (if needed)
73-
if (requestContentClone != null && request.Content == null)
68+
// Clone the request content if it's not null so we can reuse it for retries
69+
var requestContentClone = request.Content != null
70+
? await CloneHttpContentAsync(request.Content)
71+
: null;
72+
73+
HttpResponseMessage response;
74+
string? lastErrorMessage = null;
75+
DateTime startTime = DateTime.UtcNow;
76+
int attemptCount = 0;
77+
int currentBackoffSeconds = _initialBackoffSeconds;
78+
int totalRetrySeconds = 0;
79+
80+
do
7481
{
75-
request.Content = await CloneHttpContentAsync(requestContentClone);
76-
}
82+
// Set the content for each attempt (if needed)
83+
if (requestContentClone != null && request.Content == null)
84+
{
85+
request.Content = await CloneHttpContentAsync(requestContentClone);
86+
}
7787

78-
response = await base.SendAsync(request, cancellationToken);
88+
response = await base.SendAsync(request, cancellationToken);
7989

80-
// If it's not a retryable status code, return immediately
81-
if (!IsRetryableStatusCode(response.StatusCode))
82-
{
83-
// Only log retry summary if retries occurred
84-
if (attemptCount > 0)
90+
// If it's not a retryable status code, return immediately
91+
if (!IsRetryableStatusCode(response.StatusCode))
8592
{
86-
activity?.SetTag("http.retry.total_attempts", attemptCount);
87-
activity?.SetTag("http.response.status_code", (int)response.StatusCode);
93+
// Only log retry summary if retries occurred
94+
if (attemptCount > 0)
95+
{
96+
activity?.SetTag("http.retry.total_attempts", attemptCount);
97+
activity?.SetTag("http.response.status_code", (int)response.StatusCode);
98+
}
99+
return response;
88100
}
89-
return response;
90-
}
91101

92-
attemptCount++;
102+
attemptCount++;
93103

94-
activity?.SetTag("http.retry.attempt", attemptCount);
95-
activity?.SetTag("http.response.status_code", (int)response.StatusCode);
104+
activity?.SetTag("http.retry.attempt", attemptCount);
105+
activity?.SetTag("http.response.status_code", (int)response.StatusCode);
96106

97-
// Check if we've exceeded the timeout
98-
TimeSpan elapsedTime = DateTime.UtcNow - startTime;
99-
if (_retryTimeoutSeconds > 0 && elapsedTime.TotalSeconds > _retryTimeoutSeconds)
100-
{
101-
// We've exceeded the timeout, so break out of the loop
102-
break;
103-
}
107+
// Check if we've exceeded the timeout
108+
TimeSpan elapsedTime = DateTime.UtcNow - startTime;
109+
if (_retryTimeoutSeconds > 0 && elapsedTime.TotalSeconds > _retryTimeoutSeconds)
110+
{
111+
// We've exceeded the timeout, so break out of the loop
112+
break;
113+
}
104114

105-
int waitSeconds;
115+
int waitSeconds;
106116

107-
// Check for Retry-After header
108-
if (response.Headers.TryGetValues("Retry-After", out var retryAfterValues))
109-
{
110-
// Parse the Retry-After value
111-
string retryAfterValue = string.Join(",", retryAfterValues);
112-
if (int.TryParse(retryAfterValue, out int retryAfterSeconds) && retryAfterSeconds > 0)
117+
// Check for Retry-After header
118+
if (response.Headers.TryGetValues("Retry-After", out var retryAfterValues))
113119
{
114-
// Use the Retry-After value
115-
waitSeconds = retryAfterSeconds;
116-
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Using server-specified retry after {waitSeconds} seconds. Attempt {attemptCount}.";
120+
// Parse the Retry-After value
121+
string retryAfterValue = string.Join(",", retryAfterValues);
122+
if (int.TryParse(retryAfterValue, out int retryAfterSeconds) && retryAfterSeconds > 0)
123+
{
124+
// Use the Retry-After value
125+
waitSeconds = retryAfterSeconds;
126+
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Using server-specified retry after {waitSeconds} seconds. Attempt {attemptCount}.";
127+
}
128+
else
129+
{
130+
// Invalid Retry-After value, use exponential backoff
131+
waitSeconds = CalculateBackoffWithJitter(currentBackoffSeconds);
132+
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Invalid Retry-After header, using exponential backoff of {waitSeconds} seconds. Attempt {attemptCount}.";
133+
}
117134
}
118135
else
119136
{
120-
// Invalid Retry-After value, use exponential backoff
137+
// No Retry-After header, use exponential backoff
121138
waitSeconds = CalculateBackoffWithJitter(currentBackoffSeconds);
122-
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Invalid Retry-After header, using exponential backoff of {waitSeconds} seconds. Attempt {attemptCount}.";
139+
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Using exponential backoff of {waitSeconds} seconds. Attempt {attemptCount}.";
123140
}
124-
}
125-
else
126-
{
127-
// No Retry-After header, use exponential backoff
128-
waitSeconds = CalculateBackoffWithJitter(currentBackoffSeconds);
129-
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Using exponential backoff of {waitSeconds} seconds. Attempt {attemptCount}.";
130-
}
131141

132-
// Dispose the response before retrying
133-
response.Dispose();
142+
// Dispose the response before retrying
143+
response.Dispose();
134144

135-
// Reset the request content for the next attempt
136-
request.Content = null;
145+
// Reset the request content for the next attempt
146+
request.Content = null;
137147

138-
// Update total retry time
139-
totalRetrySeconds += waitSeconds;
140-
if (_retryTimeoutSeconds > 0 && totalRetrySeconds > _retryTimeoutSeconds)
141-
{
142-
// We've exceeded the timeout, so break out of the loop
143-
break;
144-
}
148+
// Update total retry time
149+
totalRetrySeconds += waitSeconds;
150+
if (_retryTimeoutSeconds > 0 && totalRetrySeconds > _retryTimeoutSeconds)
151+
{
152+
// We've exceeded the timeout, so break out of the loop
153+
break;
154+
}
145155

146-
// Wait for the calculated time
147-
await Task.Delay(TimeSpan.FromSeconds(waitSeconds), cancellationToken);
156+
// Wait for the calculated time
157+
await Task.Delay(TimeSpan.FromSeconds(waitSeconds), cancellationToken);
148158

149-
// Increase backoff for next attempt (exponential backoff)
150-
currentBackoffSeconds = Math.Min(currentBackoffSeconds * 2, _maxBackoffSeconds);
151-
} while (!cancellationToken.IsCancellationRequested);
159+
// Increase backoff for next attempt (exponential backoff)
160+
currentBackoffSeconds = Math.Min(currentBackoffSeconds * 2, _maxBackoffSeconds);
161+
} while (!cancellationToken.IsCancellationRequested);
152162

153-
activity?.SetTag("http.retry.total_attempts", attemptCount);
154-
activity?.SetTag("http.response.status_code", (int)response.StatusCode);
163+
activity?.SetTag("http.retry.total_attempts", attemptCount);
164+
activity?.SetTag("http.response.status_code", (int)response.StatusCode);
155165

156-
// If we get here, we've either exceeded the timeout or been cancelled
157-
if (cancellationToken.IsCancellationRequested)
158-
{
159-
activity?.SetTag("http.retry.outcome", "cancelled");
160-
var cancelEx = new OperationCanceledException("Request cancelled during retry wait", cancellationToken);
161-
activity?.AddException(cancelEx, [
162-
new("error.context", "http.retry.cancelled"),
163-
new("attempts", attemptCount)
164-
]);
165-
}
166+
// If we get here, we've either exceeded the timeout or been cancelled
167+
if (cancellationToken.IsCancellationRequested)
168+
{
169+
activity?.SetTag("http.retry.outcome", "cancelled");
170+
var cancelEx = new OperationCanceledException("Request cancelled during retry wait", cancellationToken);
171+
activity?.AddException(cancelEx, [
172+
new("error.context", "http.retry.cancelled"),
173+
new("attempts", attemptCount)
174+
]);
175+
throw cancelEx;
176+
}
166177

167-
// Timeout exceeded
168-
activity?.SetTag("http.retry.outcome", "timeout_exceeded");
169-
var exception = new DatabricksException(lastErrorMessage ?? "Service temporarily unavailable and retry timeout exceeded", AdbcStatusCode.IOError).SetSqlState("08001");
170-
activity?.AddException(exception, [
171-
new("error.context", "http.retry.timeout_exceeded"),
172-
new("attempts", attemptCount),
173-
new("total_retry_seconds", totalRetrySeconds),
174-
new("timeout_seconds", _retryTimeoutSeconds)
175-
]);
176-
throw exception;
178+
// Timeout exceeded
179+
activity?.SetTag("http.retry.outcome", "timeout_exceeded");
180+
var exception = new DatabricksException(lastErrorMessage ?? "Service temporarily unavailable and retry timeout exceeded", AdbcStatusCode.IOError).SetSqlState("08001");
181+
activity?.AddException(exception, [
182+
new("error.context", "http.retry.timeout_exceeded"),
183+
new("attempts", attemptCount),
184+
new("total_retry_seconds", totalRetrySeconds),
185+
new("timeout_seconds", _retryTimeoutSeconds)
186+
]);
187+
throw exception;
188+
}, activityName: "RetryHttp");
177189
}
178190

179191
/// <summary>

0 commit comments

Comments
 (0)