From 159ee440a62d4d1209c46ff6cb26d4f21cb891a8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 29 Apr 2026 15:30:54 -0700 Subject: [PATCH] Eliminate NoBrokersAvailableError --- kafka/consumer/group.py | 2 +- kafka/coordinator/base.py | 4 ++-- kafka/errors.py | 5 ----- kafka/producer/kafka.py | 2 +- 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 89ca93fa5..f3d0c4d91 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -212,7 +212,7 @@ class KafkaConsumer: None, the client will attempt to determine the broker version via ApiVersionsRequest API or, for brokers earlier than 0.10, probing various known APIs. Dynamic version checking is performed eagerly - during __init__ and can raise NoBrokersAvailableError if no connection + during __init__ and can raise KafkaTimeoutError if no connection was made before timeout (see api_version_auto_timeout_ms below). Different versions enable different functionality. diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index bb8fb4284..6034ac519 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -289,7 +289,7 @@ def ensure_coordinator_ready(self, timeout_ms=None): if self.config['api_version'] < (0, 8, 2): maybe_coordinator_id = self._client.least_loaded_node() if maybe_coordinator_id is None: - future = Future().failure(Errors.NoBrokersAvailable()) + future = Future().failure(Errors.NodeNotReadyError('coordinator')) else: self.coordinator_id = maybe_coordinator_id self._client.maybe_connect(self.coordinator_id) @@ -796,7 +796,7 @@ def _send_group_coordinator_request(self): """ node_id = self._client.least_loaded_node() if node_id is None: - return Future().failure(Errors.NoBrokersAvailable()) + return Future().failure(Errors.NodeNotReadyError('coordinator')) elif not self._client.ready(node_id, metadata_priority=False): e = Errors.NodeNotReadyError(node_id) diff --git a/kafka/errors.py b/kafka/errors.py index bea1c814f..f565e35a7 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -62,11 +62,6 @@ class MetadataEmptyBrokerList(KafkaError): retriable = True -class NoBrokersAvailable(KafkaError): - retriable = True - invalid_metadata = True - - class NoOffsetForPartitionError(KafkaError): pass diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index fd55483dd..5bca69ace 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -327,7 +327,7 @@ class KafkaProducer: None, the client will attempt to determine the broker version via ApiVersionsRequest API or, for brokers earlier than 0.10, probing various known APIs. Dynamic version checking is performed eagerly - during __init__ and can raise NoBrokersAvailableError if no connection + during __init__ and can raise KafkaTimeoutError if no connection was made before timeout (see api_version_auto_timeout_ms below). Different versions enable different functionality.