Skip to content

Commit 317e123

Browse files
authored
DGS-21988 Fix transformation of nullable JSON props (#359)
* DGS-21988 Fix transformation of nullable JSON props * Minor cleanup * Minor cleanup
1 parent 43f3ed9 commit 317e123

File tree

2 files changed

+149
-0
lines changed

2 files changed

+149
-0
lines changed

schemaregistry/serde/json.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,17 @@ async function transform(ctx: RuleContext, schema: DereferencedJSONSchema, path:
332332
if (fieldCtx != null) {
333333
fieldCtx.type = getType(schema)
334334
}
335+
if (schema.type != null && Array.isArray(schema.type) && schema.type.length > 0) {
336+
let originalType = schema.type
337+
let subschema = validateSubtypes(schema, msg)
338+
try {
339+
if (subschema != null) {
340+
return await transform(ctx, subschema, path, msg, fieldTransform)
341+
}
342+
} finally {
343+
schema.type = originalType
344+
}
345+
}
335346
if (schema.allOf != null && schema.allOf.length > 0) {
336347
let subschema = validateSubschemas(schema.allOf, msg)
337348
if (subschema != null) {
@@ -406,6 +417,25 @@ async function transformField(ctx: RuleContext, path: string, propName: string,
406417
}
407418
}
408419

420+
function validateSubtypes(schema: DereferencedJSONSchema, msg: any): DereferencedJSONSchema | null {
421+
if (typeof schema === 'boolean') {
422+
return null
423+
}
424+
if (schema.type == null || !Array.isArray(schema.type) || schema.type.length === 0) {
425+
return null
426+
}
427+
for (let typ of schema.type) {
428+
schema.type = typ
429+
try {
430+
validateJSON(msg, schema)
431+
return schema
432+
} catch (error) {
433+
// ignore
434+
}
435+
}
436+
return null
437+
}
438+
409439
function validateSubschemas(subschemas: DereferencedJSONSchema[], msg: any): DereferencedJSONSchema | null {
410440
for (let subschema of subschemas) {
411441
try {

schemaregistry/test/serde/json.spec.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@ import {RuleRegistry} from "@confluentinc/schemaregistry/serde/rule-registry";
2525
import stringify from "json-stringify-deterministic";
2626
import {JsonataExecutor} from "@confluentinc/schemaregistry/rules/jsonata/jsonata-executor";
2727
import {clearKmsClients} from "@confluentinc/schemaregistry/rules/encryption/kms-registry";
28+
import {CelExecutor} from "../../rules/cel/cel-executor";
29+
import {CelFieldExecutor} from "../../rules/cel/cel-field-executor";
2830

2931
const encryptionExecutor = EncryptionExecutor.register()
3032
const fieldEncryptionExecutor = FieldEncryptionExecutor.register()
33+
CelExecutor.register()
34+
CelFieldExecutor.register()
3135
JsonataExecutor.register()
3236
LocalKmsDriver.register()
3337

@@ -73,6 +77,25 @@ const demoSchema = `
7377
}
7478
}
7579
`
80+
const demoSchemaWithNullable = `
81+
{
82+
"type": "object",
83+
"properties": {
84+
"intField": { "type": "integer" },
85+
"doubleField": { "type": "number" },
86+
"stringField": {
87+
"type": ["string", "null"],
88+
"confluent:tags": [ "PII" ]
89+
},
90+
"boolField": { "type": "boolean" },
91+
"bytesField": {
92+
"type": "string",
93+
"contentEncoding": "base64",
94+
"confluent:tags": [ "PII" ]
95+
}
96+
}
97+
}
98+
`
7699
const demoSchemaWithUnion = `
77100
{
78101
"type": "object",
@@ -424,6 +447,102 @@ describe('JsonSerializer', () => {
424447

425448
await expect(() => ser.serialize(topic, diffObj)).rejects.toThrow(SerializationError)
426449
})
450+
it('cel field transform', async () => {
451+
let conf: ClientConfig = {
452+
baseURLs: [baseURL],
453+
cacheCapacity: 1000
454+
}
455+
let client = SchemaRegistryClient.newClient(conf)
456+
let serConfig: JsonSerializerConfig = {
457+
useLatestVersion: true,
458+
}
459+
let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig)
460+
461+
let encRule: Rule = {
462+
name: 'test-cel',
463+
kind: 'TRANSFORM',
464+
mode: RuleMode.WRITE,
465+
type: 'CEL_FIELD',
466+
expr: "name == 'stringField' ; value + '-suffix'"
467+
}
468+
let ruleSet: RuleSet = {
469+
domainRules: [encRule]
470+
}
471+
472+
let info: SchemaInfo = {
473+
schemaType: 'JSON',
474+
schema: demoSchema,
475+
ruleSet
476+
}
477+
478+
await client.register(subject, info, false)
479+
480+
let obj = {
481+
intField: 123,
482+
doubleField: 45.67,
483+
stringField: 'hi',
484+
boolField: true,
485+
bytesField: Buffer.from([0, 0, 0, 1]).toString('base64')
486+
}
487+
let bytes = await ser.serialize(topic, obj)
488+
489+
let deserConfig: JsonDeserializerConfig = {}
490+
let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig)
491+
let obj2 = await deser.deserialize(topic, bytes)
492+
expect(obj2.intField).toEqual(obj.intField);
493+
expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001);
494+
expect(obj2.stringField).toEqual('hi-suffix');
495+
expect(obj2.boolField).toEqual(obj.boolField);
496+
expect(obj2.bytesField).toEqual(obj.bytesField);
497+
})
498+
it('cel field transform with nullable', async () => {
499+
let conf: ClientConfig = {
500+
baseURLs: [baseURL],
501+
cacheCapacity: 1000
502+
}
503+
let client = SchemaRegistryClient.newClient(conf)
504+
let serConfig: JsonSerializerConfig = {
505+
useLatestVersion: true,
506+
}
507+
let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig)
508+
509+
let encRule: Rule = {
510+
name: 'test-cel',
511+
kind: 'TRANSFORM',
512+
mode: RuleMode.WRITE,
513+
type: 'CEL_FIELD',
514+
expr: "name == 'stringField' ; value + '-suffix'"
515+
}
516+
let ruleSet: RuleSet = {
517+
domainRules: [encRule]
518+
}
519+
520+
let info: SchemaInfo = {
521+
schemaType: 'JSON',
522+
schema: demoSchemaWithNullable,
523+
ruleSet
524+
}
525+
526+
await client.register(subject, info, false)
527+
528+
let obj = {
529+
intField: 123,
530+
doubleField: 45.67,
531+
stringField: 'hi',
532+
boolField: true,
533+
bytesField: Buffer.from([0, 0, 0, 1]).toString('base64')
534+
}
535+
let bytes = await ser.serialize(topic, obj)
536+
537+
let deserConfig: JsonDeserializerConfig = {}
538+
let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig)
539+
let obj2 = await deser.deserialize(topic, bytes)
540+
expect(obj2.intField).toEqual(obj.intField);
541+
expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001);
542+
expect(obj2.stringField).toEqual('hi-suffix');
543+
expect(obj2.boolField).toEqual(obj.boolField);
544+
expect(obj2.bytesField).toEqual(obj.bytesField);
545+
})
427546
it('basic encryption', async () => {
428547
let conf: ClientConfig = {
429548
baseURLs: [baseURL],

0 commit comments

Comments
 (0)