Skip to content

Commit d1806db

Browse files
committed
Ensure all streams get synchronized in FormulaEngine
In cases where there are multiple streams lagging by the same number of steps, only the one processed the last was getting synchronized to the match the latest timestamp. With this commit, `metrics_by_ts` stores a list of names for each timestamp, so that each of them can get synchronized to the latest. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 13455af commit d1806db

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,20 @@ async def _synchronize_metric_timestamps(
9191
name = metric.get_name()
9292
if result is None:
9393
raise RuntimeError(f"Stream closed for component: {name}")
94-
metrics_by_ts[result.timestamp] = name
94+
metrics_by_ts.setdefault(result.timestamp, []).append(name)
9595
latest_ts = max(metrics_by_ts)
9696

9797
# fetch the metrics with non-latest timestamps again until we have the values
9898
# for the same ts for all metrics.
99-
for metric_ts, name in metrics_by_ts.items():
99+
for metric_ts, names in metrics_by_ts.items():
100100
if metric_ts == latest_ts:
101101
continue
102-
fetcher = self._metric_fetchers[name]
103102
while metric_ts < latest_ts:
104-
next_val = await fetcher.fetch_next()
105-
assert next_val is not None
106-
metric_ts = next_val.timestamp
103+
for name in names:
104+
fetcher = self._metric_fetchers[name]
105+
next_val = await fetcher.fetch_next()
106+
assert next_val is not None
107+
metric_ts = next_val.timestamp
107108
if metric_ts > latest_ts:
108109
raise RuntimeError(
109110
"Unable to synchronize resampled metric timestamps, "

0 commit comments

Comments
 (0)