Skip to content

Commit 7338563

Browse files
Havretrayokota
andauthored
Use ConcurrentDictionary as default cache for Schema Registry client (#2433)
Co-authored-by: Robert Yokota <[email protected]>
1 parent 25afb38 commit 7338563

File tree

1 file changed

+54
-20
lines changed

1 file changed

+54
-20
lines changed

src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,14 @@ public class CachedSchemaRegistryClient : ISchemaRegistryClient
6969
private int latestCacheTtlSecs;
7070
private readonly ConcurrentDictionary<int, Schema> schemaById = new ConcurrentDictionary<int, Schema>();
7171

72-
private readonly Dictionary<string /*subject*/, Dictionary<Schema, int>> idBySchemaBySubject =
73-
new Dictionary<string, Dictionary<Schema, int>>();
72+
private readonly ConcurrentDictionary<string /*subject*/, ConcurrentDictionary<Schema, int>> idBySchemaBySubject =
73+
new ConcurrentDictionary<string, ConcurrentDictionary<Schema, int>>();
7474

75-
private readonly Dictionary<string /*subject*/, Dictionary<int, RegisteredSchema>> schemaByVersionBySubject =
76-
new Dictionary<string, Dictionary<int, RegisteredSchema>>();
75+
private readonly ConcurrentDictionary<string /*subject*/, ConcurrentDictionary<int, RegisteredSchema>> schemaByVersionBySubject =
76+
new ConcurrentDictionary<string, ConcurrentDictionary<int, RegisteredSchema>>();
7777

78-
private readonly Dictionary<string /*subject*/, Dictionary<Schema, RegisteredSchema>> registeredSchemaBySchemaBySubject =
79-
new Dictionary<string, Dictionary<Schema, RegisteredSchema>>();
78+
private readonly ConcurrentDictionary<string /*subject*/, ConcurrentDictionary<Schema, RegisteredSchema>> registeredSchemaBySchemaBySubject =
79+
new ConcurrentDictionary<string, ConcurrentDictionary<Schema, RegisteredSchema>>();
8080

8181
private readonly MemoryCache latestVersionBySubject = new MemoryCache(new MemoryCacheOptions());
8282

@@ -603,13 +603,21 @@ public Task<int> GetSchemaIdAsync(string subject, string avroSchema, bool normal
603603
/// <inheritdoc/>
604604
public async Task<int> GetSchemaIdAsync(string subject, Schema schema, bool normalize = false)
605605
{
606+
if (idBySchemaBySubject.TryGetValue(subject, out var idBySchema))
607+
{
608+
if (idBySchema.TryGetValue(schema, out int schemaId))
609+
{
610+
return schemaId;
611+
}
612+
}
613+
606614
await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
607615
try
608616
{
609-
if (!this.idBySchemaBySubject.TryGetValue(subject, out Dictionary<Schema, int> idBySchema))
617+
if (!this.idBySchemaBySubject.TryGetValue(subject, out idBySchema))
610618
{
611-
idBySchema = new Dictionary<Schema, int>();
612-
this.idBySchemaBySubject.Add(subject, idBySchema);
619+
idBySchema = new ConcurrentDictionary<Schema, int>();
620+
this.idBySchemaBySubject.TryAdd(subject, idBySchema);
613621
}
614622

615623
// TODO: The following could be optimized in the usual case where idBySchema only
@@ -640,13 +648,21 @@ public async Task<int> GetSchemaIdAsync(string subject, Schema schema, bool norm
640648
/// <inheritdoc/>
641649
public async Task<int> RegisterSchemaAsync(string subject, Schema schema, bool normalize = false)
642650
{
651+
if (idBySchemaBySubject.TryGetValue(subject, out var idBySchema))
652+
{
653+
if (idBySchema.TryGetValue(schema, out var schemaId))
654+
{
655+
return schemaId;
656+
}
657+
}
658+
643659
await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
644660
try
645661
{
646-
if (!this.idBySchemaBySubject.TryGetValue(subject, out Dictionary<Schema, int> idBySchema))
662+
if (!this.idBySchemaBySubject.TryGetValue(subject, out idBySchema))
647663
{
648-
idBySchema = new Dictionary<Schema, int>();
649-
this.idBySchemaBySubject[subject] = idBySchema;
664+
idBySchema = new ConcurrentDictionary<Schema, int>();
665+
idBySchemaBySubject.TryAdd(subject, idBySchema);
650666
}
651667

652668
// TODO: This could be optimized in the usual case where idBySchema only
@@ -703,13 +719,21 @@ private bool checkSchemaMatchesFormat(string format, string schemaString)
703719
public async Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema schema, bool ignoreDeletedSchemas,
704720
bool normalize = false)
705721
{
722+
if (registeredSchemaBySchemaBySubject.TryGetValue(subject, out var registeredSchemaBySchema))
723+
{
724+
if (registeredSchemaBySchema.TryGetValue(schema, out var registeredSchema))
725+
{
726+
return registeredSchema;
727+
}
728+
}
729+
706730
await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
707731
try
708732
{
709-
if (!registeredSchemaBySchemaBySubject.TryGetValue(subject, out var registeredSchemaBySchema))
733+
if (!registeredSchemaBySchemaBySubject.TryGetValue(subject, out registeredSchemaBySchema))
710734
{
711735
CleanCacheIfFull();
712-
registeredSchemaBySchema = new Dictionary<Schema, RegisteredSchema>();
736+
registeredSchemaBySchema = new ConcurrentDictionary<Schema, RegisteredSchema>();
713737
registeredSchemaBySchemaBySubject[subject] = registeredSchemaBySchema;
714738
}
715739
if (!registeredSchemaBySchema.TryGetValue(schema, out var registeredSchema))
@@ -729,10 +753,15 @@ public async Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema sch
729753
/// <inheritdoc/>
730754
public async Task<Schema> GetSchemaAsync(int id, string format = null)
731755
{
756+
if (schemaById.TryGetValue(id, out var schema) && checkSchemaMatchesFormat(format, schema.SchemaString))
757+
{
758+
return schema;
759+
}
760+
732761
await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
733762
try
734763
{
735-
if (!this.schemaById.TryGetValue(id, out Schema schema) ||
764+
if (!this.schemaById.TryGetValue(id, out schema) ||
736765
!checkSchemaMatchesFormat(format, schema.SchemaString))
737766
{
738767
CleanCacheIfFull();
@@ -753,7 +782,7 @@ public async Task<Schema> GetSchemaAsync(int id, string format = null)
753782
/// <inheritdoc/>
754783
public async Task<Schema> GetSchemaBySubjectAndIdAsync(string subject, int id, string format = null)
755784
{
756-
if (this.schemaById.TryGetValue(id, out Schema schema) && checkSchemaMatchesFormat(format, schema.SchemaString))
785+
if (this.schemaById.TryGetValue(id, out var schema) && checkSchemaMatchesFormat(format, schema.SchemaString))
757786
{
758787
return schema;
759788
}
@@ -781,19 +810,24 @@ public async Task<Schema> GetSchemaBySubjectAndIdAsync(string subject, int id, s
781810
/// <inheritdoc/>
782811
public async Task<RegisteredSchema> GetRegisteredSchemaAsync(string subject, int version, bool ignoreDeletedSchemas = true)
783812
{
813+
if (schemaByVersionBySubject.TryGetValue(subject, out var schemaByVersion) &&
814+
schemaByVersion.TryGetValue(version, out var schema))
815+
{
816+
return schema;
817+
}
818+
784819
await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
785820
try
786821
{
787822
CleanCacheIfFull();
788823

789-
if (!schemaByVersionBySubject.TryGetValue(subject,
790-
out Dictionary<int, RegisteredSchema> schemaByVersion))
824+
if (!schemaByVersionBySubject.TryGetValue(subject, out schemaByVersion))
791825
{
792-
schemaByVersion = new Dictionary<int, RegisteredSchema>();
826+
schemaByVersion = new ConcurrentDictionary<int, RegisteredSchema>();
793827
schemaByVersionBySubject[subject] = schemaByVersion;
794828
}
795829

796-
if (!schemaByVersion.TryGetValue(version, out RegisteredSchema schema))
830+
if (!schemaByVersion.TryGetValue(version, out schema))
797831
{
798832
schema = await restService.GetSchemaAsync(subject, version)
799833
.ConfigureAwait(continueOnCapturedContext: false);

0 commit comments

Comments
 (0)