Skip to content

Commit 7567e89

Browse files
authored
Add normalize.schemas config to SR client (#1874)
* Add normalize.schemas config to SR client * Fix tests * Fix tests * Incorporate review feedback * Remove extra line
1 parent 89b7c98 commit 7567e89

File tree

19 files changed

+200
-58
lines changed

19 files changed

+200
-58
lines changed

src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ namespace Confluent.SchemaRegistry.Serdes
3838
public class AvroSerializer<T> : IAsyncSerializer<T>
3939
{
4040
private bool autoRegisterSchema = true;
41+
private bool normalizeSchemas = false;
4142
private bool useLatestVersion = false;
4243
private int initialBufferSize = DefaultInitialBufferSize;
4344
private SubjectNameStrategyDelegate subjectNameStrategy = null;
@@ -111,6 +112,7 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer
111112

112113
if (config.BufferBytes != null) { this.initialBufferSize = config.BufferBytes.Value; }
113114
if (config.AutoRegisterSchemas != null) { this.autoRegisterSchema = config.AutoRegisterSchemas.Value; }
115+
if (config.NormalizeSchemas != null) { this.normalizeSchemas = config.NormalizeSchemas.Value; }
114116
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
115117
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
116118

@@ -154,8 +156,8 @@ public async Task<byte[]> SerializeAsync(T value, SerializationContext context)
154156
if (serializerImpl == null)
155157
{
156158
serializerImpl = typeof(T) == typeof(GenericRecord)
157-
? (IAvroSerializerImpl<T>)new GenericSerializerImpl(schemaRegistryClient, autoRegisterSchema, useLatestVersion, initialBufferSize, subjectNameStrategy)
158-
: new SpecificSerializerImpl<T>(schemaRegistryClient, autoRegisterSchema, useLatestVersion, initialBufferSize, subjectNameStrategy);
159+
? (IAvroSerializerImpl<T>)new GenericSerializerImpl(schemaRegistryClient, autoRegisterSchema, normalizeSchemas, useLatestVersion, initialBufferSize, subjectNameStrategy)
160+
: new SpecificSerializerImpl<T>(schemaRegistryClient, autoRegisterSchema, normalizeSchemas, useLatestVersion, initialBufferSize, subjectNameStrategy);
159161
}
160162

161163
return await serializerImpl.Serialize(context.Topic, value, context.Component == MessageComponentType.Key).ConfigureAwait(continueOnCapturedContext: false);

src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializerConfig.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ public static class PropertyNames
5353
/// </summary>
5454
public const string AutoRegisterSchemas = "avro.serializer.auto.register.schemas";
5555

56+
/// <summary>
57+
/// Specifies whether to normalize schemas, which will transform schemas
58+
/// to have a consistent format, including ordering properties and references.
59+
///
60+
/// default: false
61+
/// </summary>
62+
public const string NormalizeSchemas = "avro.serializer.normalize.schemas";
63+
5664
/// <summary>
5765
/// Specifies whether or not the Avro serializer should use the latest subject
5866
/// version for serialization.
@@ -113,6 +121,19 @@ public bool? AutoRegisterSchemas
113121
}
114122

115123

124+
/// <summary>
125+
/// Specifies whether to normalize schemas, which will transform schemas
126+
/// to have a consistent format, including ordering properties and references.
127+
///
128+
/// default: false
129+
/// </summary>
130+
public bool? NormalizeSchemas
131+
{
132+
get { return GetBool(PropertyNames.NormalizeSchemas); }
133+
set { SetObject(PropertyNames.NormalizeSchemas, value); }
134+
}
135+
136+
116137
/// <summary>
117138
/// Specifies whether or not the Avro serializer should use the latest subject
118139
/// version for serialization.

src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ internal class GenericSerializerImpl : IAvroSerializerImpl<GenericRecord>
3434
{
3535
private ISchemaRegistryClient schemaRegistryClient;
3636
private bool autoRegisterSchema;
37+
private bool normalizeSchemas;
3738
private bool useLatestVersion;
3839
private int initialBufferSize;
3940
private SubjectNameStrategyDelegate subjectNameStrategy;
@@ -47,12 +48,14 @@ internal class GenericSerializerImpl : IAvroSerializerImpl<GenericRecord>
4748
public GenericSerializerImpl(
4849
ISchemaRegistryClient schemaRegistryClient,
4950
bool autoRegisterSchema,
51+
bool normalizeSchemas,
5052
bool useLatestVersion,
5153
int initialBufferSize,
5254
SubjectNameStrategyDelegate subjectNameStrategy)
5355
{
5456
this.schemaRegistryClient = schemaRegistryClient;
5557
this.autoRegisterSchema = autoRegisterSchema;
58+
this.normalizeSchemas = normalizeSchemas;
5659
this.useLatestVersion = useLatestVersion;
5760
this.initialBufferSize = initialBufferSize;
5861
this.subjectNameStrategy = subjectNameStrategy;
@@ -146,12 +149,12 @@ public async Task<byte[]> Serialize(string topic, GenericRecord data, bool isKey
146149
if (autoRegisterSchema)
147150
{
148151
newSchemaId = await schemaRegistryClient
149-
.RegisterSchemaAsync(subject, writerSchemaString)
152+
.RegisterSchemaAsync(subject, writerSchemaString, normalizeSchemas)
150153
.ConfigureAwait(continueOnCapturedContext: false);
151154
}
152155
else
153156
{
154-
newSchemaId = await schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString)
157+
newSchemaId = await schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
155158
.ConfigureAwait(continueOnCapturedContext: false);
156159
}
157160
}

src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public SpecificWriter<T> AvroWriter
8282

8383
private ISchemaRegistryClient schemaRegistryClient;
8484
private bool autoRegisterSchema;
85+
private bool normalizeSchemas;
8586
private bool useLatestVersion;
8687
private int initialBufferSize;
8788
private SubjectNameStrategyDelegate subjectNameStrategy;
@@ -98,12 +99,14 @@ public SpecificWriter<T> AvroWriter
9899
public SpecificSerializerImpl(
99100
ISchemaRegistryClient schemaRegistryClient,
100101
bool autoRegisterSchema,
102+
bool normalizeSchemas,
101103
bool useLatestVersion,
102104
int initialBufferSize,
103105
SubjectNameStrategyDelegate subjectNameStrategy)
104106
{
105107
this.schemaRegistryClient = schemaRegistryClient;
106108
this.autoRegisterSchema = autoRegisterSchema;
109+
this.normalizeSchemas = normalizeSchemas;
107110
this.useLatestVersion = useLatestVersion;
108111
this.initialBufferSize = initialBufferSize;
109112
this.subjectNameStrategy = subjectNameStrategy;
@@ -222,10 +225,10 @@ public async Task<byte[]> Serialize(string topic, T data, bool isKey)
222225
// first usage: register/get schema to check compatibility
223226
currentSchemaData.WriterSchemaId = autoRegisterSchema
224227
? await schemaRegistryClient
225-
.RegisterSchemaAsync(subject, currentSchemaData.WriterSchemaString)
228+
.RegisterSchemaAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
226229
.ConfigureAwait(continueOnCapturedContext: false)
227230
: await schemaRegistryClient
228-
.GetSchemaIdAsync(subject, currentSchemaData.WriterSchemaString)
231+
.GetSchemaIdAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
229232
.ConfigureAwait(continueOnCapturedContext: false);
230233
}
231234

src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class JsonSerializer<T> : IAsyncSerializer<T> where T : class
5858
private const int DefaultInitialBufferSize = 1024;
5959

6060
private bool autoRegisterSchema = true;
61+
private bool normalizeSchemas = false;
6162
private bool useLatestVersion = false;
6263
private int initialBufferSize = DefaultInitialBufferSize;
6364
private SubjectNameStrategyDelegate subjectNameStrategy = null;
@@ -113,6 +114,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer
113114

114115
if (config.BufferBytes != null) { this.initialBufferSize = config.BufferBytes.Value; }
115116
if (config.AutoRegisterSchemas != null) { this.autoRegisterSchema = config.AutoRegisterSchemas.Value; }
117+
if (config.NormalizeSchemas != null) { this.normalizeSchemas = config.NormalizeSchemas.Value; }
116118
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
117119
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
118120

@@ -180,10 +182,10 @@ public async Task<byte[]> SerializeAsync(T value, SerializationContext context)
180182
// first usage: register/get schema to check compatibility
181183
schemaId = autoRegisterSchema
182184
? await schemaRegistryClient.RegisterSchemaAsync(subject,
183-
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json))
185+
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json), normalizeSchemas)
184186
.ConfigureAwait(continueOnCapturedContext: false)
185187
: await schemaRegistryClient.GetSchemaIdAsync(subject,
186-
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json))
188+
new Schema(this.schemaText, EmptyReferencesList, SchemaType.Json), normalizeSchemas)
187189
.ConfigureAwait(continueOnCapturedContext: false);
188190

