Skip to content

Commit 3a266f8

Browse files
committed
Migrate DataSourcingActor to the Actor class
Signed-off-by: Leandro Lucarella <[email protected]>
1 parent cba33f7 commit 3a266f8

File tree

4 files changed

+61
-62
lines changed

4 files changed

+61
-62
lines changed

benchmarks/timeseries/benchmark_datasourcing.py

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -113,23 +113,22 @@ async def consume(channel: Receiver[Any]) -> None:
113113
await request_sender.send(request)
114114
consume_tasks.append(asyncio.create_task(consume(recv_channel)))
115115

116-
DataSourcingActor(request_receiver, channel_registry)
117-
118-
await asyncio.gather(*consume_tasks)
119-
120-
time_taken = perf_counter() - start_time
121-
122-
await mock_grid.cleanup()
123-
124-
print(f"Samples Sent: {samples_sent}, time taken: {time_taken}")
125-
print(f"Samples per second: {samples_sent / time_taken}")
126-
print(
127-
"Expected samples: "
128-
f"{num_expected_messages}, missing: {num_expected_messages - samples_sent}"
129-
)
130-
print(
131-
f"Missing per EVC: {(num_expected_messages - samples_sent) / num_ev_chargers}"
132-
)
116+
async with DataSourcingActor(request_receiver, channel_registry):
117+
await asyncio.gather(*consume_tasks)
118+
119+
time_taken = perf_counter() - start_time
120+
121+
await mock_grid.cleanup()
122+
123+
print(f"Samples Sent: {samples_sent}, time taken: {time_taken}")
124+
print(f"Samples per second: {samples_sent / time_taken}")
125+
print(
126+
"Expected samples: "
127+
f"{num_expected_messages}, missing: {num_expected_messages - samples_sent}"
128+
)
129+
print(
130+
f"Missing per EVC: {(num_expected_messages - samples_sent) / num_ev_chargers}"
131+
)
133132

134133

135134
def parse_args() -> Tuple[int, int, bool]:

src/frequenz/sdk/actor/_data_sourcing/data_sourcing.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@
55

66
from frequenz.channels import Receiver
77

8+
from .._actor import Actor
89
from .._channel_registry import ChannelRegistry
9-
from .._decorator import actor
1010
from .microgrid_api_source import ComponentMetricRequest, MicrogridApiSource
1111

1212

13-
@actor
14-
class DataSourcingActor:
13+
class DataSourcingActor(Actor):
1514
"""An actor that provides data streams of metrics as time series."""
1615

1716
def __init__(
@@ -26,10 +25,11 @@ def __init__(
2625
registry: A channel registry. To be replaced by a singleton
2726
instance.
2827
"""
28+
super().__init__()
2929
self._request_receiver = request_receiver
3030
self._microgrid_api_source = MicrogridApiSource(registry)
3131

32-
async def run(self) -> None:
32+
async def _run(self) -> None:
3333
"""Run the actor."""
3434
async for request in self._request_receiver:
3535
await self._microgrid_api_source.add_metric(request)

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,17 +266,17 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]:
266266

267267
async def _start(self) -> None:
268268
"""Start the data pipeline actors."""
269+
if self._data_sourcing_actor:
270+
await self._data_sourcing_actor.actor.start()
269271
if self._resampling_actor:
270272
await self._resampling_actor.actor.start()
271273

272274
async def _stop(self) -> None:
273275
"""Stop the data pipeline actors."""
274-
# pylint: disable=protected-access
275276
if self._data_sourcing_actor:
276-
await self._data_sourcing_actor.actor._stop() # type: ignore
277+
await self._data_sourcing_actor.actor.stop()
277278
if self._resampling_actor:
278279
await self._resampling_actor.actor.stop()
279-
# pylint: enable=protected-access
280280

281281

282282
_DATA_PIPELINE: _DataPipeline | None = None

tests/actor/test_data_sourcing.py

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -57,41 +57,41 @@ async def test_data_sourcing_actor(self) -> None:
5757

5858
registry = ChannelRegistry(name="test-registry")
5959

60-
DataSourcingActor(req_chan.new_receiver(), registry)
61-
active_power_request = ComponentMetricRequest(
62-
"test-namespace", 4, ComponentMetricId.ACTIVE_POWER, None
63-
)
64-
active_power_recv = registry.new_receiver(
65-
active_power_request.get_channel_name()
66-
)
67-
await req_sender.send(active_power_request)
68-
69-
soc_request = ComponentMetricRequest(
70-
"test-namespace", 9, ComponentMetricId.SOC, None
71-
)
72-
soc_recv = registry.new_receiver(soc_request.get_channel_name())
73-
await req_sender.send(soc_request)
74-
75-
soc2_request = ComponentMetricRequest(
76-
"test-namespace", 9, ComponentMetricId.SOC, None
77-
)
78-
soc2_recv = registry.new_receiver(soc2_request.get_channel_name())
79-
await req_sender.send(soc2_request)
80-
81-
for _ in range(3):
82-
sample = await soc_recv.receive()
83-
assert sample is not None
84-
assert 9.0 == sample.value.base_value
85-
86-
sample = await soc2_recv.receive()
87-
assert sample is not None
88-
assert 9.0 == sample.value.base_value
89-
90-
sample = await active_power_recv.receive()
91-
assert sample is not None
92-
assert 100.0 == sample.value.base_value
93-
94-
assert await server.graceful_shutdown()
95-
connection_manager._CONNECTION_MANAGER = ( # pylint: disable=protected-access
96-
None
97-
)
60+
async with DataSourcingActor(req_chan.new_receiver(), registry):
61+
active_power_request = ComponentMetricRequest(
62+
"test-namespace", 4, ComponentMetricId.ACTIVE_POWER, None
63+
)
64+
active_power_recv = registry.new_receiver(
65+
active_power_request.get_channel_name()
66+
)
67+
await req_sender.send(active_power_request)
68+
69+
soc_request = ComponentMetricRequest(
70+
"test-namespace", 9, ComponentMetricId.SOC, None
71+
)
72+
soc_recv = registry.new_receiver(soc_request.get_channel_name())
73+
await req_sender.send(soc_request)
74+
75+
soc2_request = ComponentMetricRequest(
76+
"test-namespace", 9, ComponentMetricId.SOC, None
77+
)
78+
soc2_recv = registry.new_receiver(soc2_request.get_channel_name())
79+
await req_sender.send(soc2_request)
80+
81+
for _ in range(3):
82+
sample = await soc_recv.receive()
83+
assert sample is not None
84+
assert 9.0 == sample.value.base_value
85+
86+
sample = await soc2_recv.receive()
87+
assert sample is not None
88+
assert 9.0 == sample.value.base_value
89+
90+
sample = await active_power_recv.receive()
91+
assert sample is not None
92+
assert 100.0 == sample.value.base_value
93+
94+
assert await server.graceful_shutdown()
95+
connection_manager._CONNECTION_MANAGER = ( # pylint: disable=protected-access
96+
None
97+
)

0 commit comments

Comments
 (0)