Skip to content

Commit ef43a71

Browse files
authored
resampling: Don't subscribe twice to the same source (#123)
When receiving a subcription, we need to check if we are already handling a subscription with the same request parameters to avoid pushing the data twice to the same channel. Fixes #115.
2 parents bc9dd63 + 6c12205 commit ef43a71

File tree

5 files changed

+123
-46
lines changed

5 files changed

+123
-46
lines changed

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,5 +108,8 @@ module = [
108108
ignore_missing_imports = true
109109

110110
[[tool.mypy.overrides]]
111-
module = "async_solipsism"
111+
module = [
112+
"async_solipsism",
113+
"async_solipsism.*",
114+
]
112115
ignore_missing_imports = true

src/frequenz/sdk/actor/_resampling.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,24 +64,37 @@ def __init__( # pylint: disable=too-many-arguments
6464
*relevant* samples at a given time. The result of the function
6565
is what is sent as the resampled data.
6666
"""
67-
self._channel_registry = channel_registry
68-
self._resampling_period_s = resampling_period_s
67+
self._channel_registry: ChannelRegistry = channel_registry
68+
self._resampling_period_s: float = resampling_period_s
6969
self._max_data_age_in_periods: float = max_data_age_in_periods
7070
self._resampling_function: ResamplingFunction = resampling_function
71-
self._data_sourcing_request_sender = data_sourcing_request_sender
72-
self._resampling_request_receiver = resampling_request_receiver
73-
self._resampler = Resampler(
71+
self._data_sourcing_request_sender: Sender[
72+
ComponentMetricRequest
73+
] = data_sourcing_request_sender
74+
self._resampling_request_receiver: Receiver[
75+
ComponentMetricRequest
76+
] = resampling_request_receiver
77+
self._resampler: Resampler = Resampler(
7478
resampling_period_s=resampling_period_s,
7579
max_data_age_in_periods=max_data_age_in_periods,
7680
resampling_function=resampling_function,
7781
)
82+
self._active_req_channels: set[str] = set()
7883

7984
async def _subscribe(self, request: ComponentMetricRequest) -> None:
8085
"""Request data for a component metric.
8186
8287
Args:
8388
request: The request for component metric data.
8489
"""
90+
request_channel_name = request.get_channel_name()
91+
92+
# If we are already handling this request, there is nothing to do.
93+
if request_channel_name in self._active_req_channels:
94+
return
95+
96+
self._active_req_channels.add(request_channel_name)
97+
8598
data_source_request = dataclasses.replace(
8699
request, namespace=request.namespace + ":Source"
87100
)

tests/actor/test_resampling.py

Lines changed: 99 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import async_solipsism
1010
import pytest
1111
import time_machine
12+
from async_solipsism.socket import asyncio
1213
from frequenz.channels import Broadcast
1314

1415
from frequenz.sdk.actor import (
@@ -42,49 +43,19 @@ def _now() -> datetime:
4243
return datetime.now(timezone.utc)
4344

4445

45-
# Even when it could be refactored to use smaller functions, I'm allowing
46-
# too many statements because it makes following failures in tests more easy
47-
# when the code is very flat.
48-
async def test_component_metrics_resampling_actor( # pylint: disable=too-many-statements
46+
async def _assert_resampling_works(
47+
channel_registry: ChannelRegistry,
4948
fake_time: time_machine.Coordinates,
49+
*,
50+
resampling_chan_name: str,
51+
data_source_chan_name: str,
5052
) -> None:
51-
"""Run main functions that initializes and creates everything."""
52-
53-
channel_registry = ChannelRegistry(name="test")
54-
data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req")
55-
data_source_req_recv = data_source_req_chan.new_receiver()
56-
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
57-
resampling_req_sender = resampling_req_chan.new_sender()
58-
59-
resampling_actor = ComponentMetricsResamplingActor(
60-
channel_registry=channel_registry,
61-
data_sourcing_request_sender=data_source_req_chan.new_sender(),
62-
resampling_request_receiver=resampling_req_chan.new_receiver(),
63-
resampling_period_s=0.2,
64-
max_data_age_in_periods=2,
65-
)
66-
67-
subs_req = ComponentMetricRequest(
68-
namespace="Resampling",
69-
component_id=9,
70-
metric_id=ComponentMetricId.SOC,
71-
start_time=None,
72-
)
73-
74-
await resampling_req_sender.send(subs_req)
75-
data_source_req = await data_source_req_recv.receive()
76-
assert data_source_req is not None
77-
assert data_source_req == dataclasses.replace(
78-
subs_req, namespace="Resampling:Source"
79-
)
80-
81-
timeseries_receiver = channel_registry.new_receiver(subs_req.get_channel_name())
82-
timeseries_sender = channel_registry.new_sender(data_source_req.get_channel_name())
53+
timeseries_receiver = channel_registry.new_receiver(resampling_chan_name)
54+
timeseries_sender = channel_registry.new_sender(data_source_chan_name)
8355

8456
fake_time.shift(0.2)
8557
new_sample = await timeseries_receiver.receive() # At 0.2s (timer)
86-
assert new_sample is not None
87-
assert new_sample.value is None
58+
assert new_sample == Sample(_now(), None)
8859

8960
fake_time.shift(0.1)
9061
sample = Sample(_now(), 3) # ts = 0.3s
@@ -105,6 +76,7 @@ async def test_component_metrics_resampling_actor( # pylint: disable=too-many-s
10576
assert new_sample is not None
10677
assert new_sample.value == 3.5 # avg(3, 4)
10778
assert new_sample.timestamp >= sample.timestamp
79+
assert new_sample.timestamp == _now()
10880

10981
fake_time.shift(0.05)
11082
await timeseries_sender.send(Sample(_now(), 8)) # ts = 0.65s
@@ -118,19 +90,108 @@ async def test_component_metrics_resampling_actor( # pylint: disable=too-many-s
11890
assert new_sample is not None
11991
assert new_sample.value == 5.5 # avg(4, 8, 1, 9)
12092
assert new_sample.timestamp >= sample.timestamp
93+
assert new_sample.timestamp == _now()
12194

12295
# No more samples sent
12396
fake_time.shift(0.2)
12497
new_sample = await timeseries_receiver.receive() # At 1.0s (timer)
12598
assert new_sample is not None
12699
assert new_sample.value == 6 # avg(8, 1, 9)
127100
assert new_sample.timestamp >= sample.timestamp
101+
assert new_sample.timestamp == _now()
128102

129103
# No more samples sent
130104
fake_time.shift(0.2)
131105
new_sample = await timeseries_receiver.receive() # At 1.2s (timer)
132106
assert new_sample is not None
133107
assert new_sample.value is None
108+
assert new_sample.timestamp == _now()
109+
110+
111+
async def test_single_request(
112+
fake_time: time_machine.Coordinates,
113+
) -> None:
114+
"""Run main functions that initializes and creates everything."""
115+
116+
channel_registry = ChannelRegistry(name="test")
117+
data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req")
118+
data_source_req_recv = data_source_req_chan.new_receiver()
119+
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
120+
resampling_req_sender = resampling_req_chan.new_sender()
121+
122+
resampling_actor = ComponentMetricsResamplingActor(
123+
channel_registry=channel_registry,
124+
data_sourcing_request_sender=data_source_req_chan.new_sender(),
125+
resampling_request_receiver=resampling_req_chan.new_receiver(),
126+
resampling_period_s=0.2,
127+
max_data_age_in_periods=2,
128+
)
129+
130+
subs_req = ComponentMetricRequest(
131+
namespace="Resampling",
132+
component_id=9,
133+
metric_id=ComponentMetricId.SOC,
134+
start_time=None,
135+
)
136+
137+
await resampling_req_sender.send(subs_req)
138+
data_source_req = await data_source_req_recv.receive()
139+
assert data_source_req is not None
140+
assert data_source_req == dataclasses.replace(
141+
subs_req, namespace="Resampling:Source"
142+
)
143+
144+
await _assert_resampling_works(
145+
channel_registry,
146+
fake_time,
147+
resampling_chan_name=subs_req.get_channel_name(),
148+
data_source_chan_name=data_source_req.get_channel_name(),
149+
)
150+
151+
await resampling_actor._stop() # type: ignore # pylint: disable=no-member,protected-access
152+
await resampling_actor._resampler.stop() # pylint: disable=protected-access
153+
154+
155+
async def test_duplicate_request(
156+
fake_time: time_machine.Coordinates,
157+
) -> None:
158+
"""Run main functions that initializes and creates everything."""
159+
160+
channel_registry = ChannelRegistry(name="test")
161+
data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req")
162+
data_source_req_recv = data_source_req_chan.new_receiver()
163+
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
164+
resampling_req_sender = resampling_req_chan.new_sender()
165+
166+
resampling_actor = ComponentMetricsResamplingActor(
167+
channel_registry=channel_registry,
168+
data_sourcing_request_sender=data_source_req_chan.new_sender(),
169+
resampling_request_receiver=resampling_req_chan.new_receiver(),
170+
resampling_period_s=0.2,
171+
max_data_age_in_periods=2,
172+
)
173+
174+
subs_req = ComponentMetricRequest(
175+
namespace="Resampling",
176+
component_id=9,
177+
metric_id=ComponentMetricId.SOC,
178+
start_time=None,
179+
)
180+
181+
await resampling_req_sender.send(subs_req)
182+
data_source_req = await data_source_req_recv.receive()
183+
184+
# Send duplicate request
185+
await resampling_req_sender.send(subs_req)
186+
with pytest.raises(asyncio.TimeoutError):
187+
await asyncio.wait_for(data_source_req_recv.receive(), timeout=0.1)
188+
189+
await _assert_resampling_works(
190+
channel_registry,
191+
fake_time,
192+
resampling_chan_name=subs_req.get_channel_name(),
193+
data_source_chan_name=data_source_req.get_channel_name(),
194+
)
134195

135196
await resampling_actor._stop() # type: ignore # pylint: disable=no-member,protected-access
136197
await resampling_actor._resampler.stop() # pylint: disable=protected-access

tests/timeseries/test_resampling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# tests/timeseries/test_resampling.py:93: License: MIT
1+
# License: MIT
22
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
33

44
"""

tests/utils/_a_sequence.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# tests/timeseries/test_resampling.py:93: License: MIT
1+
# License: MIT
22
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
33

44
"""Helper class to compare two sequences without caring about the underlying type."""

0 commit comments

Comments
 (0)