Skip to content

Commit a0d969f

Browse files
rnpridgeonRyan P
authored andcommitted
Make KafkaError instantiable
1 parent 7b9a95a commit a0d969f

File tree

8 files changed

+263
-103
lines changed

8 files changed

+263
-103
lines changed

confluent_kafka/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818

1919
from .deserializing_consumer import DeserializingConsumer
2020
from .serializing_producer import SerializingProducer
21+
from .error import KafkaException, KafkaError
2122

2223
from .cimpl import (Producer,
2324
Consumer,
24-
KafkaError,
25-
KafkaException,
2625
Message,
2726
TopicPartition,
2827
libversion,

confluent_kafka/deserializing_consumer.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
# limitations under the License.
1717
#
1818

19-
from confluent_kafka.cimpl import (KafkaError,
20-
Consumer as _ConsumerImpl)
21-
from .error import ConsumeError
22-
from .serialization import (SerializationError,
23-
SerializationContext,
19+
from confluent_kafka.cimpl import Consumer as _ConsumerImpl
20+
from .error import (ConsumeError,
21+
KeyDeserializationError,
22+
ValueDeserializationError)
23+
from .serialization import (SerializationContext,
2424
MessageField)
2525

2626

@@ -115,6 +115,12 @@ def poll(self, timeout=-1):
115115
:py:class:`Message` or None on timeout
116116
117117
Raises:
118+
KeyDeserializationError: If an error occurs during key
119+
deserialization.
120+
121+
ValueDeserializationError: If an error occurs during value
122+
deserialization.
123+
118124
ConsumeError if an error was encountered while polling.
119125
120126
"""
@@ -131,20 +137,16 @@ def poll(self, timeout=-1):
131137
if self._value_deserializer is not None:
132138
try:
133139
value = self._value_deserializer(value, ctx)
134-
except SerializationError as se:
135-
raise ConsumeError(KafkaError._VALUE_DESERIALIZATION,
136-
reason=se.message,
137-
message=msg)
140+
except Exception as se:
141+
raise ValueDeserializationError(exception=se, message=msg)
138142

139143
key = msg.key()
140144
ctx.field = MessageField.KEY
141145
if self._key_deserializer is not None:
142146
try:
143147
key = self._key_deserializer(key, ctx)
144-
except SerializationError as se:
145-
raise ConsumeError(KafkaError._KEY_DESERIALIZATION,
146-
reason=se.message,
147-
message=msg)
148+
except Exception as se:
149+
raise KeyDeserializationError(exception=se, message=msg)
148150

149151
msg.set_key(key)
150152
msg.set_value(value)

confluent_kafka/error.py

Lines changed: 105 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
#
18+
from confluent_kafka.cimpl import KafkaException, KafkaError
1819

20+
from confluent_kafka.serialization import SerializationError
1921

20-
class ConsumeError(Exception):
21-
__slots__ = ['message', 'reason', 'error_code']
2222

23+
class ConsumeError(KafkaException):
2324
"""
2425
Wraps all errors encountered during the consumption of a message.
2526
@@ -28,25 +29,113 @@ class ConsumeError(Exception):
2829
may be retrieved from the ``message`` attribute.
2930
3031
Args:
31-
error (KafkaError): The error that occurred.
32+
error_code (KafkaError): Error code indicating the type of error.
3233
33-
message (Message, optional): The message returned from the broker.
34+
exception(Exception, optional): The original exception
3435
35-
reason (str): String description of the error.
36+
message (Message, optional): The Kafka Message returned from the broker.
3637
3738
"""
39+
def __init__(self, error_code, exception=None, message=None):
40+
if exception is not None:
41+
kafka_error = KafkaError(error_code, repr(exception))
42+
self.exception = exception
43+
else:
44+
kafka_error = KafkaError(error_code)
45+
self.exception = None
3846

39-
def __init__(self, error, reason=None, message=None):
40-
self.error = error
41-
if reason is None:
42-
reason = error.str()
43-
44-
self.reason = reason
47+
super(ConsumeError, self).__init__(kafka_error)
4548
self.message = message
4649

47-
def __repr__(self):
48-
return str(self)
50+
@property
51+
def code(self):
52+
return self.code()
53+
54+
@property
55+
def name(self):
56+
return self.name()
57+
58+
59+
class KeyDeserializationError(ConsumeError, SerializationError):
60+
"""
61+
Wraps all errors encountered during the deserialization of a Kafka
62+
Message's key.
63+
64+
Args:
65+
exception(Exception, optional): The original exception
66+
67+
message (Message, optional): The Kafka Message returned from the broker.
68+
69+
"""
70+
def __init__(self, exception=None, message=None):
71+
super(KeyDeserializationError, self).__init__(
72+
KafkaError._KEY_DESERIALIZATION, exception=exception, message=message)
73+
74+
75+
class ValueDeserializationError(ConsumeError, SerializationError):
76+
"""
77+
Wraps all errors encountered during the deserialization of a Kafka
78+
Message's value.
79+
80+
Args:
81+
exception(Exception, optional): The original exception
82+
83+
message (Message, optional): The Kafka Message returned from the broker.
84+
85+
"""
86+
def __init__(self, exception=None, message=None):
87+
super(ValueDeserializationError, self).__init__(
88+
KafkaError._VALUE_DESERIALIZATION, exception=exception, message=message)
89+
90+
91+
class ProduceError(KafkaException):
92+
"""
93+
Wraps all errors encountered when Producing messages.
94+
95+
Args:
96+
error_code (KafkaError): Error code indicating the type of error.
97+
98+
exception(Exception, optional): The original exception.
99+
100+
"""
101+
def __init__(self, error_code, exception=None):
102+
if exception is not None:
103+
kafka_error = KafkaError(error_code, repr(exception))
104+
self.exception = exception
105+
else:
106+
kafka_error = KafkaError(error_code)
107+
self.exception = None
108+
109+
super(ProduceError, self).__init__(kafka_error)
110+
111+
@property
112+
def code(self):
113+
return self.code()
114+
115+
@property
116+
def name(self):
117+
return self.name()
118+
49119

50-
def __str__(self):
51-
return "{} (KafkaError code {})".format(self.reason,
52-
self.error)
120+
class KeySerializationError(ProduceError, SerializationError):
121+
"""
122+
Wraps all errors encountered during the serialization of a Message key.
123+
124+
Args:
125+
exception (Exception): The exception that occurred during serialization.
126+
"""
127+
def __init__(self, exception=None):
128+
super(KeySerializationError, self).__init__(
129+
KafkaError._KEY_SERIALIZATION, exception=exception)
130+
131+
132+
class ValueSerializationError(ProduceError, SerializationError):
133+
"""
134+
Wraps all errors encountered during the serialization of a Message value.
135+
136+
Args:
137+
exception (Exception): The exception that occurred during serialization.
138+
"""
139+
def __init__(self, exception=None):
140+
super(ValueSerializationError, self).__init__(
141+
KafkaError._VALUE_SERIALIZATION, exception=exception)

confluent_kafka/serialization/__init__.py

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# limitations under the License.
1717
#
1818
import struct as _struct
19+
from confluent_kafka.error import KafkaException
1920

2021
__all__ = ['Deserializer',
2122
'IntegerDeserializer',
@@ -61,35 +62,8 @@ def __init__(self, topic, field):
6162
self.field = field
6263

6364

64-
class SerializationError(Exception):
65+
class SerializationError(KafkaException):
6566
"""Generic error from serializer package"""
66-
67-
def __init__(self, message):
68-
self.message = message
69-
70-
def __repr__(self):
71-
return '{klass}(error={error})'.format(
72-
klass=self.__class__.__name__,
73-
error=self.message
74-
)
75-
76-
def __str__(self):
77-
return self.message
78-
79-
80-
class KeySerializationError(SerializationError):
81-
pass
82-
83-
84-
class KeyDeserializationError(SerializationError):
85-
pass
86-
87-
88-
class ValueSerializationError(SerializationError):
89-
pass
90-
91-
92-
class ValueDeserializationError(SerializationError):
9367
pass
9468

9569

confluent_kafka/serializing_producer.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
# limitations under the License.
1717
#
1818

19-
from .cimpl import Producer as _ProducerImpl
19+
from confluent_kafka.cimpl import Producer as _ProducerImpl
2020
from .serialization import (MessageField,
2121
SerializationContext)
22+
from .error import (KeySerializationError,
23+
ValueSerializationError)
2224

2325

2426
class SerializingProducer(_ProducerImpl):
@@ -152,16 +154,26 @@ def produce(self, topic, key=None, value=None, partition=-1,
152154
the application should call :py:func:`SerializingProducer.Poll`
153155
and try again.
154156
155-
KafkaException: for other errors, see exception code
157+
KeySerializationError: If an error occurs during key serialization.
158+
159+
ValueSerializationError: If an error occurs during value
160+
serialization.
161+
162+
ProduceException: For all other errors
156163
157164
"""
158165
ctx = SerializationContext(topic, MessageField.KEY)
159166
if self._key_serializer is not None:
160-
key = self._key_serializer(key, ctx)
161-
167+
try:
168+
key = self._key_serializer(key, ctx)
169+
except Exception as se:
170+
raise KeySerializationError(se)
162171
ctx.field = MessageField.VALUE
163172
if self._value_serializer is not None:
164-
value = self._value_serializer(value, ctx)
173+
try:
174+
value = self._value_serializer(value, ctx)
175+
except Exception as se:
176+
raise ValueSerializationError(se)
165177

166178
super(SerializingProducer, self).produce(topic, value, key,
167179
headers=headers,

0 commit comments

Comments
 (0)