Skip to content

Commit 298df08

Browse files
committed
Update NodeSynchronizer to support syncing to the first node
This will be needed for ongoing operation of `coalesce` nodes. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 7c2cf3c commit 298df08

File tree

1 file changed

+64
-34
lines changed

1 file changed

+64
-34
lines changed

src/frequenz/sdk/timeseries/formulas/_base_ast_node.py

Lines changed: 64 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import abc
77
import asyncio
8+
import logging
89
from dataclasses import dataclass
910
from datetime import datetime
1011
from typing import Generic
@@ -14,6 +15,8 @@
1415
from ...timeseries import Sample
1516
from ...timeseries._base_types import QuantityT
1617

18+
_logger = logging.getLogger(__name__)
19+
1720

1821
@dataclass(kw_only=True)
1922
class AstNode(abc.ABC, Generic[QuantityT]):
@@ -48,57 +51,84 @@ class NodeSynchronizer(Generic[QuantityT]):
4851

4952
def __init__(self) -> None:
5053
"""Initialize this instance."""
51-
self._synchronized: bool = False
54+
self._synchronized_nodes: int = 0
55+
self._latest_values: dict[int, Sample[QuantityT] | QuantityT | None] = {}
5256

5357
async def evaluate(
5458
self,
5559
nodes: list[AstNode[QuantityT]],
56-
target_timestamp: datetime | None = None,
60+
sync_to_first_node: bool = False,
5761
) -> list[Sample[QuantityT] | QuantityT | None]:
5862
"""Synchronize and evaluate multiple AST nodes.
5963
6064
Args:
6165
nodes: The AST nodes to synchronize and evaluate.
62-
target_timestamp: An optional maximum timestamp to synchronize to.
66+
sync_to_first_node: If True, synchronize all nodes to the timestamp
67+
of the first node. If False, synchronize to the latest timestamp
68+
among all nodes.
6369
6470
Returns:
6571
A list containing the evaluated values of the nodes.
66-
67-
Raises:
68-
RuntimeError: If synchronization fails after multiple attempts.
6972
"""
70-
if not self._synchronized or target_timestamp is not None:
73+
if self._synchronized_nodes != len(nodes) or self._latest_values:
74+
_logger.debug(
75+
"Synchronizing %d AST nodes (sync_to_first_node=%s).",
76+
len(nodes),
77+
sync_to_first_node,
78+
)
7179
_ = await asyncio.gather(*(node.subscribe() for node in nodes))
72-
values = [await node.evaluate() for node in nodes]
80+
values: list[Sample[QuantityT] | QuantityT | None] = []
81+
for node in nodes:
82+
value = self._latest_values.pop(id(node), None)
83+
if value is None:
84+
value = await node.evaluate()
85+
values.append(value)
86+
87+
if sync_to_first_node:
88+
target_timestamp = None
89+
for value in values:
90+
if isinstance(value, Sample):
91+
target_timestamp = value.timestamp
92+
break
93+
else:
94+
target_timestamp = max(
95+
(value.timestamp for value in values if isinstance(value, Sample)),
96+
default=None,
97+
)
7398

74-
target_timestamp = max(
75-
(value.timestamp for value in values if isinstance(value, Sample)),
76-
default=None,
77-
)
7899
if target_timestamp is None:
79-
self._synchronized = True
100+
self._synchronized_nodes = len(nodes)
80101
return values
81102

82-
for i, value in enumerate(values):
83-
if isinstance(value, Sample):
84-
ctr = 0
85-
while ctr < 10 and value.timestamp < target_timestamp:
86-
value = await nodes[i].evaluate()
87-
if not isinstance(value, Sample):
88-
raise RuntimeError(
89-
"Subsequent AST node evaluation did not return a Sample"
90-
)
91-
values[i] = value
92-
ctr += 1
93-
if ctr >= 10 and value.timestamp < target_timestamp:
94-
raise RuntimeError(
95-
"Could not synchronize AST node evaluations after 10 tries"
96-
)
97-
if value.timestamp > target_timestamp:
98-
values[i] = Sample(target_timestamp, None)
99-
100-
self._synchronized = True
101-
102-
return values
103+
return await self._synchronize_to_timestamp(values, nodes, target_timestamp)
103104

104105
return [await node.evaluate() for node in nodes]
106+
107+
async def _synchronize_to_timestamp(
108+
self,
109+
values: list[Sample[QuantityT] | QuantityT | None],
110+
nodes: list[AstNode[QuantityT]],
111+
target_timestamp: datetime,
112+
) -> list[Sample[QuantityT] | QuantityT | None]:
113+
for i, value in enumerate(values):
114+
if isinstance(value, Sample):
115+
ctr = 0
116+
while ctr < 10 and value.timestamp < target_timestamp:
117+
value = await nodes[i].evaluate()
118+
if not isinstance(value, Sample):
119+
raise RuntimeError(
120+
"Subsequent AST node evaluation did not return a Sample"
121+
)
122+
values[i] = value
123+
ctr += 1
124+
if ctr >= 10 and value.timestamp < target_timestamp:
125+
raise RuntimeError(
126+
"Could not synchronize AST node evaluations after 10 tries"
127+
)
128+
if value.timestamp > target_timestamp:
129+
self._latest_values[id(nodes[i])] = value
130+
values[i] = Sample(target_timestamp, None)
131+
132+
self._synchronized_nodes = len(nodes)
133+
134+
return values

0 commit comments

Comments
 (0)