Skip to content

Commit 0d5810a

Browse files
authored
DGS-19519 Enhance error handling for CSFLE configure method (#1899)
* Enhance error handling for CSFLE configure method * Add test * Minor fix
1 parent 3855e6d commit 0d5810a

File tree

4 files changed

+50
-3
lines changed

4 files changed

+50
-3
lines changed

src/confluent_kafka/schema_registry/protobuf.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,9 @@ def get_type(fd: FieldDescriptor) -> FieldType:
10411041
FieldDescriptor.TYPE_UINT64, FieldDescriptor.TYPE_FIXED64,
10421042
FieldDescriptor.TYPE_SFIXED64):
10431043
return FieldType.LONG
1044-
if fd.type in (FieldDescriptor.TYPE_FLOAT, FieldDescriptor.TYPE_DOUBLE):
1044+
if fd.type == FieldDescriptor.TYPE_FLOAT:
1045+
return FieldType.FLOAT
1046+
if fd.type == FieldDescriptor.TYPE_DOUBLE:
10451047
return FieldType.DOUBLE
10461048
if fd.type == FieldDescriptor.TYPE_BOOL:
10471049
return FieldType.BOOLEAN

src/confluent_kafka/schema_registry/rules/encryption/dek_registry/dek_registry_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ class DekRegistryClient(object):
454454
""" # noqa: E501
455455

456456
def __init__(self, conf: dict):
457+
self._conf = conf
457458
self._rest_client = _RestClient(conf)
458459
self._kek_cache = _KekCache()
459460
self._dek_cache = _DekCache()
@@ -465,6 +466,9 @@ def __exit__(self, *args):
465466
if self._rest_client is not None:
466467
self._rest_client.session.close()
467468

469+
def config(self):
470+
return self._conf
471+
468472
def register_kek(
469473
self, name: str, kms_type: str, kms_key_id: str,
470474
shared: bool = False, kms_props: Dict[str, str] = None, doc: str = None

src/confluent_kafka/schema_registry/rules/encryption/encrypt_executor.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,24 @@ def __init__(self, clock: Clock = Clock()):
5656
self.clock = clock
5757

5858
def configure(self, client_conf: dict, rule_conf: dict):
59-
self.client = DekRegistryClient.new_client(client_conf)
60-
self.config = rule_conf
59+
if client_conf:
60+
if self.client:
61+
if self.client.config() != client_conf:
62+
raise RuleError("executor already configured")
63+
else:
64+
self.client = DekRegistryClient.new_client(client_conf)
65+
66+
if rule_conf:
67+
if self.config:
68+
for key, value in rule_conf.items():
69+
v = self.config.get(key)
70+
if v is not None:
71+
if v != value:
72+
raise RuleError(f"rule config key already set: {key}")
73+
else:
74+
self.config[key] = value
75+
else:
76+
self.config = rule_conf
6177

6278
def type(self) -> str:
6379
return "ENCRYPT"

tests/schema_registry/test_config.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
from httpx import BasicAuth
2020

2121
from confluent_kafka.schema_registry import SchemaRegistryClient
22+
from confluent_kafka.schema_registry.rules.encryption.encrypt_executor import \
23+
FieldEncryptionExecutor
24+
from confluent_kafka.schema_registry.serde import RuleError
2225

2326
TEST_URL = 'http://SchemaRegistry:65534'
2427
TEST_USERNAME = 'sr_user'
@@ -119,3 +122,25 @@ def test_config_unknown_prop():
119122

120123
with pytest.raises(ValueError, match=r"Unrecognized properties: (.*)"):
121124
SchemaRegistryClient(conf)
125+
126+
127+
def test_config_encrypt_executor():
128+
executor = FieldEncryptionExecutor()
129+
client_conf = {'url': 'mock://'}
130+
rule_conf = {'key': 'value'}
131+
executor.configure(client_conf, rule_conf)
132+
# configure with same args is fine
133+
executor.configure(client_conf, rule_conf)
134+
rule_conf2 = {'key2': 'value2'}
135+
# configure with additional rule_conf keys is fine
136+
executor.configure(client_conf, rule_conf2)
137+
138+
client_conf2 = {'url': 'mock://',
139+
'ssl.key.location': '/ssl/keys/client',
140+
'ssl.certificate.location': '/ssl/certs/client'}
141+
with pytest.raises(RuleError, match="executor already configured"):
142+
executor.configure(client_conf2, rule_conf)
143+
144+
rule_conf3 = {'key': 'value3'}
145+
with pytest.raises(RuleError, match="rule config key already set: key"):
146+
executor.configure(client_conf, rule_conf3)

0 commit comments

Comments
 (0)