Skip to content

Commit 3f10fc8

Browse files
authored
Support for schema id in header (#2465)
* Support schema ID in header * Minor fix * Minor cleanup * Minor cleanup * Minor cleanup * Minor cleanup * Add test * Minor renaming * Add test * Fix tests * Add wait to test * Fix test * Fix test * Incorporate review feedback
1 parent 6d6321f commit 3f10fc8

33 files changed

+1178
-254
lines changed

src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializerConfig.cs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
using System;
1818
using System.Collections.Generic;
1919
using System.Linq;
20-
using Confluent.Kafka;
2120

2221

2322
namespace Confluent.SchemaRegistry.Serdes
@@ -53,6 +52,12 @@ public static class PropertyNames
5352
/// Possible values: <see cref="Confluent.SchemaRegistry.SubjectNameStrategy" />
5453
/// </summary>
5554
public const string SubjectNameStrategy = "avro.deserializer.subject.name.strategy";
55+
56+
/// <summary>
57+
/// The schema id name strategy to use to serialize the ID/GUID.
58+
/// Possible values: <see cref="Confluent.SchemaRegistry.SchemaIdDeserializerStrategy" />
59+
/// </summary>
60+
public const string SchemaIdStrategy = "avro.deserializer.schema.id.strategy";
5661
}
5762

5863

@@ -124,5 +129,34 @@ public SubjectNameStrategy? SubjectNameStrategy
124129
else { this.properties[PropertyNames.SubjectNameStrategy] = value.ToString(); }
125130
}
126131
}
132+
133+
134+
/// <summary>
135+
/// Schema id strategy.
136+
///
137+
/// default: SchemaIdDeserializerStrategy.Dual
138+
/// </summary>
139+
public SchemaIdDeserializerStrategy? SchemaIdStrategy
140+
{
141+
get
142+
{
143+
var r = Get(PropertyNames.SchemaIdStrategy);
144+
if (r == null) { return null; }
145+
else
146+
{
147+
SchemaIdDeserializerStrategy result;
148+
if (!Enum.TryParse<SchemaIdDeserializerStrategy>(r, out result))
149+
throw new ArgumentException(
150+
$"Unknown ${PropertyNames.SchemaIdStrategy} value: {r}.");
151+
else
152+
return result;
153+
}
154+
}
155+
set
156+
{
157+
if (value == null) { this.properties.Remove(PropertyNames.SchemaIdStrategy); }
158+
else { this.properties[PropertyNames.SchemaIdStrategy] = value.ToString(); }
159+
}
160+
}
127161
}
128162
}

src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer
107107
property.Key != AvroSerializerConfig.PropertyNames.UseLatestWithMetadata &&
108108
property.Key != AvroSerializerConfig.PropertyNames.BufferBytes &&
109109
property.Key != AvroSerializerConfig.PropertyNames.SubjectNameStrategy &&
110+
property.Key != AvroSerializerConfig.PropertyNames.SchemaIdStrategy &&
110111
property.Key != AvroSerializerConfig.PropertyNames.NormalizeSchemas)
111112
{
112113
throw new ArgumentException($"AvroSerializer: unknown configuration property {property.Key}");

src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializerConfig.cs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
using System;
1818
using System.Collections.Generic;
1919
using System.Linq;
20-
using Confluent.Kafka;
2120

2221

2322
namespace Confluent.SchemaRegistry.Serdes
@@ -89,6 +88,12 @@ public static class PropertyNames
8988
/// Possible values: <see cref="Confluent.SchemaRegistry.SubjectNameStrategy" />
9089
/// </summary>
9190
public const string SubjectNameStrategy = "avro.serializer.subject.name.strategy";
91+
92+
/// <summary>
93+
/// The schema id name strategy to use to serialize the ID/GUID.
94+
/// Possible values: <see cref="Confluent.SchemaRegistry.SchemaIdSerializerStrategy" />
95+
/// </summary>
96+
public const string SchemaIdStrategy = "avro.serializer.schema.id.strategy";
9297
}
9398

