Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions hatchet_sdk/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from hatchet_sdk.worker.action_listener_process import worker_action_listener_process
from hatchet_sdk.worker.runner.run_loop_manager import WorkerActionRunLoopManager
from hatchet_sdk.workflow import WorkflowInterface
from asyncio.exceptions import CancelledError


T = TypeVar("T")

Expand Down Expand Up @@ -124,7 +126,7 @@ def register_workflow(self, workflow: TWorkflow) -> None:
sys.exit(1)

def create_action_function(
action_func: Callable[..., T]
action_func: Callable[..., T],
) -> Callable[[Context], T]:
def action_function(context: Context) -> T:
return action_func(workflow, context)
Expand Down Expand Up @@ -312,8 +314,8 @@ async def _check_listener_health(self) -> None:

## Cleanup methods
def _setup_signal_handlers(self) -> None:
signal.signal(signal.SIGTERM, self._handle_exit_signal)
signal.signal(signal.SIGINT, self._handle_exit_signal)
signal.signal(signal.SIGTERM, self._handle_force_quit_signal)
signal.signal(signal.SIGINT, self._handle_force_quit_signal)
signal.signal(signal.SIGQUIT, self._handle_force_quit_signal)

def _handle_exit_signal(self, signum: int, frame: FrameType | None) -> None:
Expand All @@ -323,7 +325,7 @@ def _handle_exit_signal(self, signum: int, frame: FrameType | None) -> None:

def _handle_force_quit_signal(self, signum: int, frame: FrameType | None) -> None:
logger.info("received SIGQUIT...")
self.exit_forcefully()
self.loop.create_task(self.exit_forcefully())

async def close(self) -> None:
logger.info(f"closing worker '{self.name}'...")
Expand All @@ -340,7 +342,7 @@ async def exit_gracefully(self) -> None:
logger.debug(f"gracefully stopping worker: {self.name}")

if self.killing:
return self.exit_forcefully()
return await self.exit_forcefully()

self.killing = True

Expand All @@ -357,20 +359,31 @@ async def exit_gracefully(self) -> None:

logger.info("👋")

def exit_forcefully(self) -> None:
async def exit_forcefully(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that this being async will cause the same problem as you're seeing now if exit_forcefully is called synchronously. You can reproduce this if you change _setup_signal_handlers to this:

    def _setup_signal_handlers(self) -> None:
        signal.signal(signal.SIGTERM, self._handle_force_quit_signal)
        signal.signal(signal.SIGINT, self._handle_force_quit_signal)
        signal.signal(signal.SIGQUIT, self._handle_force_quit_signal)

and then start any worker and then try to shut it off. I'm pretty sure that changing _handle_force_quit_signal to something like this will fix it, but haven't tested super thoroughly:

    def _handle_force_quit_signal(self, signum: int, frame: FrameType | None) -> None:
        logger.info("received SIGQUIT...")
        self.loop.create_task(self.exit_forcefully())

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this change works. I will revert the change to a sync func.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay nice! to be clear - I think what you have basically works, and it should be all good if you modify this _handle_force_kill_signal to handle things async similarly to what you've done already

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mrkaye97, this has been done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Just to clarify, this was an example but I didn't mean we should persist this change - sorry for being unclear on that!

Ultimately I think we should use _handle_exit_signal for SIGINT and SIGTERM, I was just using that as an example to make it easier to test

self.killing = True

logger.debug(f"forcefully stopping worker: {self.name}")
try:
await self.close()

self.close()
if self.action_listener_process:
self.action_listener_process.kill() # Forcefully kill the process

if self.action_listener_process:
self.action_listener_process.kill() # Forcefully kill the process
logger.info("👋")

logger.info("👋")
sys.exit(
1
) # Exit immediately TODO - should we exit with 1 here, there may be other workers to cleanup
except CancelledError:
logger.warning("Shutdown process was cancelled, ensuring cleanup...")
raise # Allow proper propagation of cancellation

except Exception as e:
logger.error(f"Unexpected error during shutdown: {e}")

finally:
logger.info("Worker cleanup finished, allowing normal exit.")
# sys.exit(
# 1
# ) # Exit immediately TODO - should we exit with 1 here, there may be other workers to cleanup
os._exit(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that using sys.exit is preferred over os._exit, since this will immediately terminate the worker without waiting for other cleanup tasks to happen. WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I will play around with it and revert in a pr.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @mrkaye97

so i reverted back to sys.exit(1) however there were errors that pointed back to how sys.exit(1) raises an SystemError that conflicts with async tasks, and In my case it also affects the uvicorn server.

I did some research and from ChatGPT here is the deduction for both.

<h
Method.    | Behavior
sys.exit(1) | Raises SystemExit, allowing Python to clean up, but can break async tasks.
os._exit(1) | Immediately kills the process, skipping cleanup, avoiding CancelledError issues.

Because async tasks are still running when your worker shuts down, sys.exit(1) isn't safe in this case. os._exit(1) ensures an immediate, clean exit without leaving leaked semaphores.

We face this because, unlike the FastAPI example Hatchet provided, we use start-up event or lifespan as the way to start the worker, along side our backend app.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here sys.exit(1) affects the integrity of the logs. it does not just exit it raises SystemExit which is expected. However, we use and have added hatchet to startup and shutdown events in our FastAPI app. It consequently affects our application and prevents either restarts or it takes too long to quit (Considering there is no active Job running).

Here is what our Startup and shutdown event looks like.


@app.on_event("startup")
async def startup_event():
    await redis.ping()
    suppress_vebose_logs()
    # Startup Hatchet based on server type
    if settings.SERVER_TYPE.value == ServerType.workflow_server.value:
        hatchet_init()


@app.on_event("shutdown")
async def shut_down_event():

    await redis.close()  # Clean up
    cleanup_tmp_dir()
    remove_mw_logs()


This way we can start Hatchet in its separate thread and have our FastAPI server running in its own thread.

This is different from the example currently on Hatchet.

So this was what exposed the issue with sys.exit(1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - unfortunately, I think that we don't want to optimize around this method of running Hatchet. In general, we advise running Hatchet as its own worker and not as a subprocess (e.g.) in a FastAPI app, in which case we wouldn't need to worry about sys.exit causing problems with the parent FastAPI application.



def register_on_worker(callable: HatchetCallable[T], worker: Worker) -> None:
Expand Down