Skip to content

Commit c173ba5

Browse files
authored
DGS-21268 Add support for full payload encryption (confluentinc#2499)
1 parent fa0f92a commit c173ba5

File tree

16 files changed

+1139
-603
lines changed

16 files changed

+1139
-603
lines changed

src/Confluent.SchemaRegistry.Encryption/EncryptionExecutor.cs

Lines changed: 595 additions & 0 deletions
Large diffs are not rendered by default.

src/Confluent.SchemaRegistry.Encryption/FieldEncryptionExecutor.cs

Lines changed: 34 additions & 508 deletions
Large diffs are not rendered by default.

src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ private async Task<GenericRecord> Deserialize(string topic, Headers headers, Rea
9595
.ConfigureAwait(continueOnCapturedContext: false);
9696
}
9797
}
98+
payload = await ExecuteRules(isKey, subject, topic, headers, RulePhase.Encoding, RuleMode.Read,
99+
null, writerSchemaJson, payload, null)
100+
.ContinueWith(t => t.Result is byte[] bytes ? new ReadOnlyMemory<byte>(bytes) : (ReadOnlyMemory<byte>)t.Result)
101+
.ConfigureAwait(continueOnCapturedContext: false);
98102

99103
if (latestSchema != null)
100104
{
@@ -159,8 +163,8 @@ private async Task<GenericRecord> Deserialize(string topic, Headers headers, Rea
159163
{
160164
return await AvroUtils.Transform(ctx, readerSchema, message, transform).ConfigureAwait(false);
161165
};
162-
data = await ExecuteRules(isKey, subject, topic, headers, RuleMode.Read, null,
163-
readerSchemaJson, data, fieldTransformer)
166+
data = await ExecuteRules(isKey, subject, topic, headers, RuleMode.Read,
167+
null, readerSchemaJson, data, fieldTransformer)
164168
.ContinueWith(t => (GenericRecord)t.Result)
165169
.ConfigureAwait(continueOnCapturedContext: false);
166170

src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,9 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
167167
{
168168
return await AvroUtils.Transform(ctx, writerSchema, message, transform).ConfigureAwait(false);
169169
};
170-
data = (GenericRecord) await ExecuteRules(isKey, subject, topic, headers, RuleMode.Write, null,
171-
latestSchema, data, fieldTransformer)
170+
data = await ExecuteRules(isKey, subject, topic, headers, RuleMode.Write,
171+
null, latestSchema, data, fieldTransformer)
172+
.ContinueWith(t => (GenericRecord)t.Result)
172173
.ConfigureAwait(continueOnCapturedContext: false);
173174
}
174175

@@ -180,14 +181,19 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
180181
new GenericWriter<GenericRecord>(writerSchema)
181182
.Write(data, new BinaryEncoder(stream));
182183

184+
var buffer = await ExecuteRules(isKey, subject, topic, headers, RulePhase.Encoding, RuleMode.Write,
185+
null, latestSchema, stream.GetBuffer(), null)
186+
.ContinueWith(t => (byte[])t.Result)
187+
.ConfigureAwait(continueOnCapturedContext: false);
188+
183189
var schemaIdSize = schemaIdEncoder.CalculateSize(ref schemaId);
184-
var serializedMessageSize = (int) stream.Length;
190+
var serializedMessageSize = (int) buffer.Length;
185191

186-
var buffer = new byte[schemaIdSize + serializedMessageSize];
187-
schemaIdEncoder.Encode(buffer, ref context, ref schemaId);
188-
stream.GetBuffer().AsSpan(0, serializedMessageSize).CopyTo(buffer.AsSpan(schemaIdSize));
192+
var result = new byte[schemaIdSize + serializedMessageSize];
193+
schemaIdEncoder.Encode(result, ref context, ref schemaId);
194+
buffer.AsSpan(0, serializedMessageSize).CopyTo(result.AsSpan(schemaIdSize));
189195

190-
return buffer;
196+
return result;
191197
}
192198
}
193199
catch (AggregateException e)

