Skip to content

Commit b2e4bfd

Browse files
authored
Add retry logic to RestService (#2353)
* Add retry logic to RestService * Minor fix * Minor fix
1 parent cfafad4 commit b2e4bfd

File tree

5 files changed

+241
-15
lines changed

5 files changed

+241
-15
lines changed

src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,21 @@ public class CachedDekRegistryClient : IDekRegistryClient
4848
/// </summary>
4949
public const int DefaultTimeout = 30000;
5050

51+
/// <summary>
52+
/// The default maximum number of retries.
53+
/// </summary>
54+
public const int DefaultMaxRetries = RestService.DefaultMaxRetries;
55+
56+
/// <summary>
57+
/// The default time to wait for the first retry.
58+
/// </summary>
59+
public const int DefaultRetriesWaitMs = RestService.DefaultRetriesWaitMs;
60+
61+
/// <summary>
62+
/// The default time to wait for any retry.
63+
/// </summary>
64+
public const int DefaultRetriesMaxWaitMs = RestService.DefaultRetriesMaxWaitMs;
65+
5166
/// <summary>
5267
/// The default maximum capacity of the local cache.
5368
/// </summary>
@@ -105,6 +120,45 @@ public CachedDekRegistryClient(IEnumerable<KeyValuePair<string, string>> config,
105120
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs} must be an integer.");
106121
}
107122

123+
var maxRetriesMaybe = config.FirstOrDefault(prop =>
124+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries);
125+
int maxRetries;
126+
try
127+
{
128+
maxRetries = maxRetriesMaybe.Value == null ? DefaultMaxRetries : Convert.ToInt32(maxRetriesMaybe.Value);
129+
}
130+
catch (FormatException)
131+
{
132+
throw new ArgumentException(
133+
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries} must be an integer.");
134+
}
135+
136+
var retriesWaitMsMaybe = config.FirstOrDefault(prop =>
137+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs);
138+
int retriesWaitMs;
139+
try
140+
{
141+
retriesWaitMs = retriesWaitMsMaybe.Value == null ? DefaultRetriesWaitMs : Convert.ToInt32(retriesWaitMsMaybe.Value);
142+
}
143+
catch (FormatException)
144+
{
145+
throw new ArgumentException(
146+
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs} must be an integer.");
147+
}
148+
149+
var retriesMaxWaitMsMaybe = config.FirstOrDefault(prop =>
150+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs);
151+
int retriesMaxWaitMs;
152+
try
153+
{
154+
retriesMaxWaitMs = retriesMaxWaitMsMaybe.Value == null ? DefaultRetriesMaxWaitMs : Convert.ToInt32(retriesMaxWaitMsMaybe.Value);
155+
}
156+
catch (FormatException)
157+
{
158+
throw new ArgumentException(
159+
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs} must be an integer.");
160+
}
161+
108162
var identityMapCapacityMaybe = config.FirstOrDefault(prop =>
109163
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas);
110164
try
@@ -210,6 +264,9 @@ public CachedDekRegistryClient(IEnumerable<KeyValuePair<string, string>> config,
210264

211265
if (property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl &&
212266
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs &&
267+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries &&
268+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs &&
269+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs &&
213270
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas &&
214271
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs &&
215272
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource &&
@@ -242,7 +299,8 @@ public CachedDekRegistryClient(IEnumerable<KeyValuePair<string, string>> config,
242299

243300
var sslCaLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value;
244301
var sslCaCertificate = string.IsNullOrEmpty(sslCaLocation) ? null : new X509Certificate2(sslCaLocation);
245-
this.restService = new DekRestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, SetSslConfig(config), sslVerify, sslCaCertificate, proxy);
302+
this.restService = new DekRestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider,
303+
SetSslConfig(config), sslVerify, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs);
246304
}
247305

248306
/// <summary>

src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ public class DekRestService : RestService
3030
/// </summary>
3131
public DekRestService(string schemaRegistryUrl, int timeoutMs,
3232
IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, List<X509Certificate2> certificates,
33-
bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null) :
33+
bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null,
34+
int maxRetries = DefaultMaxRetries, int retriesWaitMs = DefaultRetriesWaitMs,
35+
int retriesMaxWaitMs = DefaultRetriesMaxWaitMs) :
3436
base(schemaRegistryUrl, timeoutMs, authenticationHeaderValueProvider, certificates,
35-
enableSslCertificateVerification, sslCaCertificate, proxy)
37+
enableSslCertificateVerification, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs)
3638
{
3739
}
3840

src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,21 @@ public class CachedSchemaRegistryClient : ISchemaRegistryClient
8888
/// </summary>
8989
public const int DefaultTimeout = 30000;
9090

91+
/// <summary>
92+
/// The default maximum number of retries.
93+
/// </summary>
94+
public const int DefaultMaxRetries = RestService.DefaultMaxRetries;
95+
96+
/// <summary>
97+
/// The default time to wait for the first retry.
98+
/// </summary>
99+
public const int DefaultRetriesWaitMs = RestService.DefaultRetriesWaitMs;
100+
101+
/// <summary>
102+
/// The default time to wait for any retry.
103+
/// </summary>
104+
public const int DefaultRetriesMaxWaitMs = RestService.DefaultRetriesMaxWaitMs;
105+
91106
/// <summary>
92107
/// The default maximum capacity of the local schema cache.
93108
/// </summary>
@@ -221,6 +236,45 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
221236
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs} must be an integer.");
222237
}
223238