9499

@@ -189,7 +194,7 @@ public IDictionary<string, string> UseLatestWithMetadata
189194

190195
/// <summary>
191196
/// Subject name strategy.
192-
///
197+
///
193198
/// default: SubjectNameStrategy.Topic
194199
/// </summary>
195200
public SubjectNameStrategy? SubjectNameStrategy
@@ -215,5 +220,33 @@ public SubjectNameStrategy? SubjectNameStrategy
215220
}
216221
}
217222

223+
224+
/// <summary>
225+
/// Schema id strategy.
226+
///
227+
/// default: SchemaIdSerializerStrategy.Prefix
228+
/// </summary>
229+
public SchemaIdSerializerStrategy? SchemaIdStrategy
230+
{
231+
get
232+
{
233+
var r = Get(PropertyNames.SchemaIdStrategy);
234+
if (r == null) { return null; }
235+
else
236+
{
237+
SchemaIdSerializerStrategy result;
238+
if (!Enum.TryParse<SchemaIdSerializerStrategy>(r, out result))
239+
throw new ArgumentException(
240+
$"Unknown ${PropertyNames.SchemaIdStrategy} value: {r}.");
241+
else
242+
return result;
243+
}
244+
}
245+
set
246+
{
247+
if (value == null) { this.properties.Remove(PropertyNames.SchemaIdStrategy); }
248+
else { this.properties[PropertyNames.SchemaIdStrategy] = value.ToString(); }
249+
}
250+
}
218251
}
219252
}

src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
using System;
1818
using System.Collections.Generic;
1919
using System.IO;
20-
using System.Net;
2120
using System.Text;
2221
using System.Threading.Tasks;
2322
using Avro.IO;
@@ -48,6 +47,7 @@ public GenericDeserializerImpl(
4847
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
4948
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
5049
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
50+
if (config.SchemaIdStrategy != null) { this.schemaIdDeserializer = config.SchemaIdStrategy.Value.ToDeserializer(); }
5151
}
5252

5353
public override async Task<GenericRecord> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull,
@@ -66,11 +66,6 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
6666
// Note: topic is not necessary for deserialization (or knowing if it's a key
6767
// or value) only the schema id is needed.
6868

