Skip to content

Commit 48d1ace

Browse files
Feature: dynamic target topic routing (#976)
Co-authored-by: Daniil Gusev <[email protected]>
1 parent 9f95914 commit 48d1ace

File tree

3 files changed

+143
-4
lines changed

3 files changed

+143
-4
lines changed

docs/processing.md

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,65 @@ To publish the current value of the `StreamingDataFrame` to a topic, simply call
385385
from `Application.topic()`
386386
as an argument.
387387

388-
Similarly to other methods, `.to_topic()` creates a new `StreamingDataFrame` object.
388+
Similarly to other methods, `.to_topic()` creates a new `StreamingDataFrame` object:
389+
390+
```python
391+
from quixstreams import Application
392+
393+
app = Application(broker_address='localhost:9092')
394+
395+
# Declare input and output topics
396+
input_topic = app.topic('input', value_deserializer='json')
397+
output_topic = app.topic('output', value_serializer='json')
398+
399+
sdf = app.dataframe(input_topic)
400+
401+
# Produce data to the output topic
402+
sdf = sdf.to_topic(topic=output_topic)
403+
```
404+
405+
### Splitting data into multiple topics
406+
407+
To dynamically route messages to different topics based on the message content, you can provide a callable that returns a `Topic` object to the `StreamingDataFrame.to_topic()` method:
408+
409+
```python
410+
from quixstreams import Application
411+
from typing import Any
412+
413+
app = Application(broker_address='localhost:9092')
414+
415+
# Declare topics
416+
input_topic = app.topic('sensor-data', value_deserializer='json')
417+
normal_topic = app.topic('normal-readings', value_serializer='json')
418+
alert_topic = app.topic('high-temp-alerts', value_serializer='json')
419+
420+
sdf = app.dataframe(input_topic)
421+
422+
def route_by_temperature(value: Any, key: Any, timestamp: int, headers):
423+
"""
424+
Send messages to different topics based on temperature sensor value
425+
"""
426+
if value.get('temperature', 0) > 80:
427+
return alert_topic
428+
else:
429+
return normal_topic
430+
431+
sdf = sdf.to_topic(topic=route_by_temperature)
432+
433+
# You can also combine it together with `key` transformation to change the message key
434+
sdf = sdf.to_topic(
435+
topic=route_by_temperature,
436+
key=lambda value: f"sensor-{value['sensor_id']}"
437+
)
438+
```
439+
440+
!!! warning "Important Considerations"
441+
442+
We recommend declaring all `Topic` instances before staring the application instead of creating them dynamically within the passed callback.
443+
444+
Creating topics dynamically can lead to accidentally creating numerous topics and saturating the broker's partitions limits.
445+
Also, each `app.topic()` calls checks if the topic exists in the cluster and attempts to create the missing one when `auto_create_topics=True` is passed to the `Application`.
446+
389447

390448
### Changing Message Key Before Producing
391449

quixstreams/dataframe/dataframe.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -667,8 +667,24 @@ def callback(value, *_):
667667

668668
return StreamingSeries.from_apply_callback(callback, sdf_id=id(self))
669669

670+
@overload
670671
def to_topic(
671-
self, topic: Topic, key: Optional[Callable[[Any], Any]] = None
672+
self,
673+
topic: Topic,
674+
key: Optional[Callable[[Any], Any]] = None,
675+
) -> "StreamingDataFrame": ...
676+
677+
@overload
678+
def to_topic(
679+
self,
680+
topic: Callable[[Any, Any, int, Any], Topic],
681+
key: Optional[Callable[[Any], Any]] = None,
682+
) -> "StreamingDataFrame": ...
683+
684+
def to_topic(
685+
self,
686+
topic: Union[Topic, Callable[[Any, Any, int, Any], Topic]],
687+
key: Optional[Callable[[Any], Any]] = None,
672688
) -> "StreamingDataFrame":
673689
"""
674690
Produce current value to a topic. You can optionally specify a new key.
@@ -692,18 +708,40 @@ def to_topic(
692708
sdf = sdf.to_topic(output_topic_0)
693709
# does not require reassigning
694710
sdf.to_topic(output_topic_1, key=lambda data: data["a_field"])
711+
712+
# Dynamic topic selection based on message content
713+
def select_topic(value, key, timestamp, headers):
714+
if value.get("priority") == "high":
715+
return output_topic_0
716+
else:
717+
return output_topic_1
718+
719+
sdf = sdf.to_topic(select_topic)
695720
```
696721
697-
:param topic: instance of `Topic`
722+
:param topic: instance of `Topic` or a callable that returns a `Topic`.
723+
If a callable is provided, it will receive four arguments:
724+
value, key, timestamp, and headers of the current message.
725+
The callable must return a `Topic` object.
726+
**Important**: We recommend declaring all `Topic` instances before
727+
staring the application instead of creating them dynamically
728+
within the passed callback. Creating topics dynamically can lead
729+
to accidentally creating numerous topics and
730+
saturating the broker's partitions limits.
698731
:param key: a callable to generate a new message key, optional.
699732
If passed, the return type of this callable must be serializable
700733
by `key_serializer` defined for this Topic object.
701734
By default, the current message key will be used.
702735
:return: the updated StreamingDataFrame instance (reassignment NOT required).
703736
"""
737+
if isinstance(topic, Topic):
738+
topic_callback = lambda value, orig_key, timestamp, headers: topic
739+
else:
740+
topic_callback = topic
741+
704742
return self._add_update(
705743
lambda value, orig_key, timestamp, headers: self._produce(
706-
topic=topic,
744+
topic=topic_callback(value, orig_key, timestamp, headers),
707745
value=value,
708746
key=orig_key if key is None else key(value),
709747
timestamp=timestamp,

tests/test_quixstreams/test_dataframe/test_dataframe.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from quixstreams.dataframe.windows.base import WindowResult
1818
from quixstreams.models import TopicConfig
1919
from quixstreams.models.topics.exceptions import TopicPartitionsMismatch
20+
from quixstreams.models.topics.topic import Topic
2021
from quixstreams.utils.stream_id import stream_id_from_strings
2122
from tests.utils import DummySink
2223

@@ -779,6 +780,48 @@ def test_to_topic_multiple_topics_out(
779780
assert consumed_row.key == key
780781
assert consumed_row.value == value
781782

783+
def test_to_topic_dynamic_topic(
784+
self,
785+
dataframe_factory,
786+
internal_consumer_factory,
787+
internal_producer_factory,
788+
topic_manager_topic_factory,
789+
message_context_factory,
790+
):
791+
topic_0 = topic_manager_topic_factory(
792+
value_serializer="json", value_deserializer="json"
793+
)
794+
topic_1 = topic_manager_topic_factory(
795+
value_serializer="json", value_deserializer="json"
796+
)
797+
798+
def topic_callback(value: Any, key: Any, timestamp: int, headers: Any) -> Topic:
799+
return topic_0 if value == 0 else topic_1
800+
801+
producer = internal_producer_factory()
802+
803+
sdf = dataframe_factory(producer=producer)
804+
sdf = sdf.to_topic(topic_callback)
805+
806+
key, timestamp = b"key", 10
807+
ctx = message_context_factory()
808+
809+
with producer:
810+
sdf.test(value=0, key=key, timestamp=timestamp, ctx=ctx)
811+
sdf.test(value=1, key=key, timestamp=timestamp, ctx=ctx)
812+
813+
with internal_consumer_factory(auto_offset_reset="earliest") as consumer:
814+
consumer.subscribe([topic_0])
815+
row_0 = consumer.poll_row(timeout=5.0)
816+
assert row_0.topic == topic_0.name
817+
assert row_0.value == 0
818+
819+
with internal_consumer_factory(auto_offset_reset="earliest") as consumer:
820+
consumer.subscribe([topic_1])
821+
row_1 = consumer.poll_row(timeout=5.0)
822+
assert row_1.topic == topic_1.name
823+
assert row_1.value == 1
824+
782825

783826
class TestStreamingDataframeStateful:
784827
def test_apply_stateful(

0 commit comments

Comments
 (0)