Skip to content

Commit 69e9b4c

Browse files
authored
Ensure proper reader schema in rules for deserializer (#2338)
* Ensure proper reader schema in rules for deserializer * Minor formatting * Add test
1 parent 2efbc4f commit 69e9b4c

File tree

4 files changed

+87
-13
lines changed

4 files changed

+87
-13
lines changed

src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
8181

8282
Schema writerSchemaJson;
8383
Avro.Schema writerSchema;
84+
Schema readerSchemaJson;
85+
Avro.Schema readerSchema;
8486
GenericRecord data;
8587
IList<Migration> migrations = new List<Migration>();
8688
using (var stream = new MemoryStream(array))
@@ -133,22 +135,23 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
133135
json = await ExecuteMigrations(migrations, isKey, subject, topic, headers, json)
134136
.ContinueWith(t => (JToken)t.Result)
135137
.ConfigureAwait(continueOnCapturedContext: false);
136-
var latestSchemaAvro = await GetParsedSchema(latestSchema);
137-
Avro.IO.Decoder decoder = new JsonDecoder(latestSchemaAvro, json.ToString(Formatting.None));
138+
readerSchemaJson = latestSchema;
139+
readerSchema = await GetParsedSchema(readerSchemaJson);
140+
Avro.IO.Decoder decoder = new JsonDecoder(readerSchema, json.ToString(Formatting.None));
138141

139-
datumReader = new GenericReader<GenericRecord>(latestSchemaAvro, latestSchemaAvro);
142+
datumReader = new GenericReader<GenericRecord>(readerSchema, readerSchema);
140143
data = datumReader.Read(default(GenericRecord), decoder);
141144
}
142145
else
143146
{
144-
Avro.Schema readerSchema;
145147
if (latestSchema != null)
146148
{
147-
var latestSchemaAvro = await GetParsedSchema(latestSchema);
148-
readerSchema = latestSchemaAvro;
149+
readerSchemaJson = latestSchema;
150+
readerSchema = await GetParsedSchema(readerSchemaJson);
149151
}
150152
else
151153
{
154+
readerSchemaJson = writerSchemaJson;
152155
readerSchema = writerSchema;
153156
}
154157
datumReader = await GetDatumReader(writerSchema, readerSchema);
@@ -158,10 +161,10 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
158161

159162
FieldTransformer fieldTransformer = async (ctx, transform, message) =>
160163
{
161-
return await AvroUtils.Transform(ctx, writerSchema, message, transform).ConfigureAwait(false);
164+
return await AvroUtils.Transform(ctx, readerSchema, message, transform).ConfigureAwait(false);
162165
};
163166
data = await ExecuteRules(isKey, subject, topic, headers, RuleMode.Read, null,
164-
writerSchemaJson, data, fieldTransformer)
167+
readerSchemaJson, data, fieldTransformer)
165168
.ContinueWith(t => (GenericRecord)t.Result)
166169
.ConfigureAwait(continueOnCapturedContext: false);
167170

src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,14 @@ public async Task<T> Deserialize(string topic, Headers headers, byte[] array, bo
199199
}
200200
}
201201

202-
FieldTransformer fieldTransformer = async (ctx, transform, message) =>
202+
Schema readerSchemaJson = latestSchema ?? writerSchemaJson;
203+
Avro.Schema readerSchema = latestSchema != null ? await GetParsedSchema(latestSchema) : writerSchema;
204+
FieldTransformer fieldTransformer = async (ctx, transform, message) =>
203205
{
204-
return await AvroUtils.Transform(ctx, writerSchema, message, transform).ConfigureAwait(false);
206+
return await AvroUtils.Transform(ctx, readerSchema, message, transform).ConfigureAwait(false);
205207
};
206208
data = await ExecuteRules(isKey, subject, topic, headers, RuleMode.Read, null,
207-
writerSchemaJson, data, fieldTransformer)
209+
readerSchemaJson, data, fieldTransformer)
208210
.ConfigureAwait(continueOnCapturedContext: false);
209211

210212
return (T) data;

