Skip to content

Commit 224a561

Browse files
authored
Improve lock utilization on the consumer hot path (#2370)
1 parent 7d82888 commit 224a561

File tree

2 files changed

+16
-5
lines changed

2 files changed

+16
-5
lines changed

src/Confluent.SchemaRegistry/AsyncSerde.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#pragma warning disable CS0618
1919

2020
using System;
21+
using System.Collections.Concurrent;
2122
using System.Collections.Generic;
2223
using System.IO;
2324
using System.Linq;
@@ -41,7 +42,7 @@ public abstract class AsyncSerde<TParsedSchema>
4142

4243
protected SemaphoreSlim serdeMutex = new SemaphoreSlim(1);
4344

44-
private readonly IDictionary<Schema, TParsedSchema> parsedSchemaCache = new Dictionary<Schema, TParsedSchema>();
45+
private readonly IDictionary<Schema, TParsedSchema> parsedSchemaCache = new ConcurrentDictionary<Schema, TParsedSchema>();
4546
private SemaphoreSlim parsedSchemaMutex = new SemaphoreSlim(1);
4647

4748
protected AsyncSerde(ISchemaRegistryClient schemaRegistryClient, SerdeConfig config, RuleRegistry ruleRegistry = null)
@@ -98,10 +99,15 @@ protected string GetSubjectName(string topic, bool isKey, string recordType)
9899

99100
protected async Task<TParsedSchema> GetParsedSchema(Schema schema)
100101
{
102+
if (parsedSchemaCache.TryGetValue(schema, out TParsedSchema parsedSchema))
103+
{
104+
return parsedSchema;
105+
}
106+
101107
await parsedSchemaMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
102108
try
103109
{
104-
if (!parsedSchemaCache.TryGetValue(schema, out TParsedSchema parsedSchema))
110+
if (!parsedSchemaCache.TryGetValue(schema, out parsedSchema))
105111
{
106112
if (parsedSchemaCache.Count > schemaRegistryClient.MaxCachedSchemas)
107113
{

src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
using System.Threading.Tasks;
2424
using System.Linq;
2525
using System;
26+
using System.Collections.Concurrent;
2627
using System.Net;
2728
using System.Threading;
2829
using System.Security.Cryptography.X509Certificates;
@@ -65,7 +66,7 @@ public class CachedSchemaRegistryClient : ISchemaRegistryClient
6566
private IRestService restService;
6667
private int identityMapCapacity;
6768
private int latestCacheTtlSecs;
68-
private readonly Dictionary<int, Schema> schemaById = new Dictionary<int, Schema>();
69+
private readonly ConcurrentDictionary<int, Schema> schemaById = new ConcurrentDictionary<int, Schema>();
6970

7071
private readonly Dictionary<string /*subject*/, Dictionary<Schema, int>> idBySchemaBySubject =
7172
new Dictionary<string, Dictionary<Schema, int>>();
@@ -657,11 +658,15 @@ public async Task<Schema> GetSchemaAsync(int id, string format = null)
657658
/// <inheritdoc/>
658659
public async Task<Schema> GetSchemaBySubjectAndIdAsync(string subject, int id, string format = null)
659660
{
661+
if (this.schemaById.TryGetValue(id, out Schema schema) && checkSchemaMatchesFormat(format, schema.SchemaString))
662+
{
663+
return schema;
664+
}
665+
660666
await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
661667
try
662668
{
663-
if (!this.schemaById.TryGetValue(id, out Schema schema) ||
664-
!checkSchemaMatchesFormat(format, schema.SchemaString))
669+
if (!this.schemaById.TryGetValue(id, out schema) || !checkSchemaMatchesFormat(format, schema.SchemaString))
665670
{
666671
CleanCacheIfFull();
667672
schema = (await restService.GetSchemaBySubjectAndIdAsync(subject, id, format)

0 commit comments

Comments
 (0)