Skip to content

Commit e5561f8

Browse files
authored
Minor refactoring to ensure latest schema can change (confluentinc#2340)
* Minor refactoring to ensure latest schema can change * Minor cleanup * Minor cleanup
1 parent c28ff3f commit e5561f8

File tree

4 files changed

+71
-78
lines changed

4 files changed

+71
-78
lines changed

src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ namespace Confluent.SchemaRegistry.Serdes
3131
{
3232
internal class GenericSerializerImpl : AsyncSerializer<GenericRecord, Avro.Schema>
3333
{
34-
private Dictionary<Avro.RecordSchema, string> knownSchemas = new Dictionary<global::Avro.RecordSchema, string>();
34+
private Dictionary<Avro.Schema, string> knownSchemas = new Dictionary<global::Avro.Schema, string>();
3535
private HashSet<KeyValuePair<string, string>> registeredSchemas = new HashSet<KeyValuePair<string, string>>();
3636
private Dictionary<string, int> schemaIds = new Dictionary<string, int>();
3737

@@ -90,7 +90,7 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
9090
int schemaId;
9191
string subject;
9292
RegisteredSchema latestSchema = null;
93-
Avro.RecordSchema writerSchema;
93+
Avro.Schema writerSchema;
9494
await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
9595
try
9696
{
@@ -132,31 +132,28 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
132132
// object reference, not value.
133133

134134
subject = GetSubjectName(topic, isKey, data.Schema.Fullname);
135+
var subjectSchemaPair = new KeyValuePair<string, string>(subject, writerSchemaString);
135136
latestSchema = await GetReaderSchema(subject)
136137
.ConfigureAwait(continueOnCapturedContext: false);
137-
138-
var subjectSchemaPair = new KeyValuePair<string, string>(subject, writerSchemaString);
139-
if (!registeredSchemas.Contains(subjectSchemaPair))
138+
if (latestSchema != null)
139+
{
140+
schemaId = latestSchema.Id;
141+
}
142+
else if (!registeredSchemas.Contains(subjectSchemaPair))
140143
{
141144
int newSchemaId;
142-
if (latestSchema != null)
145+
146+
// first usage: register/get schema to check compatibility
147+
if (autoRegisterSchema)
143148
{
144-
newSchemaId = latestSchema.Id;
149+
newSchemaId = await schemaRegistryClient
150+
.RegisterSchemaAsync(subject, writerSchemaString, normalizeSchemas)
151+
.ConfigureAwait(continueOnCapturedContext: false);
145152
}
146153
else
147154
{
148-
// first usage: register/get schema to check compatibility
149-
if (autoRegisterSchema)
150-
{
151-
newSchemaId = await schemaRegistryClient
152-
.RegisterSchemaAsync(subject, writerSchemaString, normalizeSchemas)
153-
.ConfigureAwait(continueOnCapturedContext: false);
154-
}
155-
else
156-
{
157-
newSchemaId = await schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
158-
.ConfigureAwait(continueOnCapturedContext: false);
159-
}
155+
newSchemaId = await schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
156+
.ConfigureAwait(continueOnCapturedContext: false);
160157
}
161158

162159
if (!schemaIds.ContainsKey(writerSchemaString))
@@ -171,8 +168,13 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
171168
}
172169

173170
registeredSchemas.Add(subjectSchemaPair);
171+
172+
schemaId = schemaIds[writerSchemaString];
173+
}
174+
else
175+
{
176+
schemaId = schemaIds[writerSchemaString];
174177
}
175-
schemaId = schemaIds[writerSchemaString];
176178
}
177179
finally
178180
{
@@ -181,10 +183,10 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
181183

182184
if (latestSchema != null)
183185
{
184-
var schema = await GetParsedSchema(latestSchema);
186+
writerSchema = await GetParsedSchema(latestSchema);
185187
FieldTransformer fieldTransformer = async (ctx, transform, message) =>
186188
{
187-
return await AvroUtils.Transform(ctx, schema, message, transform).ConfigureAwait(false);
189+
return await AvroUtils.Transform(ctx, writerSchema, message, transform).ConfigureAwait(false);
188190
};
189191
data = await ExecuteRules(isKey, subject, topic, headers, RuleMode.Write, null,
190192
latestSchema, data, fieldTransformer)

src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
using System.Collections.Generic;
2222
using System.IO;
2323
using System.Net;
24-
using System.Reflection;
2524
using System.Threading.Tasks;
2625
using Avro.IO;
2726
using Avro.Specific;
@@ -208,23 +207,20 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
208207
latestSchema = await GetReaderSchema(subject)
209208
.ConfigureAwait(continueOnCapturedContext: false);
210209

211-
if (!currentSchemaData.SubjectsRegistered.Contains(subject))
210+
if (latestSchema != null)
212211
{
213-
if (latestSchema != null)
214-
{
215-
currentSchemaData.WriterSchemaId = latestSchema.Id;
216-
}
217-
else
218-
{
219-
// first usage: register/get schema to check compatibility
220-
currentSchemaData.WriterSchemaId = autoRegisterSchema
221-
? await schemaRegistryClient
222-
.RegisterSchemaAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
223-
.ConfigureAwait(continueOnCapturedContext: false)
224-
: await schemaRegistryClient
225-
.GetSchemaIdAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
226-
.ConfigureAwait(continueOnCapturedContext: false);
227-
}
212+
currentSchemaData.WriterSchemaId = latestSchema.Id;
213+
}
214+
else if (!currentSchemaData.SubjectsRegistered.Contains(subject))
215+
{
216+
// first usage: register/get schema to check compatibility
217+
currentSchemaData.WriterSchemaId = autoRegisterSchema
218+
? await schemaRegistryClient
219+
.RegisterSchemaAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
220+
.ConfigureAwait(continueOnCapturedContext: false)
221+
: await schemaRegistryClient
222+
.GetSchemaIdAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
223+
.ConfigureAwait(continueOnCapturedContext: false);
228224

229225
currentSchemaData.SubjectsRegistered.Add(subject);
230226
}

src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -207,23 +207,21 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
207207
latestSchema = await GetReaderSchema(subject, new Schema(schemaText, ReferenceList, SchemaType.Json))
208208
.ConfigureAwait(continueOnCapturedContext: false);
209209

210-
if (!subjectsRegistered.Contains(subject))
210+
if (latestSchema != null)
211211
{
212-
if (latestSchema != null)
213-
{
214-
schemaId = latestSchema.Id;
215-
}
216-
else
217-
{
218-
// first usage: register/get schema to check compatibility
219-
schemaId = autoRegisterSchema
220-
? await schemaRegistryClient.RegisterSchemaAsync(subject,
221-
new Schema(this.schemaText, ReferenceList, SchemaType.Json), normalizeSchemas)
222-
.ConfigureAwait(continueOnCapturedContext: false)
223-
: await schemaRegistryClient.GetSchemaIdAsync(subject,
224-
new Schema(this.schemaText, ReferenceList, SchemaType.Json), normalizeSchemas)
225-
.ConfigureAwait(continueOnCapturedContext: false);
226-
}
212+
schemaId = latestSchema.Id;
213+
}
214+
else if (!subjectsRegistered.Contains(subject))
215+
{
216+
// first usage: register/get schema to check compatibility
217+
schemaId = autoRegisterSchema
218+
? await schemaRegistryClient.RegisterSchemaAsync(subject,
219+
new Schema(this.schemaText, ReferenceList, SchemaType.Json), normalizeSchemas)
220+
.ConfigureAwait(continueOnCapturedContext: false)
221+
: await schemaRegistryClient.GetSchemaIdAsync(subject,
222+
new Schema(this.schemaText, ReferenceList, SchemaType.Json), normalizeSchemas)
223+
.ConfigureAwait(continueOnCapturedContext: false);
224+
227225
subjectsRegistered.Add(subject);
228226
}
229227
}

src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -261,32 +261,29 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
261261
latestSchema = await GetReaderSchema(subject)
262262
.ConfigureAwait(continueOnCapturedContext: false);
263263

264-
if (!subjectsRegistered.Contains(subject))
264+
if (latestSchema != null)
265265
{
266-
if (latestSchema != null)
267-
{
268-
schemaId = latestSchema.Id;
269-
}
270-
else
271-
{
272-
var references =
273-
await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema, skipKnownTypes)
274-
.ConfigureAwait(continueOnCapturedContext: false);
266+
schemaId = latestSchema.Id;
267+
}
268+
else if (!subjectsRegistered.Contains(subject))
269+
{
270+
var references =
271+
await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema, skipKnownTypes)
272+
.ConfigureAwait(continueOnCapturedContext: false);
275273

276-
// first usage: register/get schema to check compatibility
277-
schemaId = autoRegisterSchema
278-
? await schemaRegistryClient.RegisterSchemaAsync(subject,
279-
new Schema(value.Descriptor.File.SerializedData.ToBase64(), references,
280-
SchemaType.Protobuf), normalizeSchemas)
281-
.ConfigureAwait(continueOnCapturedContext: false)
282-
: await schemaRegistryClient.GetSchemaIdAsync(subject,
283-
new Schema(value.Descriptor.File.SerializedData.ToBase64(), references,
284-
SchemaType.Protobuf), normalizeSchemas)
285-
.ConfigureAwait(continueOnCapturedContext: false);
274+
// first usage: register/get schema to check compatibility
275+
schemaId = autoRegisterSchema
276+
? await schemaRegistryClient.RegisterSchemaAsync(subject,
277+
new Schema(value.Descriptor.File.SerializedData.ToBase64(), references,
278+
SchemaType.Protobuf), normalizeSchemas)
279+
.ConfigureAwait(continueOnCapturedContext: false)
280+
: await schemaRegistryClient.GetSchemaIdAsync(subject,
281+
new Schema(value.Descriptor.File.SerializedData.ToBase64(), references,
282+
SchemaType.Protobuf), normalizeSchemas)
283+
.ConfigureAwait(continueOnCapturedContext: false);
286284

287-
// note: different values for schemaId should never be seen here.
288-
// TODO: but fail fast may be better here.
289-
}
285+
// note: different values for schemaId should never be seen here.
286+
// TODO: but fail fast may be better here.
290287

291288
subjectsRegistered.Add(subject);
292289
}

0 commit comments

Comments
 (0)