Skip to content

Commit 70872b9

Browse files
authored
Ensure multiple rules are supported (#1870)
1 parent 057640c commit 70872b9

File tree

2 files changed

+81
-3
lines changed

2 files changed

+81
-3
lines changed

src/confluent_kafka/schema_registry/serde.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,10 +335,8 @@ def _execute_rules(
335335
if rule.kind == RuleKind.CONDITION:
336336
if not result:
337337
raise RuleConditionError(rule)
338-
break
339338
elif rule.kind == RuleKind.TRANSFORM:
340339
message = result
341-
break
342340
self._run_action(
343341
ctx, rule_mode, rule,
344342
self._get_on_failure(rule) if message is None else self._get_on_success(rule),
@@ -377,7 +375,7 @@ def _run_action(
377375
action_name = self._get_rule_action_name(rule, rule_mode, action)
378376
if action_name is None:
379377
action_name = default_action
380-
rule_action = self._get_rule_action(self._rule_registry, action_name)
378+
rule_action = self._get_rule_action(ctx, action_name)
381379
if rule_action is None:
382380
log.error("Could not find rule action of type %s", action_name)
383381
raise RuleError(f"Could not find rule action of type {action_name}")

tests/schema_registry/test_avro_serdes.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,6 +929,86 @@ def test_avro_encryption():
929929
assert obj == obj2
930930

931931

932+
def test_avro_encryption_cel():
933+
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
934+
935+
conf = {'url': _BASE_URL}
936+
client = SchemaRegistryClient.new_client(conf)
937+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
938+
rule_conf = {'secret': 'mysecret'}
939+
schema = {
940+
'type': 'record',
941+
'name': 'test',
942+
'fields': [
943+
{'name': 'intField', 'type': 'int'},
944+
{'name': 'doubleField', 'type': 'double'},
945+
{'name': 'stringField', 'type': 'string', 'confluent:tags': ['PII']},
946+
{'name': 'booleanField', 'type': 'boolean'},
947+
{'name': 'bytesField', 'type': 'bytes', 'confluent:tags': ['PII']},
948+
]
949+
}
950+
951+
rule1 = Rule(
952+
"test-cel",
953+
"",
954+
RuleKind.TRANSFORM,
955+
RuleMode.WRITE,
956+
"CEL_FIELD",
957+
None,
958+
None,
959+
"name == 'stringField' ; value + '-suffix'",
960+
None,
961+
None,
962+
False
963+
)
964+
rule2 = Rule(
965+
"test-encrypt",
966+
"",
967+
RuleKind.TRANSFORM,
968+
RuleMode.WRITEREAD,
969+
"ENCRYPT",
970+
["PII"],
971+
RuleParams({
972+
"encrypt.kek.name": "kek1",
973+
"encrypt.kms.type": "local-kms",
974+
"encrypt.kms.key.id": "mykey"
975+
}),
976+
None,
977+
None,
978+
"ERROR,NONE",
979+
False
980+
)
981+
client.register_schema(_SUBJECT, Schema(
982+
json.dumps(schema),
983+
"AVRO",
984+
[],
985+
None,
986+
RuleSet(None, [rule1, rule2])
987+
))
988+
989+
obj = {
990+
'intField': 123,
991+
'doubleField': 45.67,
992+
'stringField': 'hi',
993+
'booleanField': True,
994+
'bytesField': b'foobar',
995+
}
996+
ser = AvroSerializer(client, schema_str=None, conf=ser_conf, rule_conf=rule_conf)
997+
dek_client = executor.client
998+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
999+
obj_bytes = ser(obj, ser_ctx)
1000+
1001+
# reset encrypted fields
1002+
assert obj['stringField'] != 'hi-suffix'
1003+
obj['stringField'] = 'hi-suffix'
1004+
obj['bytesField'] = b'foobar'
1005+
1006+
deser = AvroDeserializer(client, rule_conf=rule_conf)
1007+
executor.client = dek_client
1008+
obj2 = deser(obj_bytes, ser_ctx)
1009+
assert obj == obj2
1010+
1011+
9321012
def test_avro_encryption_dek_rotation():
9331013
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
9341014

0 commit comments

Comments
 (0)