From f40ca953e9dbbaaf3511bfa70ac345eccff4ec6a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 4 Jun 2025 14:58:42 -0700 Subject: [PATCH 1/4] Test unauthorized topic metadata response --- test/test_cluster.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/test/test_cluster.py b/test/test_cluster.py index c57bd8f9f..730b27cb6 100644 --- a/test/test_cluster.py +++ b/test/test_cluster.py @@ -136,6 +136,21 @@ def test_metadata_v7(): assert cluster._partitions['topic-1'][0].leader_epoch == 0 +def test_unauthorized_topic(): + cluster = ClusterMetadata() + assert len(cluster.brokers()) == 0 + + cluster.update_metadata(MetadataResponse[0]( + [(0, 'foo', 12), (1, 'bar', 34)], + [(29, 'unauthorized-topic', [])])) # single topic w/ unauthorized error + + # broker metadata should get updated + assert len(cluster.brokers()) == 2 + + # topic should be added to unauthorized list + assert 'unauthorized-topic' in cluster.unauthorized_topics + + def test_collect_hosts__happy_path(): hosts = "127.0.0.1:1234,127.0.0.1" results = collect_hosts(hosts) From 2b391a11bad0274eb100c2286403c8d7e317e331 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 4 Jun 2025 14:59:15 -0700 Subject: [PATCH 2/4] Do not fail/ignore metadata update for single topic with error --- kafka/cluster.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/kafka/cluster.py b/kafka/cluster.py index d6ec82dba..3930d27cb 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -245,13 +245,6 @@ def update_metadata(self, metadata): Returns: None """ - # In the common case where we ask for a single topic and get back an - # error, we should fail the future - if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno: - error_code, topic = metadata.topics[0][:2] - error = Errors.for_code(error_code)(topic) - return self.failed_update(error) - if not metadata.brokers: log.warning("No broker metadata found in MetadataResponse -- ignoring.") return self.failed_update(Errors.MetadataEmptyBrokerList(metadata)) @@ -349,7 +342,15 @@ def update_metadata(self, metadata): self._last_successful_refresh_ms = now if f: - f.success(self) + # In the common case where we ask for a single topic and get back an + # error, we should fail the future + if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno: + error_code, topic = metadata.topics[0][:2] + error = Errors.for_code(error_code)(topic) + return f.failure(error) + else: + f.success(self) + log.debug("Updated cluster metadata to %s", self) for listener in self._listeners: From 2df2cc20d1dffa244fabe366acc4e4efcd288782 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 4 Jun 2025 15:19:41 -0700 Subject: [PATCH 3/4] dont return --- kafka/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/cluster.py b/kafka/cluster.py index 3930d27cb..ded8c6f96 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -347,7 +347,7 @@ def update_metadata(self, metadata): if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno: error_code, topic = metadata.topics[0][:2] error = Errors.for_code(error_code)(topic) - return f.failure(error) + f.failure(error) else: f.success(self) From e9b75d44548047d3a537313d7bbd473cd3e3c199 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 5 Jun 2025 09:27:03 -0700 Subject: [PATCH 4/4] Raise non-retriable errors from producer _wait_on_metadata --- kafka/producer/kafka.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 2416cc3ef..9401bd814 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -960,9 +960,11 @@ def _wait_on_metadata(self, topic, max_wait_ms): future.add_both(lambda e, *args: e.set(), metadata_event) self._sender.wakeup() metadata_event.wait(timer.timeout_ms / 1000) - if not metadata_event.is_set(): + if not future.is_done: raise Errors.KafkaTimeoutError( "Failed to update metadata after %.1f secs." % (max_wait_ms / 1000,)) + elif future.failed() and not future.retriable(): + raise future.exception elif topic in self._metadata.unauthorized_topics: raise Errors.TopicAuthorizationFailedError(set([topic])) else: