Skip to content

Commit 3e3f17a

Browse files
authored
Ensure algorithm query param is passed for CSFLE (confluentinc#1889)
* Add missing algorithm query param * Add test
1 parent 4f25c8c commit 3e3f17a

File tree

3 files changed

+70
-2
lines changed

3 files changed

+70
-2
lines changed

src/confluent_kafka/schema_registry/rules/cel/cel_field_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def _field_transform(self, ctx: RuleContext, field_ctx: FieldContext, field_valu
4545
"name": field_ctx.name,
4646
"typeName": field_ctx.type_name(),
4747
"tags": [celtypes.StringType(tag) for tag in field_ctx.tags],
48-
"message": msg_to_cel(field_value),
48+
"message": msg_to_cel(field_ctx.containing_message),
4949
}
5050
return self._executor.execute(ctx, field_value, args)
5151

src/confluent_kafka/schema_registry/rules/encryption/dek_registry/dek_registry_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ def get_dek(
632632
if dek is not None:
633633
return dek
634634

635-
query = {'deleted': deleted}
635+
query = {'algorithm': algorithm, 'deleted': deleted}
636636
response = self._rest_client.get('/dek-registry/v1/keks/{}/deks/{}/versions/{}'
637637
.format(urllib.parse.quote(kek_name),
638638
urllib.parse.quote(subject, safe=''),

tests/schema_registry/test_avro_serdes.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,6 +929,74 @@ def test_avro_encryption():
929929
assert obj == obj2
930930

931931

932+
def test_avro_encryption_deterministic():
933+
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
934+
935+
conf = {'url': _BASE_URL}
936+
client = SchemaRegistryClient.new_client(conf)
937+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
938+
rule_conf = {'secret': 'mysecret'}
939+
schema = {
940+
'type': 'record',
941+
'name': 'test',
942+
'fields': [
943+
{'name': 'intField', 'type': 'int'},
944+
{'name': 'doubleField', 'type': 'double'},
945+
{'name': 'stringField', 'type': 'string', 'confluent:tags': ['PII']},
946+
{'name': 'booleanField', 'type': 'boolean'},
947+
{'name': 'bytesField', 'type': 'bytes', 'confluent:tags': ['PII']},
948+
]
949+
}
950+
951+
rule = Rule(
952+
"test-encrypt",
953+
"",
954+
RuleKind.TRANSFORM,
955+
RuleMode.WRITEREAD,
956+
"ENCRYPT",
957+
["PII"],
958+
RuleParams({
959+
"encrypt.kek.name": "kek1",
960+
"encrypt.kms.type": "local-kms",
961+
"encrypt.kms.key.id": "mykey",
962+
"encrypt.dek.algorithm": "AES256_SIV"
963+
}),
964+
None,
965+
None,
966+
"ERROR,NONE",
967+
False
968+
)
969+
client.register_schema(_SUBJECT, Schema(
970+
json.dumps(schema),
971+
"AVRO",
972+
[],
973+
None,
974+
RuleSet(None, [rule])
975+
))
976+
977+
obj = {
978+
'intField': 123,
979+
'doubleField': 45.67,
980+
'stringField': 'hi',
981+
'booleanField': True,
982+
'bytesField': b'foobar',
983+
}
984+
ser = AvroSerializer(client, schema_str=None, conf=ser_conf, rule_conf=rule_conf)
985+
dek_client = executor.client
986+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
987+
obj_bytes = ser(obj, ser_ctx)
988+
989+
# reset encrypted fields
990+
assert obj['stringField'] != 'hi'
991+
obj['stringField'] = 'hi'
992+
obj['bytesField'] = b'foobar'
993+
994+
deser = AvroDeserializer(client, rule_conf=rule_conf)
995+
executor.client = dek_client
996+
obj2 = deser(obj_bytes, ser_ctx)
997+
assert obj == obj2
998+
999+
9321000
def test_avro_encryption_cel():
9331001
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
9341002

0 commit comments

Comments
 (0)