Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions hatchet_sdk/clients/dispatcher/action_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from hatchet_sdk.contracts.dispatcher_pb2_grpc import DispatcherStub
from hatchet_sdk.logger import logger
from hatchet_sdk.utils.aio_utils import create_new_event_loop, get_active_event_loop
from hatchet_sdk.utils.backoff import exp_backoff_sleep

from ...loader import ClientConfig
Expand Down Expand Up @@ -184,15 +185,10 @@ async def heartbeat(self):
async def start_heartbeater(self):
if self.heartbeat_task is not None:
return

try:
loop = asyncio.get_event_loop()
except RuntimeError as e:
if str(e).startswith("There is no current event loop in thread"):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
else:
raise e
loop = get_active_event_loop(should_raise=False)
if not loop:
loop = create_new_event_loop()
asyncio.set_event_loop(loop)
self.heartbeat_task = loop.create_task(self.heartbeat())

def __aiter__(self):
Expand Down
4 changes: 3 additions & 1 deletion hatchet_sdk/clients/event_ts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
from typing import Any

from hatchet_sdk.utils.aio_utils import get_active_event_loop


class Event_ts(asyncio.Event):
"""
Expand All @@ -10,7 +12,7 @@ class Event_ts(asyncio.Event):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._loop is None:
self._loop = asyncio.get_event_loop()
self._loop = get_active_event_loop()

def set(self):
if not self._loop.is_closed():
Expand Down
3 changes: 2 additions & 1 deletion hatchet_sdk/clients/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
WorkflowRunsCancelRequest,
)
from hatchet_sdk.clients.rest.models.workflow_version import WorkflowVersion
from hatchet_sdk.utils.aio_utils import create_new_event_loop


class AsyncRestApi:
Expand Down Expand Up @@ -246,7 +247,7 @@ async def events_replay(self, event_ids: list[str] | EventList) -> EventList:

class RestApi:
def __init__(self, host: str, api_key: str, tenant_id: str):
self._loop = asyncio.new_event_loop()
self._loop = create_new_event_loop()
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._thread.start()

Expand Down
51 changes: 46 additions & 5 deletions hatchet_sdk/utils/aio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def run(*args, loop=None, executor=None, **kwargs):
The result of the function call.
"""
if loop is None:
loop = asyncio.get_running_loop()
loop = get_active_event_loop()

if inspect.iscoroutinefunction(func):
# Wrap the coroutine to run it in an executor
Expand All @@ -80,7 +80,7 @@ def __init__(self):
Initializes the EventLoopThread by creating an event loop
and setting up a thread to run the loop.
"""
self.loop = asyncio.new_event_loop()
self.loop = create_new_event_loop()
self.thread = Thread(target=self.run_loop_in_thread, args=(self.loop,))

def __enter__(self) -> asyncio.AbstractEventLoop:
Expand Down Expand Up @@ -111,7 +111,7 @@ def run_loop_in_thread(self, loop: asyncio.AbstractEventLoop) -> None:
loop.run_forever()


def get_active_event_loop() -> asyncio.AbstractEventLoop | None:
def get_active_event_loop(should_raise=True) -> asyncio.AbstractEventLoop | None:
"""
Get the active event loop.

Expand All @@ -120,9 +120,50 @@ def get_active_event_loop() -> asyncio.AbstractEventLoop | None:
event loop in the current thread.
"""
try:
return asyncio.get_event_loop()
loop = asyncio.get_event_loop()
patch_exception_handler(loop)
return loop
except RuntimeError as e:
if str(e).startswith("There is no current event loop in thread"):
if (
str(e).startswith("There is no current event loop in thread")
and not should_raise
):
return None
else:
raise e


def create_new_event_loop() -> asyncio.AbstractEventLoop | None:
"""
Create a new event loop.

Returns:
asyncio.AbstractEventLoop: The new event loop.
"""
loop = asyncio.new_event_loop()
patch_exception_handler(loop)
return loop


def patch_exception_handler(loop: asyncio.AbstractEventLoop) -> None:
"""
Patch the asyncio exception handler to ignore `BlockingIOError: [Errno 35] Resource temporarily unavailable`
errors caused by `aio.grpc` when using multiple event loops in separate threads.

This error arises from a Cython implementation detail in `aio.Channel.__init__`, where a `socket.recv(1)` call
succeeds only on the first invocation. Subsequent calls result in the mentioned error, but this does not
impact the functionality of the library and can be safely ignored.

