Skip to content

Commit 7aac43f

Browse files
authored
DG-21472: use SocketsHttpHandler to support connection pooling for Schema Registry client, for .NET 6.0+ (#2502)
* update * config
1 parent c173ba5 commit 7aac43f

File tree

5 files changed

+186
-56
lines changed

5 files changed

+186
-56
lines changed

src/Confluent.SchemaRegistry.Encryption/CachedDekRegistryClient.cs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,24 @@
2525
namespace Confluent.SchemaRegistry.Encryption
2626
{
2727
public record KekId(string Name, bool LookupDeletedKeks);
28-
28+
2929
public record DekId(string KekName, string Subject, int? Version, DekFormat? DekFormat, bool LookupDeletedDeks);
30-
30+
3131
/// <summary>
3232
/// A caching DEK Registry client.
3333
/// </summary>
3434
public class CachedDekRegistryClient : IDekRegistryClient
3535
{
3636
private DekRestService restService;
37-
37+
3838
private int identityMapCapacity;
39-
39+
4040
private readonly IDictionary<KekId, RegisteredKek> keks = new Dictionary<KekId, RegisteredKek>();
4141

4242
private readonly IDictionary<DekId, RegisteredDek> deks = new Dictionary<DekId, RegisteredDek>();
4343

4444
private readonly SemaphoreSlim cacheMutex = new SemaphoreSlim(1);
45-
45+
4646
/// <summary>
4747
/// The default timeout value for Schema Registry REST API calls.
4848
/// </summary>
@@ -63,6 +63,11 @@ public class CachedDekRegistryClient : IDekRegistryClient
6363
/// </summary>
6464
public const int DefaultRetriesMaxWaitMs = RestService.DefaultRetriesMaxWaitMs;
6565

66+
/// <summary>
67+
/// The default maximum number of connections per server.
68+
/// </summary>
69+
public const int DefaultMaxConnectionsPerServer = 20;
70+
6671
/// <summary>
6772
/// The default maximum capacity of the local cache.
6873
/// </summary>
@@ -159,6 +164,19 @@ public CachedDekRegistryClient(IEnumerable<KeyValuePair<string, string>> config,
159164
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs} must be an integer.");
160165
}
161166

167+
var maxConnectionsPerServerMaybe = config.FirstOrDefault(prop =>
168+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxConnectionsPerServer);
169+
int maxConnectionsPerServer;
170+
try
171+
{
172+
maxConnectionsPerServer = maxConnectionsPerServerMaybe.Value == null ? DefaultMaxConnectionsPerServer : Convert.ToInt32(maxConnectionsPerServerMaybe.Value);
173+
}
174+
catch (FormatException)
175+
{
176+
throw new ArgumentException(
177+
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxConnectionsPerServer} must be an integer.");
178+
}
179+
162180
var identityMapCapacityMaybe = config.FirstOrDefault(prop =>
163181
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas);
164182
try
@@ -188,6 +206,7 @@ public CachedDekRegistryClient(IEnumerable<KeyValuePair<string, string>> config,
188206
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries &&
189207
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs &&
190208
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs &&
209+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxConnectionsPerServer &&
191210
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas &&
192211
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs &&
193212
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource &&
@@ -229,7 +248,7 @@ public CachedDekRegistryClient(IEnumerable<KeyValuePair<string, string>> config,
229248
var sslCaLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value;
230249
var sslCaCertificate = string.IsNullOrEmpty(sslCaLocation) ? null : new X509Certificate2(sslCaLocation);
231250
this.restService = new DekRestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider,
232-
SetSslConfig(config), sslVerify, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs);
251+
SetSslConfig(config), sslVerify, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs, maxConnectionsPerServer);
233252
}
234253

235254
/// <summary>
@@ -246,10 +265,10 @@ public CachedDekRegistryClient(IEnumerable<KeyValuePair<string, string>> config)
246265

247266
/// <remarks>
248267
/// This is to make sure memory doesn't explode in the case of incorrect usage.
249-
///
250-
/// It's behavior is pretty extreme - remove everything and start again if the
268+
///
269+
/// It's behavior is pretty extreme - remove everything and start again if the
251270
/// cache gets full. However, in practical situations this is not expected.
252-
///
271+
///
253272
/// TODO: Implement an LRU Cache here or something instead.
254273
/// </remarks>
255274
private bool CleanCacheIfFull()
@@ -327,7 +346,7 @@ public Task<List<string>> GetDeksAsync(string kekName, bool ignoreDeletedDeks)
327346

328347
/// <inheritdoc/>
329348
public Task<List<int>> GetDekVersionsAsync(string kekName, string subject, DekFormat? algorithm,
330-
bool ignoreDeletedDeks)
349+
bool ignoreDeletedDeks)
331350
=> restService.GetDekVersionsAsync(kekName, subject, algorithm, ignoreDeletedDeks);
332351

