Skip to content

Commit 0efa1e4

Browse files
committed
resampling: Don't subcribe twice to the same source
When receiving a subcription, we need to check if we are already handling a subscription with the same request parameters to avoid pushing the data twice to the same channel. Fixes #115. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent f692491 commit 0efa1e4

File tree

2 files changed

+108
-38
lines changed

2 files changed

+108
-38
lines changed

src/frequenz/sdk/actor/_resampling.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,22 @@ def __init__( # pylint: disable=too-many-arguments
7979
max_data_age_in_periods=max_data_age_in_periods,
8080
resampling_function=resampling_function,
8181
)
82+
self._active_req_channels: set[str] = set()
8283

8384
async def _subscribe(self, request: ComponentMetricRequest) -> None:
8485
"""Request data for a component metric.
8586
8687
Args:
8788
request: The request for component metric data.
8889
"""
90+
request_channel_name = request.get_channel_name()
91+
92+
# If we are already handling this request, there is nothing to do.
93+
if request_channel_name in self._active_req_channels:
94+
return
95+
96+
self._active_req_channels.add(request_channel_name)
97+
8998
data_source_request = dataclasses.replace(
9099
request, namespace=request.namespace + ":Source"
91100
)

tests/actor/test_resampling.py

Lines changed: 99 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import async_solipsism
1010
import pytest
1111
import time_machine
12+
from async_solipsism.socket import asyncio
1213
from frequenz.channels import Broadcast
1314

