Skip to content

Commit 9be8da8

Browse files
committed
Rename heartbeat to wall clock
1 parent e1cf56e commit 9be8da8

File tree

6 files changed

+66
-66
lines changed

6 files changed

+66
-66
lines changed

quixstreams/app.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def __init__(
155155
topic_create_timeout: float = 60,
156156
processing_guarantee: ProcessingGuarantee = "at-least-once",
157157
max_partition_buffer_size: int = 10000,
158-
heartbeat_interval: float = 0.0,
158+
wall_clock_interval: float = 0.0,
159159
):
160160
"""
161161
:param broker_address: Connection settings for Kafka.
@@ -224,11 +224,11 @@ def __init__(
224224
It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
225225
Lower value decreases the memory use, but increases the latency.
226226
Default - `10000`.
227-
:param heartbeat_interval: the interval (seconds) at which to send heartbeat messages.
228-
The heartbeat timing starts counting from application start.
229-
TODO: Save and respect last heartbeat timestamp.
230-
The heartbeat is sent for every partition of every topic with registered heartbeat streams.
231-
If the value is 0, no heartbeat messages will be sent.
227+
:param wall_clock_interval: the interval (seconds) at which to invoke
228+
the registered wall clock logic.
229+
The wall clock timing starts counting from application start.
230+
TODO: Save and respect last wall clock timestamp.
231+
If the value is 0, no wall clock logic will be invoked.
232232
Default - `0.0`.
233233
234234
<br><br>***Error Handlers***<br>
@@ -381,9 +381,9 @@ def __init__(
381381
recovery_manager=recovery_manager,
382382
)
383383

384-
self._heartbeat_active = heartbeat_interval > 0
385-
self._heartbeat_interval = heartbeat_interval
386-
self._heartbeat_last_sent = datetime.now().timestamp()
384+
self._wall_clock_active = wall_clock_interval > 0
385+
self._wall_clock_interval = wall_clock_interval
386+
self._wall_clock_last_sent = datetime.now().timestamp()
387387

388388
self._source_manager = SourceManager()
389389
self._sink_manager = SinkManager()
@@ -914,7 +914,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
914914
processing_context = self._processing_context
915915
source_manager = self._source_manager
916916
process_message = self._process_message
917-
process_heartbeat = self._process_heartbeat
917+
process_wall_clock = self._process_wall_clock
918918
printer = self._processing_context.printer
919919
run_tracker = self._run_tracker
920920
consumer = self._consumer
@@ -927,9 +927,9 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
927927
)
928928

929929
dataframes_composed = self._dataframe_registry.compose_all(sink=sink)
930-
heartbeats_composed = self._dataframe_registry.compose_heartbeats()
931-
if not heartbeats_composed:
932-
self._heartbeat_active = False
930+
wall_clock_executors = self._dataframe_registry.compose_wall_clock()
931+
if not wall_clock_executors:
932+
self._wall_clock_active = False
933933

934934
processing_context.init_checkpoint()
935935
run_tracker.set_as_running()
@@ -941,7 +941,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
941941
run_tracker.timeout_refresh()
942942
else:
943943
process_message(dataframes_composed)
944-
process_heartbeat(heartbeats_composed)
944+
process_wall_clock(wall_clock_executors)
945945
processing_context.commit_checkpoint()
946946
consumer.resume_backpressured()
947947
source_manager.raise_for_error()
@@ -1025,18 +1025,18 @@ def _process_message(self, dataframe_composed):
10251025
if self._on_message_processed is not None:
10261026
self._on_message_processed(topic_name, partition, offset)
10271027

1028-
def _process_heartbeat(self, heartbeats_composed):
1029-
if not self._heartbeat_active:
1028+
def _process_wall_clock(self, wall_clock_executors):
1029+
if not self._wall_clock_active:
10301030
return
10311031

10321032
now = datetime.now().timestamp()
1033-
if self._heartbeat_last_sent > now - self._heartbeat_interval:
1033+
if self._wall_clock_last_sent > now - self._wall_clock_interval:
10341034
return
10351035

10361036
value, key, timestamp, headers = None, None, int(now * 1000), {}
10371037

10381038
for tp in self._consumer.assignment():
1039-
if executor := heartbeats_composed.get(tp.topic):
1039+
if executor := wall_clock_executors.get(tp.topic):
10401040
row = Row(
10411041
value=value,
10421042
key=key,
@@ -1058,7 +1058,7 @@ def _process_heartbeat(self, heartbeats_composed):
10581058
if not to_suppress:
10591059
raise
10601060

1061-
self._heartbeat_last_sent = now
1061+
self._wall_clock_last_sent = now
10621062

10631063
def _on_assign(self, _, topic_partitions: List[TopicPartition]):
10641064
"""