src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,11 @@ public async Task<T> Deserialize(string topic, Headers headers, ReadOnlyMemory<b
144144
.ConfigureAwait(continueOnCapturedContext: false);
145145
}
146146
}
147-
147+
payload = await ExecuteRules(isKey, subject, topic, headers, RulePhase.Encoding, RuleMode.Read,
148+
null, writerSchemaJson, payload, null)
149+
.ContinueWith(t => t.Result is byte[] bytes ? new ReadOnlyMemory<byte>(bytes) : (ReadOnlyMemory<byte>)t.Result)
150+
.ConfigureAwait(continueOnCapturedContext: false);
151+
148152
if (latestSchema != null)
149153
{
150154
migrations = await GetMigrations(subject, writerSchemaJson, latestSchema)
@@ -198,8 +202,8 @@ public async Task<T> Deserialize(string topic, Headers headers, ReadOnlyMemory<b
198202
{
199203
return await AvroUtils.Transform(ctx, readerSchema, message, transform).ConfigureAwait(false);
200204
};
201-
data = await ExecuteRules(isKey, subject, topic, headers, RuleMode.Read, null,
202-
readerSchemaJson, data, fieldTransformer)
205+
data = await ExecuteRules(isKey, subject, topic, headers, RuleMode.Read,
206+
null, readerSchemaJson, data, fieldTransformer)
203207
.ConfigureAwait(continueOnCapturedContext: false);
204208

205209
return (T) data;

src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,9 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
209209
{
210210
return await AvroUtils.Transform(ctx, schema, message, transform).ConfigureAwait(false);
211211
};
212-
data = (T) await ExecuteRules(isKey, subject, topic, headers, RuleMode.Write, null,
213-
latestSchema, data, fieldTransformer)
212+
data = await ExecuteRules(isKey, subject, topic, headers, RuleMode.Write,
213+
null, latestSchema, data, fieldTransformer)
214+
.ContinueWith(t => (T)t.Result)
214215
.ConfigureAwait(continueOnCapturedContext: false);
215216
}
216217

@@ -221,14 +222,19 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
221222
{
222223
currentSchemaData.AvroWriter.Write(data, new BinaryEncoder(stream));
223224

225+
var buffer = await ExecuteRules(isKey, subject, topic, headers, RulePhase.Encoding, RuleMode.Write,
226+
null, latestSchema, stream.GetBuffer(), null)
227+
.ContinueWith(t => (byte[])t.Result)
228+
.ConfigureAwait(continueOnCapturedContext: false);
229+
224230
var schemaIdSize = schemaIdEncoder.CalculateSize(ref schemaId);
225-
var serializedMessageSize = (int) stream.Length;
231+
var serializedMessageSize = (int) buffer.Length;
226232

227-
var buffer = new byte[schemaIdSize + serializedMessageSize];
228-
schemaIdEncoder.Encode(buffer, ref context, ref schemaId);
229-
stream.GetBuffer().AsSpan(0, serializedMessageSize).CopyTo(buffer.AsSpan(schemaIdSize));
233+
var result = new byte[schemaIdSize + serializedMessageSize];
234+
schemaIdEncoder.Encode(result, ref context, ref schemaId);
235+
buffer.AsSpan(0, serializedMessageSize).CopyTo(result.AsSpan(schemaIdSize));
230236

231-
return buffer;
237+
return result;
232238
}
233239
}
234240
catch (AggregateException e)

src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
205205
}
206206
}
207207
}
208+
payload = await ExecuteRules(context.Component == MessageComponentType.Key,
209+
subject, context.Topic, context.Headers, RulePhase.Encoding, RuleMode.Read,
210+
null, writerSchemaJson, payload, null)
211+
.ContinueWith(t => t.Result is byte[] bytes ? new ReadOnlyMemory<byte>(bytes) : (ReadOnlyMemory<byte>)t.Result)
212+
.ConfigureAwait(continueOnCapturedContext: false);
208213

209214
if (latestSchema != null)
210215
{
@@ -269,9 +274,9 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
269274
{
270275
return await JsonUtils.Transform(ctx, readerSchema, "$", message, transform).ConfigureAwait(false);
271276
};
272-
value = await ExecuteRules(context.Component == MessageComponentType.Key, subject,
273-
context.Topic, context.Headers, RuleMode.Read, null,
274-
readerSchemaJson, value, fieldTransformer)
277+
value = await ExecuteRules(context.Component == MessageComponentType.Key,
278+
subject, context.Topic, context.Headers, RuleMode.Read,
279+
null, readerSchemaJson, value, fieldTransformer)
275280
.ContinueWith(t => (T)t.Result)
276281
.ConfigureAwait(continueOnCapturedContext: false);
277282
}