239+
var maxRetriesMaybe = config.FirstOrDefault(prop =>
240+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries);
241+
int maxRetries;
242+
try
243+
{
244+
maxRetries = maxRetriesMaybe.Value == null ? DefaultMaxRetries : Convert.ToInt32(maxRetriesMaybe.Value);
245+
}
246+
catch (FormatException)
247+
{
248+
throw new ArgumentException(
249+
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries} must be an integer.");
250+
}
251+
252+
var retriesWaitMsMaybe = config.FirstOrDefault(prop =>
253+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs);
254+
int retriesWaitMs;
255+
try
256+
{
257+
retriesWaitMs = retriesWaitMsMaybe.Value == null ? DefaultRetriesWaitMs : Convert.ToInt32(retriesWaitMsMaybe.Value);
258+
}
259+
catch (FormatException)
260+
{
261+
throw new ArgumentException(
262+
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs} must be an integer.");
263+
}
264+
265+
var retriesMaxWaitMsMaybe = config.FirstOrDefault(prop =>
266+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs);
267+
int retriesMaxWaitMs;
268+
try
269+
{
270+
retriesMaxWaitMs = retriesMaxWaitMsMaybe.Value == null ? DefaultRetriesMaxWaitMs : Convert.ToInt32(retriesMaxWaitMsMaybe.Value);
271+
}
272+
catch (FormatException)
273+
{
274+
throw new ArgumentException(
275+
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs} must be an integer.");
276+
}
277+
224278
var identityMapCapacityMaybe = config.FirstOrDefault(prop =>
225279
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas);
226280
try
@@ -340,6 +394,9 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
340394

341395
if (property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl &&
342396
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs &&
397+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries &&
398+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs &&
399+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs &&
343400
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas &&
344401
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs &&
345402
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource &&
@@ -372,7 +429,8 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
372429

373430
var sslCaLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value;
374431
var sslCaCertificate = string.IsNullOrEmpty(sslCaLocation) ? null : new X509Certificate2(sslCaLocation);
375-
this.restService = new RestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, SetSslConfig(config), sslVerify, sslCaCertificate, proxy);
432+
this.restService = new RestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider,
433+
SetSslConfig(config), sslVerify, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs);
376434
}
377435

378436
/// <summary>

src/Confluent.SchemaRegistry/Rest/RestService.cs

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ public class RestService : IRestService
3434

3535
private static readonly string acceptHeader = string.Join(", ", Versions.PreferredResponseTypes);
3636

37+
public const int DefaultMaxRetries = 2;
38+
39+
public const int DefaultRetriesWaitMs = 1000;
40+
41+
public const int DefaultRetriesMaxWaitMs = 20000;
42+
43+
private static Random random = new Random();
44+
3745
/// <summary>
3846
/// The index of the last client successfully used (or random if none worked).
3947
/// </summary>
@@ -51,15 +59,25 @@ public class RestService : IRestService
5159
/// </summary>
5260
private readonly IAuthenticationHeaderValueProvider authenticationHeaderValueProvider;
5361

62+
private int maxRetries;
63+
64+
private int retriesWaitMs;
65+
66+
private int retriesMaxWaitMs;
5467

