diff --git a/kafka/cluster.py b/kafka/cluster.py index d6ec82dba..ded8c6f96 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -245,13 +245,6 @@ def update_metadata(self, metadata): Returns: None """ - # In the common case where we ask for a single topic and get back an - # error, we should fail the future - if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno: - error_code, topic = metadata.topics[0][:2] - error = Errors.for_code(error_code)(topic) - return self.failed_update(error) - if not metadata.brokers: log.warning("No broker metadata found in MetadataResponse -- ignoring.") return self.failed_update(Errors.MetadataEmptyBrokerList(metadata)) @@ -349,7 +342,15 @@ def update_metadata(self, metadata): self._last_successful_refresh_ms = now if f: - f.success(self) + # In the common case where we ask for a single topic and get back an + # error, we should fail the future + if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno: + error_code, topic = metadata.topics[0][:2] + error = Errors.for_code(error_code)(topic) + f.failure(error) + else: + f.success(self) + log.debug("Updated cluster metadata to %s", self) for listener in self._listeners: diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 2416cc3ef..9401bd814 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -960,9 +960,11 @@ def _wait_on_metadata(self, topic, max_wait_ms): future.add_both(lambda e, *args: e.set(), metadata_event) self._sender.wakeup() metadata_event.wait(timer.timeout_ms / 1000) - if not metadata_event.is_set(): + if not future.is_done: raise Errors.KafkaTimeoutError( "Failed to update metadata after %.1f secs." % (max_wait_ms / 1000,)) + elif future.failed() and not future.retriable(): + raise future.exception elif topic in self._metadata.unauthorized_topics: raise Errors.TopicAuthorizationFailedError(set([topic])) else: diff --git a/test/test_cluster.py b/test/test_cluster.py index c57bd8f9f..730b27cb6 100644 --- a/test/test_cluster.py +++ b/test/test_cluster.py @@ -136,6 +136,21 @@ def test_metadata_v7(): assert cluster._partitions['topic-1'][0].leader_epoch == 0 +def test_unauthorized_topic(): + cluster = ClusterMetadata() + assert len(cluster.brokers()) == 0 + + cluster.update_metadata(MetadataResponse[0]( + [(0, 'foo', 12), (1, 'bar', 34)], + [(29, 'unauthorized-topic', [])])) # single topic w/ unauthorized error + + # broker metadata should get updated + assert len(cluster.brokers()) == 2 + + # topic should be added to unauthorized list + assert 'unauthorized-topic' in cluster.unauthorized_topics + + def test_collect_hosts__happy_path(): hosts = "127.0.0.1:1234,127.0.0.1" results = collect_hosts(hosts)