|
3 | 3 |
|
4 | 4 | """Timeseries resampler.""" |
5 | 5 |
|
| 6 | +from __future__ import annotations |
| 7 | + |
6 | 8 | import asyncio |
7 | 9 | import logging |
8 | 10 | import math |
9 | 11 | from collections import deque |
10 | 12 | from datetime import datetime, timedelta |
11 | | -from typing import Callable, Deque, Dict, Sequence, Union, cast |
| 13 | +from typing import Callable, Sequence |
12 | 14 |
|
13 | 15 | from frequenz.channels.util import Timer |
14 | 16 |
|
@@ -90,7 +92,7 @@ class ResamplingError(RuntimeError): |
90 | 92 | """ |
91 | 93 |
|
92 | 94 | def __init__( |
93 | | - self, exceptions: Dict[Source, Union[Exception, asyncio.CancelledError]] |
| 95 | + self, exceptions: dict[Source, Exception | asyncio.CancelledError] |
94 | 96 | ) -> None: |
95 | 97 | """Create an instance. |
96 | 98 |
|
@@ -166,7 +168,7 @@ def __init__( |
166 | 168 | self._resampling_period_s = resampling_period_s |
167 | 169 | self._max_data_age_in_periods: float = max_data_age_in_periods |
168 | 170 | self._resampling_function: ResamplingFunction = resampling_function |
169 | | - self._resamplers: Dict[Source, _StreamingHelper] = {} |
| 171 | + self._resamplers: dict[Source, _StreamingHelper] = {} |
170 | 172 | self._timer: Timer = Timer(self._resampling_period_s) |
171 | 173 |
|
172 | 174 | async def stop(self) -> None: |
@@ -240,19 +242,13 @@ async def resample(self, *, one_shot: bool = False) -> None: |
240 | 242 | *[r.resample(timer_timestamp) for r in self._resamplers.values()], |
241 | 243 | return_exceptions=True, |
242 | 244 | ) |
243 | | - # We need to cast this because Python is not smart enough to figure |
244 | | - # out the key value can't be None (not even adding another |
245 | | - # condition checking for None in the dict expression). |
246 | | - exceptions = cast( |
247 | | - Dict[Source, Union[Exception, asyncio.CancelledError]], |
248 | | - { |
249 | | - source: results[i] |
250 | | - for i, source in enumerate(self._resamplers) |
251 | | - # CancelledError inherits from BaseException, but we don't want |
252 | | - # to catch *all* BaseExceptions here. |
253 | | - if isinstance(results[i], (Exception, asyncio.CancelledError)) |
254 | | - }, |
255 | | - ) |
| 245 | + exceptions = { |
| 246 | + source: results[i] |
| 247 | + for i, source in enumerate(self._resamplers) |
| 248 | + # CancelledError inherits from BaseException, but we don't want |
| 249 | + # to catch *all* BaseExceptions here. |
| 250 | + if isinstance(results[i], (Exception, asyncio.CancelledError)) |
| 251 | + } |
256 | 252 | if exceptions: |
257 | 253 | raise ResamplingError(exceptions) |
258 | 254 | if one_shot: |
@@ -293,7 +289,7 @@ def __init__( |
293 | 289 | """ |
294 | 290 | self._resampling_period_s = resampling_period_s |
295 | 291 | self._max_data_age_in_periods: float = max_data_age_in_periods |
296 | | - self._buffer: Deque[Sample] = deque() |
| 292 | + self._buffer: deque[Sample] = deque() |
297 | 293 | self._resampling_function: ResamplingFunction = resampling_function |
298 | 294 |
|
299 | 295 | def add_sample(self, sample: Sample) -> None: |
|
0 commit comments