Skip to content

Commit 3cda236

Browse files
authored
Topics refactoring (#896)
1 parent 4898a91 commit 3cda236

File tree

6 files changed

+121
-190
lines changed

6 files changed

+121
-190
lines changed

quixstreams/dataframe/dataframe.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -505,31 +505,31 @@ def group_by(
505505
self,
506506
key: str,
507507
name: Optional[str] = ...,
508-
value_deserializer: Optional[DeserializerType] = ...,
509-
key_deserializer: Optional[DeserializerType] = ...,
510-
value_serializer: Optional[SerializerType] = ...,
511-
key_serializer: Optional[SerializerType] = ...,
508+
value_deserializer: DeserializerType = ...,
509+
key_deserializer: DeserializerType = ...,
510+
value_serializer: SerializerType = ...,
511+
key_serializer: SerializerType = ...,
512512
) -> "StreamingDataFrame": ...
513513

514514
@overload
515515
def group_by(
516516
self,
517517
key: Callable[[Any], Any],
518518
name: str,
519-
value_deserializer: Optional[DeserializerType] = ...,
520-
key_deserializer: Optional[DeserializerType] = ...,
521-
value_serializer: Optional[SerializerType] = ...,
522-
key_serializer: Optional[SerializerType] = ...,
519+
value_deserializer: DeserializerType = ...,
520+
key_deserializer: DeserializerType = ...,
521+
value_serializer: SerializerType = ...,
522+
key_serializer: SerializerType = ...,
523523
) -> "StreamingDataFrame": ...
524524

525525
def group_by(
526526
self,
527527
key: Union[str, Callable[[Any], Any]],
528528
name: Optional[str] = None,
529-
value_deserializer: Optional[DeserializerType] = "json",
530-
key_deserializer: Optional[DeserializerType] = "json",
531-
value_serializer: Optional[SerializerType] = "json",
532-
key_serializer: Optional[SerializerType] = "json",
529+
value_deserializer: DeserializerType = "json",
530+
key_deserializer: DeserializerType = "json",
531+
value_serializer: SerializerType = "json",
532+
key_serializer: SerializerType = "json",
533533
) -> "StreamingDataFrame":
534534
"""
535535
"Groups" messages by re-keying them via the provided group_by operation

quixstreams/models/serializers/exceptions.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
"KeySerializationError",
2323
"ValueSerializationError",
2424
"ValueDeserializationError",
25-
"SerializerIsNotProvidedError",
26-
"DeserializerIsNotProvidedError",
2725
"IgnoreMessage",
2826
)
2927

@@ -45,12 +43,6 @@ class ValueDeserializationError(
4543
): ...
4644

4745

48-
class SerializerIsNotProvidedError(exceptions.QuixException): ...
49-
50-
51-
class DeserializerIsNotProvidedError(exceptions.QuixException): ...
52-
53-
5446
class IgnoreMessage(exceptions.QuixException):
5547
"""
5648
Raise this exception from Deserializer.__call__ in order to ignore the processing

quixstreams/models/topics/manager.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ def topic_config(
136136
def topic(
137137
self,
138138
name: str,
139-
value_deserializer: Optional[DeserializerType] = None,
140-
key_deserializer: Optional[DeserializerType] = "bytes",
141-
value_serializer: Optional[SerializerType] = None,
142-
key_serializer: Optional[SerializerType] = "bytes",
139+
value_deserializer: DeserializerType = "json",
140+
key_deserializer: DeserializerType = "bytes",
141+
value_serializer: SerializerType = "json",
142+
key_serializer: SerializerType = "bytes",
143143
create_config: Optional[TopicConfig] = None,
144144
timestamp_extractor: Optional[TimestampExtractor] = None,
145145
) -> Topic:
@@ -204,10 +204,10 @@ def repartition_topic(
204204
operation: str,
205205
stream_id: str,
206206
config: TopicConfig,
207-
value_deserializer: Optional[DeserializerType] = "json",
208-
key_deserializer: Optional[DeserializerType] = "json",
209-
value_serializer: Optional[SerializerType] = "json",
210-
key_serializer: Optional[SerializerType] = "json",
207+
value_deserializer: DeserializerType = "json",
208+
key_deserializer: DeserializerType = "json",
209+
value_serializer: SerializerType = "json",
210+
key_serializer: SerializerType = "json",
211211
) -> Topic:
212212
"""
213213
Create an internal repartition topic.

quixstreams/models/topics/topic.py

Lines changed: 68 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,12 @@
1010
from quixstreams.models.serializers import (
1111
DESERIALIZERS,
1212
SERIALIZERS,
13-
BytesDeserializer,
14-
BytesSerializer,
1513
Deserializer,
16-
DeserializerIsNotProvidedError,
1714
DeserializerType,
1815
IgnoreMessage,
1916
MessageField,
2017
SerializationContext,
2118
Serializer,
22-
SerializerIsNotProvidedError,
2319
SerializerType,
2420
)
2521
from quixstreams.models.timestamps import TimestampType
@@ -57,29 +53,39 @@ def as_dict(self):
5753
return dataclasses.asdict(self)
5854

5955

60-
def _get_serializer(serializer: Optional[SerializerType]) -> Optional[Serializer]:
56+
def _resolve_serializer(serializer: SerializerType) -> Serializer:
6157
if isinstance(serializer, str):
6258
try:
6359
return SERIALIZERS[serializer]()
6460
except KeyError:
6561
raise ValueError(
66-
f"Unknown deserializer option '{serializer}'; "
62+
f'Unknown serializer option "{serializer}"; '
6763
f"valid options are {list(SERIALIZERS.keys())}"
6864
)
65+
elif not isinstance(serializer, Serializer):
66+
raise ValueError(
67+
f"Serializer must be either one of {list(SERIALIZERS.keys())} "
68+
f'or a subclass of Serializer; got "{serializer}"'
69+
)
6970
return serializer
7071

7172

72-
def _get_deserializer(
73-
deserializer: Optional[DeserializerType],
74-
) -> Optional[Deserializer]:
73+
def _resolve_deserializer(
74+
deserializer: DeserializerType,
75+
) -> Deserializer:
7576
if isinstance(deserializer, str):
7677
try:
7778
return DESERIALIZERS[deserializer]()
7879
except KeyError:
7980
raise ValueError(
80-
f"Unknown deserializer option '{deserializer}'; "
81+
f'Unknown deserializer option "{deserializer}"; '
8182
f"valid options are {list(DESERIALIZERS.keys())}"
8283
)
84+
elif not isinstance(deserializer, Deserializer):
85+
raise ValueError(
86+
f"Deserializer must be either one of {list(DESERIALIZERS.keys())} "
87+
f'or a subclass of Deserializer; got "{deserializer}"'
88+
)
8389
return deserializer
8490

8591

@@ -103,10 +109,10 @@ def __init__(
103109
name: str,
104110
topic_type: TopicType = TopicType.REGULAR,
105111
create_config: Optional[TopicConfig] = None,
106-
value_deserializer: Optional[DeserializerType] = None,
107-
key_deserializer: Optional[DeserializerType] = BytesDeserializer(),
108-
value_serializer: Optional[SerializerType] = None,
109-
key_serializer: Optional[SerializerType] = BytesSerializer(),
112+
value_deserializer: DeserializerType = "json",
113+
key_deserializer: DeserializerType = "bytes",
114+
value_serializer: SerializerType = "json",
115+
key_serializer: SerializerType = "bytes",
110116
timestamp_extractor: Optional[TimestampExtractor] = None,
111117
quix_name: str = "",
112118
):
@@ -133,10 +139,10 @@ def __init__(
133139
self.quix_name = quix_name or name
134140
self._create_config = copy.deepcopy(create_config)
135141
self._broker_config: Optional[TopicConfig] = None
136-
self._value_deserializer = _get_deserializer(value_deserializer)
137-
self._key_deserializer = _get_deserializer(key_deserializer)
138-
self._value_serializer = _get_serializer(value_serializer)
139-
self._key_serializer = _get_serializer(key_serializer)
142+
self._value_deserializer = _resolve_deserializer(value_deserializer)
143+
self._key_deserializer = _resolve_deserializer(key_deserializer)
144+
self._value_serializer = _resolve_serializer(value_serializer)
145+
self._key_serializer = _resolve_serializer(key_serializer)
140146
self._timestamp_extractor = timestamp_extractor
141147
self._type = topic_type
142148

@@ -202,36 +208,27 @@ def row_serialize(self, row: Row, key: Any) -> KafkaMessage:
202208
:param key: message key to serialize
203209
:return: KafkaMessage object with serialized values
204210
"""
205-
if self._key_serializer is None:
206-
raise SerializerIsNotProvidedError(
207-
f'Key serializer is not provided for topic "{self.name}"'
208-
)
209-
if self._value_serializer is None:
210-
raise SerializerIsNotProvidedError(
211-
f'Value serializer is not provided for topic "{self.name}"'
212-
)
213211

212+
serialization_ctx = SerializationContext(
213+
topic=self.name, field=MessageField.KEY, headers=row.headers
214+
)
214215
# Try to serialize the key only if it's not None
215216
# If key is None then pass it as is
216217
# Otherwise, different serializers may serialize None differently
217218
if key is None:
218219
key_serialized = None
219220
else:
220-
key_ctx = SerializationContext(
221-
topic=self.name, field=MessageField.KEY, headers=row.headers
222-
)
223-
key_serialized = self._key_serializer(key, ctx=key_ctx)
221+
key_serialized = self._key_serializer(key, ctx=serialization_ctx)
224222

225223
# Update message headers with headers supplied by the value serializer.
226224
extra_headers = self._value_serializer.extra_headers
227225
headers = merge_headers(row.headers, extra_headers)
228-
value_ctx = SerializationContext(
229-
topic=self.name, field=MessageField.VALUE, headers=row.headers
230-
)
226+
serialization_ctx.field = MessageField.VALUE
227+
value_serialized = self._value_serializer(row.value, ctx=serialization_ctx)
231228

232229
return KafkaMessage(
233230
key=key_serialized,
234-
value=self._value_serializer(row.value, ctx=value_ctx),
231+
value=value_serialized,
235232
headers=headers,
236233
)
237234

@@ -244,50 +241,46 @@ def row_deserialize(
244241
:param message: an object with interface of `confluent_kafka.Message`
245242
:return: Row, list of Rows or None if the message is ignored.
246243
"""
247-
if self._key_deserializer is None:
248-
raise DeserializerIsNotProvidedError(
249-
f'Key deserializer is not provided for topic "{self.name}"'
250-
)
251-
if self._value_deserializer is None:
252-
raise DeserializerIsNotProvidedError(
253-
f'Value deserializer is not provided for topic "{self.name}"'
254-
)
255-
256244
headers = message.headers()
245+
topic = message.topic()
246+
partition = message.partition()
247+
offset = message.offset()
257248

249+
serialization_ctx = SerializationContext(
250+
topic=topic, field=MessageField.KEY, headers=headers
251+
)
258252
if (key_bytes := message.key()) is None:
259253
key_deserialized = None
260254
else:
261-
key_ctx = SerializationContext(
262-
topic=message.topic(), field=MessageField.KEY, headers=headers
255+
key_deserialized = self._key_deserializer(
256+
value=key_bytes, ctx=serialization_ctx
263257
)
264-
key_deserialized = self._key_deserializer(value=key_bytes, ctx=key_ctx)
265258

266259
if (value_bytes := message.value()) is None:
267260
value_deserialized = None
268261
else:
269-
value_ctx = SerializationContext(
270-
topic=message.topic(), field=MessageField.VALUE, headers=headers
271-
)
262+
# Reuse the SerializationContext object here to avoid creating a new
263+
# one with almost the same fields
264+
serialization_ctx.field = MessageField.VALUE
272265
try:
273266
value_deserialized = self._value_deserializer(
274-
value=value_bytes, ctx=value_ctx
267+
value=value_bytes, ctx=serialization_ctx
275268
)
276269
except IgnoreMessage:
277270
# Ignore message completely if deserializer raised IgnoreValueError.
278271
logger.debug(
279272
'Ignore incoming message: partition="%s[%s]" offset="%s"',
280-
message.topic(),
281-
message.partition(),
282-
message.offset(),
273+
topic,
274+
partition,
275+
offset,
283276
)
284277
return None
285278

286279
timestamp_type, timestamp_ms = message.timestamp()
287280
message_context = MessageContext(
288-
topic=message.topic(),
289-
partition=message.partition(),
290-
offset=message.offset(),
281+
topic=topic,
282+
partition=partition,
283+
offset=offset,
291284
size=len(message),
292285
leader_epoch=message.leader_epoch(),
293286
)
@@ -312,8 +305,8 @@ def row_deserialize(
312305
)
313306
return rows
314307

315-
if self._timestamp_extractor:
316-
timestamp_ms = self._timestamp_extractor(
308+
if (timestamp_extractor := self._timestamp_extractor) is not None:
309+
timestamp_ms = timestamp_extractor(
317310
value_deserialized, headers, timestamp_ms, TimestampType(timestamp_type)
318311
)
319312

@@ -332,24 +325,13 @@ def serialize(
332325
headers: Optional[Headers] = None,
333326
timestamp_ms: Optional[int] = None,
334327
) -> KafkaMessage:
335-
if self._key_serializer:
336-
key_ctx = SerializationContext(
337-
topic=self.name, field=MessageField.KEY, headers=headers
338-
)
339-
key = self._key_serializer(key, ctx=key_ctx)
340-
elif key is not None:
341-
raise SerializerIsNotProvidedError(
342-
f'Key serializer is not provided for topic "{self.name}"'
343-
)
344-
if self._value_serializer:
345-
value_ctx = SerializationContext(
346-
topic=self.name, field=MessageField.VALUE, headers=headers
347-
)
348-
value = self._value_serializer(value, ctx=value_ctx)
349-
elif value is not None:
350-
raise SerializerIsNotProvidedError(
351-
f'Value serializer is not provided for topic "{self.name}"'
352-
)
328+
serialization_ctx = SerializationContext(
329+
topic=self.name, field=MessageField.KEY, headers=headers
330+
)
331+
key = self._key_serializer(key, ctx=serialization_ctx)
332+
serialization_ctx.field = MessageField.VALUE
333+
value = self._value_serializer(value, ctx=serialization_ctx)
334+
353335
return KafkaMessage(
354336
key=key,
355337
value=value,
@@ -358,30 +340,18 @@ def serialize(
358340
)
359341

360342
def deserialize(self, message: SuccessfulConfluentKafkaMessageProto):
343+
serialization_ctx = SerializationContext(
344+
topic=message.topic(),
345+
field=MessageField.KEY,
346+
headers=message.headers(),
347+
)
361348
if (key := message.key()) is not None:
362-
if self._key_deserializer:
363-
key_ctx = SerializationContext(
364-
topic=message.topic(),
365-
field=MessageField.KEY,
366-
headers=message.headers(),
367-
)
368-
key = self._key_deserializer(key, ctx=key_ctx)
369-
else:
370-
raise DeserializerIsNotProvidedError(
371-
f'Key deserializer is not provided for topic "{self.name}"'
372-
)
349+
key = self._key_deserializer(key, ctx=serialization_ctx)
350+
373351
if (value := message.value()) is not None:
374-
if self._value_deserializer:
375-
value_ctx = SerializationContext(
376-
topic=message.topic(),
377-
field=MessageField.VALUE,
378-
headers=message.headers(),
379-
)
380-
value = self._value_deserializer(value, ctx=value_ctx)
381-
else:
382-
raise DeserializerIsNotProvidedError(
383-
f'Value deserializer is not provided for topic "{self.name}"'
384-
)
352+
serialization_ctx.field = MessageField.VALUE
353+
value = self._value_deserializer(value, ctx=serialization_ctx)
354+
385355
return KafkaMessage(
386356
key=key,
387357
value=value,

0 commit comments

Comments
 (0)