Skip to content

Commit 86993ac

Browse files
authored
Refactor encryption executor (#347)
1 parent 18d3e21 commit 86993ac

File tree

9 files changed

+374
-88
lines changed

9 files changed

+374
-88
lines changed

schemaregistry/rules/encryption/encrypt-executor.ts

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
MAGIC_BYTE_V0,
77
RuleContext,
88
RuleError,
9+
RuleExecutor,
910
} from "../../serde/serde";
1011
import {RuleMode,} from "../../schemaregistry-client";
1112
import {DekClient, Dek, DekRegistryClient, Kek} from "./dekregistry/dekregistry-client";
@@ -61,29 +62,29 @@ export class Clock {
6162
}
6263
}
6364

64-
export class FieldEncryptionExecutor extends FieldRuleExecutor {
65+
export class EncryptionExecutor implements RuleExecutor {
66+
config: Map<string, string> | null = null
6567
client: DekClient | null = null
6668
clock: Clock
6769

6870
/**
6971
* Register the field encryption executor with the rule registry.
7072
*/
71-
static register(): FieldEncryptionExecutor {
73+
static register(): EncryptionExecutor {
7274
return this.registerWithClock(new Clock())
7375
}
7476

75-
static registerWithClock(clock: Clock): FieldEncryptionExecutor {
76-
const executor = new FieldEncryptionExecutor(clock)
77+
static registerWithClock(clock: Clock): EncryptionExecutor {
78+
const executor = new EncryptionExecutor(clock)
7779
RuleRegistry.registerRuleExecutor(executor)
7880
return executor
7981
}
8082

8183
constructor(clock: Clock = new Clock()) {
82-
super()
8384
this.clock = clock
8485
}
8586

86-
override configure(clientConfig: ClientConfig, config: Map<string, string>) {
87+
configure(clientConfig: ClientConfig, config: Map<string, string>) {
8788
if (this.client != null) {
8889
if (!deepEqual(this.client.config(), clientConfig)) {
8990
throw new RuleError('executor already configured')
@@ -110,20 +111,23 @@ export class FieldEncryptionExecutor extends FieldRuleExecutor {
110111
}
111112
}
112113

113-
override type(): string {
114-
return 'ENCRYPT'
114+
type(): string {
115+
return 'ENCRYPT_PAYLOAD'
115116
}
116117

117-
override newTransform(ctx: RuleContext): FieldTransform {
118+
async transform(ctx: RuleContext, msg: any): Promise<any> {
119+
const transform = this.newTransform(ctx)
120+
return await transform.transform(ctx, FieldType.BYTES, msg)
121+
}
122+
123+
newTransform(ctx: RuleContext): EncryptionExecutorTransform {
118124
const cryptor = this.getCryptor(ctx)
119125
const kekName = this.getKekName(ctx)
120126
const dekExpiryDays = this.getDekExpiryDays(ctx)
121-
const transform =
122-
new FieldEncryptionExecutorTransform(this, cryptor, kekName, dekExpiryDays)
123-
return transform
127+
return new EncryptionExecutorTransform(this, cryptor, kekName, dekExpiryDays)
124128
}
125129

126-
override async close(): Promise<void> {
130+
async close(): Promise<void> {
127131
if (this.client != null) {
128132
await this.client.close()
129133
}
@@ -135,8 +139,7 @@ export class FieldEncryptionExecutor extends FieldRuleExecutor {
135139
if (dekAlgorithmStr != null) {
136140
dekAlgorithm = DekFormat[dekAlgorithmStr as keyof typeof DekFormat]
137141
}
138-
const cryptor = new Cryptor(dekAlgorithm)
139-
return cryptor
142+
return new Cryptor(dekAlgorithm)
140143
}
141144

142145
private getKekName(ctx: RuleContext): string {
@@ -269,15 +272,15 @@ export class Cryptor {
269272
}
270273
}
271274

272-
export class FieldEncryptionExecutorTransform implements FieldTransform {
273-
private executor: FieldEncryptionExecutor
275+
export class EncryptionExecutorTransform {
276+
private executor: EncryptionExecutor
274277
private cryptor: Cryptor
275278
private kekName: string
276279
private kek: Kek | null = null
277280
private dekExpiryDays: number
278281

279282
constructor(
280-
executor: FieldEncryptionExecutor,
283+
executor: EncryptionExecutor,
281284
cryptor: Cryptor,
282285
kekName: string,
283286
dekExpiryDays: number,
@@ -481,15 +484,15 @@ export class FieldEncryptionExecutorTransform implements FieldTransform {
481484
(now - dek.ts!) / MILLIS_IN_DAY >= this.dekExpiryDays
482485
}
483486

484-
async transform(ctx: RuleContext, fieldCtx: FieldContext, fieldValue: any): Promise<any> {
487+
async transform(ctx: RuleContext, fieldType: FieldType, fieldValue: any): Promise<any> {
485488
if (fieldValue == null) {
486489
return null
487490
}
488491
switch (ctx.ruleMode) {
489492
case RuleMode.WRITE: {
490-
let plaintext = this.toBytes(fieldCtx.type, fieldValue)
493+
let plaintext = this.toBytes(fieldType, fieldValue)
491494
if (plaintext == null) {
492-
throw new RuleError(`type ${fieldCtx.type} not supported for encryption`)
495+
throw new RuleError(`type ${fieldType} not supported for encryption`)
493496
}
494497
let version: number | null = null
495498
if (this.isDekRotated()) {
@@ -501,18 +504,18 @@ export class FieldEncryptionExecutorTransform implements FieldTransform {
501504
if (this.isDekRotated()) {
502505
ciphertext = this.prefixVersion(dek.version!, ciphertext)
503506
}
504-
if (fieldCtx.type === FieldType.STRING) {
507+
if (fieldType === FieldType.STRING) {
505508
return ciphertext.toString('base64')
506509
} else {
507-
return this.toObject(fieldCtx.type, ciphertext)
510+
return this.toObject(fieldType, ciphertext)
508511
}
509512
}
510513
case RuleMode.READ: {
511514
let ciphertext
512-
if (fieldCtx.type === FieldType.STRING) {
515+
if (fieldType === FieldType.STRING) {
513516
ciphertext = Buffer.from(fieldValue, 'base64')
514517
} else {
515-
ciphertext = this.toBytes(fieldCtx.type, fieldValue)
518+
ciphertext = this.toBytes(fieldType, fieldValue)
516519
}
517520
if (ciphertext == null) {
518521
return fieldValue
@@ -528,7 +531,7 @@ export class FieldEncryptionExecutorTransform implements FieldTransform {
528531
let dek = await this.getOrCreateDek(ctx, version)
529532
let keyMaterialBytes = await this.executor.client!.getDekKeyMaterialBytes(dek)
530533
let plaintext = await this.cryptor.decrypt(keyMaterialBytes!, ciphertext)
531-
return this.toObject(fieldCtx.type, plaintext)
534+
return this.toObject(fieldType, plaintext)
532535
}
533536
default:
534537
throw new RuleError(`unsupported rule mode ${ctx.ruleMode}`)
@@ -586,3 +589,55 @@ function getKmsClient(config: Map<string, string>, kek: Kek): KmsClient {
586589
}
587590
return kmsClient
588591
}
592+
593+
export class FieldEncryptionExecutor extends FieldRuleExecutor {
594+
executor: EncryptionExecutor
595+
596+
/**
597+
* Register the field encryption executor with the rule registry.
598+
*/
599+
static register(): FieldEncryptionExecutor {
600+
return this.registerWithClock(new Clock())
601+
}
602+
603+
static registerWithClock(clock: Clock): FieldEncryptionExecutor {
604+
const executor = new FieldEncryptionExecutor(clock)
605+
RuleRegistry.registerRuleExecutor(executor)
606+
return executor
607+
}
608+
609+
constructor(clock: Clock = new Clock()) {
610+
super()
611+
this.executor = new EncryptionExecutor(clock)
612+
}
613+
614+
override configure(clientConfig: ClientConfig, config: Map<string, string>) {
615+
this.executor.configure(clientConfig, config)
616+
}
617+
618+
override type(): string {
619+
return 'ENCRYPT'
620+
}
621+
622+
override newTransform(ctx: RuleContext): FieldTransform {
623+
const executorTransform = this.executor.newTransform(ctx)
624+
return new FieldEncryptionExecutorTransform(executorTransform)
625+
}
626+
627+
override async close(): Promise<void> {
628+
return this.executor.close()
629+
}
630+
}
631+
632+
export class FieldEncryptionExecutorTransform implements FieldTransform {
633+
private executorTransform: EncryptionExecutorTransform
634+
635+
constructor(executorTransform: EncryptionExecutorTransform) {
636+
this.executorTransform = executorTransform
637+
}
638+
639+
async transform(ctx: RuleContext, fieldCtx: FieldContext, fieldValue: any): Promise<any> {
640+
return await this.executorTransform.transform(ctx, fieldCtx.type, fieldValue)
641+
}
642+
}
643+

schemaregistry/schemaregistry-client.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ export interface Rule {
4646
disabled?: boolean
4747
}
4848

49+
export enum RulePhase {
50+
MIGRATION = 'MIGRATION',
51+
DOMAIN = 'DOMAIN',
52+
ENCODING = 'ENCODING',
53+
}
54+
4955
export enum RuleMode {
5056
UPGRADE = 'UPGRADE',
5157
DOWNGRADE = 'DOWNGRADE',
@@ -111,6 +117,7 @@ export interface Metadata {
111117
export interface RuleSet {
112118
migrationRules?: Rule[];
113119
domainRules?: Rule[];
120+
encodingRules?: Rule[];
114121
}
115122

116123
/**

schemaregistry/serde/avro.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
Serializer, SerializerConfig
88
} from "./serde";
99
import {
10-
Client, RuleMode,
10+
Client, RuleMode, RulePhase,
1111
SchemaInfo
1212
} from "../schemaregistry-client";
1313
import avro, {ForSchemaOptions, Type, types} from "avsc";
@@ -93,7 +93,9 @@ export class AvroSerializer extends Serializer implements AvroSerde {
9393
const subject = this.subjectName(topic, info)
9494
msg = await this.executeRules(
9595
subject, topic, RuleMode.WRITE, null, info, msg, getInlineTags(info, deps))
96-
const msgBytes = avroType.toBuffer(msg)
96+
let msgBytes = avroType.toBuffer(msg)
97+
msgBytes = await this.executeRulesWithPhase(
98+
subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null)
9799
return this.serializeSchemaId(topic, msgBytes, schemaId, headers)
98100
}
99101

@@ -186,6 +188,8 @@ export class AvroDeserializer extends Deserializer implements AvroSerde {
186188
const [info, bytesRead] = await this.getWriterSchema(topic, payload, schemaId, headers)
187189
payload = payload.subarray(bytesRead)
188190
const subject = this.subjectName(topic, info)
191+
payload = await this.executeRulesWithPhase(
192+
subject, topic, RulePhase.ENCODING, RuleMode.READ, null, info, payload, null)
189193
const readerMeta = await this.getReaderSchema(subject)
190194
let migrations: Migration[] = []
191195
if (readerMeta != null) {

schemaregistry/serde/json.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
Serializer, SerializerConfig
88
} from "./serde";
99
import {
10-
Client, RuleMode,
10+
Client, RuleMode, RulePhase,
1111
SchemaInfo
1212
} from "../schemaregistry-client";
1313
import Ajv, {ErrorObject} from "ajv";
@@ -109,13 +109,15 @@ export class JsonSerializer extends Serializer implements JsonSerde {
109109
const [schemaId, info] = await this.getSchemaId(JSON_TYPE, topic, msg, schema)
110110
const subject = this.subjectName(topic, info)
111111
msg = await this.executeRules(subject, topic, RuleMode.WRITE, null, info, msg, null)
112-
const msgBytes = Buffer.from(JSON.stringify(msg))
113112
if ((this.conf as JsonSerdeConfig).validate) {
114113
const validate = await this.toValidateFunction(info)
115114
if (validate != null && !validate(msg)) {
116115
throw new SerializationError('Invalid message')
117116
}
118117
}
118+
let msgBytes = Buffer.from(JSON.stringify(msg))
119+
msgBytes = await this.executeRulesWithPhase(
120+
subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null)
119121
return this.serializeSchemaId(topic, msgBytes, schemaId, headers)
120122
}
121123

@@ -198,6 +200,8 @@ export class JsonDeserializer extends Deserializer implements JsonSerde {
198200
const [info, bytesRead] = await this.getWriterSchema(topic, payload, schemaId, headers)
199201
payload = payload.subarray(bytesRead)
200202
const subject = this.subjectName(topic, info)
203+
payload = await this.executeRulesWithPhase(
204+
subject, topic, RulePhase.ENCODING, RuleMode.READ, null, info, payload, null)
201205
const readerMeta = await this.getReaderSchema(subject)
202206
let migrations: Migration[] = []
203207
if (readerMeta != null) {

schemaregistry/serde/protobuf.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
SerializerConfig
1010
} from "./serde";
1111
import {
12-
Client, Reference, RuleMode,
12+
Client, Reference, RuleMode, RulePhase,
1313
SchemaInfo,
1414
SchemaMetadata
1515
} from "../schemaregistry-client";
@@ -165,7 +165,9 @@ export class ProtobufSerializer extends Serializer implements ProtobufSerde {
165165
const subject = this.subjectName(topic, info)
166166
msg = await this.executeRules(subject, topic, RuleMode.WRITE, null, info, msg, null)
167167
schemaId.messageIndexes = this.toMessageIndexArray(messageDesc)
168-
const msgBytes = Buffer.from(toBinary(messageDesc, msg))
168+
let msgBytes = Buffer.from(toBinary(messageDesc, msg))
169+
msgBytes = await this.executeRulesWithPhase(
170+
subject, topic, RulePhase.ENCODING, RuleMode.WRITE, null, info, msgBytes, null)
169171
return this.serializeSchemaId(topic, msgBytes, schemaId, headers)
170172
}
171173

@@ -381,6 +383,8 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde
381383
const messageDesc = this.toMessageDescFromIndexes(fd, schemaId.messageIndexes!)
382384

383385
const subject = this.subjectName(topic, info)
386+
payload = await this.executeRulesWithPhase(
387+
subject, topic, RulePhase.ENCODING, RuleMode.READ, null, info, payload, null)
384388
const readerMeta = await this.getReaderSchema(subject, 'serialized')
385389

386390
const msgBytes = payload

0 commit comments

Comments
 (0)