Skip to content

Commit 539279f

Browse files
authored
Add tests for JSON with def and avro union with ref (#185)
1 parent 465d93a commit 539279f

File tree

3 files changed

+213
-5
lines changed

3 files changed

+213
-5
lines changed

schemaregistry/serde/avro.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,13 @@ export class AvroSerializer extends Serializer implements AvroSerde {
8383
}
8484
}
8585
const [id, info] = await this.getId(topic, msg, schema)
86-
let avroSchema: avro.Type
86+
let avroType: avro.Type
8787
let deps: Map<string, string>
88-
[avroSchema, deps] = await this.toType(info)
88+
[avroType, deps] = await this.toType(info)
8989
const subject = this.subjectName(topic, info)
9090
msg = await this.executeRules(
9191
subject, topic, RuleMode.WRITE, null, info, msg, getInlineTags(info, deps))
92-
const msgBytes = avroSchema.toBuffer(msg)
92+
const msgBytes = avroType.toBuffer(msg)
9393
return this.writeBytes(id, msgBytes)
9494
}
9595

schemaregistry/test/serde/avro.spec.ts

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ describe('AvroSerializer', () => {
366366
schemaType: 'AVRO',
367367
schema: demoSchema,
368368
}
369-
await client.register('demo-value', info , false)
369+
await client.register('demo-value', info, false)
370370

371371
info = {
372372
schemaType: 'AVRO',
@@ -377,7 +377,7 @@ describe('AvroSerializer', () => {
377377
version: 1
378378
}]
379379
}
380-
await client.register(subject, info , false)
380+
await client.register(subject, info, false)
381381

382382
let nested = {
383383
intField: 123,
@@ -454,6 +454,111 @@ describe('AvroSerializer', () => {
454454
const obj2 = await deser.deserialize(topic, bytes);
455455
expect(obj2.color).toEqual(obj.color);
456456
})
457+
it('serialize reference', async () => {
458+
let conf: ClientConfig = {
459+
baseURLs: [baseURL],
460+
cacheCapacity: 1000
461+
}
462+
let client = SchemaRegistryClient.newClient(conf)
463+
let ser = new AvroSerializer(client, SerdeType.VALUE, {useLatestVersion: true})
464+
465+
let info: SchemaInfo = {
466+
schemaType: 'AVRO',
467+
schema: demoSchema,
468+
}
469+
await client.register('demo-value', info, false)
470+
471+
info = {
472+
schemaType: 'AVRO',
473+
schema: rootPointerSchema,
474+
references: [{
475+
name: 'DemoSchema',
476+
subject: 'demo-value',
477+
version: 1
478+
}]
479+
}
480+
await client.register(subject, info, false)
481+
482+
let nested = {
483+
intField: 123,
484+
doubleField: 45.67,
485+
stringField: 'hi',
486+
boolField: true,
487+
bytesField: Buffer.from([1, 2]),
488+
}
489+
let obj = {
490+
otherField: nested
491+
}
492+
let bytes = await ser.serialize(topic, obj)
493+
494+
let deser = new AvroDeserializer(client, SerdeType.VALUE, {})
495+
let obj2 = await deser.deserialize(topic, bytes)
496+
expect(obj2.otherField.intField).toEqual(nested.intField);
497+
expect(obj2.otherField.doubleField).toBeCloseTo(nested.doubleField, 0.001);
498+
expect(obj2.otherField.stringField).toEqual(nested.stringField);
499+
expect(obj2.otherField.boolField).toEqual(nested.boolField);
500+
expect(obj2.otherField.bytesField).toEqual(nested.bytesField);
501+
})
502+
it('serialize union with references', async () => {
503+
let conf: ClientConfig = {
504+
baseURLs: [baseURL],
505+
cacheCapacity: 1000
506+
}
507+
let client = SchemaRegistryClient.newClient(conf)
508+
let ser = new AvroSerializer(client, SerdeType.VALUE, {useLatestVersion: true})
509+
510+
let info: SchemaInfo = {
511+
schemaType: 'AVRO',
512+
schema: demoSchema,
513+
}
514+
await client.register('demo-value', info, false)
515+
516+
info = {
517+
schemaType: 'AVRO',
518+
schema: complexSchema,
519+
}
520+
await client.register('complex-value', info, false)
521+
522+
info = {
523+
schemaType: 'AVRO',
524+
schema: '[ "DemoSchema", "ComplexSchema" ]',
525+
references: [
526+
{
527+
name: 'DemoSchema',
528+
subject: 'demo-value',
529+
version: 1
530+
},
531+
{
532+
name: 'ComplexSchema',
533+
subject: 'complex-value',
534+
version: 1
535+
}]
536+
}
537+
await client.register(subject, info, false)
538+
539+
let obj = {
540+
intField: 123,
541+
doubleField: 45.67,
542+
stringField: 'hi',
543+
boolField: true,
544+
bytesField: Buffer.from([1, 2]),
545+
}
546+
// need to wrap union
547+
let union = { DemoSchema: obj }
548+
let bytes = await ser.serialize(topic, union)
549+
550+
let deser = new AvroDeserializer(client, SerdeType.VALUE, {})
551+
let union2 = await deser.deserialize(topic, bytes)
552+
553+
// need to unwrap union
554+
let obj2 = union2.DemoSchema
555+
556+
expect(obj2.intField).toEqual(obj.intField);
557+
expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001);
558+
expect(obj2.stringField).toEqual(obj.stringField);
559+
expect(obj2.boolField).toEqual(obj.boolField);
560+
expect(obj2.bytesField).toEqual(obj.bytesField);
561+
})
457562
it('schema evolution', async () => {
458563
let conf: ClientConfig = {
459564
baseURLs: [baseURL],

schemaregistry/test/serde/json.spec.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,38 @@ const complexSchema = `
145145
}
146146
}
147147
`
148+
const defSchema = `
149+
{
150+
"$schema" : "http://json-schema.org/draft-07/schema#",
151+
"additionalProperties" : false,
152+
"definitions" : {
153+
"Address" : {
154+
"additionalProperties" : false,
155+
"properties" : {
156+
"doornumber" : {
157+
"type" : "integer"
158+
},
159+
"doorpin" : {
160+
"confluent:tags" : [ "PII" ],
161+
"type" : "string"
162+
}
163+
},
164+
"type" : "object"
165+
}
166+
},
167+
"properties" : {
168+
"address" : {
169+
"$ref" : "#/definitions/Address"
170+
},
171+
"name" : {
172+
"confluent:tags" : [ "PII" ],
173+
"type" : "string"
174+
}
175+
},
176+
"title" : "Sample Event",
177+
"type" : "object"
178+
}
179+
`
148180

149181
describe('JsonSerializer', () => {
150182
afterEach(async () => {
@@ -501,6 +533,77 @@ describe('JsonSerializer', () => {
501533
obj2 = await deser.deserialize(topic, bytes)
502534
expect(obj2).not.toEqual(obj);
503535
})
536+
it('encryption with def', async () => {
537+
let conf: ClientConfig = {
538+
baseURLs: [baseURL],
539+
cacheCapacity: 1000
540+
}
541+
let client = SchemaRegistryClient.newClient(conf)
542+
let serConfig: JsonSerializerConfig = {
543+
useLatestVersion: true,
544+
ruleConfig: {
545+
secret: 'mysecret'
546+
}
547+
}
548+
let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig)
549+
let dekClient = fieldEncryptionExecutor.client!
550+
551+
let encRule: Rule = {
552+
name: 'test-encrypt',
553+
kind: 'TRANSFORM',
554+
mode: RuleMode.WRITEREAD,
555+
type: 'ENCRYPT',
556+
tags: ['PII'],
557+
params: {
558+
'encrypt.kek.name': 'kek1',
559+
'encrypt.kms.type': 'local-kms',
560+
'encrypt.kms.key.id': 'mykey',
561+
},
562+
onFailure: 'ERROR,NONE'
563+
}
564+
let ruleSet: RuleSet = {
565+
domainRules: [encRule]
566+
}
567+
568+
let info: SchemaInfo = {
569+
schemaType: 'JSON',
570+
schema: defSchema,
571+
ruleSet
572+
}
573+
574+
await client.register(subject, info, false)
575+
576+
let addr = {
577+
doornumber: 123,
578+
doorpin: 'hi'
579+
}
580+
let obj = {
581+
address: addr,
582+
name: 'bob'
583+
}
584+
let bytes = await ser.serialize(topic, obj)
585+
586+
// reset encrypted field
587+
obj.name = 'bob'
588+
obj.address.doorpin = 'hi'
589+
590+
let deserConfig: JsonDeserializerConfig = {
591+
ruleConfig: {
592+
secret: 'mysecret'
593+
}
594+
}
595+
let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig)
596+
fieldEncryptionExecutor.client = dekClient
597+
let obj2 = await deser.deserialize(topic, bytes)
598+
expect(obj2).toEqual(obj)
599+
600+
clearKmsClients()
601+
let registry = new RuleRegistry()
602+
registry.registerExecutor(new FieldEncryptionExecutor())
603+
deser = new JsonDeserializer(client, SerdeType.VALUE, {}, registry)
604+
obj2 = await deser.deserialize(topic, bytes)
605+
expect(obj2).not.toEqual(obj);
606+
})
504607
it('encryption with union', async () => {
505608
let conf: ClientConfig = {
506609
baseURLs: [baseURL],

0 commit comments

Comments
 (0)