189191
// TODO: It may be better to fail fast if conflicting values for schemaId are seen here.

src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ public static class PropertyNames
5353
/// </summary>
5454
public const string AutoRegisterSchemas = "json.serializer.auto.register.schemas";
5555

56+
/// <summary>
57+
/// Specifies whether to normalize schemas, which will transform schemas
58+
/// to have a consistent format, including ordering properties and references.
59+
///
60+
/// default: false
61+
/// </summary>
62+
public const string NormalizeSchemas = "json.serializer.normalize.schemas";
63+
5664
/// <summary>
5765
/// Specifies whether or not the JSON serializer should use the latest subject
5866
/// version for serialization.
@@ -113,6 +121,19 @@ public bool? AutoRegisterSchemas
113121
}
114122

115123

124+
/// <summary>
125+
/// Specifies whether to normalize schemas, which will transform schemas
126+
/// to have a consistent format, including ordering properties and references.
127+
///
128+
/// default: false
129+
/// </summary>
130+
public bool? NormalizeSchemas
131+
{
132+
get { return GetBool(PropertyNames.NormalizeSchemas); }
133+
set { SetObject(PropertyNames.NormalizeSchemas, value); }
134+
}
135+
136+
116137
/// <summary>
117138
/// Specifies whether or not the JSON serializer should use the latest subject
118139
/// version for serialization.

