Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions hatchet_sdk/worker/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
import json
import logging
import traceback
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from enum import Enum
from io import StringIO
from logging import StreamHandler
from multiprocessing import Queue
from threading import Thread, current_thread
from multiprocessing import Process, Queue, current_process
from typing import Any, Callable, Coroutine, Dict

from hatchet_sdk.client import new_client_raw
Expand Down Expand Up @@ -140,8 +139,8 @@ def __init__(
self.event_queue = event_queue

# The thread pool is used for synchronous functions which need to run concurrently
self.thread_pool = ThreadPoolExecutor(max_workers=max_runs)
self.threads: Dict[str, Thread] = {} # Store run ids and threads
self.thread_pool = ProcessPoolExecutor(max_workers=max_runs)
self.threads: Dict[str, Process] = {} # Store run ids and threads

self.killing = False
self.handle_kill = handle_kill
Expand Down Expand Up @@ -262,12 +261,12 @@ def inner_callback(task: asyncio.Task):

def thread_action_func(self, context, action_func, action: Action):
if action.step_run_id is not None and action.step_run_id != "":
self.threads[action.step_run_id] = current_thread()
self.threads[action.step_run_id] = current_process()
elif (
action.get_group_key_run_id is not None
and action.get_group_key_run_id != ""
):
self.threads[action.get_group_key_run_id] = current_thread()
self.threads[action.get_group_key_run_id] = current_process()

return action_func(context)

Expand All @@ -284,6 +283,7 @@ async def async_wrapped_action_func(
) or asyncio.iscoroutinefunction(action_func):
return await action_func(context)
else:

pfunc = functools.partial(
# we must copy the context vars to the new thread, as only asyncio natively supports
# contextvars
Expand Down