333352
/// <inheritdoc/>
@@ -397,7 +416,7 @@ public Task<RegisteredDek> CreateDekAsync(string kekName, Dek dek)
397416
this.deks.Remove(new DekId(kekName, dek.Subject, -1, dek.Algorithm, true));
398417
}
399418
}
400-
419+
401420
/// <summary>
402421
/// Releases unmanaged resources owned by this CachedSchemaRegistryClient instance.
403422
/// </summary>
@@ -423,4 +442,4 @@ protected virtual void Dispose(bool disposing)
423442
}
424443
}
425444
}
426-
}
445+
}

src/Confluent.SchemaRegistry.Encryption/Rest/DekRestService.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ public DekRestService(string schemaRegistryUrl, int timeoutMs,
3232
IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, List<X509Certificate2> certificates,
3333
bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null,
3434
int maxRetries = DefaultMaxRetries, int retriesWaitMs = DefaultRetriesWaitMs,
35-
int retriesMaxWaitMs = DefaultRetriesMaxWaitMs) :
35+
int retriesMaxWaitMs = DefaultRetriesMaxWaitMs, int maxConnectionsPerServer = DefaultMaxConnectionsPerServer) :
3636
base(schemaRegistryUrl, timeoutMs, authenticationHeaderValueProvider, certificates,
37-
enableSslCertificateVerification, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs)
37+
enableSslCertificateVerification, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs, maxConnectionsPerServer)
3838
{
3939
}
4040

@@ -110,4 +110,4 @@ protected internal static IAuthenticationHeaderValueProvider
110110
authenticationHeaderValueProvider, maxRetries, retriesWaitMs, retriesMaxWaitMs);
111111
}
112112
}
113-
}
113+
}

src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,12 @@ private record struct SchemaGuid(string Guid, string Format);
7474

7575
private readonly ConcurrentDictionary<string /*subject*/, ConcurrentDictionary<int, Task<RegisteredSchema>>> schemaByVersionBySubject =
7676
new ConcurrentDictionary<string, ConcurrentDictionary<int, Task<RegisteredSchema>>>();
77-
77+
7878
private readonly ConcurrentDictionary<string /*subject*/, ConcurrentDictionary<Schema, Task<RegisteredSchema>>> registeredSchemaBySchemaBySubject =
7979
new ConcurrentDictionary<string, ConcurrentDictionary<Schema, Task<RegisteredSchema>>>();
8080

8181
private readonly MemoryCache latestVersionBySubject = new MemoryCache(new MemoryCacheOptions());
82-
82+
8383
private readonly MemoryCache latestWithMetadataBySubject = new MemoryCache(new MemoryCacheOptions());
8484

8585
private SubjectNameStrategyDelegate keySubjectNameStrategy;
@@ -106,6 +106,11 @@ private record struct SchemaGuid(string Guid, string Format);
106106
/// </summary>
107107
public const int DefaultRetriesMaxWaitMs = RestService.DefaultRetriesMaxWaitMs;
108108

109+
/// <summary>
110+
/// The default maximum number of connections per server.
111+
/// </summary>
112+
public const int DefaultMaxConnectionsPerServer = 20;
113+
109114
/// <summary>
110115
/// The default maximum capacity of the local schema cache.
111116
/// </summary>
@@ -131,7 +136,6 @@ private record struct SchemaGuid(string Guid, string Format);
131136
/// </summary>
132137
public const SubjectNameStrategy DefaultValueSubjectNameStrategy = SubjectNameStrategy.Topic;
133138

134-
135139
/// <inheritdoc />
136140
public IEnumerable<KeyValuePair<string, string>> Config
137141
=> config;
@@ -208,7 +212,7 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
208212
{
209213
throw new ArgumentNullException("config");
210214
}
211-
215+
212216
this.config = config;
213217
this.authHeaderProvider = authenticationHeaderValueProvider;
214218
this.proxy = proxy;
@@ -278,6 +282,19 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
278282
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs} must be an integer.");
279283
}
280284

