Skip to content

Commit cefe511

Browse files
authored
DGS-22343 Fix transformation of union of JSON refs (#2525)
* DGS-22343 Fix transformation of union of JSON refs * Minor cleanup
1 parent abde168 commit cefe511

File tree

2 files changed

+148
-3
lines changed

2 files changed

+148
-3
lines changed

src/Confluent.SchemaRegistry.Serdes.Json/JsonUtils.cs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,20 @@ public static async Task<object> Transform(RuleContext ctx, JsonSchema schema, s
7777
if (schema.AllOf.Count > 0 || schema.AnyOf.Count > 0 || schema.OneOf.Count > 0)
7878
{
7979
JToken jsonObject = JToken.FromObject(message);
80-
foreach (JsonSchema subschema in schema.AllOf)
80+
ICollection<JsonSchema> subschemas;
81+
if (schema.AllOf.Count > 0)
82+
{
83+
subschemas = schema.AllOf;
84+
}
85+
else if (schema.AnyOf.Count > 0)
86+
{
87+
subschemas = schema.AnyOf;
88+
}
89+
else
90+
{
91+
subschemas = schema.OneOf;
92+
}
93+
foreach (JsonSchema subschema in subschemas)
8194
{
8295
var validator = new JsonSchemaValidator();
8396
var errors = validator.Validate(jsonObject, subschema);
@@ -105,14 +118,22 @@ public static async Task<object> Transform(RuleContext ctx, JsonSchema schema, s
105118
Transform(ctx, subschema, path + '[' + index + ']', elem, fieldTransform);
106119
return await Utils.TransformEnumerableAsync(message, transformer).ConfigureAwait(false);
107120
}
108-
else if (schema.IsObject)
121+
else if (schema.IsObject || schema.Properties.Count > 0)
109122
{
110123
foreach (var it in schema.Properties)
111124
{
112125
string fullName = path + '.' + it.Key;
113126
using (ctx.EnterField(message, fullName, it.Key, GetType(it.Value), GetInlineTags(it.Value)))
114127
{
115-
FieldAccessor fieldAccessor = new FieldAccessor(message.GetType(), it.Key);
128+
FieldAccessor fieldAccessor;
129+
try
130+
{
131+
fieldAccessor = new FieldAccessor(message.GetType(), it.Key);
132+
}
133+
catch (ArgumentException)
134+
{
135+
continue;
136+
}
116137
object value = fieldAccessor.GetFieldValue(message);
117138
object newValue = await Transform(ctx, it.Value, fullName, value, fieldTransform).ConfigureAwait(false);
118139
if (ctx.Rule.Kind == RuleKind.Condition)

test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,111 @@ public void CELFieldTransformWithNullable()
757757
Assert.Equal(user.FavoriteNumber, result.FavoriteNumber);
758758
}
759759

760+
[Fact]
761+
public void CELFieldTransformWithUnionOfRefs()
762+
{
763+
var schemaStr = @"{
764+
""type"": ""object"",
765+
""properties"": {
766+
""messageType"": {
767+
""type"": ""string""
768+
},
769+
""version"": {
770+
""type"": ""string""
771+
},
772+
""payload"": {
773+
""type"": ""object"",
774+
""oneOf"": [
775+
{
776+
""$ref"": ""#/$defs/authentication_request""
777+
},
778+
{
779+
""$ref"": ""#/$defs/authentication_status""
780+
}
781+
]
782+
}
783+
},
784+
""required"": [
785+
""payload"",
786+
""messageType"",
787+
""version""
788+
],
789+
""$defs"": {
790+
""authentication_request"": {
791+
""properties"": {
792+
""messageId"": {
793+
""type"": ""string"",
794+
""confluent:tags"": [""PII""]
795+
},
796+
""timestamp"": {
797+
""type"": ""integer"",
798+
""minimum"": 0
799+
},
800+
""requestId"": {
801+
""type"": ""string""
802+
}
803+
},
804+
""required"": [
805+
""messageId"",
806+
""timestamp""
807+
]
808+
},
809+
""authentication_status"": {
810+
""properties"": {
811+
""messageId"": {
812+
""type"": ""string"",
813+
""confluent:tags"": [""PII""]
814+
},
815+
""authType"": {
816+
""type"": [
817+
""string"",
818+
""null""
819+
]
820+
}
821+
},
822+
""required"": [
823+
""messageId"",
824+
""authType""
825+
]
826+
}
827+
}
828+
}";
829+
var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Json, null);
830+
schema.RuleSet = new RuleSet(new List<Rule>(),
831+
new List<Rule>
832+
{
833+
new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null,
834+
"typeName == 'STRING' ; value + '-suffix'", null, null, false)
835+
}
836+
);
837+
store[schemaStr] = 1;
838+
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
839+
var config = new JsonSerializerConfig
840+
{
841+
AutoRegisterSchemas = false,
842+
UseLatestVersion = true
843+
};
844+
var serializer = new JsonSerializer<Message>(schemaRegistryClient, config);
845+
var deserializer = new JsonDeserializer<Message>(schemaRegistryClient);
846+
847+
var msg = new Message
848+
{
849+
MessageType = "authentication_request",
850+
Version = "1.0",
851+
Payload = new Payload
852+
{
853+
MessageId = "12345",
854+
Timestamp = 1757410647
855+
}
856+
};
857+
858+
Headers headers = new Headers();
859+
var bytes = serializer.SerializeAsync(msg, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
860+
var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
861+
862+
Assert.Equal("12345-suffix", result.Payload.MessageId);
863+
}
864+
760865
[Fact]
761866
public void CELFieldTransformWithDef()
762867
{
@@ -1404,4 +1509,23 @@ class Address
14041509
public int DoorNumber { get; set; }
14051510
public string DoorPin { get; set; }
14061511
}
1512+
1513+
class Message
1514+
{
1515+
[JsonProperty("messageType")]
1516+
public string MessageType { get; set; }
1517+
[JsonProperty("version")]
1518+
public string Version { get; set; }
1519+
[JsonProperty("payload")]
1520+
public Payload Payload { get; set; }
1521+
}
1522+
1523+
class Payload
1524+
{
1525+
[JsonProperty("messageId")]
1526+
public string MessageId { get; set; }
1527+
[JsonProperty("timestamp")]
1528+
public int Timestamp { get; set; }
1529+
}
1530+
14071531
}

0 commit comments

Comments
 (0)