Skip to content

Commit cc70e6a

Browse files
committed
Attach the message associated with an error to KafkaException.
Append the name of the target topic to error messages when it may be relevant. This latter feature uses a hard-coded list of error codes, which is not ideal, but seems tolerable, as this only affects the exception error message, is never technically wrong (since the error arose from a message being sent to the referenced topic), the full information is always available though the `error` and `message` subobjects, and the set of relevant errors changes infrequently.
1 parent 135eb29 commit cc70e6a

File tree

1 file changed

+55
-7
lines changed

1 file changed

+55
-7
lines changed

adc/errors.py

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,69 @@ def log_delivery_errors(
3434
def raise_delivery_errors(kafka_error: confluent_kafka.KafkaError,
3535
msg: confluent_kafka.Message) -> None:
3636
if kafka_error is not None:
37-
raise KafkaException.from_kafka_error(kafka_error)
37+
raise KafkaException.from_kafka_error(kafka_error, msg)
3838
elif msg.error() is not None:
39-
raise KafkaException.from_kafka_error(msg.error())
39+
raise KafkaException.from_kafka_error(msg.error(), msg)
40+
41+
42+
def _get_topic_related_errors():
43+
"""Build a set of all Kafka error codes which seem to relate to a specific topic.
44+
45+
This uses a list extracted from all documented error codes up to confluent_kafka v2.4,
46+
but some of these errors did not exist or were not exposed in earlier versions.
47+
To maintain backward compatibility, this function checks whether each error exists before
48+
attempting to otherwise refer to it.
49+
"""
50+
err_names = [
51+
"_UNKNOWN_TOPIC",
52+
"_NO_OFFSET",
53+
"_LOG_TRUNCATION",
54+
"OFFSET_OUT_OF_RANGE",
55+
"UNKNOWN_TOPIC_OR_PART",
56+
"NOT_LEADER_FOR_PARTITION",
57+
"TOPIC_EXCEPTION",
58+
"NOT_ENOUGH_REPLICAS",
59+
"NOT_ENOUGH_REPLICAS_AFTER_APPEND",
60+
"INVALID_COMMIT_OFFSET_SIZE",
61+
"TOPIC_AUTHORIZATION_FAILED",
62+
"TOPIC_ALREADY_EXISTS",
63+
"INVALID_PARTITIONS",
64+
"INVALID_REPLICATION_FACTOR",
65+
"INVALID_REPLICA_ASSIGNMENT",
66+
"REASSIGNMENT_IN_PROGRESS",
67+
"TOPIC_DELETION_DISABLED",
68+
"OFFSET_NOT_AVAILABLE",
69+
"PREFERRED_LEADER_NOT_AVAILABLE",
70+
"NO_REASSIGNMENT_IN_PROGRESS",
71+
"GROUP_SUBSCRIBED_TO_TOPIC",
72+
"UNSTABLE_OFFSET_COMMIT",
73+
"UNKNOWN_TOPIC_ID",
74+
]
75+
errors = set()
76+
for name in err_names:
77+
if hasattr(confluent_kafka.KafkaError, name):
78+
errors.add(getattr(confluent_kafka.KafkaError, name))
79+
else:
80+
logger.debug(f"{name} does not exist in confluent_kafka version "
81+
f"{confluent_kafka.__version__} ({confluent_kafka.libversion()})")
82+
return errors
4083

4184

4285
class KafkaException(Exception):
4386
@classmethod
44-
def from_kafka_error(cls, error):
45-
return cls(error)
87+
def from_kafka_error(cls, error, msg=None):
88+
return cls(error, msg)
89+
90+
topic_related_errors = _get_topic_related_errors()
4691

47-
def __init__(self, error):
92+
def __init__(self, error, msg=None):
4893
self.error = error
4994
self.name = error.name()
5095
self.reason = error.str()
5196
self.retriable = error.retriable()
5297
self.fatal = error.fatal()
53-
msg = f"Error communicating with Kafka: code={self.name} {self.reason}"
54-
super(KafkaException, self).__init__(msg)
98+
self.message = msg
99+
ex_msg = f"Error communicating with Kafka: code={self.name} {self.reason}"
100+
if msg and error.code() in KafkaException.topic_related_errors:
101+
ex_msg += f" on topic {msg.topic()}"
102+
super(KafkaException, self).__init__(ex_msg)

0 commit comments

Comments
 (0)