Skip to content

Commit 0836309

Browse files
authored
DGS-22343 Fix transformation of union of JSON refs (#1475)
1 parent 2d291dc commit 0836309

File tree

2 files changed

+151
-1
lines changed

2 files changed

+151
-1
lines changed

schemaregistry/serde/jsonschema/json_schema_test.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,75 @@ const (
234234
"title" : "Sample Event",
235235
"type" : "object"
236236
}
237+
`
238+
messageSchema = `
239+
{
240+
241+
"type": "object",
242+
"properties": {
243+
"messageType": {
244+
"type": "string"
245+
},
246+
"version": {
247+
"type": "string"
248+
},
249+
"payload": {
250+
"type": "object",
251+
"oneOf": [
252+
{
253+
"$ref": "#/$defs/authentication_request"
254+
},
255+
{
256+
"$ref": "#/$defs/authentication_status"
257+
}
258+
]
259+
}
260+
},
261+
"required": [
262+
"payload",
263+
"messageType",
264+
"version"
265+
],
266+
"$defs": {
267+
"authentication_request": {
268+
"properties": {
269+
"messageId": {
270+
"type": "string",
271+
"confluent:tags": ["PII"]
272+
},
273+
"timestamp": {
274+
"type": "integer",
275+
"minimum": 0
276+
},
277+
"requestId": {
278+
"type": "string"
279+
}
280+
},
281+
"required": [
282+
"messageId",
283+
"timestamp"
284+
]
285+
},
286+
"authentication_status": {
287+
"properties": {
288+
"messageId": {
289+
"type": "string",
290+
"confluent:tags": ["PII"]
291+
},
292+
"authType": {
293+
"type": [
294+
"string",
295+
"null"
296+
]
297+
}
298+
},
299+
"required": [
300+
"messageId",
301+
"authType"
302+
]
303+
}
304+
}
305+
}
237306
`
238307
)
239308

@@ -756,6 +825,70 @@ func TestJSONSchemaSerdeWithCELFieldTransformWithNullable(t *testing.T) {
756825
serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj2))
757826
}
758827

828+
func TestJSONSchemaSerdeWithCELFieldTransformWithUnionOfRefs(t *testing.T) {
829+
serde.MaybeFail = serde.InitFailFunc(t)
830+
var err error
831+
832+
conf := schemaregistry.NewConfig("mock://")
833+
834+
client, err := schemaregistry.NewClient(conf)
835+
serde.MaybeFail("Schema Registry configuration", err)
836+
837+
serConfig := NewSerializerConfig()
838+
serConfig.AutoRegisterSchemas = false
839+
serConfig.UseLatestVersion = true
840+
ser, err := NewSerializer(client, serde.ValueSerde, serConfig)
841+
serde.MaybeFail("Serializer configuration", err)
842+
843+
encRule := schemaregistry.Rule{
844+
Name: "test-cel",
845+
Kind: "TRANSFORM",
846+
Mode: "WRITE",
847+
Type: "CEL_FIELD",
848+
Expr: "name == 'messageId' ; value + '-suffix'",
849+
}
850+
ruleSet := schemaregistry.RuleSet{
851+
DomainRules: []schemaregistry.Rule{encRule},
852+
}
853+
854+
info := schemaregistry.SchemaInfo{
855+
Schema: messageSchema,
856+
SchemaType: "JSON",
857+
RuleSet: &ruleSet,
858+
}
859+
860+
id, err := client.Register("topic1-value", info, false)
861+
serde.MaybeFail("Schema registration", err)
862+
if id <= 0 {
863+
t.Errorf("Expected valid schema id, found %d", id)
864+
}
865+
866+
payload := Payload{}
867+
payload.MessageID = "12345"
868+
payload.Timestamp = 12345
869+
obj := Message{}
870+
obj.MessageType = "authentication_request"
871+
obj.Version = "1.0"
872+
obj.Payload = payload
873+
874+
bytes, err := ser.Serialize("topic1", &obj)
875+
serde.MaybeFail("serialization", err)
876+
877+
deserConfig := NewDeserializerConfig()
878+
deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig)
879+
serde.MaybeFail("Deserializer configuration", err)
880+
deser.Client = ser.Client
881+
882+
payload2 := Payload{}
883+
payload2.MessageID = "12345-suffix"
884+
payload2.Timestamp = 12345
885+
obj.Payload = payload2
886+
887+
var newobj Message
888+
err = deser.DeserializeInto("topic1", bytes, &newobj)
889+
serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj))
890+
}
891+
759892
func TestJSONSchemaSerdeWithCELFieldTransformWithDef(t *testing.T) {
760893
serde.MaybeFail = serde.InitFailFunc(t)
761894
var err error
@@ -1896,3 +2029,17 @@ type JSONPerson struct {
18962029

18972030
Address Address `json:"address"`
18982031
}
2032+
2033+
type Message struct {
2034+
MessageType string `json:"messageType"`
2035+
2036+
Version string `json:"version"`
2037+
2038+
Payload Payload `json:"payload"`
2039+
}
2040+
2041+
type Payload struct {
2042+
MessageID string `json:"messageId"`
2043+
2044+
Timestamp int `json:"timestamp"`
2045+
}

schemaregistry/serde/jsonschema/json_schema_util.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func transform(ctx serde.RuleContext, schema *jsonschema2.Schema, path string, m
9696
for propName, propSchema := range schema.Properties {
9797
structField, ok := fieldByNames[propName]
9898
if !ok {
99-
return nil, fmt.Errorf("json: missing field %s", propName)
99+
continue
100100
}
101101
err := transformField(ctx, path, propName, structField, val, propSchema, fieldTransform)
102102
if err != nil {
@@ -233,6 +233,9 @@ func isModernJSONSchema(draft *jsonschema2.Draft) bool {
233233
func getType(schema *jsonschema2.Schema) serde.FieldType {
234234
types := schema.Types
235235
if len(types) == 0 {
236+
if len(schema.Properties) > 0 {
237+
return serde.TypeRecord
238+
}
236239
return serde.TypeNull
237240
}
238241
if len(types) > 1 || len(schema.AllOf) > 0 || len(schema.AnyOf) > 0 || len(schema.OneOf) > 0 {

0 commit comments

Comments
 (0)