Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions benchmarks/timeseries/resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,13 @@

from frequenz.quantities import Quantity

from frequenz.sdk.timeseries import Sample
from frequenz.sdk.timeseries._resampling import (
ResamplerConfig,
SourceProperties,
_ResamplingHelper,
)
from frequenz.sdk.timeseries import ResamplerConfig
from frequenz.sdk.timeseries._resampling._base_types import SourceProperties
from frequenz.sdk.timeseries._resampling._resampler import _ResamplingHelper


def nop( # pylint: disable=unused-argument
samples: Sequence[Sample[Quantity]],
samples: Sequence[tuple[datetime, Quantity]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if a NoNoneSample type or so might be a nice abstraction here. I can imagine there are actually many places that would enjoy not having to check for None after they initially received the data. I can certainly use it in FCR as well.

For now its fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in any case here we want to get rid of even the Quantity, we shouldn't really use Sample either, at least until we can use Sample[float].

If we go with the None/not None sample, maybe it can be OptionalSample (None values allowed) and Sample (can't have a None value) to follow Python's old Optional as an alias of | None. Another option, which I'm not sure if it is doable or not, is Sample[Power] vs Sample[Power | None]. I have the feeling we tried to make that work and failed, but maybe I'm just remembering wrongly.

resampler_config: ResamplerConfig,
source_properties: SourceProperties,
) -> float:
Expand Down Expand Up @@ -46,7 +43,7 @@ def _do_work() -> None:
for _n_resample in range(resamples):
for _n_sample in range(samples):
now = now + timedelta(seconds=1 / samples)
helper.add_sample(Sample(now, Quantity(0.0)))
helper.add_sample((now, Quantity(0.0)))
helper.resample(now)

print(timeit(_do_work, number=5))
Expand Down
3 changes: 1 addition & 2 deletions examples/battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ async def main() -> None:
receivers = [
battery_pool.soc.new_receiver(limit=1),
battery_pool.capacity.new_receiver(limit=1),
# pylint: disable=protected-access
# pylint: disable-next=protected-access
battery_pool._system_power_bounds.new_receiver(limit=1),
# pylint: enable=protected-access
]

async for metric in merge(*receivers):
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/sdk/actor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ async def main() -> None: # (6)!
[_run]: #the-_run-method
"""

from ..timeseries._resampling import ResamplerConfig
from ..timeseries._resampling._config import ResamplerConfig
from ._actor import Actor
from ._background_service import BackgroundService
from ._run_utils import run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ def _is_inverter_state_correct(self, msg: InverterData) -> bool:
True if inverter is in correct state. False otherwise.
"""
# Component state is not exposed to the user.
# pylint: disable=protected-access
state = msg.component_state
# pylint: disable-next=protected-access
if state not in BatteryStatusTracker._inverter_valid_state:
if self._last_status == ComponentStatusEnum.WORKING:
_logger.warning(
Expand All @@ -404,7 +404,6 @@ def _is_inverter_state_correct(self, msg: InverterData) -> bool:
state.name,
)
return False
# pylint: enable=protected-access
return True

def _is_battery_state_correct(self, msg: BatteryData) -> bool:
Expand All @@ -417,8 +416,8 @@ def _is_battery_state_correct(self, msg: BatteryData) -> bool:
True if battery is in correct state. False otherwise.
"""
# Component state is not exposed to the user.
# pylint: disable=protected-access
state = msg.component_state
# pylint: disable-next=protected-access
if state not in BatteryStatusTracker._battery_valid_state:
if self._last_status == ComponentStatusEnum.WORKING:
_logger.warning(
Expand All @@ -439,7 +438,6 @@ def _is_battery_state_correct(self, msg: BatteryData) -> bool:
)
return False
return True
# pylint: enable=protected-access

def _is_timestamp_outdated(self, timestamp: datetime) -> bool:
"""Return if timestamp is to old.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ def _compute_battery_availability_ratio(

return battery_availability_ratio, total_battery_availability_ratio

def _distribute_power( # pylint: disable=too-many-arguments
# pylint: disable-next=too-many-arguments,too-many-locals,too-many-branches,too-many-statements
def _distribute_power(
self,
*,
components: list[InvBatPair],
Expand All @@ -470,7 +471,6 @@ def _distribute_power( # pylint: disable=too-many-arguments
incl_bounds: dict[ComponentId, Power],
excl_bounds: dict[ComponentId, Power],
) -> DistributionResult:
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
"""Distribute power between given components.

After this method power should be distributed between batteries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
_logger = logging.getLogger(__name__)


class PowerDistributingActor(Actor):
# pylint: disable=too-many-instance-attributes
class PowerDistributingActor(Actor): # pylint: disable=too-many-instance-attributes
"""Actor to distribute the power between components in a microgrid.

One instance of the actor can handle only one component category and type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,17 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[ComponentId]) -> N
NotImplementedError: When the pool type is not supported.
"""
bounds_receiver: Receiver[SystemBounds]
# pylint: disable=protected-access
if self._component_category is ComponentCategory.BATTERY:
battery_pool = _data_pipeline.new_battery_pool(
priority=-sys.maxsize - 1, component_ids=component_ids
)
# pylint: disable-next=protected-access
bounds_receiver = battery_pool._system_power_bounds.new_receiver()
elif self._component_category is ComponentCategory.EV_CHARGER:
ev_charger_pool = _data_pipeline.new_ev_charger_pool(
priority=-sys.maxsize - 1, component_ids=component_ids
)
# pylint: disable-next=protected-access
bounds_receiver = ev_charger_pool._system_power_bounds.new_receiver()
elif (
self._component_category is ComponentCategory.INVERTER
Expand All @@ -176,8 +177,8 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[ComponentId]) -> N
pv_pool = _data_pipeline.new_pv_pool(
priority=-sys.maxsize - 1, component_ids=component_ids
)
# pylint: disable-next=protected-access
bounds_receiver = pv_pool._system_power_bounds.new_receiver()
# pylint: enable=protected-access
else:
err = (
"PowerManagingActor: Unsupported component category: "
Expand Down
10 changes: 6 additions & 4 deletions src/frequenz/sdk/microgrid/_power_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
from datetime import timedelta

from frequenz.channels import Broadcast

# pylint seems to think this is a cyclic import, but it is not.
#
# pylint: disable=cyclic-import
from frequenz.client.microgrid import ComponentCategory, ComponentType

from .._internal._channels import ChannelRegistry, ReceiverFetcher

# pylint seems to think this is a cyclic import, but it is not.
#
# pylint: disable-next=cyclic-import
from . import _power_managing, connection_manager

# pylint: disable-next=cyclic-import
from ._power_distributing import (
ComponentPoolStatus,
PowerDistributingActor,
Expand Down
6 changes: 4 additions & 2 deletions src/frequenz/sdk/microgrid/_resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

from .._internal._asyncio import cancel_and_await
from .._internal._channels import ChannelRegistry
from ..actor import Actor
from ..actor._actor import Actor
from ..timeseries import Sample
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
from ..timeseries._resampling._config import ResamplerConfig
from ..timeseries._resampling._exceptions import ResamplingError
from ..timeseries._resampling._resampler import Resampler
from ._data_sourcing import ComponentMetricRequest

_logger = logging.getLogger(__name__)
Expand Down
5 changes: 4 additions & 1 deletion src/frequenz/sdk/timeseries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
from ._fuse import Fuse
from ._moving_window import MovingWindow
from ._periodic_feature_extractor import PeriodicFeatureExtractor
from ._resampling import ResamplerConfig
from ._resampling._base_types import SourceProperties
from ._resampling._config import ResamplerConfig, ResamplingFunction

__all__ = [
"Bounds",
Expand All @@ -49,7 +50,9 @@
"PeriodicFeatureExtractor",
"ResamplerConfig",
"ReceiverFetcher",
"ResamplingFunction",
"Sample",
"Sample3Phase",
"SourceProperties",
"UNIX_EPOCH",
]
3 changes: 2 additions & 1 deletion src/frequenz/sdk/timeseries/_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

from ..actor._background_service import BackgroundService
from ._base_types import UNIX_EPOCH, Sample
from ._resampling import Resampler, ResamplerConfig
from ._resampling._config import ResamplerConfig
from ._resampling._resampler import Resampler
from ._ringbuffer import OrderedRingBuffer

_logger = logging.getLogger(__name__)
Expand Down
4 changes: 4 additions & 0 deletions src/frequenz/sdk/timeseries/_resampling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Timeseries resampling package."""
58 changes: 58 additions & 0 deletions src/frequenz/sdk/timeseries/_resampling/_base_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Resampler base types."""

from collections.abc import AsyncIterator, Callable, Coroutine
from dataclasses import dataclass
from datetime import datetime, timedelta

from frequenz.quantities import Quantity

from .._base_types import Sample

Source = AsyncIterator[Sample[Quantity]]
"""A source for a timeseries.

A timeseries can be received sample by sample in a streaming way
using a source.
"""

Sink = Callable[[Sample[Quantity]], Coroutine[None, None, None]]
"""A sink for a timeseries.

A new timeseries can be generated by sending samples to a sink.

This should be an `async` callable, for example:

```python
async some_sink(Sample) -> None:
...
```

Args:
sample (Sample): A sample to be sent out.
"""


@dataclass
class SourceProperties:
"""Properties of a resampling source."""

sampling_start: datetime | None = None
"""The time when resampling started for this source.

`None` means it didn't started yet.
"""

received_samples: int = 0
"""Total samples received by this source so far."""

sampling_period: timedelta | None = None
"""The sampling period of this source.

This means we receive (on average) one sample for this source every
`sampling_period` time.

`None` means it is unknown.
"""
Loading