5568
/// <summary>
5669
/// Initializes a new instance of the RestService class.
5770
/// </summary>
5871
public RestService(string schemaRegistryUrl, int timeoutMs,
5972
IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, List<X509Certificate2> certificates,
60-
bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null)
73+
bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null,
74+
int maxRetries = DefaultMaxRetries, int retriesWaitMs = DefaultRetriesWaitMs,
75+
int retriesMaxWaitMs = DefaultRetriesMaxWaitMs)
6176
{
6277
this.authenticationHeaderValueProvider = authenticationHeaderValueProvider;
78+
this.maxRetries = maxRetries;
79+
this.retriesWaitMs = retriesWaitMs;
80+
this.retriesMaxWaitMs = retriesMaxWaitMs;
6381

6482
this.clients = schemaRegistryUrl
6583
.Split(',')
@@ -91,11 +109,11 @@ private static HttpClientHandler CreateHandler(List<X509Certificate2> certificat
91109
if (!enableSslCertificateVerification)
92110
{
93111
handler.ServerCertificateCustomValidationCallback = (_, __, ___, ____) => { return true; };
94-
}
112+
}
95113
else if (sslCaCertificate != null)
96114
{
97-
handler.ServerCertificateCustomValidationCallback = (_, __, chain, policyErrors) => {
98-
115+
handler.ServerCertificateCustomValidationCallback = (_, __, chain, policyErrors) => {
116+
99117
if (policyErrors == SslPolicyErrors.None)
100118
{
101119
return true;
@@ -105,7 +123,7 @@ private static HttpClientHandler CreateHandler(List<X509Certificate2> certificat
105123
if (chain.ChainElements.Count < 2)
106124
{
107125
return false;
108-
}
126+
}
109127
var connectionCertHash = chain.ChainElements[1].Certificate.GetCertHash();
110128

111129

@@ -123,7 +141,7 @@ private static HttpClientHandler CreateHandler(List<X509Certificate2> certificat
123141
return false;
124142
}
125143
}
126-
return true;
144+
return true;
127145
};
128146
}
129147

@@ -205,12 +223,9 @@ private async Task<HttpResponseMessage> ExecuteOnOneInstanceAsync(Func<HttpReque
205223

206224
try
207225
{
208-
response = await clients[clientIndex]
209-
.SendAsync(createRequest())
210-
.ConfigureAwait(continueOnCapturedContext: false);
226+
response = await SendRequest(clients[clientIndex], createRequest);
211227

212-
if (response.StatusCode == HttpStatusCode.OK ||
213-
response.StatusCode == HttpStatusCode.NoContent)
228+
if (IsSuccess((int)response.StatusCode))
214229
{
215230
lock (lastClientUsedLock)
216231
{
@@ -293,6 +308,44 @@ await response.Content.ReadAsStringAsync()
293308
throw new HttpRequestException(aggregatedErrorMessage);
294309
}
295310

311+
private async Task<HttpResponseMessage> SendRequest(
312+
HttpClient client, Func<HttpRequestMessage> createRequest)
313+
{
314+
HttpResponseMessage response = null;
315+
for (int i = 0; i < maxRetries; i++)
316+
{
317+
response = await client
318+
.SendAsync(createRequest())
319+
.ConfigureAwait(continueOnCapturedContext: false);
320+
if (IsSuccess((int)response.StatusCode) || !IsRetriable((int)response.StatusCode) || i >= maxRetries)
321+
{
322+
return response;
323+
}
324+
325+
await Task.Delay(CalculateRetryDelay(retriesWaitMs, retriesMaxWaitMs, i));
326+
}
327+
return response;
328+
}
329+
330+
private static bool IsSuccess(int statusCode)
331+
{
332+
return statusCode >= 200 && statusCode < 300;
333+
}
334+
335+
private static bool IsRetriable(int statusCode)
336+
{
337+
return statusCode == 408 || statusCode == 429 ||
338+
statusCode == 500 || statusCode == 502 || statusCode == 503 || statusCode == 504;
339+
}
340+
341+
protected static int CalculateRetryDelay(int baseDelayMs, int maxDelayMs, int retriesAttempted)
342+
{
343+
double jitter;
344+
lock (random) {
345+
jitter = random.NextDouble();
346+
}
347+
return Convert.ToInt32(Math.Min(jitter * Math.Pow(2, retriesAttempted) * baseDelayMs, maxDelayMs));
348+
}
296349

297350
/// <remarks>
298351
/// Used for end points that return a json object { ... }

src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,28 @@ public static class PropertyNames
4444
/// </summary>
4545
public const string SchemaRegistryRequestTimeoutMs = "schema.registry.request.timeout.ms";
4646

47+
/// <summary>
48+
/// Specifies the maximum number of retries for a request.
49+
///
50+
/// default: 3
51+
/// </summary>
52+
public const string SchemaRegistryMaxRetries = "schema.registry.max.retries";
53+
54+
/// <summary>
55+
/// Specifies the maximum time to wait for the first retry.
56+
/// When jitter is applied, the actual wait may be less.
57+
///
58+
/// default: 1000
59+
/// </summary>
60+
public const string SchemaRegistryRetriesWaitMs = "schema.registry.retries.wait.ms";
61+
62+
/// <summary>
63+
/// Specifies the maximum time to wait any retry.
64+
///
65+
/// default: 20000
66+
/// </summary>
67+
public const string SchemaRegistryRetriesMaxWaitMs = "schema.registry.retries.max.wait.ms";
68+
4769
/// <summary>
4870
/// Specifies the maximum number of schemas CachedSchemaRegistryClient
4971
/// should cache locally.
@@ -187,6 +209,39 @@ public int? RequestTimeoutMs
187209
set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs, value?.ToString()); }
188210
}
189211

212+
/// <summary>
213+
/// Specifies the maximum number of retries for a request.
214+
///
215+
/// default: 3
216+
/// </summary>
217+
public int? MaxRetries
218+
{
219+
get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries); }
220+
set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries, value?.ToString()); }
221+
}
222+
223+
/// <summary>
224+
/// Specifies the time to wait for the first retry.
225+
///
226+
/// default: 1000
227+
/// </summary>
228+
public int? RetriesWaitMs
229+
{
230+
get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs); }
231+
set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs, value?.ToString()); }
232+
}
233+
234+
/// <summary>
235+
/// Specifies the time to wait for any retry.
236+
///
237+
/// default: 20000
238+
/// </summary>
239+
public int? RetriesMaxWaitMs
240+
{
241+
get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs); }
242+
set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs, value?.ToString()); }
243+
}
244+
190245
/// <summary>
191246
/// File or directory path to CA certificate(s) for verifying the schema registry's key.
192247
///

0 commit comments

Comments
 (0)