quixstreams/core/stream/functions/transform.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from .types import (
55
TransformCallback,
66
TransformExpandedCallback,
7-
TransformHeartbeatCallback,
8-
TransformHeartbeatExpandedCallback,
7+
TransformWallClockCallback,
8+
TransformWallClockExpandedCallback,
99
VoidExecutor,
1010
)
1111

@@ -32,61 +32,61 @@ def __init__(
3232
self,
3333
func: TransformCallback,
3434
expand: Literal[False] = False,
35-
heartbeat: Literal[False] = False,
35+
wall_clock: Literal[False] = False,
3636
) -> None: ...
3737

3838
@overload
3939
def __init__(
4040
self,
4141
func: TransformExpandedCallback,
4242
expand: Literal[True],
43-
heartbeat: Literal[False] = False,
43+
wall_clock: Literal[False] = False,
4444
) -> None: ...
4545

4646
@overload
4747
def __init__(
4848
self,
49-
func: TransformHeartbeatCallback,
49+
func: TransformWallClockCallback,
5050
expand: Literal[False] = False,
51-
heartbeat: Literal[True] = True,
51+
wall_clock: Literal[True] = True,
5252
) -> None: ...
5353

5454
@overload
5555
def __init__(
5656
self,
57-
func: TransformHeartbeatExpandedCallback,
57+
func: TransformWallClockExpandedCallback,
5858
expand: Literal[True],
59-
heartbeat: Literal[True],
59+
wall_clock: Literal[True],
6060
) -> None: ...
6161

6262
def __init__(
6363
self,
6464
func: Union[
6565
TransformCallback,
6666
TransformExpandedCallback,
67-
TransformHeartbeatCallback,
68-
TransformHeartbeatExpandedCallback,
67+
TransformWallClockCallback,
68+
TransformWallClockExpandedCallback,
6969
],
7070
expand: bool = False,
71-
heartbeat: bool = False,
71+
wall_clock: bool = False,
7272
):
7373
super().__init__(func)
7474

7575
self.func: Union[
7676
TransformCallback,
7777
TransformExpandedCallback,
78-
TransformHeartbeatCallback,
79-
TransformHeartbeatExpandedCallback,
78+
TransformWallClockCallback,
79+
TransformWallClockExpandedCallback,
8080
]
8181
self.expand = expand
82-
self.heartbeat = heartbeat
82+
self.wall_clock = wall_clock
8383

8484
def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:
8585
child_executor = self._resolve_branching(*child_executors)
8686

