-
Notifications
You must be signed in to change notification settings - Fork 21
coroutine additions and exits handle #328
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,7 @@ | |
| from hatchet_sdk.worker.action_listener_process import worker_action_listener_process | ||
| from hatchet_sdk.worker.runner.run_loop_manager import WorkerActionRunLoopManager | ||
| from hatchet_sdk.workflow import WorkflowInterface | ||
| from asyncio.exceptions import CancelledError | ||
|
|
||
| T = TypeVar("T") | ||
|
|
||
|
|
@@ -124,7 +125,7 @@ def register_workflow(self, workflow: TWorkflow) -> None: | |
| sys.exit(1) | ||
|
|
||
| def create_action_function( | ||
| action_func: Callable[..., T] | ||
| action_func: Callable[..., T], | ||
| ) -> Callable[[Context], T]: | ||
| def action_function(context: Context) -> T: | ||
| return action_func(workflow, context) | ||
|
|
@@ -340,7 +341,7 @@ async def exit_gracefully(self) -> None: | |
| logger.debug(f"gracefully stopping worker: {self.name}") | ||
|
|
||
| if self.killing: | ||
| return self.exit_forcefully() | ||
| return await self.exit_forcefully() | ||
|
|
||
| self.killing = True | ||
|
|
||
|
|
@@ -357,20 +358,31 @@ async def exit_gracefully(self) -> None: | |
|
|
||
| logger.info("👋") | ||
|
|
||
| def exit_forcefully(self) -> None: | ||
| async def exit_forcefully(self) -> None: | ||
| self.killing = True | ||
|
|
||
| logger.debug(f"forcefully stopping worker: {self.name}") | ||
| try: | ||
| await self.close() | ||
|
|
||
| self.close() | ||
| if self.action_listener_process: | ||
| self.action_listener_process.kill() # Forcefully kill the process | ||
|
|
||
| if self.action_listener_process: | ||
| self.action_listener_process.kill() # Forcefully kill the process | ||
| logger.info("👋") | ||
| # sys.exit( | ||
| # 1 | ||
| # ) # Exit immediately TODO - should we exit with 1 here, there may be other workers to cleanup | ||
|
|
||
| logger.info("👋") | ||
| sys.exit( | ||
| 1 | ||
| ) # Exit immediately TODO - should we exit with 1 here, there may be other workers to cleanup | ||
| except CancelledError: | ||
| logger.warning("Shutdown process was cancelled, ensuring cleanup...") | ||
| raise # Allow proper propagation of cancellation | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Unexpected error during shutdown: {e}") | ||
|
|
||
| finally: | ||
| logger.info("Worker cleanup finished, allowing normal exit.") | ||
| os._exit(1) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, I will play around with it and revert in a pr. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hi @mrkaye97 so i reverted back to sys.exit(1) however there were errors that pointed back to how sys.exit(1) raises an SystemError that conflicts with async tasks, and In my case it also affects the uvicorn server. I did some research and from ChatGPT here is the deduction for both. Because async tasks are still running when your worker shuts down, We face this because, unlike the FastAPI example Hatchet provided, we use start-up event or lifespan as the way to start the worker, along side our backend app. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here sys.exit(1) affects the integrity of the logs. it does not just exit it raises SystemExit which is expected. However, we use and have added hatchet to startup and shutdown events in our FastAPI app. It consequently affects our application and prevents either restarts or it takes too long to quit (Considering there is no active Job running). Here is what our Startup and shutdown event looks like. This way we can start Hatchet in its separate thread and have our FastAPI server running in its own thread. This is different from the example currently on Hatchet. So this was what exposed the issue with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm - unfortunately, I think that we don't want to optimize around this method of running Hatchet. In general, we advise running Hatchet as its own worker and not as a subprocess (e.g.) in a FastAPI app, in which case we wouldn't need to worry about |
||
|
|
||
|
|
||
| def register_on_worker(callable: HatchetCallable[T], worker: Worker) -> None: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure that this being async will cause the same problem as you're seeing now if
exit_forcefullyis called synchronously. You can reproduce this if you change_setup_signal_handlersto this:and then start any worker and then try to shut it off. I'm pretty sure that changing
_handle_force_quit_signalto something like this will fix it, but haven't tested super thoroughly:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this change works. I will revert the change to a sync func.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay nice! to be clear - I think what you have basically works, and it should be all good if you modify this
_handle_force_kill_signalto handle things async similarly to what you've done alreadyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mrkaye97, this has been done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Just to clarify, this was an example but I didn't mean we should persist this change - sorry for being unclear on that!
Ultimately I think we should use
_handle_exit_signalforSIGINTandSIGTERM, I was just using that as an example to make it easier to test