Skip to content

Commit 0e48f63

Browse files
authored
Add complex encryption tests (#89)
1 parent 7350d0a commit 0e48f63

File tree

7 files changed

+450
-24
lines changed

7 files changed

+450
-24
lines changed

schemaregistry/rest-service.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export interface BearerAuthCredentials {
2424
//TODO: Consider retry policy, may need additional libraries on top of Axios
2525
export interface ClientConfig {
2626
baseURLs: string[],
27-
cacheCapacity: number,
27+
cacheCapacity?: number,
2828
cacheLatestTtlSecs?: number,
2929
isForward?: boolean,
3030
createAxiosDefaults?: CreateAxiosDefaults,
@@ -37,7 +37,7 @@ export class RestService {
3737
private OAuthClient?: OAuthClient;
3838
private bearerAuth: boolean = false;
3939

40-
constructor(baseURLs: string[], isForward?: boolean, axiosDefaults?: CreateAxiosDefaults,
40+
constructor(baseURLs: string[], isForward?: boolean, axiosDefaults?: CreateAxiosDefaults,
4141
bearerAuthCredentials?: BearerAuthCredentials) {
4242
this.client = axios.create(axiosDefaults);
4343
this.baseURLs = baseURLs;
@@ -53,7 +53,7 @@ export class RestService {
5353
'Confluent-Identity-Pool-Id': bearerAuthCredentials.identityPool,
5454
'target-sr-cluster': bearerAuthCredentials.schemaRegistryLogicalCluster
5555
});
56-
this.OAuthClient = new OAuthClient(bearerAuthCredentials.clientId, bearerAuthCredentials.clientSecret,
56+
this.OAuthClient = new OAuthClient(bearerAuthCredentials.clientId, bearerAuthCredentials.clientSecret,
5757
bearerAuthCredentials.tokenHost, bearerAuthCredentials.tokenPath, bearerAuthCredentials.scope);
5858
}
5959
}

schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class DekRegistryClient implements Client {
6565

6666
constructor(config: ClientConfig) {
6767
const cacheOptions = {
68-
max: config.cacheCapacity,
68+
max: config.cacheCapacity !== undefined ? config.cacheCapacity : 1000,
6969
...(config.cacheLatestTtlSecs !== undefined && { maxAge: config.cacheLatestTtlSecs * 1000 }),
7070
};
7171

schemaregistry/schemaregistry-client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,11 @@ export class SchemaRegistryClient implements Client {
161161
constructor(config: ClientConfig) {
162162
this.clientConfig = config
163163
const cacheOptions = {
164-
max: config.cacheCapacity,
164+
max: config.cacheCapacity !== undefined ? config.cacheCapacity : 1000,
165165
...(config.cacheLatestTtlSecs !== undefined && { maxAge: config.cacheLatestTtlSecs * 1000 })
166166
};
167167

168-
this.restService = new RestService(config.baseURLs, config.isForward, config.createAxiosDefaults,
168+
this.restService = new RestService(config.baseURLs, config.isForward, config.createAxiosDefaults,
169169
config.bearerAuthCredentials);
170170

171171
this.schemaToIdCache = new LRUCache(cacheOptions);

schemaregistry/serde/avro.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,9 +245,9 @@ async function transform(ctx: RuleContext, schema: Type, msg: any, fieldTransfor
245245
return await Promise.all(array.map(item => transform(ctx, arraySchema.itemsType, item, fieldTransform)))
246246
case 'map':
247247
const mapSchema = schema as MapType
248-
const map = msg as Map<string, any>
248+
const map = msg as { [key: string]: any }
249249
for (const key of Object.keys(map)) {
250-
map.set(key, await transform(ctx, mapSchema.valuesType, map.get(key), fieldTransform))
250+
map[key] = await transform(ctx, mapSchema.valuesType, map[key], fieldTransform)
251251
}
252252
return map
253253
case 'record':

schemaregistry/serde/json.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,8 @@ async function toType(
262262
const json = JSON.parse(info.schema)
263263
const spec = json.$schema
264264
let schema
265-
if (spec === 'http://json-schema.org/draft/2020-12/schema') {
265+
if (spec === 'http://json-schema.org/draft/2020-12/schema'
266+
|| spec === 'https://json-schema.org/draft/2020-12/schema') {
266267
schema = await dereferenceJSONSchemaDraft2020_12(json, { retrieve })
267268
} else {
268269
schema = await dereferenceJSONSchemaDraft07(json, { retrieve })
@@ -302,6 +303,7 @@ async function transform(ctx: RuleContext, schema: DereferencedJSONSchema, path:
302303
for (let i = 0; i < msg.length; i++) {
303304
msg[i] = await transform(ctx, schema.items, path, msg[i], fieldTransform)
304305
}
306+
return msg
305307
}
306308
}
307309
if (schema.$ref != null) {
@@ -355,7 +357,7 @@ async function transformField(ctx: RuleContext, path: string, propName: string,
355357
function validateSubschemas(subschemas: DereferencedJSONSchema[], msg: any): DereferencedJSONSchema | null {
356358
for (let subschema of subschemas) {
357359
try {
358-
validateJSON(subschema, msg)
360+
validateJSON(msg, subschema)
359361
return subschema
360362
} catch (error) {
361363
// ignore

test/schemaregistry/serde/avro.spec.ts

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,60 @@ const demoSchemaWithUnion = `
190190
]
191191
}
192192
`
193+
const schemaEvolution1 = `
194+
{
195+
"name": "SchemaEvolution",
196+
"type": "record",
197+
"fields": [
198+
{
199+
"name": "fieldToDelete",
200+
"type": "string"
201+
}
202+
]
203+
}
204+
`
205+
const schemaEvolution2 = `
206+
{
207+
"name": "SchemaEvolution",
208+
"type": "record",
209+
"fields": [
210+
{
211+
"name": "newOptionalField",
212+
"type": ["string", "null"],
213+
"default": "optional"
214+
}
215+
]
216+
}
217+
`
218+
const complexSchema = `
219+
{
220+
"name": "ComplexSchema",
221+
"type": "record",
222+
"fields": [
223+
{
224+
"name": "arrayField",
225+
"type": {
226+
"type": "array",
227+
"items": "string"
228+
},
229+
"confluent:tags": [ "PII" ]
230+
},
231+
{
232+
"name": "mapField",
233+
"type": {
234+
"type": "map",
235+
"values": "string"
236+
},
237+
"confluent:tags": [ "PII" ]
238+
},
239+
{
240+
"name": "unionField",
241+
"type": ["null", "string"],
242+
"confluent:tags": [ "PII" ]
243+
}
244+
]
245+
}
246+
`
193247

194248
class FakeClock extends Clock {
195249
fixedNow: number = 0
@@ -320,6 +374,38 @@ describe('AvroSerializer', () => {
320374
expect(obj2.otherField.boolField).toEqual(nested.boolField);
321375
expect(obj2.otherField.bytesField).toEqual(nested.bytesField);
322376
})
377+
it('schema evolution', async () => {
378+
let conf: ClientConfig = {
379+
baseURLs: [baseURL],
380+
cacheCapacity: 1000
381+
}
382+
let client = SchemaRegistryClient.newClient(conf)
383+
let ser = new AvroSerializer(client, SerdeType.VALUE, {useLatestVersion: true})
384+
385+
let obj = {
386+
fieldToDelete: "bye",
387+
}
388+
let info: SchemaInfo = {
389+
schemaType: 'AVRO',
390+
schema: schemaEvolution1,
391+
}
392+
393+
await client.register(subject, info, false)
394+
395+
let bytes = await ser.serialize(topic, obj)
396+
397+
info = {
398+
schemaType: 'AVRO',
399+
schema: schemaEvolution2,
400+
}
401+
402+
await client.register(subject, info, false)
403+
404+
let deser = new AvroDeserializer(client, SerdeType.VALUE, {useLatestVersion: true})
405+
let obj2 = await deser.deserialize(topic, bytes)
406+
expect(obj2.fieldToDelete).toEqual(undefined);
407+
expect(obj2.newOptionalField).toEqual("optional");
408+
})
323409
it('basic encryption', async () => {
324410
let conf: ClientConfig = {
325411
baseURLs: [baseURL],
@@ -876,6 +962,124 @@ describe('AvroSerializer', () => {
876962
expect(obj2.boolField).toEqual(obj.boolField);
877963
expect(obj2.bytesField).toEqual(obj.bytesField);
878964
})
965+
it('complex encryption', async () => {
966+
let conf: ClientConfig = {
967+
baseURLs: [baseURL],
968+
cacheCapacity: 1000
969+
}
970+
let client = SchemaRegistryClient.newClient(conf)
971+
let serConfig: AvroSerializerConfig = {
972+
useLatestVersion: true,
973+
ruleConfig: {
974+
secret: 'mysecret'
975+
}
976+
}
977+
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)
978+
let dekClient = fieldEncryptionExecutor.client!
979+
980+
let encRule: Rule = {
981+
name: 'test-encrypt',
982+
kind: 'TRANSFORM',
983+
mode: RuleMode.WRITEREAD,
984+
type: 'ENCRYPT',
985+
tags: ['PII'],
986+
params: {
987+
'encrypt.kek.name': 'kek1',
988+
'encrypt.kms.type': 'local-kms',
989+
'encrypt.kms.key.id': 'mykey',
990+
},
991+
onFailure: 'ERROR,NONE'
992+
}
993+
let ruleSet: RuleSet = {
994+
domainRules: [encRule]
995+
}
996+
997+
let info: SchemaInfo = {
998+
schemaType: 'AVRO',
999+
schema: complexSchema,
1000+
ruleSet
1001+
}
1002+
1003+
await client.register(subject, info, false)
1004+
1005+
let obj = {
1006+
arrayField: [ 'hello' ],
1007+
mapField: { 'key': 'world' },
1008+
unionField: 'bye',
1009+
}
1010+
let bytes = await ser.serialize(topic, obj)
1011+
1012+
let deserConfig: AvroDeserializerConfig = {
1013+
ruleConfig: {
1014+
secret: 'mysecret'
1015+
}
1016+
}
1017+
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
1018+
fieldEncryptionExecutor.client = dekClient
1019+
let obj2 = await deser.deserialize(topic, bytes)
1020+
expect(obj2.arrayField).toEqual([ 'hello' ]);
1021+
expect(obj2.mapField).toEqual({ 'key': 'world' });
1022+
expect(obj2.unionField).toEqual('bye');
1023+
})
1024+
it('complex encryption with null', async () => {
1025+
let conf: ClientConfig = {
1026+
baseURLs: [baseURL],
1027+
cacheCapacity: 1000
1028+
}
1029+
let client = SchemaRegistryClient.newClient(conf)
1030+
let serConfig: AvroSerializerConfig = {
1031+
useLatestVersion: true,
1032+
ruleConfig: {
1033+
secret: 'mysecret'
1034+
}
1035+
}
1036+
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)
1037+
let dekClient = fieldEncryptionExecutor.client!
1038+
1039+
let encRule: Rule = {
1040+
name: 'test-encrypt',
1041+
kind: 'TRANSFORM',
1042+
mode: RuleMode.WRITEREAD,
1043+
type: 'ENCRYPT',
1044+
tags: ['PII'],
1045+
params: {
1046+
'encrypt.kek.name': 'kek1',
1047+
'encrypt.kms.type': 'local-kms',
1048+
'encrypt.kms.key.id': 'mykey',
1049+
},
1050+
onFailure: 'ERROR,NONE'
1051+
}
1052+
let ruleSet: RuleSet = {
1053+
domainRules: [encRule]
1054+
}
1055+
1056+
let info: SchemaInfo = {
1057+
schemaType: 'AVRO',
1058+
schema: complexSchema,
1059+
ruleSet
1060+
}
1061+
1062+
await client.register(subject, info, false)
1063+
1064+
let obj = {
1065+
arrayField: [ 'hello' ],
1066+
mapField: { 'key': 'world' },
1067+
unionField: null
1068+
}
1069+
let bytes = await ser.serialize(topic, obj)
1070+
1071+
let deserConfig: AvroDeserializerConfig = {
1072+
ruleConfig: {
1073+
secret: 'mysecret'
1074+
}
1075+
}
1076+
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
1077+
fieldEncryptionExecutor.client = dekClient
1078+
let obj2 = await deser.deserialize(topic, bytes)
1079+
expect(obj2.arrayField).toEqual([ 'hello' ]);
1080+
expect(obj2.mapField).toEqual({ 'key': 'world' });
1081+
expect(obj2.unionField).toEqual(null);
1082+
})
8791083
it('jsonata fully compatible', async () => {
8801084
let rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])"
8811085
let rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])"

0 commit comments

Comments
 (0)