diff --git a/schemaregistry/rules/encryption/encrypt-executor.ts b/schemaregistry/rules/encryption/encrypt-executor.ts index 0e718705..aca488df 100644 --- a/schemaregistry/rules/encryption/encrypt-executor.ts +++ b/schemaregistry/rules/encryption/encrypt-executor.ts @@ -33,6 +33,8 @@ const ENCRYPT_KMS_TYPE = 'encrypt.kms.type' const ENCRYPT_DEK_ALGORITHM = 'encrypt.dek.algorithm' // EncryptDekExpiryDays represents dek expiry days const ENCRYPT_DEK_EXPIRY_DAYS = 'encrypt.dek.expiry.days' +// EncryptAlternateKmsKeyIds represents alternate kms key IDs +const ENCRYPT_ALTERNATE_KMS_KEY_IDS = 'encrypt.alternate.kms.key.ids' // MillisInDay represents number of milliseconds in a day const MILLIS_IN_DAY = 24 * 60 * 60 * 1000 @@ -387,7 +389,7 @@ export class EncryptionExecutorTransform { } let encryptedDek: Buffer | null = null if (!kek.shared) { - kmsClient = getKmsClient(this.executor.config!, kek) + kmsClient = new KmsClientWrapper(this.executor.config!, kek) // Generate new dek const rawDek = this.cryptor.generateKey() encryptedDek = await kmsClient.encrypt(rawDek) @@ -407,7 +409,7 @@ export class EncryptionExecutorTransform { const keyMaterialBytes = await this.executor.client!.getDekKeyMaterialBytes(dek) if (keyMaterialBytes == null) { if (kmsClient == null) { - kmsClient = getKmsClient(this.executor.config!, kek) + kmsClient = new KmsClientWrapper(this.executor.config!, kek) } const encryptedKeyMaterialBytes = await this.executor.client!.getDekEncryptedKeyMaterialBytes(dek) const rawDek = await kmsClient.decrypt(encryptedKeyMaterialBytes!) @@ -579,8 +581,8 @@ export class EncryptionExecutorTransform { } } -function getKmsClient(config: Map, kek: Kek): KmsClient { - let keyUrl = kek.kmsType + '://' + kek.kmsKeyId +function getKmsClient(config: Map, kmsType: string, kmsKeyId: string): KmsClient { + let keyUrl = kmsType + '://' + kmsKeyId let kmsClient = Registry.getKmsClient(keyUrl) if (kmsClient == null) { let kmsDriver = Registry.getKmsDriver(keyUrl) @@ -641,3 +643,64 @@ export class FieldEncryptionExecutorTransform implements FieldTransform { } } +export class KmsClientWrapper implements KmsClient { + private config: Map + private kek: Kek + private kekId: string + private kmsKeyIds: string[] + + constructor(config: Map, kek: Kek) { + this.config = config + this.kek = kek + this.kekId = kek.kmsType + '://' + kek.kmsKeyId + this.kmsKeyIds = this.getKmsKeyIds() + } + + getKmsKeyIds(): string[] { + let kmsKeyIds = [this.kek.kmsKeyId!] + let alternateKmsKeyIds: string | undefined + if (this.kek.kmsProps != null) { + alternateKmsKeyIds = this.kek.kmsProps[ENCRYPT_ALTERNATE_KMS_KEY_IDS] + } + if (alternateKmsKeyIds == null) { + alternateKmsKeyIds = this.config.get(ENCRYPT_ALTERNATE_KMS_KEY_IDS) + } + if (alternateKmsKeyIds != null) { + kmsKeyIds = kmsKeyIds.concat(alternateKmsKeyIds.split(',').map(id => id.trim())) + } + return kmsKeyIds + } + + supported(keyUri: string): boolean { + return this.kekId === keyUri + } + + async encrypt(rawKey: Buffer): Promise { + for (let i = 0; i < this.kmsKeyIds.length; i++) { + try { + let kmsClient = getKmsClient(this.config, this.kek.kmsType!, this.kmsKeyIds[i]) + return await kmsClient.encrypt(rawKey) + } catch (e) { + if (i === this.kmsKeyIds.length - 1) { + throw new RuleError(`failed to encrypt key with all KMS keys: ${e}`) + } + } + } + throw new RuleError('no KEK found for encryption') + } + + async decrypt(encryptedKey: Buffer): Promise { + for (let i = 0; i < this.kmsKeyIds.length; i++) { + try { + let kmsClient = getKmsClient(this.config, this.kek.kmsType!, this.kmsKeyIds[i]) + return await kmsClient.decrypt(encryptedKey) + } catch (e) { + if (i === this.kmsKeyIds.length - 1) { + throw new RuleError(`failed to decrypt key with all KMS keys: ${e}`) + } + } + } + throw new RuleError('no KEK found for decryption') + } +} + diff --git a/schemaregistry/test/serde/avro.spec.ts b/schemaregistry/test/serde/avro.spec.ts index a07d8e99..7580939e 100644 --- a/schemaregistry/test/serde/avro.spec.ts +++ b/schemaregistry/test/serde/avro.spec.ts @@ -1193,6 +1193,69 @@ describe('AvroSerializer', () => { expect(obj2.boolField).toEqual(obj.boolField); expect(obj2.bytesField).toEqual(obj.bytesField); }) + it('encryption with alternate keks', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret', + 'encrypt.alternate.kms.key.ids': 'mykey2,mykey3' + } + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = encryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT_PAYLOAD', + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,NONE' + } + let ruleSet: RuleSet = { + encodingRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + encryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) it('deterministic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL],