Skip to content

Commit 2d291dc

Browse files
authored
DGS-21988 Fix transformation of nullable JSON props (#1469)
1 parent 0e04e67 commit 2d291dc

File tree

2 files changed

+112
-0
lines changed

2 files changed

+112
-0
lines changed

schemaregistry/serde/jsonschema/json_schema_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,25 @@ const (
6464
}
6565
}
6666
}
67+
`
68+
demoSchemaWithNullable = `
69+
{
70+
"type": "object",
71+
"properties": {
72+
"IntField": { "type": "integer" },
73+
"DoubleField": { "type": "number" },
74+
"StringField": {
75+
"type": ["string", "null"],
76+
"confluent:tags": [ "PII" ]
77+
},
78+
"BoolField": { "type": "boolean" },
79+
"BytesField": {
80+
"type": "string",
81+
"contentEncoding": "base64",
82+
"confluent:tags": [ "PII" ]
83+
}
84+
}
85+
}
6786
`
6887
demoSchemaWithUnion = `
6988
{
@@ -672,6 +691,71 @@ func TestJSONSchemaSerdeWithCELFieldTransform(t *testing.T) {
672691
serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj2))
673692
}
674693

694+
func TestJSONSchemaSerdeWithCELFieldTransformWithNullable(t *testing.T) {
695+
serde.MaybeFail = serde.InitFailFunc(t)
696+
var err error
697+
698+
conf := schemaregistry.NewConfig("mock://")
699+
700+
client, err := schemaregistry.NewClient(conf)
701+
serde.MaybeFail("Schema Registry configuration", err)
702+
703+
serConfig := NewSerializerConfig()
704+
serConfig.AutoRegisterSchemas = false
705+
serConfig.UseLatestVersion = true
706+
ser, err := NewSerializer(client, serde.ValueSerde, serConfig)
707+
serde.MaybeFail("Serializer configuration", err)
708+
709+
encRule := schemaregistry.Rule{
710+
Name: "test-cel",
711+
Kind: "TRANSFORM",
712+
Mode: "WRITE",
713+
Type: "CEL_FIELD",
714+
Expr: "name == 'StringField' ; value + '-suffix'",
715+
}
716+
ruleSet := schemaregistry.RuleSet{
717+
DomainRules: []schemaregistry.Rule{encRule},
718+
}
719+
720+
info := schemaregistry.SchemaInfo{
721+
Schema: demoSchemaWithNullable,
722+
SchemaType: "JSON",
723+
RuleSet: &ruleSet,
724+
}
725+
726+
id, err := client.Register("topic1-value", info, false)
727+
serde.MaybeFail("Schema registration", err)
728+
if id <= 0 {
729+
t.Errorf("Expected valid schema id, found %d", id)
730+
}
731+
732+
obj := JSONDemoSchema{}
733+
obj.IntField = 123
734+
obj.DoubleField = 45.67
735+
obj.StringField = "hi"
736+
obj.BoolField = true
737+
obj.BytesField = base64.StdEncoding.EncodeToString([]byte{1, 2})
738+
739+
bytes, err := ser.Serialize("topic1", &obj)
740+
serde.MaybeFail("serialization", err)
741+
742+
deserConfig := NewDeserializerConfig()
743+
deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig)
744+
serde.MaybeFail("Deserializer configuration", err)
745+
deser.Client = ser.Client
746+
747+
obj2 := JSONDemoSchema{}
748+
obj2.IntField = 123
749+
obj2.DoubleField = 45.67
750+
obj2.StringField = "hi-suffix"
751+
obj2.BoolField = true
752+
obj2.BytesField = base64.StdEncoding.EncodeToString([]byte{1, 2})
753+
754+
var newobj JSONDemoSchema
755+
err = deser.DeserializeInto("topic1", bytes, &newobj)
756+
serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj2))
757+
}
758+
675759
func TestJSONSchemaSerdeWithCELFieldTransformWithDef(t *testing.T) {
676760
serde.MaybeFail = serde.InitFailFunc(t)
677761
var err error

schemaregistry/serde/jsonschema/json_schema_util.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,19 @@ func transform(ctx serde.RuleContext, schema *jsonschema2.Schema, path string, m
3333
if fieldCtx != nil {
3434
fieldCtx.Type = getType(schema)
3535
}
36+
if len(schema.Types) > 1 {
37+
originalTypes := schema.Types
38+
subschema, err := validateSubtypes(schema, msg)
39+
if err != nil {
40+
return nil, err
41+
}
42+
if subschema != nil {
43+
result, err := transform(ctx, subschema, path, msg, fieldTransform)
44+
schema.Types = originalTypes // restore original types
45+
return result, err
46+
}
47+
schema.Types = originalTypes // restore original types
48+
}
3649
if len(schema.AllOf) > 0 {
3750
subschema, err := validateSubschemas(schema.AllOf, msg)
3851
if err != nil {
@@ -182,6 +195,21 @@ func transformArray(ctx serde.RuleContext, msg *reflect.Value, sch *jsonschema2.
182195
return msg, nil
183196
}
184197

198+
func validateSubtypes(schema *jsonschema2.Schema, msg *reflect.Value) (*jsonschema2.Schema, error) {
199+
val := deref(msg)
200+
for _, typ := range schema.Types {
201+
schema.Types = []string{typ}
202+
valid, err := validate(schema, val)
203+
if err != nil {
204+
return nil, err
205+
}
206+
if valid {
207+
return schema, nil
208+
}
209+
}
210+
return nil, nil
211+
}
212+
185213
func validateSubschemas(subschemas []*jsonschema2.Schema, msg *reflect.Value) (*jsonschema2.Schema, error) {
186214
val := deref(msg)
187215
for _, subschema := range subschemas {

0 commit comments

Comments
 (0)