Skip to content

Commit 465d93a

Browse files
authored
Propertly handle wrapped Avro unions when performing field transformations (#186)
1 parent 9d172db commit 465d93a

File tree

2 files changed

+96
-5
lines changed

2 files changed

+96
-5
lines changed

schemaregistry/serde/avro.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -386,14 +386,25 @@ function resolveUnion(schema: Type, msg: any): Type | null {
386386
if (schema.typeName === 'union:unwrapped') {
387387
const union = schema as UnwrappedUnionType
388388
unionTypes = union.types.slice()
389+
if (unionTypes != null) {
390+
for (let i = 0; i < unionTypes.length; i++) {
391+
if (unionTypes[i].isValid(msg)) {
392+
return unionTypes[i]
393+
}
394+
}
395+
}
389396
} else if (schema.typeName === 'union:wrapped') {
390397
const union = schema as WrappedUnionType
391398
unionTypes = union.types.slice()
392-
}
393-
if (unionTypes != null) {
394-
for (let i = 0; i < unionTypes.length; i++) {
395-
if (unionTypes[i].isValid(msg)) {
396-
return unionTypes[i]
399+
if (typeof msg === 'object') {
400+
let keys = Object.keys(msg)
401+
if (keys.length === 1) {
402+
let name = keys[0]
403+
for (let i = 0; i < unionTypes.length; i++) {
404+
if (unionTypes[i].branchName === name) {
405+
return unionTypes[i]
406+
}
407+
}
397408
}
398409
}
399410
}

schemaregistry/test/serde/avro.spec.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,31 @@ const complexSchema = `
244244
]
245245
}
246246
`
247+
const unionFieldSchema = `
248+
{
249+
"type": "record",
250+
"name": "UnionTest",
251+
"namespace": "test",
252+
"fields": [
253+
{
254+
"name": "color",
255+
"type": [
256+
"string",
257+
{
258+
"type": "enum",
259+
"name": "Color",
260+
"symbols": [
261+
"RED",
262+
"BLUE"
263+
]
264+
}
265+
],
266+
"default": "BLUE"
267+
268+
}
269+
],
270+
"version": "1"
271+
}`;
247272

248273
class FakeClock extends Clock {
249274
fixedNow: number = 0
@@ -374,6 +399,61 @@ describe('AvroSerializer', () => {
374399
expect(obj2.otherField.boolField).toEqual(nested.boolField);
375400
expect(obj2.otherField.bytesField).toEqual(nested.bytesField);
376401
})
402+
it('field with union with non-applicable rule', async () => {
403+
const conf: ClientConfig = {
404+
baseURLs: [baseURL],
405+
cacheCapacity: 1000
406+
};
407+
const client = SchemaRegistryClient.newClient(conf)
408+
const serConfig: AvroSerializerConfig = {
409+
useLatestVersion: true,
410+
ruleConfig: {
411+
secret: 'mysecret'
412+
}
413+
};
414+
const ser = new AvroSerializer(client, SerdeType.VALUE, serConfig);
415+
const dekClient = fieldEncryptionExecutor.client!;
416+
417+
const encRule: Rule = {
418+
name: 'test-encrypt',
419+
kind: 'TRANSFORM',
420+
mode: RuleMode.WRITEREAD,
421+
type: 'ENCRYPT',
422+
tags: ['PII'],
423+
params: {
424+
'encrypt.kek.name': 'kek1',
425+
'encrypt.kms.type': 'local-kms',
426+
'encrypt.kms.key.id': 'mykey',
427+
},
428+
onFailure: 'ERROR,ERROR'
429+
};
430+
const ruleSet: RuleSet = {
431+
domainRules: [encRule]
432+
};
433+
434+
const info = {
435+
schemaType: 'AVRO',
436+
schema: unionFieldSchema,
437+
ruleSet
438+
};
439+
440+
await client.register(subject, info, false);
441+
442+
const obj = {
443+
color: {"test.Color": "BLUE"}
444+
};
445+
const bytes = await ser.serialize(topic, obj);
446+
447+
const deserConfig: AvroDeserializerConfig = {
448+
ruleConfig: {
449+
secret: 'mysecret'
450+
}
451+
};
452+
const deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig);
453+
fieldEncryptionExecutor.client = dekClient;
454+
const obj2 = await deser.deserialize(topic, bytes);
455+
expect(obj2.color).toEqual(obj.color);
456+
})
377457
it('schema evolution', async () => {
378458
let conf: ClientConfig = {
379459
baseURLs: [baseURL],

0 commit comments

Comments
 (0)