diff --git a/hatchet_sdk/worker/worker.py b/hatchet_sdk/worker/worker.py index b6ec1531..babd5428 100644 --- a/hatchet_sdk/worker/worker.py +++ b/hatchet_sdk/worker/worker.py @@ -29,6 +29,8 @@ from hatchet_sdk.worker.action_listener_process import worker_action_listener_process from hatchet_sdk.worker.runner.run_loop_manager import WorkerActionRunLoopManager from hatchet_sdk.workflow import WorkflowInterface +from asyncio.exceptions import CancelledError + T = TypeVar("T") @@ -124,7 +126,7 @@ def register_workflow(self, workflow: TWorkflow) -> None: sys.exit(1) def create_action_function( - action_func: Callable[..., T] + action_func: Callable[..., T], ) -> Callable[[Context], T]: def action_function(context: Context) -> T: return action_func(workflow, context) @@ -312,8 +314,8 @@ async def _check_listener_health(self) -> None: ## Cleanup methods def _setup_signal_handlers(self) -> None: - signal.signal(signal.SIGTERM, self._handle_exit_signal) - signal.signal(signal.SIGINT, self._handle_exit_signal) + signal.signal(signal.SIGTERM, self._handle_force_quit_signal) + signal.signal(signal.SIGINT, self._handle_force_quit_signal) signal.signal(signal.SIGQUIT, self._handle_force_quit_signal) def _handle_exit_signal(self, signum: int, frame: FrameType | None) -> None: @@ -323,7 +325,7 @@ def _handle_exit_signal(self, signum: int, frame: FrameType | None) -> None: def _handle_force_quit_signal(self, signum: int, frame: FrameType | None) -> None: logger.info("received SIGQUIT...") - self.exit_forcefully() + self.loop.create_task(self.exit_forcefully()) async def close(self) -> None: logger.info(f"closing worker '{self.name}'...") @@ -340,7 +342,7 @@ async def exit_gracefully(self) -> None: logger.debug(f"gracefully stopping worker: {self.name}") if self.killing: - return self.exit_forcefully() + return await self.exit_forcefully() self.killing = True @@ -357,20 +359,31 @@ async def exit_gracefully(self) -> None: logger.info("👋") - def exit_forcefully(self) -> None: + async def exit_forcefully(self) -> None: self.killing = True logger.debug(f"forcefully stopping worker: {self.name}") + try: + await self.close() - self.close() + if self.action_listener_process: + self.action_listener_process.kill() # Forcefully kill the process - if self.action_listener_process: - self.action_listener_process.kill() # Forcefully kill the process + logger.info("👋") - logger.info("👋") - sys.exit( - 1 - ) # Exit immediately TODO - should we exit with 1 here, there may be other workers to cleanup + except CancelledError: + logger.warning("Shutdown process was cancelled, ensuring cleanup...") + raise # Allow proper propagation of cancellation + + except Exception as e: + logger.error(f"Unexpected error during shutdown: {e}") + + finally: + logger.info("Worker cleanup finished, allowing normal exit.") + # sys.exit( + # 1 + # ) # Exit immediately TODO - should we exit with 1 here, there may be other workers to cleanup + os._exit(1) def register_on_worker(callable: HatchetCallable[T], worker: Worker) -> None: