Skip to content

Commit f0a75e8

Browse files
committed
Added connection params of rateLimitRetry and rateLimitRetryTimeout
1 parent 9051acf commit f0a75e8

File tree

4 files changed

+189
-40
lines changed

4 files changed

+189
-40
lines changed

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ internal class DatabricksConnection : SparkHttpConnection
7373
private const long DefaultMaxBytesPerFetchRequest = 400 * 1024 * 1024; // 400MB
7474
private long _maxBytesPerFetchRequest = DefaultMaxBytesPerFetchRequest;
7575
private const bool DefaultRetryOnUnavailable = true;
76+
private const bool DefaultRateLimitRetry = true;
7677
private const int DefaultTemporarilyUnavailableRetryTimeout = 900;
78+
private const int DefaultRateLimitRetryTimeout = 120;
7779
private bool _useDescTableExtended = false;
7880

7981
// Trace propagation configuration
@@ -540,15 +542,25 @@ protected internal override bool TrySetGetDirectResults(IRequest request)
540542
public bool RunAsyncInThrift => _runAsyncInThrift;
541543

542544
/// <summary>
543-
/// Gets a value indicating whether to retry requests that receive a 503 response with a Retry-After header.
545+
/// Gets a value indicating whether to retry requests that receive retryable responses (408, 502, 503, 504) .
544546
/// </summary>
545547
protected bool TemporarilyUnavailableRetry { get; private set; } = DefaultRetryOnUnavailable;
546548

547549
/// <summary>
548-
/// Gets the maximum total time in seconds to retry 503 responses before failing.
550+
/// Gets the maximum total time in seconds to retry retryable responses (408, 502, 503, 504) before failing.
549551
/// </summary>
550552
protected int TemporarilyUnavailableRetryTimeout { get; private set; } = DefaultTemporarilyUnavailableRetryTimeout;
551553

554+
/// <summary>
555+
/// Gets a value indicating whether to retry requests that receive HTTP 429 responses.
556+
/// </summary>
557+
protected bool RateLimitRetry { get; private set; } = DefaultRateLimitRetry;
558+
559+
/// <summary>
560+
/// Gets the number of seconds to wait before stopping an attempt to retry HTTP 429 responses.
561+
/// </summary>
562+
protected int RateLimitRetryTimeout { get; private set; } = DefaultRateLimitRetryTimeout;
563+
552564
protected override HttpMessageHandler CreateHttpHandler()
553565
{
554566
HttpMessageHandler baseHandler = base.CreateHttpHandler();
@@ -565,7 +577,7 @@ protected override HttpMessageHandler CreateHttpHandler()
565577
// Current chain order (outermost to innermost):
566578
// 1. OAuth handlers (OAuthDelegatingHandler, etc.) - only on baseHandler for API requests
567579
// 2. ThriftErrorMessageHandler - extracts x-thriftserver-error-message and throws descriptive exceptions
568-
// 3. RetryHttpHandler - retries 408, 502, 503, 504 with Retry-After support
580+
// 3. RetryHttpHandler - retries 408, 429, 502, 503, 504 with Retry-After support
569581
// 4. TracingDelegatingHandler - propagates W3C trace context
570582
// 5. Base HTTP handler - actual network communication
571583
//
@@ -586,16 +598,16 @@ protected override HttpMessageHandler CreateHttpHandler()
586598
baseAuthHandler = new TracingDelegatingHandler(baseAuthHandler, this, _traceParentHeaderName, _traceStateEnabled);
587599
}
588600

589-
if (TemporarilyUnavailableRetry)
601+
if (TemporarilyUnavailableRetry || RateLimitRetry)
590602
{
591-
// Add retry handler for 408, 502, 503, 504 responses with Retry-After support
603+
// Add retry handler for 408, 429, 502, 503, 504 responses with Retry-After support
592604
// This must be INSIDE ThriftErrorMessageHandler so retries happen before exceptions are thrown
593-
baseHandler = new RetryHttpHandler(baseHandler, TemporarilyUnavailableRetryTimeout);
594-
baseAuthHandler = new RetryHttpHandler(baseAuthHandler, TemporarilyUnavailableRetryTimeout);
605+
baseHandler = new RetryHttpHandler(baseHandler, TemporarilyUnavailableRetryTimeout, RateLimitRetryTimeout, TemporarilyUnavailableRetry, RateLimitRetry);
606+
baseAuthHandler = new RetryHttpHandler(baseAuthHandler, TemporarilyUnavailableRetryTimeout, RateLimitRetryTimeout, TemporarilyUnavailableRetry, RateLimitRetry);
595607
}
596608

