Skip to content

Commit af8facb

Browse files
Use a bigger request receiver buffer size in data pipeline actors (#302)
2 parents ae128f3 + e206df5 commit af8facb

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
from ..timeseries.ev_charger_pool import EVChargerPool
3030
from ..timeseries.logical_meter import LogicalMeter
3131

32+
_REQUEST_RECV_BUFFER_SIZE = 500
33+
"""The maximum number of requests that can be queued in the request receiver.
34+
35+
A larger buffer size means that the DataSourcing and Resampling actors don't drop
36+
requests and will be able to keep up with higher request rates in larger installations.
37+
"""
38+
3239

3340
@dataclass
3441
class _ActorInfo:
@@ -131,7 +138,10 @@ def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
131138
"Data Pipeline: Data Sourcing Actor Request Channel"
132139
)
133140
actor = DataSourcingActor(
134-
request_receiver=channel.new_receiver(), registry=self._channel_registry
141+
request_receiver=channel.new_receiver(
142+
maxsize=_REQUEST_RECV_BUFFER_SIZE
143+
),
144+
registry=self._channel_registry,
135145
)
136146
self._data_sourcing_actor = _ActorInfo(actor, channel)
137147
return self._data_sourcing_actor.channel.new_sender()
@@ -153,7 +163,9 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]:
153163
actor = ComponentMetricsResamplingActor(
154164
channel_registry=self._channel_registry,
155165
data_sourcing_request_sender=self._data_sourcing_request_sender(),
156-
resampling_request_receiver=channel.new_receiver(),
166+
resampling_request_receiver=channel.new_receiver(
167+
maxsize=_REQUEST_RECV_BUFFER_SIZE
168+
),
157169
config=self._resampler_config,
158170
)
159171
self._resampling_actor = _ActorInfo(actor, channel)

0 commit comments

Comments
 (0)