Skip to content

Commit ea395b8

Browse files
authored
Refactor schema caching to use a composite key for SchemaId and format (#2436)
1 parent 8678c00 commit ea395b8

File tree

1 file changed

+21
-31
lines changed

1 file changed

+21
-31
lines changed

src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ namespace Confluent.SchemaRegistry
5858
/// </summary>
5959
public class CachedSchemaRegistryClient : ISchemaRegistryClient
6060
{
61+
private record struct SchemaId(int Id, string Format);
62+
6163
private readonly List<SchemaReference> EmptyReferencesList = new List<SchemaReference>();
6264

6365
private IEnumerable<KeyValuePair<string, string>> config;
@@ -67,7 +69,7 @@ public class CachedSchemaRegistryClient : ISchemaRegistryClient
6769
private IRestService restService;
6870
private int identityMapCapacity;
6971
private int latestCacheTtlSecs;
70-
private readonly ConcurrentDictionary<int, Schema> schemaById = new ConcurrentDictionary<int, Schema>();
72+
private readonly ConcurrentDictionary<SchemaId, Schema> schemaById = new ConcurrentDictionary<SchemaId, Schema>();
7173

7274
private readonly ConcurrentDictionary<string /*subject*/, ConcurrentDictionary<Schema, int>> idBySchemaBySubject =
7375
new ConcurrentDictionary<string, ConcurrentDictionary<Schema, int>>();
@@ -632,7 +634,9 @@ public async Task<int> GetSchemaIdAsync(string subject, Schema schema, bool norm
632634
var registeredSchema = await restService.LookupSchemaAsync(subject, schema, true, normalize)
633635
.ConfigureAwait(continueOnCapturedContext: false);
634636
idBySchema[schema] = registeredSchema.Id;
635-
schemaById[registeredSchema.Id] = registeredSchema.Schema;
637+
638+
var format = GetSchemaFormat(schema.SchemaString);
639+
schemaById.TryAdd(new SchemaId(registeredSchema.Id, format), registeredSchema.Schema);
636640
schemaId = registeredSchema.Id;
637641
}
638642

@@ -695,26 +699,11 @@ public Task<int> RegisterSchemaAsync(string subject, string avroSchema, bool nor
695699
/// <summary>
696700
/// Check if the given schema string matches a given format name.
697701
/// </summary>
698-
private bool checkSchemaMatchesFormat(string format, string schemaString)
702+
private static string GetSchemaFormat(string schemaString)
699703
{
700-
// if a format isn't specified, then assume text is desired.
701-
if (format == null)
702-
{
703-
// If schemaString is not Base64, infer the schemaString format is text.
704-
return !Utils.IsBase64String(schemaString);
705-
}
706-
else
707-
{
708-
if (format != "serialized")
709-
{
710-
throw new ArgumentException($"Invalid schema format was specified: {format}.");
711-
}
712-
713-
return Utils.IsBase64String(schemaString);
714-
}
704+
return Utils.IsBase64String(schemaString) ? "serialized" : null;
715705
}
716706

717-
718707
/// <inheritdoc/>
719708
public async Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema schema, bool ignoreDeletedSchemas,
720709
bool normalize = false)
@@ -753,21 +742,20 @@ public async Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema sch
753742
/// <inheritdoc/>
754743
public async Task<Schema> GetSchemaAsync(int id, string format = null)
755744
{
756-
if (schemaById.TryGetValue(id, out var schema) && checkSchemaMatchesFormat(format, schema.SchemaString))
745+
var schemaId = new SchemaId(id, format);
746+
if (schemaById.TryGetValue(schemaId, out var schema))
757747
{
758748
return schema;
759749
}
760750

761751
await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
762752
try
763753
{
764-
if (!this.schemaById.TryGetValue(id, out schema) ||
765-
!checkSchemaMatchesFormat(format, schema.SchemaString))
754+
if (!this.schemaById.TryGetValue(schemaId, out schema))
766755
{
767756
CleanCacheIfFull();
768-
schema = (await restService.GetSchemaAsync(id, format)
769-
.ConfigureAwait(continueOnCapturedContext: false));
770-
schemaById[id] = schema;
757+
schema = await restService.GetSchemaAsync(id, format).ConfigureAwait(continueOnCapturedContext: false);
758+
schemaById.TryAdd(schemaId, schema);
771759
}
772760

773761
return schema;
@@ -782,20 +770,21 @@ public async Task<Schema> GetSchemaAsync(int id, string format = null)
782770
/// <inheritdoc/>
783771
public async Task<Schema> GetSchemaBySubjectAndIdAsync(string subject, int id, string format = null)
784772
{
785-
if (this.schemaById.TryGetValue(id, out var schema) && checkSchemaMatchesFormat(format, schema.SchemaString))
773+
var schemaId = new SchemaId(id, format);
774+
if (this.schemaById.TryGetValue(schemaId, out var schema))
786775
{
787776
return schema;
788777
}
789778

790779
await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
791780
try
792781
{
793-
if (!this.schemaById.TryGetValue(id, out schema) || !checkSchemaMatchesFormat(format, schema.SchemaString))
782+
if (!this.schemaById.TryGetValue(schemaId, out schema))
794783
{
795784
CleanCacheIfFull();
796-
schema = (await restService.GetSchemaBySubjectAndIdAsync(subject, id, format)
797-
.ConfigureAwait(continueOnCapturedContext: false));
798-
schemaById[id] = schema;
785+
schema = await restService.GetSchemaBySubjectAndIdAsync(subject, id, format)
786+
.ConfigureAwait(continueOnCapturedContext: false);
787+
schemaById.TryAdd(schemaId, schema);
799788
}
800789

801790
return schema;
@@ -832,7 +821,8 @@ public async Task<RegisteredSchema> GetRegisteredSchemaAsync(string subject, int
832821
schema = await restService.GetSchemaAsync(subject, version)
833822
.ConfigureAwait(continueOnCapturedContext: false);
834823
schemaByVersion[version] = schema;
835-
schemaById[schema.Id] = schema.Schema;
824+
var format = GetSchemaFormat(schema.SchemaString);
825+
schemaById.TryAdd(new SchemaId(schema.Id, format), schema.Schema);
836826
}
837827

838828
return schema;

0 commit comments

Comments
 (0)