diff --git a/cadence/workflow/deterministic_event_loop.py b/cadence/workflow/deterministic_event_loop.py new file mode 100644 index 0000000..f0af8e3 --- /dev/null +++ b/cadence/workflow/deterministic_event_loop.py @@ -0,0 +1,162 @@ +from asyncio import AbstractEventLoop, Handle, futures, tasks +from contextvars import Context +import logging +import collections +import asyncio.events as events +import threading +from typing import Callable +from typing_extensions import Unpack, TypeVarTuple + +logger = logging.getLogger(__name__) + +_Ts = TypeVarTuple("_Ts") + +class DeterministicEventLoop(AbstractEventLoop): + """ + This is a basic FIFO implementation of event loop that does not allow I/O or timer operations. + As a result, it's theoretically deterministic. This event loop is not useful directly without async events processing inside the loop. + + Code is mostly copied from asyncio.BaseEventLoop without I/O or timer operations. + """ + + def __init__(self): + self._thread_id = None # indicate if the event loop is running + self._debug = False + self._ready = collections.deque[events.Handle]() + self._stopping = False + self._closed = False + + def call_soon(self, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts], context: Context | None = None) -> Handle: + return self._call_soon(callback, args, context) + + def _call_soon(self, callback, args, context) -> Handle: + handle = events.Handle(callback, args, self, context) + self._ready.append(handle) + return handle + + def get_debug(self): + return self._debug + + def set_debug(self, enabled: bool): + self._debug = enabled + + def run_forever(self): + """Run until stop() is called.""" + self._run_forever_setup() + try: + while True: + self._run_once() + if self._stopping: + break + finally: + self._run_forever_cleanup() + + def run_until_complete(self, future): + """Run until the Future is done. + + If the argument is a coroutine, it is wrapped in a Task. + + WARNING: It would be disastrous to call run_until_complete() + with the same coroutine twice -- it would wrap it in two + different Tasks and that can't be good. + + Return the Future's result, or raise its exception. + """ + self._check_closed() + self._check_running() + + new_task = not futures.isfuture(future) + future = tasks.ensure_future(future, loop=self) + + future.add_done_callback(_run_until_complete_cb) + try: + self.run_forever() + except: + if new_task and future.done() and not future.cancelled(): + # The coroutine raised a BaseException. Consume the exception + # to not log a warning, the caller doesn't have access to the + # local task. + future.exception() + raise + finally: + future.remove_done_callback(_run_until_complete_cb) + if not future.done(): + raise RuntimeError('Event loop stopped before Future completed.') + + return future.result() + + def create_task(self, coro, **kwargs): + """Schedule a coroutine object. + + Return a task object. + """ + self._check_closed() + + # NOTE: eager_start is not supported for deterministic event loop + if kwargs.get("eager_start", False): + raise RuntimeError("eager_start in create_task is not supported for deterministic event loop") + + return tasks.Task(coro, loop=self, **kwargs) + + def create_future(self): + return futures.Future(loop=self) + + def _run_once(self): + ntodo = len(self._ready) + for i in range(ntodo): + handle = self._ready.popleft() + if handle._cancelled: + continue + handle._run() + + def _run_forever_setup(self): + self._check_closed() + self._check_running() + self._thread_id = threading.get_ident() + events._set_running_loop(self) + + def _run_forever_cleanup(self): + self._stopping = False + self._thread_id = None + events._set_running_loop(None) + + def stop(self): + self._stopping = True + + def _check_closed(self): + if self._closed: + raise RuntimeError('Event loop is closed') + + def _check_running(self): + if self.is_running(): + raise RuntimeError('This event loop is already running') + if events._get_running_loop() is not None: + raise RuntimeError( + 'Cannot run the event loop while another loop is running') + + def is_running(self): + return (self._thread_id is not None) + + def close(self): + """Close the event loop. + The event loop must not be running. + """ + if self.is_running(): + raise RuntimeError("Cannot close a running event loop") + if self._closed: + return + if self._debug: + logger.debug("Close %r", self) + self._closed = True + self._ready.clear() + + def is_closed(self): + """Returns True if the event loop was closed.""" + return self._closed + +def _run_until_complete_cb(fut): + if not fut.cancelled(): + exc = fut.exception() + if isinstance(exc, (SystemExit, KeyboardInterrupt)): + return + fut.get_loop().stop() diff --git a/tests/cadence/workflow/test_deterministic_event_loop.py b/tests/cadence/workflow/test_deterministic_event_loop.py new file mode 100644 index 0000000..51fcb68 --- /dev/null +++ b/tests/cadence/workflow/test_deterministic_event_loop.py @@ -0,0 +1,72 @@ +import pytest +import asyncio +from cadence.workflow.deterministic_event_loop import DeterministicEventLoop + + +async def coro_append(results: list, i: int): + results.append(i) + +async def coro_await(size: int): + results = [] + for i in range(size): + await coro_append(results, i) + return results + +async def coro_await_future(future: asyncio.Future): + return await future + +async def coro_await_task(size: int): + results = [] + for i in range(size): + asyncio.create_task(coro_append(results, i)) + return results + +class TestDeterministicEventLoop: + """Test suite for DeterministicEventLoop using table-driven tests.""" + + def setup_method(self): + """Setup method called before each test.""" + self.loop = DeterministicEventLoop() + + def teardown_method(self): + """Teardown method called after each test.""" + if not self.loop.is_closed(): + self.loop.close() + assert self.loop.is_closed() is True + + def test_call_soon(self): + """Test _run_once executes single callback.""" + results = [] + expected = [] + for i in range(10000): + expected.append(i) + self.loop.call_soon(lambda x=i: results.append(x)) + + self.loop._run_once() + + assert results == expected + assert self.loop.is_running() is False + + def test_run_until_complete(self): + size = 10000 + results = self.loop.run_until_complete(coro_await(size)) + assert results == list(range(size)) + assert self.loop.is_running() is False + assert self.loop.is_closed() is False + + @pytest.mark.parametrize("result, exception, expected, expected_exception", + [(10000, None, 10000, None), (None, ValueError("test"), None, ValueError)]) + def test_create_future(self, result, exception, expected, expected_exception): + future = self.loop.create_future() + if expected_exception is not None: + with pytest.raises(expected_exception): + future.set_exception(exception) + self.loop.run_until_complete(coro_await_future(future)) + else: + future.set_result(result) + assert self.loop.run_until_complete(coro_await_future(future)) == expected + + def test_create_task(self): + size = 10000 + results = self.loop.run_until_complete(coro_await_task(size)) + assert results == list(range(size))