1415
from frequenz.sdk.actor import (
@@ -42,49 +43,19 @@ def _now() -> datetime:
4243
return datetime.now(timezone.utc)
4344

4445

45-
# Even when it could be refactored to use smaller functions, I'm allowing
46-
# too many statements because it makes following failures in tests more easy
47-
# when the code is very flat.
48-
async def test_component_metrics_resampling_actor( # pylint: disable=too-many-statements
46+
async def _assert_resampling_works(
47+
channel_registry: ChannelRegistry,
4948
fake_time: time_machine.Coordinates,
49+
*,
50+
resampling_chan_name: str,
51+
data_source_chan_name: str,
5052
) -> None:
51-
"""Run main functions that initializes and creates everything."""
52-
53-
channel_registry = ChannelRegistry(name="test")
54-
data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req")
55-
data_source_req_recv = data_source_req_chan.new_receiver()
56-
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
57-
resampling_req_sender = resampling_req_chan.new_sender()
58-
59-
resampling_actor = ComponentMetricsResamplingActor(
60-
channel_registry=channel_registry,
61-
data_sourcing_request_sender=data_source_req_chan.new_sender(),
62-
resampling_request_receiver=resampling_req_chan.new_receiver(),
63-
resampling_period_s=0.2,
64-
max_data_age_in_periods=2,
65-
)
66-
67-
subs_req = ComponentMetricRequest(
68-
namespace="Resampling",
69-
component_id=9,
70-
metric_id=ComponentMetricId.SOC,
71-
start_time=None,
72-
)
73-
74-
await resampling_req_sender.send(subs_req)
75-
data_source_req = await data_source_req_recv.receive()
76-
assert data_source_req is not None
77-
assert data_source_req == dataclasses.replace(
78-
subs_req, namespace="Resampling:Source"
79-
)
80-
81-
timeseries_receiver = channel_registry.new_receiver(subs_req.get_channel_name())
82-
timeseries_sender = channel_registry.new_sender(data_source_req.get_channel_name())
53+
timeseries_receiver = channel_registry.new_receiver(resampling_chan_name)
54+
timeseries_sender = channel_registry.new_sender(data_source_chan_name)
8355

8456
fake_time.shift(0.2)
8557
new_sample = await timeseries_receiver.receive() # At 0.2s (timer)
86-
assert new_sample is not None
87-
assert new_sample.value is None
58+
assert new_sample == Sample(_now(), None)
8859

8960
fake_time.shift(0.1)
9061
sample = Sample(_now(), 3) # ts = 0.3s
@@ -105,6 +76,7 @@ async def test_component_metrics_resampling_actor( # pylint: disable=too-many-s
10576
assert new_sample is not None
10677
assert new_sample.value == 3.5 # avg(3, 4)
10778
assert new_sample.timestamp >= sample.timestamp
79+
assert new_sample.timestamp == _now()
10880

10981
fake_time.shift(0.05)
11082
await timeseries_sender.send(Sample(_now(), 8)) # ts = 0.65s
@@ -118,19 +90,108 @@ async def test_component_metrics_resampling_actor( # pylint: disable=too-many-s
11890
assert new_sample is not None
11991
assert new_sample.value == 5.5 # avg(4, 8, 1, 9)
12092
assert new_sample.timestamp >= sample.timestamp
93+
assert new_sample.timestamp == _now()
12194

12295
# No more samples sent
12396
fake_time.shift(0.2)
12497
new_sample = await timeseries_receiver.receive() # At 1.0s (timer)
12598
assert new_sample is not None
12699
assert new_sample.value == 6 # avg(8, 1, 9)
127100
assert new_sample.timestamp >= sample.timestamp
101+
assert new_sample.timestamp == _now()
128102

129103
# No more samples sent
130104
fake_time.shift(0.2)
131105
new_sample = await timeseries_receiver.receive() # At 1.2s (timer)
132106
assert new_sample is not None
133107
assert new_sample.value is None
108+
assert new_sample.timestamp == _now()
109+
110+
111+
async def test_single_request(
112+
fake_time: time_machine.Coordinates,
113+
) -> None:
114+
"""Run main functions that initializes and creates everything."""
115+
116+
channel_registry = ChannelRegistry(name="test")
117+
data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req")
118+
data_source_req_recv = data_source_req_chan.new_receiver()
119+
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
120+
resampling_req_sender = resampling_req_chan.new_sender()
121+
122+
resampling_actor = ComponentMetricsResamplingActor(
123+
channel_registry=channel_registry,
124+
data_sourcing_request_sender=data_source_req_chan.new_sender(),
125+
resampling_request_receiver=resampling_req_chan.new_receiver(),
126+
resampling_period_s=0.2,
127+
max_data_age_in_periods=2,
128+
)
129+
130+
subs_req = ComponentMetricRequest(
131+
namespace="Resampling",
132+
component_id=9,
133+
metric_id=ComponentMetricId.SOC,
134+
start_time=None,
135+
)
136+
137+
await resampling_req_sender.send(subs_req)
138+
data_source_req = await data_source_req_recv.receive()
139+
assert data_source_req is not None
140+
assert data_source_req == dataclasses.replace(
141+
subs_req, namespace="Resampling:Source"
142+
)
143+
144+
await _assert_resampling_works(
145+
channel_registry,
146+
fake_time,
147+
resampling_chan_name=subs_req.get_channel_name(),
148+
data_source_chan_name=data_source_req.get_channel_name(),
149+
)
150+
151+
await resampling_actor._stop() # type: ignore # pylint: disable=no-member,protected-access
152+
await resampling_actor._resampler.stop() # pylint: disable=protected-access
153+
154+
155+
async def test_duplicate_request(
156+
fake_time: time_machine.Coordinates,
157+
) -> None:
158+
"""Run main functions that initializes and creates everything."""
159+
160+
channel_registry = ChannelRegistry(name="test")
161+
data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req")
162+
data_source_req_recv = data_source_req_chan.new_receiver()
163+
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
164+
resampling_req_sender = resampling_req_chan.new_sender()
165+
166+
resampling_actor = ComponentMetricsResamplingActor(
167+
channel_registry=channel_registry,
168+
data_sourcing_request_sender=data_source_req_chan.new_sender(),
169+
resampling_request_receiver=resampling_req_chan.new_receiver(),
170+
resampling_period_s=0.2,
171+
max_data_age_in_periods=2,
172+
)
173+
174+
subs_req = ComponentMetricRequest(
175+
namespace="Resampling",
176+
component_id=9,
177+
metric_id=ComponentMetricId.SOC,
178+
start_time=None,
179+
)
180+
181+
await resampling_req_sender.send(subs_req)
182+
data_source_req = await data_source_req_recv.receive()
183+
184+
# Send duplicate request
185+
await resampling_req_sender.send(subs_req)
186+
with pytest.raises(asyncio.TimeoutError):
187+
await asyncio.wait_for(data_source_req_recv.receive(), timeout=0.1)
188+
189+
await _assert_resampling_works(
190+
channel_registry,
191+
fake_time,
192+
resampling_chan_name=subs_req.get_channel_name(),
193+
data_source_chan_name=data_source_req.get_channel_name(),
194+
)
134195

135196
await resampling_actor._stop() # type: ignore # pylint: disable=no-member,protected-access
136197
await resampling_actor._resampler.stop() # pylint: disable=protected-access

0 commit comments

Comments
 (0)