Skip to content

Commit 1f12bbe

Browse files
authored
Handle records nested in arrays/maps when searching for tags (#231)
1 parent 1abd90d commit 1f12bbe

File tree

2 files changed

+125
-22
lines changed

2 files changed

+125
-22
lines changed

schemaregistry/serde/avro.ts

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -430,30 +430,38 @@ function getInlineTagsRecursively(ns: string, name: string, schema: any, tags: M
430430
}
431431
} else if (typeof schema === 'object') {
432432
const type = schema['type']
433-
if (type === 'record') {
434-
let recordNs = schema['namespace']
435-
let recordName = schema['name']
436-
if (recordNs === undefined) {
437-
recordNs = impliedNamespace(name)
438-
}
439-
if (recordNs == null) {
440-
recordNs = ns
441-
}
442-
if (recordNs !== '' && !recordName.startsWith(recordNs)) {
443-
recordName = recordNs + '.' + recordName
444-
}
445-
const fields = schema['fields']
446-
for (const field of fields) {
447-
const fieldTags = field['confluent:tags']
448-
const fieldName = field['name']
449-
if (fieldTags !== undefined && fieldName !== undefined) {
450-
tags.set(recordName + '.' + fieldName, new Set(fieldTags))
433+
switch (type) {
434+
case 'array':
435+
getInlineTagsRecursively(ns, name, schema['items'], tags)
436+
break;
437+
case 'map':
438+
getInlineTagsRecursively(ns, name, schema['values'], tags)
439+
break;
440+
case 'record':
441+
let recordNs = schema['namespace']
442+
let recordName = schema['name']
443+
if (recordNs === undefined) {
444+
recordNs = impliedNamespace(name)
451445
}
452-
const fieldType = field['type']
453-
if (fieldType !== undefined) {
454-
getInlineTagsRecursively(recordNs, recordName, fieldType, tags)
446+
if (recordNs == null) {
447+
recordNs = ns
455448
}
456-
}
449+
if (recordNs !== '' && !recordName.startsWith(recordNs)) {
450+
recordName = recordNs + '.' + recordName
451+
}
452+
const fields = schema['fields']
453+
for (const field of fields) {
454+
const fieldTags = field['confluent:tags']
455+
const fieldName = field['name']
456+
if (fieldTags !== undefined && fieldName !== undefined) {
457+
tags.set(recordName + '.' + fieldName, new Set(fieldTags))
458+
}
459+
const fieldType = field['type']
460+
if (fieldType !== undefined) {
461+
getInlineTagsRecursively(recordNs, recordName, fieldType, tags)
462+
}
463+
}
464+
break;
457465
}
458466
}
459467
}

schemaregistry/test/serde/avro.spec.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,44 @@ const unionFieldSchema = `
270270
"version": "1"
271271
}`;
272272

273+
const complexNestedSchema = `
274+
{
275+
"type": "record",
276+
"name": "UnionTest",
277+
"namespace": "test",
278+
"fields": [
279+
{
280+
"name": "emails",
281+
"type": [
282+
"null",
283+
{
284+
"type": "array",
285+
"items": {
286+
"type": "record",
287+
"name": "Email",
288+
"fields": [
289+
{
290+
"name": "email",
291+
"type": [
292+
"null",
293+
"string"
294+
],
295+
"doc": "Email address",
296+
"default": null,
297+
"confluent:tags": [
298+
"PII"
299+
]
300+
}
301+
]
302+
}
303+
}
304+
],
305+
"doc": "Communication Email",
306+
"default": null
307+
}
308+
]
309+
}`;
310+
273311
class FakeClock extends Clock {
274312
fixedNow: number = 0
275313

@@ -1358,6 +1396,63 @@ describe('AvroSerializer', () => {
13581396
expect(obj2.mapField).toEqual({ 'key': 'world' });
13591397
expect(obj2.unionField).toEqual(null);
13601398
})
1399+
it('complex nested encryption', async () => {
1400+
let conf: ClientConfig = {
1401+
baseURLs: [baseURL],
1402+
cacheCapacity: 1000
1403+
}
1404+
let client = SchemaRegistryClient.newClient(conf)
1405+
let serConfig: AvroSerializerConfig = {
1406+
useLatestVersion: true,
1407+
ruleConfig: {
1408+
secret: 'mysecret'
1409+
}
1410+
}
1411+
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)
1412+
let dekClient = fieldEncryptionExecutor.client!
1413+
1414+
let encRule: Rule = {
1415+
name: 'test-encrypt',
1416+
kind: 'TRANSFORM',
1417+
mode: RuleMode.WRITEREAD,
1418+
type: 'ENCRYPT',
1419+
tags: ['PII'],
1420+
params: {
1421+
'encrypt.kek.name': 'kek1',
1422+
'encrypt.kms.type': 'local-kms',
1423+
'encrypt.kms.key.id': 'mykey',
1424+
},
1425+
onFailure: 'ERROR,NONE'
1426+
}
1427+
let ruleSet: RuleSet = {
1428+
domainRules: [encRule]
1429+
}
1430+
1431+
let info: SchemaInfo = {
1432+
schemaType: 'AVRO',
1433+
schema: complexNestedSchema,
1434+
ruleSet
1435+
}
1436+
1437+
await client.register(subject, info, false)
1438+
1439+
let obj = {
1440+
emails: [ {
1441+
1442+
} ],
1443+
}
1444+
let bytes = await ser.serialize(topic, obj)
1445+
1446+
let deserConfig: AvroDeserializerConfig = {
1447+
ruleConfig: {
1448+
secret: 'mysecret'
1449+
}
1450+
}
1451+
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
1452+
fieldEncryptionExecutor.client = dekClient
1453+
let obj2 = await deser.deserialize(topic, bytes)
1454+
expect(obj2.emails[0].email).toEqual('[email protected]');
1455+
})
13611456
it('jsonata fully compatible', async () => {
13621457
let rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])"
13631458
let rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])"

0 commit comments

Comments
 (0)