|
18 | 18 |
|
19 | 19 |
|
20 | 20 | from __future__ import annotations |
21 | | -from typing import Callable, TypeVar, Awaitable |
| 21 | +from typing import ( |
| 22 | + Any, |
| 23 | + Awaitable, |
| 24 | + Callable, |
| 25 | + cast, |
| 26 | + overload, |
| 27 | + Self, |
| 28 | + TypeVar, |
| 29 | + Type, |
| 30 | +) |
22 | 31 |
|
23 | 32 | import asyncio |
| 33 | +import inspect |
| 34 | +import warnings |
24 | 35 |
|
25 | 36 |
|
26 | 37 | _T = TypeVar('_T') |
@@ -140,3 +151,188 @@ async def debounce( |
140 | 151 | batch = [] |
141 | 152 | last_signal = t |
142 | 153 | target_time = None |
| 154 | + |
| 155 | + |
| 156 | +_Owner = TypeVar("_Owner") |
| 157 | +HandlerFunction = Callable[[], Awaitable[None]] |
| 158 | +HandlerMethod = Callable[[Any], Awaitable[None]] |
| 159 | + |
| 160 | + |
| 161 | +class ExclusiveTask: |
| 162 | + """Manages to run a repeatable task once at a time.""" |
| 163 | + |
| 164 | + _handler: HandlerFunction |
| 165 | + _task: asyncio.Task | None |
| 166 | + _scheduled: bool |
| 167 | + _stop_requested: bool |
| 168 | + |
| 169 | + def __init__(self, handler: HandlerFunction) -> None: |
| 170 | + self._handler = handler |
| 171 | + self._task = None |
| 172 | + self._scheduled = False |
| 173 | + self._stop_requested = False |
| 174 | + |
| 175 | + @property |
| 176 | + def scheduled(self) -> bool: |
| 177 | + return self._scheduled |
| 178 | + |
| 179 | + async def _run(self) -> None: |
| 180 | + if self._scheduled and not self._stop_requested: |
| 181 | + self._scheduled = False |
| 182 | + else: |
| 183 | + return |
| 184 | + try: |
| 185 | + await self._handler() |
| 186 | + finally: |
| 187 | + if self._scheduled and not self._stop_requested: |
| 188 | + self._task = asyncio.create_task(self._run()) |
| 189 | + else: |
| 190 | + self._task = None |
| 191 | + |
| 192 | + def schedule(self) -> None: |
| 193 | + """Schedule to run the task as soon as possible. |
| 194 | +
|
| 195 | + If already scheduled, nothing happens; it won't queue up. |
| 196 | +
|
| 197 | + If the task is already running, it will be scheduled to run again as |
| 198 | + soon as the running task is done. |
| 199 | + """ |
| 200 | + if not self._stop_requested: |
| 201 | + self._scheduled = True |
| 202 | + if self._task is None: |
| 203 | + self._task = asyncio.create_task(self._run()) |
| 204 | + |
| 205 | + async def stop(self) -> None: |
| 206 | + """Cancel scheduled task and wait for the running one to finish. |
| 207 | +
|
| 208 | + After an ExclusiveTask is stopped, no more new schedules are allowed. |
| 209 | + Note: "cancel scheduled task" only means setting self._scheduled to |
| 210 | + False; if an asyncio task is scheduled, stop() will still wait for it. |
| 211 | + """ |
| 212 | + self._scheduled = False |
| 213 | + self._stop_requested = True |
| 214 | + if self._task is not None: |
| 215 | + await self._task |
| 216 | + |
| 217 | + |
| 218 | +class ExclusiveTaskProperty: |
| 219 | + _method: HandlerMethod |
| 220 | + _name: str | None |
| 221 | + |
| 222 | + def __init__( |
| 223 | + self, method: HandlerMethod, *, slot: str | None = None |
| 224 | + ) -> None: |
| 225 | + self._method = method |
| 226 | + self._name = slot |
| 227 | + |
| 228 | + def __set_name__(self, owner: Type[_Owner], name: str) -> None: |
| 229 | + if (slots := getattr(owner, "__slots__", None)) is not None: |
| 230 | + if self._name is None: |
| 231 | + raise TypeError("missing slot in @exclusive_task()") |
| 232 | + if self._name not in slots: |
| 233 | + raise TypeError( |
| 234 | + f"slot {self._name!r} must be defined in __slots__" |
| 235 | + ) |
| 236 | + |
| 237 | + if self._name is None: |
| 238 | + self._name = name |
| 239 | + |
| 240 | + @overload |
| 241 | + def __get__(self, instance: None, owner: Type[_Owner]) -> Self: ... |
| 242 | + |
| 243 | + @overload |
| 244 | + def __get__( |
| 245 | + self, instance: _Owner, owner: Type[_Owner] |
| 246 | + ) -> ExclusiveTask: ... |
| 247 | + |
| 248 | + def __get__( |
| 249 | + self, instance: _Owner | None, owner: Type[_Owner] |
| 250 | + ) -> ExclusiveTask | Self: |
| 251 | + # getattr on the class |
| 252 | + if instance is None: |
| 253 | + return self |
| 254 | + |
| 255 | + assert self._name is not None |
| 256 | + |
| 257 | + # getattr on an object with __dict__ |
| 258 | + if (d := getattr(instance, "__dict__", None)) is not None: |
| 259 | + if rv := d.get(self._name, None): |
| 260 | + return rv |
| 261 | + rv = ExclusiveTask(self._method.__get__(instance, owner)) |
| 262 | + d[self._name] = rv |
| 263 | + return rv |
| 264 | + |
| 265 | + # getattr on an object with __slots__ |
| 266 | + else: |
| 267 | + if rv := getattr(instance, self._name, None): |
| 268 | + return rv |
| 269 | + rv = ExclusiveTask(self._method.__get__(instance, owner)) |
| 270 | + setattr(instance, self._name, rv) |
| 271 | + return rv |
| 272 | + |
| 273 | + |
| 274 | +ExclusiveTaskDecorator = Callable[ |
| 275 | + [HandlerFunction | HandlerMethod], ExclusiveTask | ExclusiveTaskProperty |
| 276 | +] |
| 277 | + |
| 278 | + |
| 279 | +def _exclusive_task( |
| 280 | + handler: HandlerFunction | HandlerMethod, *, slot: str | None |
| 281 | +) -> ExclusiveTask | ExclusiveTaskProperty: |
| 282 | + sig = inspect.signature(handler) |
| 283 | + params = list(sig.parameters.values()) |
| 284 | + if len(params) == 0: |
| 285 | + handler = cast(HandlerFunction, handler) |
| 286 | + if slot is not None: |
| 287 | + warnings.warn( |
| 288 | + "slot is specified but unused in @exclusive_task()", |
| 289 | + stacklevel=2, |
| 290 | + ) |
| 291 | + return ExclusiveTask(handler) |
| 292 | + elif len(params) == 1 and params[0].kind in ( |
| 293 | + inspect.Parameter.POSITIONAL_ONLY, |
| 294 | + inspect.Parameter.POSITIONAL_OR_KEYWORD, |
| 295 | + ): |
| 296 | + handler = cast(HandlerMethod, handler) |
| 297 | + return ExclusiveTaskProperty(handler, slot=slot) |
| 298 | + else: |
| 299 | + raise TypeError("bad signature") |
| 300 | + |
| 301 | + |
| 302 | +@overload |
| 303 | +def exclusive_task(handler: HandlerFunction) -> ExclusiveTask: ... |
| 304 | + |
| 305 | + |
| 306 | +@overload |
| 307 | +def exclusive_task( |
| 308 | + handler: HandlerMethod, *, slot: str | None = None |
| 309 | +) -> ExclusiveTaskProperty: ... |
| 310 | + |
| 311 | + |
| 312 | +@overload |
| 313 | +def exclusive_task(*, slot: str | None = None) -> ExclusiveTaskDecorator: ... |
| 314 | + |
| 315 | + |
| 316 | +def exclusive_task( |
| 317 | + handler: HandlerFunction | HandlerMethod | None = None, |
| 318 | + *, |
| 319 | + slot: str | None = None, |
| 320 | +) -> ExclusiveTask | ExclusiveTaskProperty | ExclusiveTaskDecorator: |
| 321 | + """Convert an async function into an ExclusiveTask. |
| 322 | +
|
| 323 | + This decorator can be applied to either top-level functions or methods |
| 324 | + in a class. In the latter case, the exclusiveness is bound to each object |
| 325 | + of the owning class. If the owning class defines __slots__, you must also |
| 326 | + define an extra slot to store the exclusive state and tell exclusive_task() |
| 327 | + by providing the `slot` argument. |
| 328 | + """ |
| 329 | + if handler is None: |
| 330 | + |
| 331 | + def decorator( |
| 332 | + handler: HandlerFunction | HandlerMethod, |
| 333 | + ) -> ExclusiveTask | ExclusiveTaskProperty: |
| 334 | + return _exclusive_task(handler, slot=slot) |
| 335 | + |
| 336 | + return decorator |
| 337 | + |
| 338 | + return _exclusive_task(handler, slot=slot) |
0 commit comments