69-
if (array.Length < 5)
70-
{
71-
throw new InvalidDataException($"Expecting data framing of length 5 bytes or more but total data size is {array.Length} bytes");
72-
}
73-
7469
string subject = GetSubjectName(topic, isKey, null);
7570
RegisteredSchema latestSchema = null;
7671
if (subject != null)
@@ -85,17 +80,12 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
8580
Avro.Schema readerSchema;
8681
GenericRecord data;
8782
IList<Migration> migrations = new List<Migration>();
88-
using (var stream = new MemoryStream(array))
89-
using (var reader = new BinaryReader(stream))
83+
SerializationContext context = new SerializationContext(
84+
isKey ? MessageComponentType.Key : MessageComponentType.Value, topic, headers);
85+
SchemaId writerId = new SchemaId(SchemaType.Avro);
86+
using (var stream = schemaIdDeserializer.Deserialize(array, context, ref writerId))
9087
{
91-
var magicByte = reader.ReadByte();
92-
if (magicByte != Constants.MagicByte)
93-
{
94-
throw new InvalidDataException($"Expecting data with Confluent Schema Registry framing. Magic byte was {array[0]}, expecting {Constants.MagicByte}");
95-
}
96-
var writerId = IPAddress.NetworkToHostOrder(reader.ReadInt32());
97-
98-
(writerSchemaJson, writerSchema) = await GetSchema(subject, writerId);
88+
(writerSchemaJson, writerSchema) = await GetWriterSchema(subject, writerId);
9989
if (subject == null)
10090
{
10191
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
@@ -130,15 +120,15 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
130120

131121
jsonString = Encoding.UTF8.GetString(jsonStream.ToArray());
132122
}
133-
123+
134124
JToken json = JToken.Parse(jsonString);
135125
json = await ExecuteMigrations(migrations, isKey, subject, topic, headers, json)
136126
.ContinueWith(t => (JToken)t.Result)
137127
.ConfigureAwait(continueOnCapturedContext: false);
138128
readerSchemaJson = latestSchema;
139129
readerSchema = await GetParsedSchema(readerSchemaJson);
140130
Avro.IO.Decoder decoder = new JsonDecoder(readerSchema, json.ToString(Formatting.None));
141-
131+
142132
datumReader = new GenericReader<GenericRecord>(readerSchema, readerSchema);
143133
data = datumReader.Read(default(GenericRecord), decoder);
144134
}
@@ -158,7 +148,7 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
158148
data = datumReader.Read(default(GenericRecord), new BinaryDecoder(stream));
159149
}
160150
}
161-
151+
162152
FieldTransformer fieldTransformer = async (ctx, transform, message) =>
163153
{
164154
return await AvroUtils.Transform(ctx, readerSchema, message, transform).ConfigureAwait(false);

src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
using System;
2121
using System.Collections.Generic;
2222
using System.IO;
23-
using System.Net;
2423
using System.Threading.Tasks;
2524
using Confluent.Kafka;
2625
using Avro.Generic;
@@ -33,8 +32,8 @@ internal class GenericSerializerImpl : AsyncSerializer<GenericRecord, Avro.Schem
3332
{
3433
private Dictionary<Avro.Schema, string> knownSchemas =
3534
new Dictionary<global::Avro.Schema, string>();
36-
private Dictionary<KeyValuePair<string, string>, int> registeredSchemas =
37-
new Dictionary<KeyValuePair<string, string>, int>();
35+
private Dictionary<KeyValuePair<string, string>, SchemaId> registeredSchemas =
36+
new Dictionary<KeyValuePair<string, string>, SchemaId>();
3837

3938
public GenericSerializerImpl(
4039
ISchemaRegistryClient schemaRegistryClient,
@@ -50,6 +49,7 @@ public GenericSerializerImpl(
5049
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
5150
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
5251
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
52+
if (config.SchemaIdStrategy != null) { this.schemaIdSerializer = config.SchemaIdStrategy.Value.ToSerializer(); }
5353

5454
if (this.useLatestVersion && this.autoRegisterSchema)
5555
{
@@ -89,9 +89,9 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
8989
{
9090
try
9191
{
92-
int schemaId;
92+
SchemaId schemaId;
9393
string subject;
94-
RegisteredSchema latestSchema = null;
94+
RegisteredSchema latestSchema;
9595
Avro.Schema writerSchema;
9696
await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
9797
try
@@ -137,19 +137,21 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
137137
.ConfigureAwait(continueOnCapturedContext: false);
138138
if (latestSchema != null)
139139
{
140-
schemaId = latestSchema.Id;
140+
schemaId = new SchemaId(SchemaType.Avro, latestSchema.Id, latestSchema.Guid);
141141
}
142142
else if (!registeredSchemas.TryGetValue(subjectSchemaPair, out schemaId))
143143
{
144144
// first usage: register/get schema to check compatibility
145-
schemaId = autoRegisterSchema
145+
var inputSchema = new Schema(writerSchemaString, new List<SchemaReference>(), SchemaType.Avro);
146+
var outputSchema = autoRegisterSchema
146147
? await schemaRegistryClient
147-
.RegisterSchemaAsync(subject, writerSchemaString, normalizeSchemas)
148+
.RegisterSchemaWithResponseAsync(subject, inputSchema, normalizeSchemas)
148149
.ConfigureAwait(continueOnCapturedContext: false)
149150
: await schemaRegistryClient
150-
.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
151+
.LookupSchemaAsync(subject, inputSchema, normalizeSchemas)
151152
.ConfigureAwait(continueOnCapturedContext: false);
152153

154+
schemaId = new SchemaId(SchemaType.Avro, outputSchema.Id, outputSchema.Guid);
153155
registeredSchemas.Add(subjectSchemaPair, schemaId);
154156
}
155157
}
@@ -171,13 +173,13 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
171173
}
172174

173175
using (var stream = new MemoryStream(initialBufferSize))
174-
using (var writer = new BinaryWriter(stream))
175176
{
176-
stream.WriteByte(Constants.MagicByte);
177-
writer.Write(IPAddress.HostToNetworkOrder(schemaId));
178177
new GenericWriter<GenericRecord>(writerSchema)
179178
.Write(data, new BinaryEncoder(stream));
180-
return stream.ToArray();
179+
byte[] payload = stream.ToArray();
180+
SerializationContext context = new SerializationContext(
181+
isKey ? MessageComponentType.Key : MessageComponentType.Value, topic, headers);
182+
return schemaIdSerializer.Serialize(payload, context, schemaId);
181183
}
182184
}
183185
catch (AggregateException e)

src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
using System;
1818
using System.Collections.Generic;
1919
using System.IO;
20-
using System.Net;
21-
using System.Reflection;
2220
using System.Text;
23-
using System.Threading;
2421
using System.Threading.Tasks;
2522
using Avro.Specific;
2623
using Avro.IO;
@@ -101,6 +98,7 @@ public SpecificDeserializerImpl(
10198
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
10299
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
103100
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
101+
if (config.SchemaIdStrategy != null) { this.schemaIdDeserializer = config.SchemaIdStrategy.Value.ToDeserializer(); }
104102
}
105103

106104
public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull,
@@ -119,12 +117,6 @@ public async Task<T> Deserialize(string topic, Headers headers, byte[] array, bo
119117
// Note: topic is not necessary for deserialization (or knowing if it's a key
120118
// or value) only the schema id is needed.
121119

122-
if (array.Length < 5)
123-
{
124-
throw new InvalidDataException(
125-
$"Expecting data framing of length 5 bytes or more but total data size is {array.Length} bytes");
126-
}
127-
128120
string subject = GetSubjectName(topic, isKey, null);
129121
RegisteredSchema latestSchema = null;
130122
if (subject != null)
@@ -133,21 +125,16 @@ public async Task<T> Deserialize(string topic, Headers headers, byte[] array, bo
133125
.ConfigureAwait(continueOnCapturedContext: false);
134126
}
135127

136-
Schema writerSchemaJson = null;
137-
Avro.Schema writerSchema = null;
128+
Schema writerSchemaJson;
129+
Avro.Schema writerSchema;
138130
object data;
139131
IList<Migration> migrations = new List<Migration>();
140-
using (var stream = new MemoryStream(array))
141-
using (var reader = new BinaryReader(stream))
132+
SerializationContext context = new SerializationContext(
133+
isKey ? MessageComponentType.Key : MessageComponentType.Value, topic, headers);
134+
SchemaId writerId = new SchemaId(SchemaType.Avro);
135+
using (var stream = schemaIdDeserializer.Deserialize(array, context, ref writerId))
142136
{
143-
var magicByte = reader.ReadByte();
144-
if (magicByte != Constants.MagicByte)
145-
{
146-
throw new InvalidDataException($"Expecting data with Confluent Schema Registry framing. Magic byte was {array[0]}, expecting {Constants.MagicByte}");
147-
}
148-
var writerId = IPAddress.NetworkToHostOrder(reader.ReadInt32());
149-
150-
(writerSchemaJson, writerSchema) = await GetSchema(subject, writerId);
137+
(writerSchemaJson, writerSchema) = await GetWriterSchema(subject, writerId);
151138
if (subject == null)
152139
{
153140
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);

0 commit comments

Comments
 (0)