Skip to content

Commit e1cf56e

Browse files
committed
Heartbeat basics
1 parent 8b9df35 commit e1cf56e

File tree

6 files changed

+219
-13
lines changed

6 files changed

+219
-13
lines changed

quixstreams/app.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import uuid
88
import warnings
99
from collections import defaultdict
10+
from datetime import datetime
1011
from pathlib import Path
1112
from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union, cast
1213

@@ -30,6 +31,8 @@
3031
from .logging import LogLevel, configure_logging
3132
from .models import (
3233
DeserializerType,
34+
MessageContext,
35+
Row,
3336
SerializerType,
3437
TimestampExtractor,
3538
Topic,
@@ -152,6 +155,7 @@ def __init__(
152155
topic_create_timeout: float = 60,
153156
processing_guarantee: ProcessingGuarantee = "at-least-once",
154157
max_partition_buffer_size: int = 10000,
158+
heartbeat_interval: float = 0.0,
155159
):
156160
"""
157161
:param broker_address: Connection settings for Kafka.
@@ -220,6 +224,12 @@ def __init__(
220224
It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
221225
Lower value decreases the memory use, but increases the latency.
222226
Default - `10000`.
227+
:param heartbeat_interval: the interval (seconds) at which to send heartbeat messages.
228+
The heartbeat timing starts counting from application start.
229+
TODO: Save and respect last heartbeat timestamp.
230+
The heartbeat is sent for every partition of every topic with registered heartbeat streams.
231+
If the value is 0, no heartbeat messages will be sent.
232+
Default - `0.0`.
223233
224234
<br><br>***Error Handlers***<br>
225235
To handle errors, `Application` accepts callbacks triggered when
@@ -371,6 +381,10 @@ def __init__(
371381
recovery_manager=recovery_manager,
372382
)
373383

384+
self._heartbeat_active = heartbeat_interval > 0
385+
self._heartbeat_interval = heartbeat_interval
386+
self._heartbeat_last_sent = datetime.now().timestamp()
387+
374388
self._source_manager = SourceManager()
375389
self._sink_manager = SinkManager()
376390
self._dataframe_registry = DataFrameRegistry()
@@ -900,6 +914,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
900914
processing_context = self._processing_context
901915
source_manager = self._source_manager
902916
process_message = self._process_message
917+
process_heartbeat = self._process_heartbeat
903918
printer = self._processing_context.printer
904919
run_tracker = self._run_tracker
905920
consumer = self._consumer
@@ -912,6 +927,9 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
912927
)
913928

914929
dataframes_composed = self._dataframe_registry.compose_all(sink=sink)
930+
heartbeats_composed = self._dataframe_registry.compose_heartbeats()
931+
if not heartbeats_composed:
932+
self._heartbeat_active = False
915933

916934
processing_context.init_checkpoint()
917935
run_tracker.set_as_running()
@@ -923,6 +941,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
923941
run_tracker.timeout_refresh()
924942
else:
925943
process_message(dataframes_composed)
944+
process_heartbeat(heartbeats_composed)
926945
processing_context.commit_checkpoint()
927946
consumer.resume_backpressured()
928947
source_manager.raise_for_error()
@@ -1006,6 +1025,41 @@ def _process_message(self, dataframe_composed):
10061025
if self._on_message_processed is not None:
10071026
self._on_message_processed(topic_name, partition, offset)
10081027

1028+
def _process_heartbeat(self, heartbeats_composed):
1029+
if not self._heartbeat_active:
1030+
return
1031+
1032+
now = datetime.now().timestamp()
1033+
if self._heartbeat_last_sent > now - self._heartbeat_interval:
1034+
return
1035+
1036+
value, key, timestamp, headers = None, None, int(now * 1000), {}
1037+
1038+
for tp in self._consumer.assignment():
1039+
if executor := heartbeats_composed.get(tp.topic):
1040+
row = Row(
1041+
value=value,
1042+
key=key,
1043+
timestamp=timestamp,
1044+
context=MessageContext(
1045+
topic=tp.topic,
1046+
partition=tp.partition,
1047+
offset=-1, # TODO: get correct offsets
1048+
size=-1,
1049+
),
1050+
headers=headers,
1051+
)
1052+
context = copy_context()
1053+
context.run(set_message_context, row.context)
1054+
try:
1055+
context.run(executor, value, key, timestamp, headers)
1056+
except Exception as exc:
1057+
to_suppress = self._on_processing_error(exc, row, logger)
1058+
if not to_suppress:
1059+
raise
1060+
1061+
self._heartbeat_last_sent = now
1062+
10091063
def _on_assign(self, _, topic_partitions: List[TopicPartition]):
10101064
"""
10111065
Assign new topic partitions to consumer and state.

quixstreams/core/stream/functions/transform.py

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
from typing import Any, Literal, Union, cast, overload
22

33
from .base import StreamFunction
4-
from .types import TransformCallback, TransformExpandedCallback, VoidExecutor
4+
from .types import (
5+
TransformCallback,
6+
TransformExpandedCallback,
7+
TransformHeartbeatCallback,
8+
TransformHeartbeatExpandedCallback,
9+
VoidExecutor,
10+
)
511

612
__all__ = ("TransformFunction",)
713

@@ -23,28 +29,81 @@ class TransformFunction(StreamFunction):
2329

2430
@overload
2531
def __init__(
26-
self, func: TransformCallback, expand: Literal[False] = False
32+
self,
33+
func: TransformCallback,
34+
expand: Literal[False] = False,
35+
heartbeat: Literal[False] = False,
36+
) -> None: ...
37+
38+
@overload
39+
def __init__(
40+
self,
41+
func: TransformExpandedCallback,
42+
expand: Literal[True],
43+
heartbeat: Literal[False] = False,
44+
) -> None: ...
45+
46+
@overload
47+
def __init__(
48+
self,
49+
func: TransformHeartbeatCallback,
50+
expand: Literal[False] = False,
51+
heartbeat: Literal[True] = True,
2752
) -> None: ...
2853

2954
@overload
3055
def __init__(
31-
self, func: TransformExpandedCallback, expand: Literal[True]
56+
self,
57+
func: TransformHeartbeatExpandedCallback,
58+
expand: Literal[True],
59+
heartbeat: Literal[True],
3260
) -> None: ...
3361

3462
def __init__(
3563
self,
36-
func: Union[TransformCallback, TransformExpandedCallback],
64+
func: Union[
65+
TransformCallback,
66+
TransformExpandedCallback,
67+
TransformHeartbeatCallback,
68+
TransformHeartbeatExpandedCallback,
69+
],
3770
expand: bool = False,
71+
heartbeat: bool = False,
3872
):
3973
super().__init__(func)
4074

41-
self.func: Union[TransformCallback, TransformExpandedCallback]
75+
self.func: Union[
76+
TransformCallback,
77+
TransformExpandedCallback,
78+
TransformHeartbeatCallback,
79+
TransformHeartbeatExpandedCallback,
80+
]
4281
self.expand = expand
82+
self.heartbeat = heartbeat
4383

4484
def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:
4585
child_executor = self._resolve_branching(*child_executors)
4686

47-
if self.expand:
87+
if self.expand and self.heartbeat:
88+
heartbeat_expanded_func = cast(
89+
TransformHeartbeatExpandedCallback, self.func
90+
)
91+
92+
def wrapper(
93+
value: Any,
94+
key: Any,
95+
timestamp: int,
96+
headers: Any,
97+
):
98+
for (
99+
new_value,
100+
new_key,
101+
new_timestamp,
102+
new_headers,
103+
) in heartbeat_expanded_func(timestamp):
104+
child_executor(new_value, new_key, new_timestamp, new_headers)
105+
106+
elif self.expand:
48107
expanded_func = cast(TransformExpandedCallback, self.func)
49108

50109
def wrapper(
@@ -57,8 +116,22 @@ def wrapper(
57116
for new_value, new_key, new_timestamp, new_headers in result:
58117
child_executor(new_value, new_key, new_timestamp, new_headers)
59118

119+
elif self.heartbeat:
120+
heartbeat_func = cast(TransformHeartbeatCallback, self.func)
121+
122+
def wrapper(
123+
value: Any,
124+
key: Any,
125+
timestamp: int,
126+
headers: Any,
127+
):
128+
new_value, new_key, new_timestamp, new_headers = heartbeat_func(
129+
timestamp
130+
)
131+
child_executor(new_value, new_key, new_timestamp, new_headers)
132+
60133
else:
61-
func = cast(TransformCallback, self.func)
134+
regular_func = cast(TransformCallback, self.func)
62135

63136
def wrapper(
64137
value: Any,
@@ -67,7 +140,7 @@ def wrapper(
67140
headers: Any,
68141
):
69142
# Execute a function on a single value and return its result
70-
new_value, new_key, new_timestamp, new_headers = func(
143+
new_value, new_key, new_timestamp, new_headers = regular_func(
71144
value, key, timestamp, headers
72145
)
73146
child_executor(new_value, new_key, new_timestamp, new_headers)

quixstreams/core/stream/functions/types.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
"FilterWithMetadataCallback",
1515
"TransformCallback",
1616
"TransformExpandedCallback",
17+
"TransformHeartbeatCallback",
18+
"TransformHeartbeatExpandedCallback",
1719
)
1820

1921

@@ -35,6 +37,10 @@ def __bool__(self) -> bool: ...
3537
TransformExpandedCallback = Callable[
3638
[Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]]
3739
]
40+
TransformHeartbeatCallback = Callable[[int], Tuple[Any, Any, int, Any]]
41+
TransformHeartbeatExpandedCallback = Callable[
42+
[int], Iterable[Tuple[Any, Any, int, Any]]
43+
]
3844

3945
StreamCallback = Union[
4046
ApplyCallback,

quixstreams/core/stream/stream.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
TransformCallback,
3131
TransformExpandedCallback,
3232
TransformFunction,
33+
TransformHeartbeatCallback,
34+
TransformHeartbeatExpandedCallback,
3335
UpdateCallback,
3436
UpdateFunction,
3537
UpdateWithMetadataCallback,
@@ -249,18 +251,56 @@ def add_update(
249251
return self._add(update_func)
250252

251253
@overload
252-
def add_transform(self, func: TransformCallback, *, expand: Literal[False] = False):
254+
def add_transform(
255+
self,
256+
func: TransformCallback,
257+
*,
258+
expand: Literal[False] = False,
259+
heartbeat: Literal[False] = False,
260+
):
253261
pass
254262

255263
@overload
256-
def add_transform(self, func: TransformExpandedCallback, *, expand: Literal[True]):
264+
def add_transform(
265+
self,
266+
func: TransformExpandedCallback,
267+
*,
268+
expand: Literal[True],
269+
heartbeat: Literal[False] = False,
270+
):
257271
pass
258272

273+
@overload
259274
def add_transform(
260275
self,
261-
func: Union[TransformCallback, TransformExpandedCallback],
276+
func: TransformHeartbeatCallback,
277+
*,
278+
expand: Literal[False] = False,
279+
heartbeat: Literal[True],
280+
):
281+
pass
282+
283+
@overload
284+
def add_transform(
285+
self,
286+
func: TransformHeartbeatExpandedCallback,
287+
*,
288+
expand: Literal[True],
289+
heartbeat: Literal[True],
290+
):
291+
pass
292+
293+
def add_transform(
294+
self,
295+
func: Union[
296+
TransformCallback,
297+
TransformExpandedCallback,
298+
TransformHeartbeatCallback,
299+
TransformHeartbeatExpandedCallback,
300+
],
262301
*,
263302
expand: bool = False,
303+
heartbeat: bool = False,
264304
) -> "Stream":
265305
"""
266306
Add a "transform" function to the Stream, that will mutate the input value.
@@ -276,9 +316,11 @@ def add_transform(
276316
:param expand: if True, expand the returned iterable into individual items
277317
downstream. If returned value is not iterable, `TypeError` will be raised.
278318
Default - `False`.
319+
:param heartbeat: if True, the callback is expected to accept timestamp only.
320+
Default - `False`.
279321
:return: a new Stream derived from the current one
280322
"""
281-
return self._add(TransformFunction(func, expand=expand)) # type: ignore[call-overload]
323+
return self._add(TransformFunction(func, expand=expand, heartbeat=heartbeat)) # type: ignore[call-overload]
282324

283325
def merge(self, other: "Stream") -> "Stream":
284326
"""

quixstreams/dataframe/dataframe.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1696,6 +1696,10 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
16961696
stream=merged_stream, stream_id=merged_stream_id
16971697
)
16981698

1699+
def concat_heartbeat(self, stream: Stream) -> "StreamingDataFrame":
1700+
self._registry.register_heartbeat(self, stream)
1701+
return self.__dataframe_clone__(stream=self.stream.merge(stream))
1702+
16991703
def join_asof(
17001704
self,
17011705
right: "StreamingDataFrame",

0 commit comments

Comments
 (0)