|
5 | 5 |
|
6 | 6 | from __future__ import annotations |
7 | 7 |
|
8 | | -from datetime import datetime |
9 | 8 | from math import isclose |
10 | | -from typing import overload |
11 | 9 |
|
12 | 10 | from frequenz.channels import Receiver |
13 | 11 |
|
14 | 12 | from frequenz.sdk.microgrid import _data_pipeline |
15 | 13 | from frequenz.sdk.microgrid.component import ComponentMetricId |
16 | | -from frequenz.sdk.timeseries import Sample, Sample3Phase |
| 14 | +from frequenz.sdk.timeseries import Sample |
17 | 15 | from frequenz.sdk.timeseries._formula_engine import ResampledFormulaBuilder |
18 | 16 |
|
19 | 17 |
|
@@ -48,47 +46,3 @@ def equal_float_lists(list1: list[float], list2: list[float]) -> bool: |
48 | 46 | and len(list1) == len(list2) |
49 | 47 | and all(isclose(v1, v2) for v1, v2 in zip(list1, list2)) |
50 | 48 | ) |
51 | | - |
52 | | - |
53 | | -@overload |
54 | | -async def synchronize_receivers( |
55 | | - receivers: list[Receiver[Sample]], |
56 | | -) -> None: |
57 | | - ... |
58 | | - |
59 | | - |
60 | | -@overload |
61 | | -async def synchronize_receivers( |
62 | | - receivers: list[Receiver[Sample3Phase]], |
63 | | -) -> None: |
64 | | - ... |
65 | | - |
66 | | - |
67 | | -async def synchronize_receivers( |
68 | | - receivers: list[Receiver[Sample]] | list[Receiver[Sample3Phase]], |
69 | | -) -> None: |
70 | | - """Check if given receivers are all returning the same timestamp. |
71 | | -
|
72 | | - If not, try to synchronize them. |
73 | | - """ |
74 | | - by_ts: dict[datetime, list[Receiver[Sample] | Receiver[Sample3Phase]]] = {} |
75 | | - for recv in receivers: |
76 | | - while True: |
77 | | - sample = await recv.receive() |
78 | | - assert sample is not None |
79 | | - if isinstance(sample, Sample) and sample.value is None: |
80 | | - continue |
81 | | - if isinstance(sample, Sample3Phase) and sample.value_p1 is None: |
82 | | - continue |
83 | | - by_ts.setdefault(sample.timestamp, []).append(recv) |
84 | | - break |
85 | | - latest_ts = max(by_ts) |
86 | | - |
87 | | - for sample_ts, recvs in by_ts.items(): |
88 | | - if sample_ts == latest_ts: |
89 | | - continue |
90 | | - while sample_ts < latest_ts: |
91 | | - for recv in recvs: |
92 | | - val = await recv.receive() |
93 | | - assert val is not None |
94 | | - sample_ts = val.timestamp |
0 commit comments