597609
// Add Thrift error message handler AFTER retry handler (OUTSIDE in the chain)
598-
// This ensures retryable status codes (408, 502, 503, 504) are retried by RetryHttpHandler
610+
// This ensures retryable status codes (408, 429, 502, 503, 504) are retried by RetryHttpHandler
599611
// before ThriftErrorMessageHandler throws exceptions with Thrift error messages
600612
baseHandler = new ThriftErrorMessageHandler(baseHandler);
601613
baseAuthHandler = new ThriftErrorMessageHandler(baseAuthHandler);
@@ -900,6 +912,16 @@ protected override void ValidateOptions()
900912
TemporarilyUnavailableRetry = tempUnavailableRetryValue;
901913
}
902914

915+
if (Properties.TryGetValue(DatabricksParameters.RateLimitRetry, out string? rateLimitRetryStr))
916+
{
917+
if (!bool.TryParse(rateLimitRetryStr, out bool rateLimitRetryValue))
918+
{
919+
throw new ArgumentOutOfRangeException(DatabricksParameters.RateLimitRetry, rateLimitRetryStr,
920+
$"must be a value of false (disabled) or true (enabled). Default is true.");
921+
}
922+
923+
RateLimitRetry = rateLimitRetryValue;
924+
}
903925

904926
if (Properties.TryGetValue(DatabricksParameters.TemporarilyUnavailableRetryTimeout, out string? tempUnavailableRetryTimeoutStr))
905927
{
@@ -912,6 +934,17 @@ protected override void ValidateOptions()
912934
TemporarilyUnavailableRetryTimeout = tempUnavailableRetryTimeoutValue;
913935
}
914936

937+
if (Properties.TryGetValue(DatabricksParameters.RateLimitRetryTimeout, out string? rateLimitRetryTimeoutStr))
938+
{
939+
if (!int.TryParse(rateLimitRetryTimeoutStr, out int rateLimitRetryTimeoutValue) ||
940+
rateLimitRetryTimeoutValue < 0)
941+
{
942+
throw new ArgumentOutOfRangeException(DatabricksParameters.RateLimitRetryTimeout, rateLimitRetryTimeoutStr,
943+
$"must be a value of 0 (retry indefinitely) or a positive integer representing seconds. Default is 120 seconds (2 minutes).");
944+
}
945+
RateLimitRetryTimeout = rateLimitRetryTimeoutValue;
946+
}
947+
915948
// When TemporarilyUnavailableRetry is enabled, we need to make sure connection timeout (which is used to cancel the HttpConnection) is equal
916949
// or greater than TemporarilyUnavailableRetryTimeout so that it won't timeout before server startup timeout (TemporarilyUnavailableRetryTimeout)
917950
if (TemporarilyUnavailableRetry && TemporarilyUnavailableRetryTimeout * 1000 > ConnectTimeoutMilliseconds)

csharp/src/Drivers/Databricks/DatabricksParameters.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,17 +99,31 @@ public class DatabricksParameters : SparkParameters
9999
/// </summary>
100100
public const string ServerSidePropertyPrefix = "adbc.databricks.ssp_";
101101

102-
/// Controls whether to retry requests that receive a 503 response with a Retry-After header.
102+
/// <summary>
103+
/// Controls whether to retry requests that receive retryable responses (408, 502, 503, 504).
103104
/// Default value is true (enabled). Set to false to disable retry behavior.
104105
/// </summary>
105106
public const string TemporarilyUnavailableRetry = "adbc.spark.temporarily_unavailable_retry";
106107

107108
/// <summary>
108-
/// Maximum total time in seconds to retry 503 responses before failing.
109+
/// Maximum total time in seconds to retry retryable responses (408, 502, 503, 504) before failing.
109110
/// Default value is 900 seconds (15 minutes). Set to 0 to retry indefinitely.
110111
/// </summary>
111112
public const string TemporarilyUnavailableRetryTimeout = "adbc.spark.temporarily_unavailable_retry_timeout";
112113

