diff --git a/kafka/cluster.py b/kafka/cluster.py index 438baf29d..4b07cc749 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -236,7 +236,7 @@ def update_metadata(self, metadata): """ # In the common case where we ask for a single topic and get back an # error, we should fail the future - if len(metadata.topics) == 1 and metadata.topics[0][0] != 0: + if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno: error_code, topic = metadata.topics[0][:2] error = Errors.for_code(error_code)(topic) return self.failed_update(error) diff --git a/kafka/errors.py b/kafka/errors.py index b33cf51e2..5586e4113 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -512,15 +512,15 @@ def _iter_broker_errors(): def for_code(error_code): - return kafka_errors.get(error_code, UnknownError) - - -def check_error(response): - if isinstance(response, Exception): - raise response - if response.error: - error_class = kafka_errors.get(response.error, UnknownError) - raise error_class(response) + if error_code in kafka_errors: + return kafka_errors[error_code] + else: + # The broker error code was not found in our list. This can happen when connecting + # to a newer broker (with new error codes), or simply because our error list is + # not complete. + # + # To avoid dropping the error code, create a dynamic error class w/ errno override. + return type('UnrecognizedBrokerError', (UnknownError,), {'errno': error_code}) RETRY_BACKOFF_ERROR_TYPES = (