Skip to content

Commit 3ae9609

Browse files
authored
DGS-22343 Fix transformation of union of JSON refs (#375)
* DGS-22343 Fix transformation of union of JSON refs * Minor cleanup
1 parent 749116f commit 3ae9609

File tree

2 files changed

+128
-6
lines changed

2 files changed

+128
-6
lines changed

schemaregistry/serde/json.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -404,13 +404,15 @@ async function transformField(ctx: RuleContext, path: string, propName: string,
404404
try {
405405
ctx.enterField(msg, fullName, propName, getType(propSchema), getInlineTags(propSchema))
406406
let value = msg[propName]
407-
const newVal = await transform(ctx, propSchema, fullName, value, fieldTransform)
408-
if (ctx.rule.kind === 'CONDITION') {
409-
if (newVal === false) {
410-
throw new RuleConditionError(ctx.rule)
407+
if (value != null) {
408+
const newVal = await transform(ctx, propSchema, fullName, value, fieldTransform)
409+
if (ctx.rule.kind === 'CONDITION') {
410+
if (newVal === false) {
411+
throw new RuleConditionError(ctx.rule)
412+
}
413+
} else {
414+
msg[propName] = newVal
411415
}
412-
} else {
413-
msg[propName] = newVal
414416
}
415417
} finally {
416418
ctx.leaveField()
@@ -453,6 +455,9 @@ function getType(schema: DereferencedJSONSchema): FieldType {
453455
return FieldType.NULL
454456
}
455457
if (schema.type == null) {
458+
if (schema.properties != null && Object.keys(schema.properties).length > 0) {
459+
return FieldType.RECORD
460+
}
456461
return FieldType.NULL
457462
}
458463
if (Array.isArray(schema.type)) {

schemaregistry/test/serde/json.spec.ts

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,75 @@ const defSchema = `
206206
"type" : "object"
207207
}
208208
`
209+
const messageSchema = `
210+
{
211+
212+
"type": "object",
213+
"properties": {
214+
"messageType": {
215+
"type": "string"
216+
},
217+
"version": {
218+
"type": "string"
219+
},
220+
"payload": {
221+
"type": "object",
222+
"oneOf": [
223+
{
224+
"$ref": "#/$defs/authentication_request"
225+
},
226+
{
227+
"$ref": "#/$defs/authentication_status"
228+
}
229+
]
230+
}
231+
},
232+
"required": [
233+
"payload",
234+
"messageType",
235+
"version"
236+
],
237+
"$defs": {
238+
"authentication_request": {
239+
"properties": {
240+
"messageId": {
241+
"type": "string",
242+
"confluent:tags": ["PII"]
243+
},
244+
"timestamp": {
245+
"type": "integer",
246+
"minimum": 0
247+
},
248+
"requestId": {
249+
"type": "string"
250+
}
251+
},
252+
"required": [
253+
"messageId",
254+
"timestamp"
255+
]
256+
},
257+
"authentication_status": {
258+
"properties": {
259+
"messageId": {
260+
"type": "string",
261+
"confluent:tags": ["PII"]
262+
},
263+
"authType": {
264+
"type": [
265+
"string",
266+
"null"
267+
]
268+
}
269+
},
270+
"required": [
271+
"messageId",
272+
"authType"
273+
]
274+
}
275+
}
276+
}
277+
`
209278

210279
describe('JsonSerializer', () => {
211280
afterEach(async () => {
@@ -543,6 +612,54 @@ describe('JsonSerializer', () => {
543612
expect(obj2.boolField).toEqual(obj.boolField);
544613
expect(obj2.bytesField).toEqual(obj.bytesField);
545614
})
615+
it('cel field transform with union of refs', async () => {
616+
let conf: ClientConfig = {
617+
baseURLs: [baseURL],
618+
cacheCapacity: 1000
619+
}
620+
let client = SchemaRegistryClient.newClient(conf)
621+
let serConfig: JsonSerializerConfig = {
622+
useLatestVersion: true,
623+
}
624+
let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig)
625+
626+
let encRule: Rule = {
627+
name: 'test-cel',
628+
kind: 'TRANSFORM',
629+
mode: RuleMode.WRITE,
630+
type: 'CEL_FIELD',
631+
expr: "name == 'messageId' ; value + '-suffix'"
632+
}
633+
let ruleSet: RuleSet = {
634+
domainRules: [encRule]
635+
}
636+
637+
let info: SchemaInfo = {
638+
schemaType: 'JSON',
639+
schema: messageSchema,
640+
ruleSet
641+
}
642+
643+
await client.register(subject, info, false)
644+
645+
let obj = {
646+
messageType: 'authentication_request',
647+
version: '1.0',
648+
payload: {
649+
messageId: '12345',
650+
timestamp: 123456789
651+
}
652+
}
653+
let bytes = await ser.serialize(topic, obj)
654+
655+
let deserConfig: JsonDeserializerConfig = {}
656+
let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig)
657+
let obj2 = await deser.deserialize(topic, bytes)
658+
expect(obj2.messageType).toEqual(obj.messageType);
659+
expect(obj2.version).toEqual(obj.version);
660+
expect(obj2.payload.messageId).toEqual('12345-suffix');
661+
expect(obj2.payload.timestamp).toEqual(obj.payload.timestamp);
662+
})
546663
it('basic encryption', async () => {
547664
let conf: ClientConfig = {
548665
baseURLs: [baseURL],

0 commit comments

Comments
 (0)