src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ namespace Confluent.SchemaRegistry.Serdes
5555
private const int DefaultInitialBufferSize = 1024;
5656

5757
private bool autoRegisterSchema = true;
58+
private bool normalizeSchemas = false;
5859
private bool useLatestVersion = false;
5960
private bool skipKnownTypes = false;
6061
private bool useDeprecatedFormat = false;
@@ -96,6 +97,7 @@ public ProtobufSerializer(ISchemaRegistryClient schemaRegistryClient, ProtobufSe
9697

9798
if (config.BufferBytes != null) { this.initialBufferSize = config.BufferBytes.Value; }
9899
if (config.AutoRegisterSchemas != null) { this.autoRegisterSchema = config.AutoRegisterSchemas.Value; }
100+
if (config.NormalizeSchemas != null) { this.normalizeSchemas = config.NormalizeSchemas.Value; }
99101
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
100102
if (config.SkipKnownTypes != null) { this.skipKnownTypes = config.SkipKnownTypes.Value; }
101103
if (config.UseDeprecatedFormat != null) { this.useDeprecatedFormat = config.UseDeprecatedFormat.Value; }
@@ -207,9 +209,9 @@ private async Task<List<SchemaReference>> RegisterOrGetReferences(FileDescriptor
207209
var subject = referenceSubjectNameStrategy(context, dependency.Name);
208210
var schema = new Schema(dependency.SerializedData.ToBase64(), dependencyReferences, SchemaType.Protobuf);
209211
var schemaId = autoRegisterSchema
210-
? await schemaRegistryClient.RegisterSchemaAsync(subject, schema).ConfigureAwait(continueOnCapturedContext: false)
211-
: await schemaRegistryClient.GetSchemaIdAsync(subject, schema).ConfigureAwait(continueOnCapturedContext: false);
212-
var registeredDependentSchema = await schemaRegistryClient.LookupSchemaAsync(subject, schema, true).ConfigureAwait(continueOnCapturedContext: false);
212+
? await schemaRegistryClient.RegisterSchemaAsync(subject, schema, normalizeSchemas).ConfigureAwait(continueOnCapturedContext: false)
213+
: await schemaRegistryClient.GetSchemaIdAsync(subject, schema, normalizeSchemas).ConfigureAwait(continueOnCapturedContext: false);
214+
var registeredDependentSchema = await schemaRegistryClient.LookupSchemaAsync(subject, schema, true, normalizeSchemas).ConfigureAwait(continueOnCapturedContext: false);
213215
return new SchemaReference(dependency.Name, subject, registeredDependentSchema.Version);
214216
};
215217
tasks.Add(t(fileDescriptor));
@@ -288,11 +290,11 @@ await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema
288290
schemaId = autoRegisterSchema
289291
? await schemaRegistryClient.RegisterSchemaAsync(subject,
290292
new Schema(value.Descriptor.File.SerializedData.ToBase64(), references,
291-
SchemaType.Protobuf))
293+
SchemaType.Protobuf), normalizeSchemas)
292294
.ConfigureAwait(continueOnCapturedContext: false)
293295
: await schemaRegistryClient.GetSchemaIdAsync(subject,
294296
new Schema(value.Descriptor.File.SerializedData.ToBase64(), references,
295-
SchemaType.Protobuf))
297+
SchemaType.Protobuf), normalizeSchemas)
296298
.ConfigureAwait(continueOnCapturedContext: false);
297299

298300
// note: different values for schemaId should never be seen here.

src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ public static class PropertyNames
5353
/// </summary>
5454
public const string AutoRegisterSchemas = "protobuf.serializer.auto.register.schemas";
5555

56+
/// <summary>
57+
/// Specifies whether to normalize schemas, which will transform schemas
58+
/// to have a consistent format, including ordering properties and references.
59+
///
60+
/// default: false
61+
/// </summary>
62+
public const string NormalizeSchemas = "protobuf.serializer.normalize.schemas";
63+
5664
/// <summary>
5765
/// Specifies whether or not the Protobuf serializer should use the latest subject
5866
/// version for serialization.
@@ -133,6 +141,19 @@ public bool? AutoRegisterSchemas
133141
set { SetObject(PropertyNames.AutoRegisterSchemas, value); }
134142
}
135143

144+
145+
/// <summary>
146+
/// Specifies whether to normalize schemas, which will transform schemas
147+
/// to have a consistent format, including ordering properties and references.
148+
///
149+
/// default: false
150+
/// </summary>
151+
public bool? NormalizeSchemas
152+
{
153+
get { return GetBool(PropertyNames.NormalizeSchemas); }
154+
set { SetObject(PropertyNames.NormalizeSchemas, value); }
155+
}
156+
136157

137158
/// <summary>
138159
/// Specifies whether or not the Protobuf serializer should use the latest subject

0 commit comments

Comments
 (0)