src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,15 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
270270
}
271271
if (writerSchema != null)
272272
{
273+
Schema readerSchemaJson = latestSchema ?? writerSchema;
274+
JsonSchema readerSchema = latestSchema != null ? await GetParsedSchema(latestSchema) : writerSchemaJson;
273275
FieldTransformer fieldTransformer = async (ctx, transform, message) =>
274276
{
275-
return await JsonUtils.Transform(ctx, writerSchemaJson, "$", message, transform).ConfigureAwait(false);
277+
return await JsonUtils.Transform(ctx, readerSchema, "$", message, transform).ConfigureAwait(false);
276278
};
277279
value = await ExecuteRules(context.Component == MessageComponentType.Key, subject,
278280
context.Topic, context.Headers, RuleMode.Read, null,
279-
writerSchema, value, fieldTransformer)
281+
readerSchemaJson, value, fieldTransformer)
280282
.ContinueWith(t => (T)t.Result)
281283
.ConfigureAwait(continueOnCapturedContext: false);
282284
}

test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,6 +1307,73 @@ dekStore[new DekId("kek1", "topic-value", 1, DekFormat.AES256_GCM, false)] =
13071307
Assert.Equal("hello world", result["f1"]);
13081308
}
13091309

1310+
[Fact]
1311+
public void GenericRecordJSONataWithCEL()
1312+
{
1313+
var rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'name'}), {'full_name': $.'name'}])";
1314+
1315+
var schemaStr = User._SCHEMA.ToString();
1316+
var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Avro, null);
1317+
schema.Metadata = new Metadata(null, new Dictionary<string, string>
1318+
{
1319+
{ "application.version", "1"}
1320+
1321+
}, new HashSet<string>()
1322+
);
1323+
store[schemaStr] = 1;
1324+
var config1 = new AvroSerializerConfig
1325+
{
1326+
AutoRegisterSchemas = false,
1327+
UseLatestVersion = false,
1328+
UseLatestWithMetadata = new Dictionary<string, string>{ { "application.version", "1"} }
1329+
};
1330+
var serializer1 = new AvroSerializer<GenericRecord>(schemaRegistryClient, config1);
1331+
1332+
var user = new GenericRecord((RecordSchema) User._SCHEMA);
1333+
user.Add("name", "awesome");
1334+
user.Add("favorite_number", 100);
1335+
user.Add("favorite_color", "blue");
1336+
1337+
var newSchemaStr = NewUser._SCHEMA.ToString();
1338+
var newSchema = new RegisteredSchema("topic-value", 2, 2, newSchemaStr, SchemaType.Avro, null);
1339+
newSchema.Metadata = new Metadata(null, new Dictionary<string, string>
1340+
{
1341+
{ "application.version", "2"}
1342+
1343+
}, new HashSet<string>()
1344+
);
1345+
newSchema.RuleSet = new RuleSet(
1346+
new List<Rule>
1347+
{
1348+
new Rule("myRule1", RuleKind.Transform, RuleMode.Upgrade, "JSONATA", null,
1349+
null, rule1To2, null, null, false)
1350+
}, new List<Rule>
1351+
{
1352+
new Rule("myRule2", RuleKind.Transform, RuleMode.Read, "CEL_FIELD", null,
1353+
null, "name == 'full_name' ; value + '-suffix'", null, null, false)
1354+
}
1355+
);
1356+
var deserConfig2 = new AvroDeserializerConfig
1357+
{
1358+
UseLatestVersion = false,
1359+
UseLatestWithMetadata = new Dictionary<string, string>{ { "application.version", "2"} }
1360+
};
1361+
var deserializer2 = new AvroDeserializer<GenericRecord>(schemaRegistryClient, deserConfig2);
1362+
1363+
store[schemaStr] = 1;
1364+
store[newSchemaStr] = 2;
1365+
subjectStore["topic-value"] = new List<RegisteredSchema> { schema, newSchema };
1366+
1367+
Headers headers = new Headers();
1368+
var bytes = serializer1.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
1369+
1370+
var result2 = deserializer2.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
1371+
1372+
Assert.Equal("awesome-suffix", result2["full_name"]);
1373+
Assert.Equal(user["favorite_color"], result2["favorite_color"]);
1374+
Assert.Equal(user["favorite_number"], result2["favorite_number"]);
1375+
}
1376+
13101377
[Fact]
13111378
public void GenericRecordJSONataFullyCompatible()
13121379
{

0 commit comments

Comments
 (0)