Skip to content

Commit c41d075

Browse files
committed
fix lint and type
1 parent 266c20e commit c41d075

File tree

1 file changed

+9
-23
lines changed

1 file changed

+9
-23
lines changed

cadence/workflow/deterministic_event_loop.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
from asyncio import AbstractEventLoop, Handle, futures, tasks
2-
from asyncio import Future
32
from contextvars import Context
43
import logging
54
import collections
65
import asyncio.events as events
76
import threading
87
from typing import Callable
8+
from typing_extensions import Unpack, TypeVarTuple
99

1010
logger = logging.getLogger(__name__)
1111

12+
_Ts = TypeVarTuple("_Ts")
1213

1314
class DeterministicEventLoop(AbstractEventLoop):
1415
"""
@@ -19,19 +20,17 @@ class DeterministicEventLoop(AbstractEventLoop):
1920
"""
2021

2122
def __init__(self):
22-
self._thread_id: int = None # indicate if the event loop is running
23+
self._thread_id = None # indicate if the event loop is running
2324
self._debug = False
2425
self._ready = collections.deque[events.Handle]()
2526
self._stopping = False
2627
self._closed = False
2728

28-
def call_soon(self, callback: Callable, *args, context : Context | None = None) -> Handle:
29-
self._call_soon(callback, args, context)
29+
def call_soon(self, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts], context: Context | None = None) -> Handle:
30+
return self._call_soon(callback, args, context)
3031

31-
def _call_soon(self, callback, args, context):
32+
def _call_soon(self, callback, args, context) -> Handle:
3233
handle = events.Handle(callback, args, self, context)
33-
if handle._source_traceback:
34-
del handle._source_traceback[-1]
3534
self._ready.append(handle)
3635
return handle
3736

@@ -50,8 +49,7 @@ def run_forever(self):
5049
finally:
5150
self._run_forever_cleanup()
5251

53-
54-
def run_until_complete(self, future: Future):
52+
def run_until_complete(self, future):
5553
"""Run until the Future is done.
5654
5755
If the argument is a coroutine, it is wrapped in a Task.
@@ -67,10 +65,6 @@ def run_until_complete(self, future: Future):
6765

6866
new_task = not futures.isfuture(future)
6967
future = tasks.ensure_future(future, loop=self)
70-
if new_task:
71-
# An exception is raised if the future didn't complete, so there
72-
# is no need to log the "destroy pending task" message
73-
future._log_destroy_pending = False
7468

7569
future.add_done_callback(_run_until_complete_cb)
7670
try:
@@ -100,15 +94,7 @@ def create_task(self, coro, **kwargs):
10094
if kwargs.get("eager_start", False):
10195
raise RuntimeError("eager_start in create_task is not supported for deterministic event loop")
10296

103-
task = tasks.Task(coro, loop=self, **kwargs)
104-
if task._source_traceback:
105-
del task._source_traceback[-1]
106-
try:
107-
return task
108-
finally:
109-
# gh-128552: prevent a refcycle of
110-
# task.exception().__traceback__->BaseEventLoop.create_task->task
111-
del task
97+
return tasks.Task(coro, loop=self, **kwargs)
11298

11399
def create_future(self):
114100
return futures.Future(loop=self)
@@ -166,7 +152,7 @@ def is_closed(self):
166152
"""Returns True if the event loop was closed."""
167153
return self._closed
168154

169-
def _run_until_complete_cb(fut: Future):
155+
def _run_until_complete_cb(fut):
170156
if not fut.cancelled():
171157
exc = fut.exception()
172158
if isinstance(exc, (SystemExit, KeyboardInterrupt)):

0 commit comments

Comments
 (0)