|
6 | 6 | import asyncio |
7 | 7 | import dataclasses |
8 | 8 | import logging |
9 | | -import math |
10 | | -from typing import Dict, Sequence |
| 9 | +from typing import Sequence, Set |
11 | 10 |
|
12 | 11 | from frequenz.channels import Receiver, Sender |
13 | | -from frequenz.channels.util import MergeNamed, Select, Timer |
14 | 12 |
|
15 | | -from ..timeseries import GroupResampler, ResamplingFunction, Sample |
| 13 | +from frequenz.sdk.util.asyncio import cancel_and_await |
| 14 | + |
| 15 | +from ..timeseries import Sample |
| 16 | +from ..timeseries.resampling import Resampler, ResamplingError, ResamplingFunction |
16 | 17 | from ._channel_registry import ChannelRegistry |
17 | 18 | from ._data_sourcing import ComponentMetricRequest |
18 | 19 | from ._decorator import actor |
@@ -141,97 +142,111 @@ async def run() -> None: |
141 | 142 | self._resampling_period_s = resampling_period_s |
142 | 143 | self._max_data_age_in_periods: float = max_data_age_in_periods |
143 | 144 | self._resampling_function: ResamplingFunction = resampling_function |
144 | | - |
145 | | - self._resampler = GroupResampler( |
| 145 | + self._resampler = Resampler( |
146 | 146 | resampling_period_s=resampling_period_s, |
147 | 147 | max_data_age_in_periods=max_data_age_in_periods, |
148 | | - initial_resampling_function=resampling_function, |
| 148 | + resampling_function=resampling_function, |
149 | 149 | ) |
150 | 150 |
|
151 | | - self._input_receivers: Dict[str, Receiver[Sample]] = {} |
152 | | - self._output_senders: Dict[str, Sender[Sample]] = {} |
153 | | - self._resampling_timer = Timer(interval=self._resampling_period_s) |
154 | | - |
155 | 151 | async def _subscribe(self, request: ComponentMetricRequest) -> None: |
156 | 152 | """Subscribe for data for a specific time series. |
157 | 153 |
|
158 | 154 | Args: |
159 | 155 | request: subscription request for a specific component metric |
160 | 156 | """ |
161 | | - channel_name = request.get_channel_name() |
162 | | - |
163 | | - data_source_request = dataclasses.replace(request, **dict(namespace="Source")) |
| 157 | + data_source_request = dataclasses.replace( |
| 158 | + request, namespace=request.namespace + ":Source" |
| 159 | + ) |
164 | 160 | data_source_channel_name = data_source_request.get_channel_name() |
165 | | - if channel_name not in self._input_receivers: |
166 | | - await self._subscription_sender.send(data_source_request) |
167 | | - receiver: Receiver[Sample] = self._channel_registry.new_receiver( |
168 | | - data_source_channel_name |
169 | | - ) |
170 | | - self._input_receivers[data_source_channel_name] = receiver |
171 | | - self._resampler.add_time_series(time_series_id=data_source_channel_name) |
| 161 | + await self._subscription_sender.send(data_source_request) |
| 162 | + receiver = self._channel_registry.new_receiver(data_source_channel_name) |
172 | 163 |
|
173 | | - if channel_name not in self._output_senders: |
174 | | - sender: Sender[Sample] = self._channel_registry.new_sender(channel_name) |
175 | | - # This means that the `sender` will be sending samples to the channel with |
176 | | - # name `channel_name` based on samples collected from the channel named |
177 | | - # `data_source_channel_name` |
178 | | - self._output_senders[data_source_channel_name] = sender |
| 164 | + # This is a temporary hack until the Sender implementation uses |
| 165 | + # exceptions to report errors. |
| 166 | + sender = self._channel_registry.new_sender(request.get_channel_name()) |
179 | 167 |
|
180 | | - def _is_sample_valid(self, sample: Sample) -> bool: |
181 | | - """Check if the provided sample is valid. |
| 168 | + async def sink_adapter(sample: Sample) -> None: |
| 169 | + if not await sender.send(sample): |
| 170 | + raise Exception(f"Error while sending with sender {sender}", sender) |
182 | 171 |
|
183 | | - Args: |
184 | | - sample: sample to be validated |
| 172 | + self._resampler.add_timeseries(receiver, sink_adapter) |
185 | 173 |
|
186 | | - Returns: |
187 | | - True if the sample is valid, False otherwise |
188 | | - """ |
189 | | - if sample.value is None or math.isnan(sample.value): |
190 | | - return False |
191 | | - return True |
| 174 | + async def _process_resampling_requests(self) -> None: |
| 175 | + """Process resampling data requests.""" |
| 176 | + async for request in self._subscription_receiver: |
| 177 | + await self._subscribe(request) |
192 | 178 |
|
193 | 179 | async def run(self) -> None: |
194 | | - """Run the actor. |
| 180 | + """Resample known component metrics and process resampling requests. |
| 181 | +
|
| 182 | + If there is a resampling error while resampling some component metric, |
| 183 | + then that metric will be discarded and not resampled any more. Any |
| 184 | + other error will be propagated (most likely ending in the actor being |
| 185 | + restarted). |
195 | 186 |
|
196 | 187 | Raises: |
197 | | - ConnectionError: When the provider of the subscription channel closes the |
198 | | - connection |
| 188 | + RuntimeError: If there is some unexpected error while resampling or |
| 189 | + handling requests. |
| 190 | +
|
| 191 | + # noqa: DAR401 error |
199 | 192 | """ |
200 | | - while True: |
201 | | - select = Select( |
202 | | - resampling_timer=self._resampling_timer, |
203 | | - subscription_receiver=self._subscription_receiver, |
204 | | - component_data_receiver=MergeNamed(**self._input_receivers), |
| 193 | + tasks_to_cancel: Set[asyncio.Task] = set() |
| 194 | + try: |
| 195 | + subscriptions_task = asyncio.create_task( |
| 196 | + self._process_resampling_requests() |
205 | 197 | ) |
206 | | - while await select.ready(): |
207 | | - if msg := select.resampling_timer: |
208 | | - assert msg.inner is not None, "The timer should never be 'closed'" |
209 | | - timestamp = msg.inner |
210 | | - awaitables = [ |
211 | | - self._output_senders[channel_name].send(sample) |
212 | | - for channel_name, sample in self._resampler.resample(timestamp) |
213 | | - ] |
214 | | - await asyncio.gather(*awaitables) |
215 | | - if msg := select.component_data_receiver: |
216 | | - if msg.inner is None: |
217 | | - # When this happens, then DataSourcingActor has closed the channel |
218 | | - # for sending data for a specific `ComponentMetricRequest`, |
219 | | - # which may need to be handled properly here, e.g. unsubscribe |
220 | | - continue |
221 | | - channel_name, sample = msg.inner |
222 | | - if self._is_sample_valid(sample=sample): |
223 | | - self._resampler.add_sample( |
224 | | - time_series_id=channel_name, |
225 | | - sample=sample, |
226 | | - ) |
227 | | - if msg := select.subscription_receiver: |
228 | | - if msg.inner is None: |
229 | | - raise ConnectionError( |
230 | | - "Subscription channel connection has been closed!" |
| 198 | + tasks_to_cancel.add(subscriptions_task) |
| 199 | + |
| 200 | + while True: |
| 201 | + resampling_task = asyncio.create_task(self._resampler.resample()) |
| 202 | + tasks_to_cancel.add(resampling_task) |
| 203 | + done, _ = await asyncio.wait( |
| 204 | + [resampling_task, subscriptions_task], |
| 205 | + return_when=asyncio.FIRST_COMPLETED, |
| 206 | + ) |
| 207 | + |
| 208 | + if subscriptions_task in done: |
| 209 | + tasks_to_cancel.remove(subscriptions_task) |
| 210 | + raise RuntimeError( |
| 211 | + "There was a problem with the subscriptions channel." |
| 212 | + ) |
| 213 | + |
| 214 | + if resampling_task in done: |
| 215 | + tasks_to_cancel.remove(resampling_task) |
| 216 | + # The resampler shouldn't end without an exception |
| 217 | + error = resampling_task.exception() |
| 218 | + assert ( |
| 219 | + error is not None |
| 220 | + ), "The resample() function shouldn't exit normally." |
| 221 | + |
| 222 | + # We don't know what to do with something other than |
| 223 | + # ResamplingError, so propagate the exception if that is the |
| 224 | + # case. |
| 225 | + if not isinstance(error, ResamplingError): |
| 226 | + raise error |
| 227 | + for source, source_error in error.exceptions.items(): |
| 228 | + logger.error( |
| 229 | + "Error resampling source %s, removing source...", source |
231 | 230 | ) |
232 | | - await self._subscribe(request=msg.inner) |
233 | | - # Breaking out from the loop is required to regenerate |
234 | | - # component_data_receivers to be able to fulfil this |
235 | | - # subscription (later can be optimized by checking if |
236 | | - # an output channel already existed in the `subscribe()` method) |
237 | | - break |
| 231 | + removed = self._resampler.remove_timeseries(source) |
| 232 | + if not removed: |
| 233 | + logger.warning( |
| 234 | + "Got an exception from an unknown source: " |
| 235 | + "source=%r, exception=%r", |
| 236 | + source, |
| 237 | + source_error, |
| 238 | + ) |
| 239 | + # The resampling_task will be re-created if we reached this point |
| 240 | + finally: |
| 241 | + await asyncio.gather(*[cancel_and_await(t) for t in tasks_to_cancel]) |
| 242 | + |
| 243 | + # XXX: Here we should probably do a: pylint: disable=fixme |
| 244 | + # await self._resampler.stop() |
| 245 | + # But since the actor will be restarted, the internal state would |
| 246 | + # be broken if we stop the resampler. |
| 247 | + # |
| 248 | + # We have an even bigger problem with this naive restarting |
| 249 | + # approach, as restarting this actor without really resetting its |
| 250 | + # state would be mostly the same as not really leaving the run() |
| 251 | + # method and just swallow any exception, which doesn't look super |
| 252 | + # smart. |
0 commit comments