Skip to content

Commit 0cb59e1

Browse files
authored
Clean ups and improvements to the resampler (mostly) (#68)
This PR mainly moves classes from the resampler to the new `timeseries` package, but it also includes some nox and test improvements and fixes. It also uses the timestamp produced by the timer as the resampling timestamp, unifying it across all the resampling levels for full consistency. - Allow passing positional arguments to nox sessions - Move resampling actor to frequenz.sdk.actor - Move resampler classes to frequenz.sdk.timeseries.resampler - Remove unnecessary usage of pytz - Fix sample ordering in tests - Use more intuitive variable names in resampler tests - Use easier to debug fake times in tests - Allow the resampler to use an external timestamp - Use the timestamp provided by the timer receiver - Document ResamplingFunction - Move Sample to frequenz.sdk.timeseries
2 parents bb114aa + a62350c commit 0cb59e1

34 files changed

+1223
-1206
lines changed

examples/sdk_resampling_example.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313

1414
from frequenz.sdk.actor import ChannelRegistry
1515
from frequenz.sdk.actor.data_sourcing import DataSourcingActor
16-
from frequenz.sdk.data_ingestion.resampling.component_metrics_resampling_actor import (
16+
from frequenz.sdk.actor.resampling import (
17+
ComponentMetricId,
18+
ComponentMetricRequest,
1719
ComponentMetricsResamplingActor,
1820
)
19-
from frequenz.sdk.data_pipeline import ComponentMetricId, ComponentMetricRequest
2021
from frequenz.sdk.microgrid import ComponentCategory, microgrid_api
2122

2223
HOST = "microgrid.sandbox.api.frequenz.io"

minimum-requirements-ci.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# CI must ensure these dependency versions are supported
22
frequenz-api-microgrid==0.11.0
3-
frequenz-channels==0.10.0
3+
frequenz-channels@git+https://github.com/frequenz-floss/[email protected]
44
google-api-python-client==2.15
55
grpcio==1.47
66
grpcio-tools==1.47
@@ -9,7 +9,6 @@ pandas==1.3.5
99
protobuf==3.20.2
1010
pyarrow==6.0.0
1111
pydantic==1.9.0
12-
pytz==2021.3
1312
sympy==1.10.1
1413
toml==0.10
1514
tqdm==4.38.0

noxfile.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ def formatting(session: nox.Session, install_deps: bool = True) -> None:
5353
if install_deps:
5454
session.install(*FMT_DEPS)
5555

56-
session.run("black", "--check", *source_file_paths(session))
57-
session.run("isort", "--check", *source_file_paths(session))
56+
paths = source_file_paths(session)
57+
session.run("black", "--check", *paths)
58+
session.run("isort", "--check", *paths)
5859

5960

6061
@nox.session
@@ -65,13 +66,6 @@ def mypy(session: nox.Session, install_deps: bool = True) -> None:
6566
# fast local tests with `nox -R -e mypy`.
6667
session.install("-e", ".", "mypy", *PYTEST_DEPS)
6768

68-
# Since we use other packages in the frequenz namespace, we need to run the
69-
# checks for frequenz.sdk from the installed package instead of the src
70-
# directory.
71-
mypy_paths = [
72-
path for path in source_file_paths(session) if not path.startswith("src")
73-
]
74-
7569
mypy_cmd = [
7670
"mypy",
7771
"--ignore-missing-imports",
@@ -83,9 +77,21 @@ def mypy(session: nox.Session, install_deps: bool = True) -> None:
8377
"--strict",
8478
]
8579

80+
# If we have session arguments, we just use those...
81+
if session.posargs:
82+
session.run(*mypy_cmd, *session.posargs)
83+
return
84+
8685
# Runs on the installed package
8786
session.run(*mypy_cmd, "-p", "frequenz.sdk")
87+
8888
# Runs on the rest of the source folders
89+
# Since we use other packages in the frequenz namespace, we need to run the
90+
# checks for frequenz.sdk from the installed package instead of the src
91+
# directory.
92+
mypy_paths = [
93+
path for path in source_file_paths(session) if not path.startswith("src")
94+
]
8995
session.run(*mypy_cmd, *mypy_paths)
9096

9197

@@ -97,10 +103,11 @@ def pylint(session: nox.Session, install_deps: bool = True) -> None:
97103
# fast local tests with `nox -R -e pylint`.
98104
session.install("-e", ".", "pylint", *PYTEST_DEPS)
99105

106+
paths = source_file_paths(session)
100107
session.run(
101108
"pylint",
102109
"--extension-pkg-whitelist=pydantic",
103-
*source_file_paths(session),
110+
*paths,
104111
)
105112

106113

@@ -110,12 +117,14 @@ def docstrings(session: nox.Session, install_deps: bool = True) -> None:
110117
if install_deps:
111118
session.install(*DOCSTRING_DEPS, "toml")
112119

113-
session.run("pydocstyle", *source_file_paths(session))
120+
paths = source_file_paths(session)
121+
session.run("pydocstyle", *paths)
114122

115123
# Darglint checks that function argument and return values are documented.
116124
# This is needed only for the `src` dir, so we exclude the other top level
117-
# dirs that contain code.
118-
darglint_paths = filter(
125+
# dirs that contain code, unless some paths were specified by argument, in
126+
# which case we use those untouched.
127+
darglint_paths = session.posargs or filter(
119128
lambda path: not (path.startswith("tests") or path.startswith("benchmarks")),
120129
source_file_paths(session),
121130
)
@@ -158,4 +167,5 @@ def _pytest_impl(
158167
"--cov=frequenz.sdk",
159168
"--cov-report=term",
160169
f"--cov-report=html:.htmlcov-{max_or_min_deps}",
170+
*session.posargs,
161171
)

pyproject.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ classifiers = [
2727
requires-python = ">= 3.8, < 4"
2828
dependencies = [
2929
"frequenz-api-microgrid >= 0.11.0, < 0.12.0",
30-
"frequenz-channels >= 0.10.0, < 0.11.0",
30+
"frequenz-channels @ git+https://github.com/frequenz-floss/[email protected]",
3131
"google-api-python-client >= 2.15, < 3",
3232
"grpcio >= 1.47, < 2",
3333
"grpcio-tools >= 1.47, < 2",
@@ -36,7 +36,6 @@ dependencies = [
3636
"protobuf >= 3.20, < 4",
3737
"pyarrow >= 6.0.0, < 6.1",
3838
"pydantic >= 1.9",
39-
"pytz >= 2021.3",
4039
"sympy >= 1.10.1, < 2",
4140
"toml >= 0.10",
4241
"tqdm >= 4.38.0, < 5",

src/frequenz/sdk/actor/data_sourcing/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
"""
1010

1111
from .data_sourcing import DataSourcingActor
12+
from .types import ComponentMetricId, ComponentMetricRequest
1213

1314
__all__ = [
1415
"DataSourcingActor",
16+
"ComponentMetricId",
17+
"ComponentMetricRequest",
1518
]

src/frequenz/sdk/actor/data_sourcing/data_sourcing.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99

1010
from frequenz.channels import Receiver
1111

12-
from frequenz.sdk.actor import actor
13-
from frequenz.sdk.actor.channel_registry import ChannelRegistry
14-
from frequenz.sdk.data_pipeline import ComponentMetricRequest
15-
12+
from .. import ChannelRegistry, actor
1613
from .microgrid_api_source import MicrogridApiSource
14+
from .types import ComponentMetricRequest
1715

1816

1917
@actor

src/frequenz/sdk/actor/data_sourcing/microgrid_api_source.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@
1313

1414
from frequenz.channels import Receiver, Sender
1515

16-
from frequenz.sdk.actor import ChannelRegistry
17-
from frequenz.sdk.data_pipeline import ComponentMetricId, ComponentMetricRequest, Sample
18-
from frequenz.sdk.microgrid import (
16+
from ...microgrid import (
1917
BatteryData,
2018
ComponentCategory,
2119
EVChargerData,
2220
InverterData,
2321
MeterData,
2422
microgrid_api,
2523
)
24+
from ...timeseries import Sample
25+
from .. import ChannelRegistry
26+
from .types import ComponentMetricId, ComponentMetricRequest
2627

2728
_MeterDataMethods: Dict[ComponentMetricId, Callable[[MeterData], float]] = {
2829
ComponentMetricId.ACTIVE_POWER: lambda msg: msg.active_power,

src/frequenz/sdk/data_pipeline/types.py renamed to src/frequenz/sdk/actor/data_sourcing/types.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,3 @@ def get_channel_name(self) -> str:
7373
A string denoting a channel name.
7474
"""
7575
return f"{self.component_id}::{self.metric_id.name}::{self.start_time}::{self.namespace}"
76-
77-
78-
@dataclass(frozen=True)
79-
class Sample:
80-
"""A measurement taken at a particular point in time.
81-
82-
The `value` could be `None` if a component is malfunctioning or data is
83-
lacking for another reason, but a sample still needs to be sent to have a
84-
coherent view on a group of component metrics for a particular timestamp.
85-
"""
86-
87-
timestamp: datetime
88-
value: Optional[float] = None

src/frequenz/sdk/data_ingestion/resampling/component_metrics_resampling_actor.py renamed to src/frequenz/sdk/actor/resampling.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,19 @@
1515

1616
from frequenz.channels import MergeNamed, Receiver, Select, Sender, Timer
1717

18-
from ...actor import ChannelRegistry, actor
19-
from ...data_pipeline import ComponentMetricRequest, Sample
20-
from .component_metric_group_resampler import ComponentMetricGroupResampler
21-
from .component_metric_resampler import ResamplingFunction
18+
from ..timeseries import GroupResampler, ResamplingFunction, Sample
19+
from . import ChannelRegistry, actor, data_sourcing
2220

2321
logger = logging.Logger(__name__)
2422

2523

24+
# Re-export the types from the data_sourcing actor as we use the same requests,
25+
# we are only forwarding them for now.
26+
ComponentMetricId = data_sourcing.ComponentMetricId
27+
28+
ComponentMetricRequest = data_sourcing.ComponentMetricRequest
29+
30+
2631
# pylint: disable=unused-argument
2732
def average(samples: Sequence[Sample], resampling_period_s: float) -> float:
2833
"""Calculate average of the provided values.
@@ -147,7 +152,7 @@ async def run() -> None:
147152
self._max_data_age_in_periods: float = max_data_age_in_periods
148153
self._resampling_function: ResamplingFunction = resampling_function
149154

150-
self._resampler = ComponentMetricGroupResampler(
155+
self._resampler = GroupResampler(
151156
resampling_period_s=resampling_period_s,
152157
max_data_age_in_periods=max_data_age_in_periods,
153158
initial_resampling_function=resampling_function,
@@ -209,10 +214,12 @@ async def run(self) -> None:
209214
component_data_receiver=MergeNamed(**self._input_receivers),
210215
)
211216
while await select.ready():
212-
if _ := select.resampling_timer:
217+
if msg := select.resampling_timer:
218+
assert msg.inner is not None, "The timer should never be 'closed'"
219+
timestamp = msg.inner
213220
awaitables = [
214221
self._output_senders[channel_name].send(sample)
215-
for channel_name, sample in self._resampler.resample()
222+
for channel_name, sample in self._resampler.resample(timestamp)
216223
]
217224
await asyncio.gather(*awaitables)
218225
if msg := select.component_data_receiver:

src/frequenz/sdk/data_handling/time_series.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@
99
"""
1010
from __future__ import annotations
1111

12-
import datetime
1312
import enum
1413
import logging
1514
from dataclasses import dataclass, field
15+
from datetime import datetime, timedelta, timezone
1616
from typing import Collection, Dict, Generic, Optional, Set, TypeVar
1717

18-
import pytz
19-
2018
from .formula import Formula
2119

2220
Key = TypeVar("Key")
@@ -104,13 +102,13 @@ class Status(enum.Enum):
104102
UNKNOWN = "unknown"
105103
ERROR = "error"
106104

107-
timestamp: datetime.datetime
105+
timestamp: datetime
108106
value: Optional[Value] = None
109107
status: Status = Status.VALID
110108
broken_component_ids: Set[int] = field(default_factory=set)
111109

112110
@staticmethod
113-
def create_error(timestamp: datetime.datetime) -> TimeSeriesEntry[Value]:
111+
def create_error(timestamp: datetime) -> TimeSeriesEntry[Value]:
114112
"""Create a `TimeSeriesEntry` that contains an error.
115113
116114
This can happen when the value would be NaN, e.g.
@@ -128,7 +126,7 @@ def create_error(timestamp: datetime.datetime) -> TimeSeriesEntry[Value]:
128126

129127
@staticmethod
130128
def create_unknown(
131-
timestamp: datetime.datetime, broken_component_ids: Optional[Set[int]] = None
129+
timestamp: datetime, broken_component_ids: Optional[Set[int]] = None
132130
) -> TimeSeriesEntry[Value]:
133131
"""Create a `TimeSeriesEntry` that contains an unknown value.
134132
@@ -174,11 +172,11 @@ class LatestEntryCache(Generic[Key, Value]):
174172

175173
def __init__(self) -> None:
176174
"""Initialize the class."""
177-
self._latest_timestamp = pytz.utc.localize(datetime.datetime.min)
175+
self._latest_timestamp = datetime.min.replace(tzinfo=timezone.utc)
178176
self._entries: Dict[Key, TimeSeriesEntry[Value]] = {}
179177

180178
@property
181-
def latest_timestamp(self) -> datetime.datetime:
179+
def latest_timestamp(self) -> datetime:
182180
"""Get the most recently observed timestamp across all keys in the cache.
183181
184182
Returns:
@@ -209,7 +207,7 @@ def __contains__(self, key: Key) -> bool:
209207
def get(
210208
self,
211209
key: Key,
212-
timedelta_tolerance: datetime.timedelta = datetime.timedelta.max,
210+
timedelta_tolerance: timedelta = timedelta.max,
213211
default: Optional[TimeSeriesEntry[Value]] = None,
214212
) -> CacheEntryLookupResult[Value]:
215213
"""Get the cached entry for the specified key, if any.
@@ -232,7 +230,7 @@ def get(
232230
retrieved from the cache has a timestamp greater than the latest saved
233231
timestamp across all cache keys.
234232
"""
235-
if timedelta_tolerance < datetime.timedelta(0):
233+
if timedelta_tolerance < timedelta(0):
236234
raise ValueError(
237235
f"timedelta_tolerance cannot be less than 0, but "
238236
f"{timedelta_tolerance} was provided"
@@ -320,7 +318,7 @@ def reset(self) -> None:
320318
slightly more efficient.
321319
"""
322320
self.clear()
323-
self._latest_timestamp = pytz.utc.localize(datetime.datetime.min)
321+
self._latest_timestamp = datetime.min.replace(tzinfo=timezone.utc)
324322

325323
def reset_latest_timestamp(self) -> bool:
326324
"""Reset the `latest_timestamp` property to the lowest possible value.
@@ -342,7 +340,7 @@ def reset_latest_timestamp(self) -> bool:
342340

343341
self._latest_timestamp = max(
344342
map(lambda x: x.timestamp, self._entries.values()),
345-
default=pytz.utc.localize(datetime.datetime.min),
343+
default=datetime.min.replace(tzinfo=timezone.utc),
346344
)
347345

348346
if self._latest_timestamp > previous:
@@ -423,7 +421,7 @@ def evaluate(
423421
cache: LatestEntryCache[str, Value],
424422
formula_name: str = "",
425423
symbol_to_symbol_mapping: Optional[Dict[str, SymbolMapping]] = None,
426-
timedelta_tolerance: datetime.timedelta = datetime.timedelta.max,
424+
timedelta_tolerance: timedelta = timedelta.max,
427425
default_entry: Optional[TimeSeriesEntry[Value]] = None,
428426
) -> Optional[TimeSeriesEntry[Value]]:
429427
"""Evaluate the formula using time-series values from the provided cache.
@@ -463,7 +461,7 @@ def evaluate(
463461
`timedelta_tolerance`
464462
"""
465463
kwargs: Dict[str, Optional[Value]] = {}
466-
timestamp = pytz.utc.localize(datetime.datetime.min)
464+
timestamp = datetime.min.replace(tzinfo=timezone.utc)
467465

468466
symbol_to_symbol_mapping = symbol_to_symbol_mapping or {}
469467
formula_broken_component_ids: Set[int] = set()

0 commit comments

Comments
 (0)