src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,10 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
244244
{
245245
return await JsonUtils.Transform(ctx, parsedSchema, "$", message, transform).ConfigureAwait(false);
246246
};
247-
value = (T) await ExecuteRules(context.Component == MessageComponentType.Key, subject,
248-
context.Topic, context.Headers, RuleMode.Write, null,
249-
latestSchema, value, fieldTransformer)
247+
value = await ExecuteRules(context.Component == MessageComponentType.Key,
248+
subject, context.Topic, context.Headers, RuleMode.Write,
249+
null, latestSchema, value, fieldTransformer)
250+
.ContinueWith(t => (T)t.Result)
250251
.ConfigureAwait(continueOnCapturedContext: false);
251252
}
252253
else
@@ -264,15 +265,22 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
264265
throw new InvalidDataException("Schema validation failed for properties: [" + string.Join(", ", validationResult.Select(r => r.Path)) + "]");
265266
}
266267
}
267-
268+
269+
var buffer = System.Text.Encoding.UTF8.GetBytes(serializedString);
270+
buffer = await ExecuteRules(context.Component == MessageComponentType.Key,
271+
subject, context.Topic, context.Headers, RulePhase.Encoding, RuleMode.Write,
272+
null, latestSchema, buffer, null)
273+
.ContinueWith(t => (byte[])t.Result)
274+
.ConfigureAwait(continueOnCapturedContext: false);
275+
268276
var schemaIdSize = schemaIdEncoder.CalculateSize(ref schemaId);
269-
var serializedMessageSize = System.Text.Encoding.UTF8.GetByteCount(serializedString);
270-
271-
var buffer = new byte[schemaIdSize + serializedMessageSize];
272-
schemaIdEncoder.Encode(buffer, ref context, ref schemaId);
273-
System.Text.Encoding.UTF8.GetBytes(serializedString).AsSpan().CopyTo(buffer.AsSpan(schemaIdSize));
277+
var serializedMessageSize = buffer.Length;
278+
279+
var result = new byte[schemaIdSize + serializedMessageSize];
280+
schemaIdEncoder.Encode(result, ref context, ref schemaId);
281+
buffer.AsSpan().CopyTo(result.AsSpan(schemaIdSize));
274282

275-
return buffer;
283+
return result;
276284
}
277285
catch (AggregateException e)
278286
{

src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
134134
{
135135
(writerSchema, fdSet) = await GetWriterSchema(subject, writerId).ConfigureAwait(false);
136136
}
137+
payload = await ExecuteRules(context.Component == MessageComponentType.Key,
138+
subject, context.Topic, context.Headers, RulePhase.Encoding, RuleMode.Read,
139+
null, writerSchema, payload, null)
140+
.ContinueWith(t => t.Result is byte[] bytes ? new ReadOnlyMemory<byte>(bytes) : (ReadOnlyMemory<byte>)t.Result)
141+
.ConfigureAwait(continueOnCapturedContext: false);
142+
137143
message = new T();
138144
message.MergeFrom(payload.Span);
139145

@@ -143,8 +149,9 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
143149
{
144150
return await ProtobufUtils.Transform(ctx, fdSet, messageToTransform, transform).ConfigureAwait(false);
145151
};
146-
message = await ExecuteRules(context.Component == MessageComponentType.Key, subject, context.Topic, context.Headers, RuleMode.Read, null,
147-
writerSchema, message, fieldTransformer)
152+
message = await ExecuteRules(context.Component == MessageComponentType.Key,
153+
subject, context.Topic, context.Headers, RuleMode.Read,
154+
null, writerSchema, message, fieldTransformer)
148155
.ContinueWith(t => (T)t.Result)
149156
.ConfigureAwait(continueOnCapturedContext: false);
150157
}

src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -272,25 +272,31 @@ await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema
272272
{
273273
return await ProtobufUtils.Transform(ctx, fdSet, message, transform).ConfigureAwait(false);
274274
};
275-
value = (T) await ExecuteRules(context.Component == MessageComponentType.Key, subject,
276-
context.Topic, context.Headers, RuleMode.Write, null,
277-
latestSchema, value, fieldTransformer)
275+
value = await ExecuteRules(context.Component == MessageComponentType.Key,
276+
subject, context.Topic, context.Headers, RuleMode.Write,
277+
null, latestSchema, value, fieldTransformer)
278+
.ContinueWith(t => (T)t.Result)
278279
.ConfigureAwait(continueOnCapturedContext: false);
279280
}
280-
281+
281282
schemaId.MessageIndexes = indexArray;
282283

284+
var buffer = new byte[value.CalculateSize()];
285+
value.WriteTo(buffer);
286+
buffer = await ExecuteRules(context.Component == MessageComponentType.Key,
287+
subject, context.Topic, context.Headers, RulePhase.Encoding, RuleMode.Write,
288+
null, latestSchema, buffer, null)
289+
.ContinueWith(t => (byte[])t.Result)
290+
.ConfigureAwait(continueOnCapturedContext: false);
291+
283292
var schemaIdSize = schemaIdEncoder.CalculateSize(ref schemaId);
284-
var serializedMessageSize = value.CalculateSize();
293+
var serializedMessageSize = buffer.Length;
285294

286-
var bufferSize = schemaIdSize // Schema ID size
287-
+ serializedMessageSize; // Serialized message size
288-
289-
var buffer = new byte[bufferSize];
290-
schemaIdEncoder.Encode(buffer, ref context, ref schemaId);
291-
value.WriteTo(buffer.AsSpan(schemaIdSize));
295+
var result = new byte[schemaIdSize + serializedMessageSize];
296+
schemaIdEncoder.Encode(result, ref context, ref schemaId);
297+
buffer.AsSpan().CopyTo(result.AsSpan(schemaIdSize));
292298

293-
return buffer;
299+
return result;
294300
}
295301
catch (AggregateException e)
296302
{

0 commit comments

Comments
 (0)