87-
if self.expand and self.heartbeat:
88-
heartbeat_expanded_func = cast(
89-
TransformHeartbeatExpandedCallback, self.func
87+
if self.expand and self.wall_clock:
88+
wall_clock_expanded_func = cast(
89+
TransformWallClockExpandedCallback, self.func
9090
)
9191

9292
def wrapper(
@@ -100,7 +100,7 @@ def wrapper(
100100
new_key,
101101
new_timestamp,
102102
new_headers,
103-
) in heartbeat_expanded_func(timestamp):
103+
) in wall_clock_expanded_func(timestamp):
104104
child_executor(new_value, new_key, new_timestamp, new_headers)
105105

106106
elif self.expand:
@@ -116,16 +116,16 @@ def wrapper(
116116
for new_value, new_key, new_timestamp, new_headers in result:
117117
child_executor(new_value, new_key, new_timestamp, new_headers)
118118

119-
elif self.heartbeat:
120-
heartbeat_func = cast(TransformHeartbeatCallback, self.func)
119+
elif self.wall_clock:
120+
wall_clock_func = cast(TransformWallClockCallback, self.func)
121121

122122
def wrapper(
123123
value: Any,
124124
key: Any,
125125
timestamp: int,
126126
headers: Any,
127127
):
128-
new_value, new_key, new_timestamp, new_headers = heartbeat_func(
128+
new_value, new_key, new_timestamp, new_headers = wall_clock_func(
129129
timestamp
130130
)
131131
child_executor(new_value, new_key, new_timestamp, new_headers)

quixstreams/core/stream/functions/types.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
"FilterWithMetadataCallback",
1515
"TransformCallback",
1616
"TransformExpandedCallback",
17-
"TransformHeartbeatCallback",
18-
"TransformHeartbeatExpandedCallback",
17+
"TransformWallClockCallback",
18+
"TransformWallClockExpandedCallback",
1919
)
2020

2121

@@ -37,8 +37,8 @@ def __bool__(self) -> bool: ...
3737
TransformExpandedCallback = Callable[
3838
[Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]]
3939
]
40-
TransformHeartbeatCallback = Callable[[int], Tuple[Any, Any, int, Any]]
41-
TransformHeartbeatExpandedCallback = Callable[
40+
TransformWallClockCallback = Callable[[int], Tuple[Any, Any, int, Any]]
41+
TransformWallClockExpandedCallback = Callable[
4242
[int], Iterable[Tuple[Any, Any, int, Any]]
4343
]
4444

quixstreams/core/stream/stream.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
TransformCallback,
3131
TransformExpandedCallback,
3232
TransformFunction,
33-
TransformHeartbeatCallback,
34-
TransformHeartbeatExpandedCallback,
33+
TransformWallClockCallback,
34+
TransformWallClockExpandedCallback,
3535
UpdateCallback,
3636
UpdateFunction,
3737
UpdateWithMetadataCallback,
@@ -256,7 +256,7 @@ def add_transform(
256256
func: TransformCallback,
257257
*,
258258
expand: Literal[False] = False,
259-
heartbeat: Literal[False] = False,
259+
wall_clock: Literal[False] = False,
260260
):
261261
pass
262262

@@ -266,27 +266,27 @@ def add_transform(
266266
func: TransformExpandedCallback,
267267
*,
268268
expand: Literal[True],
269-
heartbeat: Literal[False] = False,
269+
wall_clock: Literal[False] = False,
270270
):
271271
pass
272272

273273
@overload
274274
def add_transform(
275275
self,
276-
func: TransformHeartbeatCallback,
276+
func: TransformWallClockCallback,
277277
*,
278278
expand: Literal[False] = False,
279-
heartbeat: Literal[True],
279+
wall_clock: Literal[True],
280280
):
281281
pass
282282

283283
@overload
284284
def add_transform(
285285
self,
286-
func: TransformHeartbeatExpandedCallback,
286+
func: TransformWallClockExpandedCallback,
287287
*,
288288
expand: Literal[True],
289-
heartbeat: Literal[True],
289+
wall_clock: Literal[True],
290290
):
291291
pass
292292

@@ -295,12 +295,12 @@ def add_transform(
295295
func: Union[
296296
TransformCallback,
297297
TransformExpandedCallback,
298-
TransformHeartbeatCallback,
299-
TransformHeartbeatExpandedCallback,
298+
TransformWallClockCallback,
299+
TransformWallClockExpandedCallback,
300300
],
301301
*,
302302
expand: bool = False,
303-
heartbeat: bool = False,
303+
wall_clock: bool = False,
304304
) -> "Stream":
305305
"""
306306
Add a "transform" function to the Stream, that will mutate the input value.
@@ -316,11 +316,11 @@ def add_transform(
316316
:param expand: if True, expand the returned iterable into individual items
317317
downstream. If returned value is not iterable, `TypeError` will be raised.
318318
Default - `False`.
319-
:param heartbeat: if True, the callback is expected to accept timestamp only.
319+
:param wall_clock: if True, the callback is expected to accept timestamp only.
320320
Default - `False`.
321321
:return: a new Stream derived from the current one
322322
"""
323-
return self._add(TransformFunction(func, expand=expand, heartbeat=heartbeat)) # type: ignore[call-overload]
323+
return self._add(TransformFunction(func, expand=expand, wall_clock=wall_clock)) # type: ignore[call-overload]
324324

325325
def merge(self, other: "Stream") -> "Stream":
326326
"""

quixstreams/dataframe/dataframe.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1696,8 +1696,8 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
16961696
stream=merged_stream, stream_id=merged_stream_id
16971697
)
16981698

1699-
def concat_heartbeat(self, stream: Stream) -> "StreamingDataFrame":
1700-
self._registry.register_heartbeat(self, stream)
1699+
def concat_wall_clock(self, stream: Stream) -> "StreamingDataFrame":
1700+
self._registry.register_wall_clock(self, stream)
17011701
return self.__dataframe_clone__(stream=self.stream.merge(stream))
17021702

17031703
def join_asof(

quixstreams/dataframe/registry.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class DataFrameRegistry:
2222

2323
def __init__(self) -> None:
2424
self._registry: dict[str, Stream] = {}
25-
self._heartbeat_registry: dict[str, Stream] = {}
25+
self._wall_clock_registry: dict[str, Stream] = {}
2626
self._topics: list[Topic] = []
2727
self._repartition_origins: set[str] = set()
2828
self._topics_to_stream_ids: dict[str, set[str]] = {}
@@ -70,19 +70,19 @@ def register_root(
7070
self._topics.append(topic)
7171
self._registry[topic.name] = dataframe.stream
7272

73-
def register_heartbeat(
73+
def register_wall_clock(
7474
self, dataframe: "StreamingDataFrame", stream: Stream
7575
) -> None:
7676
"""
77-
Register a heartbeat Stream for the given topic.
77+
Register a wall clock stream for the given topic.
7878
"""
7979
topics = dataframe.topics
8080
if len(topics) > 1:
8181
raise ValueError(
8282
f"Expected a StreamingDataFrame with one topic, got {len(topics)}"
8383
)
8484
topic = topics[0]
85-
self._heartbeat_registry[topic.name] = stream
85+
self._wall_clock_registry[topic.name] = stream
8686

8787
def register_groupby(
8888
self,
@@ -130,12 +130,12 @@ def compose_all(
130130
"""
131131
return self._compose(registry=self._registry, sink=sink)
132132

133-
def compose_heartbeats(self) -> dict[str, VoidExecutor]:
133+
def compose_wall_clock(self) -> dict[str, VoidExecutor]:
134134
"""
135-
Composes all the heartbeat Streams and returns a dict of format {<topic>: <VoidExecutor>}
135+
Composes all the wall clock streams and returns a dict of format {<topic>: <VoidExecutor>}
136136
:return: a {topic_name: composed} dict, where composed is a callable
137137
"""
138-
return self._compose(registry=self._heartbeat_registry)
138+
return self._compose(registry=self._wall_clock_registry)
139139

140140
def _compose(
141141
self, registry: dict[str, Stream], sink: Optional[VoidExecutor] = None

0 commit comments

Comments
 (0)