285+
var maxConnectionsPerServerMaybe = config.FirstOrDefault(prop =>
286+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxConnectionsPerServer);
287+
int maxConnectionsPerServer;
288+
try
289+
{
290+
maxConnectionsPerServer = maxConnectionsPerServerMaybe.Value == null ? DefaultMaxConnectionsPerServer : Convert.ToInt32(maxConnectionsPerServerMaybe.Value);
291+
}
292+
catch (FormatException)
293+
{
294+
throw new ArgumentException(
295+
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxConnectionsPerServer} must be an integer.");
296+
}
297+
281298
var identityMapCapacityMaybe = config.FirstOrDefault(prop =>
282299
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas);
283300
try
@@ -305,7 +322,7 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
305322
throw new ArgumentException(
306323
$"Configured value for {SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs} must be an integer.");
307324
}
308-
325+
309326
authenticationHeaderValueProvider = RestService.AuthenticationHeaderValueProvider(
310327
config, authenticationHeaderValueProvider, maxRetries, retriesWaitMs, retriesMaxWaitMs);
311328

@@ -321,6 +338,7 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
321338
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries &&
322339
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs &&
323340
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs &&
341+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxConnectionsPerServer &&
324342
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas &&
325343
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs &&
326344
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource &&
@@ -362,7 +380,7 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
362380
var sslCaLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value;
363381
var sslCaCertificate = string.IsNullOrEmpty(sslCaLocation) ? null : new X509Certificate2(sslCaLocation);
364382
this.restService = new RestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider,
365-
SetSslConfig(config), sslVerify, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs);
383+
SetSslConfig(config), sslVerify, sslCaCertificate, proxy, maxRetries, retriesWaitMs, retriesMaxWaitMs, maxConnectionsPerServer);
366384
}
367385

368386
/// <summary>
@@ -393,10 +411,10 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
393411

394412
/// <remarks>
395413
/// This is to make sure memory doesn't explode in the case of incorrect usage.
396-
///
397-
/// It's behavior is pretty extreme - remove everything and start again if the
414+
///
415+
/// It's behavior is pretty extreme - remove everything and start again if the
398416
/// cache gets full. However, in practical situations this is not expected.
399-
///
417+
///
400418
/// TODO: Implement an LRU Cache here or something instead.
401419
/// </remarks>
402420
private bool CleanCacheIfFull()
@@ -504,9 +522,9 @@ public async Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema sch
504522
registeredSchemaBySchema.TryRemove(schema, out registeredSchema);
505523
}
506524
}
507-
525+
508526
CleanCacheIfFull();
509-
527+
510528
registeredSchemaBySchema = registeredSchemaBySchemaBySubject.GetOrAdd(subject, _ => new ConcurrentDictionary<Schema, Task<RegisteredSchema>>());
511529
return await registeredSchemaBySchema.GetOrAddAsync(schema, _ => restService.LookupSchemaAsync(subject, schema, ignoreDeletedSchemas, normalize)).ConfigureAwait(continueOnCapturedContext: false);
512530
}
@@ -519,7 +537,7 @@ public async Task<Schema> GetSchemaAsync(int id, string format = null)
519537
{
520538
return await schema.ConfigureAwait(false);
521539
}
522-
540+
523541
CleanCacheIfFull();
524542
return await schemaById.GetOrAddAsync(schemaId, _ => restService.GetSchemaAsync(id, format)).ConfigureAwait(continueOnCapturedContext: false);
525543
}
@@ -546,7 +564,7 @@ public async Task<Schema> GetSchemaByGuidAsync(string guid, string format = null
546564
{
547565
return await schema.ConfigureAwait(false);
548566
}
549-
567+
550568
return await schemaByGuid.GetOrAddAsync(schemaGuid, _ => restService.GetSchemaByGuidAsync(guid, format)).ConfigureAwait(continueOnCapturedContext: false);
551569
}
552570

@@ -561,13 +579,13 @@ public async Task<RegisteredSchema> GetRegisteredSchemaAsync(string subject, int
561579
return await schema.ConfigureAwait(false);
562580
}
563581
}
564-
582+
565583
CleanCacheIfFull();
566584
schemaByVersion = schemaByVersionBySubject.GetOrAdd(subject, _ => new ConcurrentDictionary<int, Task<RegisteredSchema>>());
567585
return await schemaByVersion.GetOrAddAsync(version, async _ =>
568586
{
569587
var schema = await restService.GetSchemaAsync(subject, version).ConfigureAwait(continueOnCapturedContext: false);
570-
588+
571589
// We already have the schema so we can add it to the cache.
572590
var format = GetSchemaFormat(schema.SchemaString);
573591
schemaById.TryAdd(new SchemaId(schema.Id, format), Task.FromResult(schema.Schema));
@@ -717,4 +735,4 @@ protected virtual void Dispose(bool disposing)
717735
}
718736
}
719737
}
720-
}
738+
}

0 commit comments

Comments
 (0)