Skip to content

Commit 3701656

Browse files
authored
Add ability to override disable flag and actions on a rule (#200)
* Add ability to override disable flag and actions on a rule * Add test
1 parent 647bead commit 3701656

File tree

3 files changed

+80
-4
lines changed

3 files changed

+80
-4
lines changed

schemaregistry/serde/rule-registry.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,22 @@
11
import {RuleAction, RuleExecutor} from "./serde";
22

3+
/**
4+
* RuleOverride represents a rule override
5+
*/
6+
export interface RuleOverride {
7+
type: string
8+
onSuccess?: string
9+
onFailure?: string
10+
disabled?: boolean
11+
}
12+
313
/**
414
* RuleRegistry is used to register and fetch rule executors and actions.
515
*/
616
export class RuleRegistry {
717
private ruleExecutors: Map<string, RuleExecutor> = new Map<string, RuleExecutor>()
818
private ruleActions: Map<string, RuleAction> = new Map<string, RuleAction>()
19+
private ruleOverrides: Map<string, RuleOverride> = new Map<string, RuleOverride>()
920

1021
private static globalInstance: RuleRegistry = new RuleRegistry()
1122

@@ -55,12 +66,36 @@ export class RuleRegistry {
5566
return Array.from(this.ruleActions.values())
5667
}
5768

69+
/**
70+
* registerOverride is used to register a new rule override.
71+
* @param ruleOverride - the rule override to register
72+
*/
73+
public registerOverride(ruleOverride: RuleOverride): void {
74+
this.ruleOverrides.set(ruleOverride.type, ruleOverride)
75+
}
76+
77+
/**
78+
* getOverride fetches a rule override by a given name.
79+
* @param name - the name of the rule override to fetch
80+
*/
81+
public getOverride(name: string): RuleOverride | undefined {
82+
return this.ruleOverrides.get(name)
83+
}
84+
85+
/**
86+
* getOverrides fetches all rule overrides
87+
*/
88+
public getOverrides(): RuleOverride[] {
89+
return Array.from(this.ruleOverrides.values())
90+
}
91+
5892
/**
5993
* clear clears all registered rules
6094
*/
6195
public clear(): void {
6296
this.ruleExecutors.clear()
6397
this.ruleActions.clear()
98+
this.ruleOverrides.clear()
6499
}
65100

66101
/**
@@ -85,4 +120,12 @@ export class RuleRegistry {
85120
public static registerRuleAction(ruleAction: RuleAction): void {
86121
RuleRegistry.globalInstance.registerAction(ruleAction)
87122
}
123+
124+
/**
125+
* registerRuleOverride is used to register a new rule override globally.
126+
* @param ruleOverride - the rule override to register
127+
*/
128+
public static registerRuleOverride(ruleOverride: RuleOverride): void {
129+
RuleRegistry.globalInstance.registerOverride(ruleOverride)
130+
}
88131
}

schemaregistry/serde/serde.ts

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ export abstract class Serde {
136136
this.serdeType === SerdeType.KEY, ruleMode, rule, i, rules, inlineTags, this.fieldTransformer!)
137137
let ruleExecutor = this.ruleRegistry.getExecutor(rule.type)
138138
if (ruleExecutor == null) {
139-
await this.runAction(ctx, ruleMode, rule, rule.onFailure, msg,
139+
await this.runAction(ctx, ruleMode, rule, this.getOnFailure(rule), msg,
140140
new Error(`could not find rule executor of type ${rule.type}`), 'ERROR')
141141
return msg
142142
}
@@ -152,18 +152,43 @@ export abstract class Serde {
152152
msg = result
153153
break
154154
}
155-
await this.runAction(ctx, ruleMode, rule, msg != null ? rule.onSuccess : rule.onFailure,
155+
await this.runAction(ctx, ruleMode, rule, msg != null
156+
? this.getOnSuccess(rule) : this.getOnFailure(rule),
156157
msg, null, msg != null ? 'NONE' : 'ERROR')
157158
} catch (error) {
158159
if (error instanceof SerializationError) {
159160
throw error
160161
}
161-
await this.runAction(ctx, ruleMode, rule, rule.onFailure, msg, error as Error, 'ERROR')
162+
await this.runAction(ctx, ruleMode, rule, this.getOnFailure(rule), msg, error as Error, 'ERROR')
162163
}
163164
}
164165
return msg
165166
}
166167

168+
getOnSuccess(rule: Rule): string | undefined {
169+
let override = this.ruleRegistry.getOverride(rule.type)
170+
if (override != null && override.onSuccess != null) {
171+
return override.onSuccess
172+
}
173+
return rule.onSuccess
174+
}
175+
176+
getOnFailure(rule: Rule): string | undefined {
177+
let override = this.ruleRegistry.getOverride(rule.type)
178+
if (override != null && override.onFailure != null) {
179+
return override.onFailure
180+
}
181+
return rule.onFailure
182+
}
183+
184+
isDisabled(rule: Rule): boolean | undefined {
185+
let override = this.ruleRegistry.getOverride(rule.type)
186+
if (override != null && override.disabled != null) {
187+
return override.disabled
188+
}
189+
return rule.disabled
190+
}
191+
167192
async runAction(ctx: RuleContext, ruleMode: RuleMode, rule: Rule, action: string | undefined,
168193
msg: any, err: Error | null, defaultAction: string): Promise<void> {
169194
let actionName = this.getRuleActionName(rule, ruleMode, action)

schemaregistry/test/serde/avro.spec.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,9 +659,17 @@ describe('AvroSerializer', () => {
659659
expect(obj2.boolField).toEqual(obj.boolField);
660660
expect(obj2.bytesField).toEqual(obj.bytesField);
661661

662-
clearKmsClients()
663662
let registry = new RuleRegistry()
664663
registry.registerExecutor(new FieldEncryptionExecutor())
664+
registry.registerOverride({type: 'ENCRYPT', disabled: true})
665+
deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig, registry)
666+
obj2 = await deser.deserialize(topic, bytes)
667+
expect(obj2.stringField).not.toEqual(obj.stringField);
668+
expect(obj2.bytesField).not.toEqual(obj.bytesField);
669+
670+
clearKmsClients()
671+
registry = new RuleRegistry()
672+
registry.registerExecutor(new FieldEncryptionExecutor())
665673
deser = new AvroDeserializer(client, SerdeType.VALUE, {}, registry)
666674
obj2 = await deser.deserialize(topic, bytes)
667675
expect(obj2.stringField).not.toEqual(obj.stringField);

0 commit comments

Comments
 (0)