Skip to content

Commit d1d8902

Browse files
committed
deterministic-el
1 parent 3509bc9 commit d1d8902

File tree

2 files changed

+171
-45
lines changed

2 files changed

+171
-45
lines changed

cadence/async/event_loop.py

Lines changed: 0 additions & 45 deletions
This file was deleted.
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
from asyncio import AbstractEventLoop, BaseEventLoop, EventLoop, futures, tasks
2+
from asyncio import Future
3+
import logging
4+
import collections
5+
import asyncio.events as events
6+
import threading
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class DeterministicEventLoop(AbstractEventLoop):
12+
"""
13+
This is a basic FIFO implementation of event loop that does not allow I/O or timer operations.
14+
As a result, it's theoretically deterministic. This event loop is not useful directly without async events processing inside the loop.
15+
16+
Code is mostly copied from asyncio.BaseEventLoop without I/O or timer operations.
17+
"""
18+
19+
def __init__(self):
20+
self._thread_id: int = None # indicate if the event loop is running
21+
self._debug = False
22+
self._ready = collections.deque[events.Handle]()
23+
self._stopping = False
24+
self._closed = False
25+
26+
def call_soon(self, callback, *args, context=None):
27+
self._call_soon(callback, args, context)
28+
29+
def _call_soon(self, callback, args, context):
30+
handle = events.Handle(callback, args, self, context)
31+
if handle._source_traceback:
32+
del handle._source_traceback[-1]
33+
self._ready.append(handle)
34+
return handle
35+
36+
def get_debug(self):
37+
return self._debug
38+
39+
40+
def run_forever(self):
41+
"""Run until stop() is called."""
42+
self._run_forever_setup()
43+
try:
44+
while True:
45+
self._run_once()
46+
if self._stopping:
47+
break
48+
finally:
49+
self._run_forever_cleanup()
50+
51+
52+
def run_until_complete(self, future: Future):
53+
"""Run until the Future is done.
54+
55+
If the argument is a coroutine, it is wrapped in a Task.
56+
57+
WARNING: It would be disastrous to call run_until_complete()
58+
with the same coroutine twice -- it would wrap it in two
59+
different Tasks and that can't be good.
60+
61+
Return the Future's result, or raise its exception.
62+
"""
63+
self._check_closed()
64+
self._check_running()
65+
66+
new_task = not futures.isfuture(future)
67+
future = tasks.ensure_future(future, loop=self)
68+
if new_task:
69+
# An exception is raised if the future didn't complete, so there
70+
# is no need to log the "destroy pending task" message
71+
future._log_destroy_pending = False
72+
73+
future.add_done_callback(_run_until_complete_cb)
74+
try:
75+
self.run_forever()
76+
except:
77+
if new_task and future.done() and not future.cancelled():
78+
# The coroutine raised a BaseException. Consume the exception
79+
# to not log a warning, the caller doesn't have access to the
80+
# local task.
81+
future.exception()
82+
raise
83+
finally:
84+
future.remove_done_callback(_run_until_complete_cb)
85+
if not future.done():
86+
raise RuntimeError('Event loop stopped before Future completed.')
87+
88+
return future.result()
89+
90+
def create_task(self, coro, **kwargs):
91+
"""Schedule a coroutine object.
92+
93+
Return a task object.
94+
"""
95+
self._check_closed()
96+
97+
# NOTE: eager_start is not supported for deterministic event loop
98+
if kwargs.get("eager_start", False):
99+
raise RuntimeError("eager_start in create_task is not supported for deterministic event loop")
100+
101+
task = tasks.Task(coro, loop=self, **kwargs)
102+
if task._source_traceback:
103+
del task._source_traceback[-1]
104+
try:
105+
return task
106+
finally:
107+
# gh-128552: prevent a refcycle of
108+
# task.exception().__traceback__->BaseEventLoop.create_task->task
109+
del task
110+
111+
def _run_once(self):
112+
ntodo = len(self._ready)
113+
for i in range(ntodo):
114+
handle = self._ready.popleft()
115+
if handle._cancelled:
116+
continue
117+
handle._run()
118+
119+
def _run_forever_setup(self):
120+
self._check_closed()
121+
self._check_running()
122+
self._thread_id = threading.get_ident()
123+
events._set_running_loop(self)
124+
125+
def _run_forever_cleanup(self):
126+
self._stopping = False
127+
self._thread_id = None
128+
events._set_running_loop(None)
129+
130+
def stop(self):
131+
self._stopping = True
132+
133+
def _check_closed(self):
134+
if self._closed:
135+
raise RuntimeError('Event loop is closed')
136+
137+
def _check_running(self):
138+
if self.is_running():
139+
raise RuntimeError('This event loop is already running')
140+
if events._get_running_loop() is not None:
141+
raise RuntimeError(
142+
'Cannot run the event loop while another loop is running')
143+
144+
def is_running(self):
145+
return (self._thread_id is not None)
146+
147+
def close(self):
148+
"""Close the event loop.
149+
The event loop must not be running.
150+
"""
151+
if self.is_running():
152+
raise RuntimeError("Cannot close a running event loop")
153+
if self._closed:
154+
return
155+
if self._debug:
156+
logger.debug("Close %r", self)
157+
self._closed = True
158+
self._ready.clear()
159+
160+
def is_closed(self):
161+
"""Returns True if the event loop was closed."""
162+
return self._closed
163+
164+
def _run_until_complete_cb(fut: Future):
165+
if not fut.cancelled():
166+
exc = fut.exception()
167+
if isinstance(exc, (SystemExit, KeyboardInterrupt)):
168+
# Issue #22429: run_forever() already finished, no need to
169+
# stop it.
170+
return
171+
fut.get_loop().stop()

0 commit comments

Comments
 (0)