References:
- https://github.com/grpc/grpc/issues/25364
- https://github.com/grpc/grpc/pull/36096
"""

def exception_handler(loop: asyncio.AbstractEventLoop, context: dict) -> None:
if "exception" in context:
err = f"{type(context['exception']).__name__}: {context['exception']}"
if err == "BlockingIOError: [Errno 35] Resource temporarily unavailable":
return
loop.default_exception_handler(context)

loop.set_exception_handler(exception_handler)
6 changes: 4 additions & 2 deletions hatchet_sdk/worker/action_listener_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from hatchet_sdk.loader import ClientConfig
from hatchet_sdk.logger import logger
from hatchet_sdk.utils.aio_utils import get_active_event_loop, patch_exception_handler
from hatchet_sdk.utils.backoff import exp_backoff_sleep

ACTION_EVENT_RETRY_COUNT = 5
Expand Down Expand Up @@ -70,7 +71,8 @@ def __post_init__(self):
if self.debug:
logger.setLevel(logging.DEBUG)

loop = asyncio.get_event_loop()
loop = get_active_event_loop()
patch_exception_handler(loop)
loop.add_signal_handler(signal.SIGINT, noop_handler)
loop.add_signal_handler(signal.SIGTERM, noop_handler)
loop.add_signal_handler(
Expand Down Expand Up @@ -111,7 +113,7 @@ async def start(self, retry_attempt=0):

# TODO move event methods to separate class
async def _get_event(self):
loop = asyncio.get_running_loop()
loop = get_active_event_loop()
return await loop.run_in_executor(None, self.event_queue.get)

async def start_event_send_loop(self):
Expand Down
3 changes: 2 additions & 1 deletion hatchet_sdk/worker/runner/run_loop_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from hatchet_sdk.clients.dispatcher.action_listener import Action
from hatchet_sdk.loader import ClientConfig
from hatchet_sdk.logger import logger
from hatchet_sdk.utils.aio_utils import get_active_event_loop
from hatchet_sdk.worker.runner.runner import Runner
from hatchet_sdk.worker.runner.utils.capture_logs import capture_logs

Expand Down Expand Up @@ -50,7 +51,7 @@ async def async_start(self, retry_count=1):

async def _async_start(self, retry_count=1):
logger.info("starting runner...")
self.loop = asyncio.get_running_loop()
self.loop = get_active_event_loop()
k = self.loop.create_task(self._start_action_loop())

def cleanup(self):
Expand Down
34 changes: 20 additions & 14 deletions hatchet_sdk/worker/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from hatchet_sdk.loader import ClientConfig
from hatchet_sdk.logger import logger
from hatchet_sdk.utils.aio_utils import create_new_event_loop, get_active_event_loop
from hatchet_sdk.v2.callable import DurableContext
from hatchet_sdk.worker.action_listener_process import ActionEvent

Expand Down Expand Up @@ -284,19 +285,24 @@ async def async_wrapped_action_func(
) or asyncio.iscoroutinefunction(action_func):
return await action_func(context)
else:
pfunc = functools.partial(
# we must copy the context vars to the new thread, as only asyncio natively supports
# contextvars
copy_context_vars,
contextvars.copy_context().items(),
self.thread_action_func,
context,
action_func,
action,
)

loop = asyncio.get_event_loop()
res = await loop.run_in_executor(self.thread_pool, pfunc)
def thread_action_wrapper():
loop = create_new_event_loop()
asyncio.set_event_loop(loop)

wr.set(context.workflow_run_id())
sr.set(context.step_run_id)

try:
result = action_func(context)
return result
finally:
loop.close()

loop = get_active_event_loop()
res = await loop.run_in_executor(
self.thread_pool, thread_action_wrapper
)

return res
except Exception as e:
Expand Down Expand Up @@ -363,7 +369,7 @@ async def handle_start_step_run(self, action: Action):
)
)

loop = asyncio.get_event_loop()
loop = get_active_event_loop()
task = loop.create_task(
self.async_wrapped_action_func(
context, action_func, action, action.step_run_id
Expand Down Expand Up @@ -406,7 +412,7 @@ async def handle_start_group_key_run(self, action: Action):
)
)

loop = asyncio.get_event_loop()
loop = get_active_event_loop()
task = loop.create_task(
self.async_wrapped_action_func(
context, action_func, action, action.get_group_key_run_id
Expand Down
13 changes: 7 additions & 6 deletions hatchet_sdk/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from hatchet_sdk.contracts.workflows_pb2 import CreateWorkflowVersionOpts
from hatchet_sdk.loader import ClientConfig
from hatchet_sdk.logger import logger
from hatchet_sdk.utils.aio_utils import create_new_event_loop, get_active_event_loop
from hatchet_sdk.v2.callable import HatchetCallable
from hatchet_sdk.worker.action_listener_process import worker_action_listener_process
from hatchet_sdk.worker.runner.run_loop_manager import WorkerActionRunLoopManager
Expand Down Expand Up @@ -102,15 +103,15 @@ def action_function(context):
def status(self) -> WorkerStatus:
return self._status

def setup_loop(self, loop: asyncio.AbstractEventLoop = None):
try:
loop = loop or asyncio.get_running_loop()
self.loop = loop
def setup_loop(self, loop: asyncio.AbstractEventLoop | None = None):
loop = loop or get_active_event_loop(should_raise=False)
if loop:
created_loop = False
logger.debug("using existing event loop")
self.loop = loop
return created_loop
except RuntimeError:
self.loop = asyncio.new_event_loop()
else:
self.loop = create_new_event_loop()
logger.debug("creating new event loop")
asyncio.set_event_loop(self.loop)
created_loop = True
Expand Down
2 changes: 1 addition & 1 deletion hatchet_sdk/workflow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def result(self) -> Coroutine:
return self.workflow_listener.result(self.workflow_run_id)

def sync_result(self) -> dict:
loop = get_active_event_loop()
loop = get_active_event_loop(should_raise=False)
if loop is None:
with EventLoopThread() as loop:
coro = self.workflow_listener.result(self.workflow_run_id)
Expand Down
Loading