114+
/// <summary>
115+
/// Controls whether to retry requests that receive HTTP 429 (TooManyRequests) response.
116+
/// Default value is true. Set to false to disable rate limit retry behavior.
117+
/// </summary>
118+
public const string RateLimitRetry = "adbc.databricks.rate_limit_retry";
119+
120+
/// <summary>
121+
/// The number of seconds that the connector waits before stopping an attempt to retry an operation
122+
/// when the operation receives an HTTP 429 response.
123+
/// Default value is 120 seconds. Set to 0 to retry indefinitely.
124+
/// </summary>
125+
public const string RateLimitRetryTimeout = "adbc.databricks.rate_limit_retry_timeout";
126+
113127
/// <summary>
114128
/// Maximum number of parallel downloads for CloudFetch operations.
115129
/// Default value is 3 if not specified.

csharp/src/Drivers/Databricks/RetryHttpHandler.cs

Lines changed: 57 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,44 @@
2525
namespace Apache.Arrow.Adbc.Drivers.Databricks
2626
{
2727
/// <summary>
28-
/// HTTP handler that implements retry behavior for 408, 502, 503, and 504 responses.
28+
/// HTTP handler that implements retry behavior for 408, 429, 502, 503, and 504 responses.
2929
/// Uses Retry-After header if present, otherwise uses exponential backoff.
3030
/// </summary>
3131
internal class RetryHttpHandler : DelegatingHandler
3232
{
3333
private readonly int _retryTimeoutSeconds;
34+
private readonly int _rateLimitRetryTimeoutSeconds;
35+
private readonly bool _retryTemporarilyUnavailableEnabled;
36+
private readonly bool _rateLimitRetryEnabled;
3437
private readonly int _initialBackoffSeconds = 1;
3538
private readonly int _maxBackoffSeconds = 32;
3639

3740
/// <summary>
3841
/// Initializes a new instance of the <see cref="RetryHttpHandler"/> class.
3942
/// </summary>
4043
/// <param name="innerHandler">The inner handler to delegate to.</param>
41-
/// <param name="retryTimeoutSeconds">Maximum total time in seconds to retry before failing.</param>
42-
public RetryHttpHandler(HttpMessageHandler innerHandler, int retryTimeoutSeconds)
44+
/// <param name="retryTimeoutSeconds">Maximum total time in seconds to retry retryable responses (408, 502, 503, 504) before failing.</param>
45+
/// <param name="rateLimitRetryTimeoutSeconds">Maximum total time in seconds to retry HTTP 429 responses before failing.</param>
46+
public RetryHttpHandler(HttpMessageHandler innerHandler, int retryTimeoutSeconds, int rateLimitRetryTimeoutSeconds)
47+
: this(innerHandler, retryTimeoutSeconds, rateLimitRetryTimeoutSeconds, true, true)
48+
{
49+
}
50+
51+
/// <summary>
52+
/// Initializes a new instance of the <see cref="RetryHttpHandler"/> class.
53+
/// </summary>
54+
/// <param name="innerHandler">The inner handler to delegate to.</param>
55+
/// <param name="retryTimeoutSeconds">Maximum total time in seconds to retry retryable responses (408, 502, 503, 504) before failing.</param>
56+
/// <param name="rateLimitRetryTimeoutSeconds">Maximum total time in seconds to retry 429 (rate limit) responses before failing.</param>
57+
/// <param name="retryTemporarilyUnavailableEnabled">Whether to retry temporarily unavailable (408, 502, 503, 504) responses.</param>
58+
/// <param name="rateLimitRetryEnabled">Whether to retry HTTP 429 responses.</param>
59+
public RetryHttpHandler(HttpMessageHandler innerHandler, int retryTimeoutSeconds, int rateLimitRetryTimeoutSeconds, bool retryTemporarilyUnavailableEnabled, bool rateLimitRetryEnabled)
4360
: base(innerHandler)
4461
{
4562
_retryTimeoutSeconds = retryTimeoutSeconds;
63+
_rateLimitRetryTimeoutSeconds = rateLimitRetryTimeoutSeconds;
64+
_retryTemporarilyUnavailableEnabled = retryTemporarilyUnavailableEnabled;
65+
_rateLimitRetryEnabled = rateLimitRetryEnabled;
4666
}
4767

4868
/// <summary>
@@ -59,10 +79,10 @@ protected override async Task<HttpResponseMessage> SendAsync(
5979

6080
HttpResponseMessage response;
6181
string? lastErrorMessage = null;
62-
DateTime startTime = DateTime.UtcNow;
6382
int attemptCount = 0;
6483
int currentBackoffSeconds = _initialBackoffSeconds;
65-
int totalRetrySeconds = 0;
84+
int totalServiceUnavailableRetrySeconds = 0;
85+
int totalTooManyRequestsRetrySeconds = 0;
6686

6787
do
6888
{
@@ -82,14 +102,6 @@ protected override async Task<HttpResponseMessage> SendAsync(
82102

83103
attemptCount++;
84104

85-
// Check if we've exceeded the timeout
86-
TimeSpan elapsedTime = DateTime.UtcNow - startTime;
87-
if (_retryTimeoutSeconds > 0 && elapsedTime.TotalSeconds > _retryTimeoutSeconds)
88-
{
89-
// We've exceeded the timeout, so break out of the loop
90-
break;
91-
}
92-
93105
int waitSeconds;
94106

95107
// Check for Retry-After header
@@ -123,12 +135,27 @@ protected override async Task<HttpResponseMessage> SendAsync(
123135
// Reset the request content for the next attempt
124136
request.Content = null;
125137

126-
// Update total retry time
127-
totalRetrySeconds += waitSeconds;
128-
if (_retryTimeoutSeconds > 0 && totalRetrySeconds > _retryTimeoutSeconds)
138+
// Check if we would exceed the timeout after waiting, based on error type
139+
bool isTooManyRequests = response.StatusCode == HttpStatusCode.TooManyRequests;
140+
if (isTooManyRequests)
141+
{
142+
// Check 429 rate limit timeout
143+
if (_rateLimitRetryTimeoutSeconds > 0 && totalTooManyRequestsRetrySeconds + waitSeconds > _rateLimitRetryTimeoutSeconds)
144+
{
145+
// We've exceeded the rate limit retry timeout, so break out of the loop
146+
break;
147+
}
148+
totalTooManyRequestsRetrySeconds += waitSeconds;
149+
}
150+
else
129151
{
130-
// We've exceeded the timeout, so break out of the loop
131-
break;
152+
// Check service unavailable timeout for other retryable errors (408, 502, 503, 504)
153+
if (_retryTimeoutSeconds > 0 && totalServiceUnavailableRetrySeconds + waitSeconds > _retryTimeoutSeconds)
154+
{
155+
// We've exceeded the retry timeout, so break out of the loop
156+
break;
157+
}
158+
totalServiceUnavailableRetrySeconds += waitSeconds;
132159
}
133160

134161
// Wait for the calculated time
@@ -153,10 +180,18 @@ protected override async Task<HttpResponseMessage> SendAsync(
153180
/// </summary>
154181
private bool IsRetryableStatusCode(HttpStatusCode statusCode)
155182
{
156-
return statusCode == HttpStatusCode.RequestTimeout || // 408
157-
statusCode == HttpStatusCode.BadGateway || // 502
158-
statusCode == HttpStatusCode.ServiceUnavailable || // 503
159-
statusCode == HttpStatusCode.GatewayTimeout; // 504
183+
// Check too many requests separately
184+
if (statusCode == HttpStatusCode.TooManyRequests) // 429
185+
return _rateLimitRetryEnabled;
186+
187+
// Check other retryable codes
188+
if (statusCode == HttpStatusCode.RequestTimeout || // 408
189+
statusCode == HttpStatusCode.BadGateway || // 502
190+
statusCode == HttpStatusCode.ServiceUnavailable || // 503
191+
statusCode == HttpStatusCode.GatewayTimeout) // 504
192+
return _retryTemporarilyUnavailableEnabled;
193+
194+
return false;
160195
}
161196

162197
/// <summary>

0 commit comments

Comments
 (0)