Skip to content

Commit 7b4ba94

Browse files
authored
Rework SchemaId encoding in headers to minimize heap allocations (#2480)
1 parent 6ed642c commit 7b4ba94

File tree

16 files changed

+567
-495
lines changed

16 files changed

+567
-495
lines changed

src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs

Lines changed: 54 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,19 @@ public GenericDeserializerImpl(
4747
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
4848
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
4949
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
50-
if (config.SchemaIdStrategy != null) { this.schemaIdDeserializer = config.SchemaIdStrategy.Value.ToDeserializer(); }
50+
if (config.SchemaIdStrategy != null) { this.schemaIdDecoder = config.SchemaIdStrategy.Value.ToDeserializer(); }
5151
}
5252

5353
public override async Task<GenericRecord> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull,
5454
SerializationContext context)
5555
{
5656
return isNull
5757
? default
58-
: await Deserialize(context.Topic, context.Headers, data.ToArray(),
58+
: await Deserialize(context.Topic, context.Headers, data,
5959
context.Component == MessageComponentType.Key).ConfigureAwait(false);
6060
}
6161

62-
public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte[] array, bool isKey)
62+
public async Task<GenericRecord> Deserialize(string topic, Headers headers, ReadOnlyMemory<byte> array, bool isKey)
6363
{
6464
try
6565
{
@@ -83,68 +83,74 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
8383
SerializationContext context = new SerializationContext(
8484
isKey ? MessageComponentType.Key : MessageComponentType.Value, topic, headers);
8585
SchemaId writerId = new SchemaId(SchemaType.Avro);
86-
using (var stream = schemaIdDeserializer.Deserialize(array, context, ref writerId))
86+
var payload = schemaIdDecoder.Decode(array, context, ref writerId);
87+
88+
(writerSchemaJson, writerSchema) = await GetWriterSchema(subject, writerId).ConfigureAwait(false);
89+
if (subject == null)
8790
{
88-
(writerSchemaJson, writerSchema) = await GetWriterSchema(subject, writerId).ConfigureAwait(false);
89-
if (subject == null)
91+
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
92+
if (subject != null)
9093
{
91-
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
92-
if (subject != null)
93-
{
94-
latestSchema = await GetReaderSchema(subject)
95-
.ConfigureAwait(continueOnCapturedContext: false);
96-
}
97-
}
98-
99-
if (latestSchema != null)
100-
{
101-
migrations = await GetMigrations(subject, writerSchemaJson, latestSchema)
94+
latestSchema = await GetReaderSchema(subject)
10295
.ConfigureAwait(continueOnCapturedContext: false);
10396
}
97+
}
10498

105-
DatumReader<GenericRecord> datumReader;
106-
if (migrations.Count > 0)
99+
if (latestSchema != null)
100+
{
101+
migrations = await GetMigrations(subject, writerSchemaJson, latestSchema)
102+
.ConfigureAwait(continueOnCapturedContext: false);
103+
}
104+
105+
DatumReader<GenericRecord> datumReader;
106+
if (migrations.Count > 0)
107+
{
108+
using (var stream = new MemoryStream(payload.ToArray()))
107109
{
108110
data = new GenericReader<GenericRecord>(writerSchema, writerSchema)
109111
.Read(default(GenericRecord), new BinaryDecoder(stream));
112+
}
113+
114+
string jsonString;
115+
using (var jsonStream = new MemoryStream())
116+
{
117+
GenericRecord record = data;
118+
DatumWriter<object> datumWriter = new GenericDatumWriter<object>(writerSchema);
110119

111-
string jsonString;
112-
using (var jsonStream = new MemoryStream())
113-
{
114-
GenericRecord record = data;
115-
DatumWriter<object> datumWriter = new GenericDatumWriter<object>(writerSchema);
120+
JsonEncoder encoder = new JsonEncoder(writerSchema, jsonStream);
121+
datumWriter.Write(record, encoder);
122+
encoder.Flush();
116123

117-
JsonEncoder encoder = new JsonEncoder(writerSchema, jsonStream);
118-
datumWriter.Write(record, encoder);
119-
encoder.Flush();
124+
jsonString = Encoding.UTF8.GetString(jsonStream.ToArray());
125+
}
120126

121-
jsonString = Encoding.UTF8.GetString(jsonStream.ToArray());
122-
}
127+
JToken json = JToken.Parse(jsonString);
128+
json = await ExecuteMigrations(migrations, isKey, subject, topic, headers, json)
129+
.ContinueWith(t => (JToken)t.Result)
130+
.ConfigureAwait(continueOnCapturedContext: false);
131+
readerSchemaJson = latestSchema;
132+
readerSchema = await GetParsedSchema(readerSchemaJson).ConfigureAwait(false);
133+
Avro.IO.Decoder decoder = new JsonDecoder(readerSchema, json.ToString(Formatting.None));
123134

124-
JToken json = JToken.Parse(jsonString);
125-
json = await ExecuteMigrations(migrations, isKey, subject, topic, headers, json)
126-
.ContinueWith(t => (JToken)t.Result)
127-
.ConfigureAwait(continueOnCapturedContext: false);
135+
datumReader = new GenericReader<GenericRecord>(readerSchema, readerSchema);
136+
data = datumReader.Read(default(GenericRecord), decoder);
137+
}
138+
else
139+
{
140+
if (latestSchema != null)
141+
{
128142
readerSchemaJson = latestSchema;
129143
readerSchema = await GetParsedSchema(readerSchemaJson).ConfigureAwait(false);
130-
Avro.IO.Decoder decoder = new JsonDecoder(readerSchema, json.ToString(Formatting.None));
131-
132-
datumReader = new GenericReader<GenericRecord>(readerSchema, readerSchema);
133-
data = datumReader.Read(default(GenericRecord), decoder);
134144
}
135145
else
136146
{
137-
if (latestSchema != null)
138-
{
139-
readerSchemaJson = latestSchema;
140-
readerSchema = await GetParsedSchema(readerSchemaJson).ConfigureAwait(false);
141-
}
142-
else
143-
{
144-
readerSchemaJson = writerSchemaJson;
145-
readerSchema = writerSchema;
146-
}
147-
datumReader = await GetDatumReader(writerSchema, readerSchema).ConfigureAwait(false);
147+
readerSchemaJson = writerSchemaJson;
148+
readerSchema = writerSchema;
149+
}
150+
datumReader = await GetDatumReader(writerSchema, readerSchema).ConfigureAwait(false);
151+
152+
using (var stream = new MemoryStream(payload.ToArray()))
153+
{
148154
data = datumReader.Read(default(GenericRecord), new BinaryDecoder(stream));
149155
}
150156
}

src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public GenericSerializerImpl(
4949
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
5050
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
5151
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
52-
if (config.SchemaIdStrategy != null) { this.schemaIdSerializer = config.SchemaIdStrategy.Value.ToSerializer(); }
52+
if (config.SchemaIdStrategy != null) { this.schemaIdEncoder = config.SchemaIdStrategy.Value.ToEncoder(); }
5353

5454
if (this.useLatestVersion && this.autoRegisterSchema)
5555
{
@@ -171,15 +171,23 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
171171
latestSchema, data, fieldTransformer)
172172
.ConfigureAwait(continueOnCapturedContext: false);
173173
}
174+
175+
SerializationContext context = new SerializationContext(
176+
isKey ? MessageComponentType.Key : MessageComponentType.Value, topic, headers);
174177

175178
using (var stream = new MemoryStream(initialBufferSize))
176179
{
177180
new GenericWriter<GenericRecord>(writerSchema)
178181
.Write(data, new BinaryEncoder(stream));
179-
byte[] payload = stream.ToArray();
180-
SerializationContext context = new SerializationContext(
181-
isKey ? MessageComponentType.Key : MessageComponentType.Value, topic, headers);
182-
return schemaIdSerializer.Serialize(payload, context, schemaId);
182+
183+
var schemaIdSize = schemaIdEncoder.CalculateSize(ref schemaId);
184+
var serializedMessageSize = (int) stream.Length;
185+
186+
var buffer = new byte[schemaIdSize + serializedMessageSize];
187+
schemaIdEncoder.Encode(buffer, ref context, ref schemaId);
188+
stream.GetBuffer().AsSpan(0, serializedMessageSize).CopyTo(buffer.AsSpan(schemaIdSize));
189+
190+
return buffer;
183191
}
184192
}
185193
catch (AggregateException e)

src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,19 @@ public SpecificDeserializerImpl(
9898
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
9999
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
100100
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
101-
if (config.SchemaIdStrategy != null) { this.schemaIdDeserializer = config.SchemaIdStrategy.Value.ToDeserializer(); }
101+
if (config.SchemaIdStrategy != null) { this.schemaIdDecoder = config.SchemaIdStrategy.Value.ToDeserializer(); }
102102
}
103103

104104
public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull,
105105
SerializationContext context)
106106
{
107107
return isNull
108108
? default
109-
: await Deserialize(context.Topic, context.Headers, data.ToArray(),
109+
: await Deserialize(context.Topic, context.Headers, data,
110110
context.Component == MessageComponentType.Key).ConfigureAwait(false);
111111
}
112112

113-
public async Task<T> Deserialize(string topic, Headers headers, byte[] array, bool isKey)
113+
public async Task<T> Deserialize(string topic, Headers headers, ReadOnlyMemory<byte> array, bool isKey)
114114
{
115115
try
116116
{
@@ -132,58 +132,66 @@ public async Task<T> Deserialize(string topic, Headers headers, byte[] array, bo
132132
SerializationContext context = new SerializationContext(
133133
isKey ? MessageComponentType.Key : MessageComponentType.Value, topic, headers);
134134
SchemaId writerId = new SchemaId(SchemaType.Avro);
135-
using (var stream = schemaIdDeserializer.Deserialize(array, context, ref writerId))
135+
var payload = schemaIdDecoder.Decode(array, context, ref writerId);
136+
137+
(writerSchemaJson, writerSchema) = await GetWriterSchema(subject, writerId).ConfigureAwait(false);
138+
if (subject == null)
136139
{
137-
(writerSchemaJson, writerSchema) = await GetWriterSchema(subject, writerId).ConfigureAwait(false);
138-
if (subject == null)
140+
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
141+
if (subject != null)
139142
{
140-
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
141-
if (subject != null)
142-
{
143-
latestSchema = await GetReaderSchema(subject)
144-
.ConfigureAwait(continueOnCapturedContext: false);
145-
}
146-
}
147-
148-
if (latestSchema != null)
149-
{
150-
migrations = await GetMigrations(subject, writerSchemaJson, latestSchema)
143+
latestSchema = await GetReaderSchema(subject)
151144
.ConfigureAwait(continueOnCapturedContext: false);
152145
}
146+
}
147+
148+
if (latestSchema != null)
149+
{
150+
migrations = await GetMigrations(subject, writerSchemaJson, latestSchema)
151+
.ConfigureAwait(continueOnCapturedContext: false);
152+
}
153153

154-
DatumReader<T> datumReader = null;
155-
if (migrations.Count > 0)
154+
DatumReader<T> datumReader = null;
155+
if (migrations.Count > 0)
156+
{
157+
// TODO: We should be able to write a wrapper around ReadOnlyMemory<byte> that allows us to
158+
// pass it to the BinaryDecoder without copying the data to a MemoryStream.
159+
using (var memoryStream = new MemoryStream(payload.ToArray()))
156160
{
157161
data = new GenericReader<GenericRecord>(writerSchema, writerSchema)
158-
.Read(default(GenericRecord), new BinaryDecoder(stream));
162+
.Read(default(GenericRecord), new BinaryDecoder(memoryStream));
163+
}
159164

160-
string jsonString = null;
161-
using (var jsonStream = new MemoryStream())
162-
{
163-
GenericRecord record = (GenericRecord)data;
164-
DatumWriter<object> datumWriter = new GenericDatumWriter<object>(writerSchema);
165+
string jsonString = null;
166+
using (var jsonStream = new MemoryStream())
167+
{
168+
GenericRecord record = (GenericRecord)data;
169+
DatumWriter<object> datumWriter = new GenericDatumWriter<object>(writerSchema);
165170

166-
JsonEncoder encoder = new JsonEncoder(writerSchema, jsonStream);
167-
datumWriter.Write(record, encoder);
168-
encoder.Flush();
171+
JsonEncoder encoder = new JsonEncoder(writerSchema, jsonStream);
172+
datumWriter.Write(record, encoder);
173+
encoder.Flush();
169174

170-
jsonString = Encoding.UTF8.GetString(jsonStream.ToArray());
171-
}
175+
jsonString = Encoding.UTF8.GetString(jsonStream.ToArray());
176+
}
172177

173-
JToken json = JToken.Parse(jsonString);
174-
json = await ExecuteMigrations(migrations, isKey, subject, topic, headers, json)
175-
.ContinueWith(t => (JToken)t.Result)
176-
.ConfigureAwait(continueOnCapturedContext: false);
177-
Avro.IO.Decoder decoder = new JsonDecoder(ReaderSchema, json.ToString(Formatting.None));
178+
JToken json = JToken.Parse(jsonString);
179+
json = await ExecuteMigrations(migrations, isKey, subject, topic, headers, json)
180+
.ContinueWith(t => (JToken)t.Result)
181+
.ConfigureAwait(continueOnCapturedContext: false);
182+
Avro.IO.Decoder decoder = new JsonDecoder(ReaderSchema, json.ToString(Formatting.None));
178183

179-
datumReader = new SpecificReader<T>(ReaderSchema, ReaderSchema);
180-
data = Read(datumReader, decoder);
181-
}
182-
else
183-
{
184-
datumReader = await GetDatumReader(writerSchema, ReaderSchema).ConfigureAwait(false);
185-
data = Read(datumReader, new BinaryDecoder(stream));
186-
}
184+
datumReader = new SpecificReader<T>(ReaderSchema, ReaderSchema);
185+
data = Read(datumReader, decoder);
186+
}
187+
else
188+
{
189+
datumReader = await GetDatumReader(writerSchema, ReaderSchema).ConfigureAwait(false);
190+
191+
// TODO: We should be able to write a wrapper around ReadOnlyMemory<byte> that allows us to
192+
// pass it to the BinaryDecoder without copying the data to a MemoryStream.
193+
using var stream = new MemoryStream(payload.ToArray());
194+
data = Read(datumReader, new BinaryDecoder(stream));
187195
}
188196

189197
Schema readerSchemaJson = latestSchema ?? writerSchemaJson;

src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public SpecificSerializerImpl(
7575
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
7676
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
7777
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
78-
if (config.SchemaIdStrategy != null) { this.schemaIdSerializer = config.SchemaIdStrategy.Value.ToSerializer(); }
78+
if (config.SchemaIdStrategy != null) { this.schemaIdEncoder = config.SchemaIdStrategy.Value.ToEncoder(); }
7979

8080
if (this.useLatestVersion && this.autoRegisterSchema)
8181
{
@@ -214,13 +214,21 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
214214
.ConfigureAwait(continueOnCapturedContext: false);
215215
}
216216

217+
SerializationContext context = new SerializationContext(
218+
isKey ? MessageComponentType.Key : MessageComponentType.Value, topic, headers);
219+
217220
using (var stream = new MemoryStream(initialBufferSize))
218221
{
219222
currentSchemaData.AvroWriter.Write(data, new BinaryEncoder(stream));
220-
byte[] payload = stream.ToArray();
221-
SerializationContext context = new SerializationContext(
222-
isKey ? MessageComponentType.Key : MessageComponentType.Value, topic, headers);
223-
return schemaIdSerializer.Serialize(payload, context, schemaId);
223+
224+
var schemaIdSize = schemaIdEncoder.CalculateSize(ref schemaId);
225+
var serializedMessageSize = (int) stream.Length;
226+
227+
var buffer = new byte[schemaIdSize + serializedMessageSize];
228+
schemaIdEncoder.Encode(buffer, ref context, ref schemaId);
229+
stream.GetBuffer().AsSpan(0, serializedMessageSize).CopyTo(buffer.AsSpan(schemaIdSize));
230+
231+
return buffer;
224232
}
225233
}
226234
catch (AggregateException e)

0 commit comments

Comments
 (0)