Skip to content

Commit f026e72

Browse files
authored
Fix SR delete behavior with client-side caching
1 parent 51652e3 commit f026e72

File tree

5 files changed

+67
-5
lines changed

5 files changed

+67
-5
lines changed

src/confluent_kafka/schema_registry/_async/schema_registry_client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1174,12 +1174,14 @@ async def delete_version(self, subject_name: str, version: int, permanent: bool
11741174
response = await self._rest_client.delete(
11751175
'subjects/{}/versions/{}?permanent=true'.format(_urlencode(subject_name), version)
11761176
)
1177-
self._cache.remove_by_subject_version(subject_name, version)
11781177
else:
11791178
response = await self._rest_client.delete(
11801179
'subjects/{}/versions/{}'.format(_urlencode(subject_name), version)
11811180
)
11821181

1182+
# Clear cache for both soft and hard deletes to maintain consistency
1183+
self._cache.remove_by_subject_version(subject_name, version)
1184+
11831185
return response
11841186

11851187
async def set_compatibility(self, subject_name: Optional[str] = None, level: Optional[str] = None) -> str:

src/confluent_kafka/schema_registry/_sync/schema_registry_client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1174,12 +1174,14 @@ def delete_version(self, subject_name: str, version: int, permanent: bool = Fals
11741174
response = self._rest_client.delete(
11751175
'subjects/{}/versions/{}?permanent=true'.format(_urlencode(subject_name), version)
11761176
)
1177-
self._cache.remove_by_subject_version(subject_name, version)
11781177
else:
11791178
response = self._rest_client.delete(
11801179
'subjects/{}/versions/{}'.format(_urlencode(subject_name), version)
11811180
)
11821181

1182+
# Clear cache for both soft and hard deletes to maintain consistency
1183+
self._cache.remove_by_subject_version(subject_name, version)
1184+
11831185
return response
11841186

11851187
def set_compatibility(self, subject_name: Optional[str] = None, level: Optional[str] = None) -> str:

src/confluent_kafka/schema_registry/common/schema_registry_client.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,12 @@ def remove_by_subject_version(self, subject: str, version: int):
268268

269269
with self.lock:
270270
if subject in self.rs_id_index:
271-
for schema_id, registered_schema in self.rs_id_index[subject].items():
271+
for schema_id, registered_schema in list(self.rs_id_index[subject].items()):
272272
if registered_schema.version == version:
273-
del self.rs_schema_index[subject][schema_id]
273+
del self.rs_id_index[subject][schema_id]
274+
274275
if subject in self.rs_schema_index:
275-
for schema, registered_schema in self.rs_schema_index[subject].items():
276+
for schema, registered_schema in list(self.rs_schema_index[subject].items()):
276277
if registered_schema.version == version:
277278
del self.rs_schema_index[subject][schema]
278279
rs = None
@@ -284,6 +285,7 @@ def remove_by_subject_version(self, subject: str, version: int):
284285
if subject in self.schema_id_index:
285286
if rs.schema_id in self.schema_id_index[subject]:
286287
del self.schema_id_index[subject][rs.schema_id]
288+
if subject in self.schema_index:
287289
if rs.schema in self.schema_index[subject]:
288290
del self.schema_index[subject][rs.schema]
289291

tests/integration/schema_registry/_async/test_api_client.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,34 @@ async def test_api_delete_subject_version(kafka_cluster, load_file):
422422
assert subject not in await sr.get_subjects()
423423

424424

425+
async def test_api_delete_version_soft_then_hard(kafka_cluster, load_file):
426+
"""
427+
Performs a soft delete followed by a hard delete with cache populated and cleared correctly.
428+
"""
429+
sr = kafka_cluster.async_schema_registry()
430+
431+
schema = Schema(load_file('basic_schema.avsc'), schema_type='AVRO')
432+
subject = str(uuid1())
433+
434+
# Register schema and trigger cache population
435+
await sr.register_schema(subject, schema)
436+
registered = await sr.lookup_schema(subject, schema)
437+
version = registered.version
438+
assert sr._cache.get_registered_by_subject_version(subject, version) is not None
439+
440+
# Verify soft delete clears cache
441+
deleted_version = await sr.delete_version(subject, version, permanent=False)
442+
assert deleted_version == version
443+
assert sr._cache.get_registered_by_subject_version(subject, version) is None
444+
445+
# Verify hard delete proceeds without error
446+
deleted_version = await sr.delete_version(subject, version, permanent=True)
447+
assert deleted_version == version
448+
449+
# Verify subject is fully deleted
450+
assert subject not in await sr.get_subjects()
451+
452+
425453
async def test_api_subject_config_update(kafka_cluster, load_file):
426454
"""
427455
Updates a subjects compatibility policy then ensures the same policy

tests/integration/schema_registry/_sync/test_api_client.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,34 @@ def test_api_delete_subject_version(kafka_cluster, load_file):
422422
assert subject not in sr.get_subjects()
423423

424424

425+
def test_api_delete_version_soft_then_hard(kafka_cluster, load_file):
426+
"""
427+
Performs a soft delete followed by a hard delete with cache populated and cleared correctly.
428+
"""
429+
sr = kafka_cluster.schema_registry()
430+
431+
schema = Schema(load_file('basic_schema.avsc'), schema_type='AVRO')
432+
subject = str(uuid1())
433+
434+
# Register schema and trigger cache population
435+
sr.register_schema(subject, schema)
436+
registered = sr.lookup_schema(subject, schema)
437+
version = registered.version
438+
assert sr._cache.get_registered_by_subject_version(subject, version) is not None
439+
440+
# Verify soft delete clears cache
441+
deleted_version = sr.delete_version(subject, version, permanent=False)
442+
assert deleted_version == version
443+
assert sr._cache.get_registered_by_subject_version(subject, version) is None
444+
445+
# Verify hard delete proceeds without error
446+
deleted_version = sr.delete_version(subject, version, permanent=True)
447+
assert deleted_version == version
448+
449+
# Verify subject is fully deleted
450+
assert subject not in sr.get_subjects()
451+
452+
425453
def test_api_subject_config_update(kafka_cluster, load_file):
426454
"""
427455
Updates a subjects compatibility policy then ensures the same policy

0 commit comments

Comments
 (0)