-
Notifications
You must be signed in to change notification settings - Fork 21
coroutine additions and exits handle #328
base: main
Are you sure you want to change the base?
coroutine additions and exits handle #328
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting in this great change! I left a couple of comments about things to look into - excited to see this ship 😄
|
|
||
| finally: | ||
| logger.info("Worker cleanup finished, allowing normal exit.") | ||
| os._exit(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that using sys.exit is preferred over os._exit, since this will immediately terminate the worker without waiting for other cleanup tasks to happen. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I will play around with it and revert in a pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @mrkaye97
so i reverted back to sys.exit(1) however there were errors that pointed back to how sys.exit(1) raises an SystemError that conflicts with async tasks, and In my case it also affects the uvicorn server.
I did some research and from ChatGPT here is the deduction for both.
<h
Method. | Behavior
sys.exit(1) | Raises SystemExit, allowing Python to clean up, but can break async tasks.
os._exit(1) | Immediately kills the process, skipping cleanup, avoiding CancelledError issues.
Because async tasks are still running when your worker shuts down, sys.exit(1) isn't safe in this case. os._exit(1) ensures an immediate, clean exit without leaving leaked semaphores.
We face this because, unlike the FastAPI example Hatchet provided, we use start-up event or lifespan as the way to start the worker, along side our backend app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here sys.exit(1) affects the integrity of the logs. it does not just exit it raises SystemExit which is expected. However, we use and have added hatchet to startup and shutdown events in our FastAPI app. It consequently affects our application and prevents either restarts or it takes too long to quit (Considering there is no active Job running).
Here is what our Startup and shutdown event looks like.
@app.on_event("startup")
async def startup_event():
await redis.ping()
suppress_vebose_logs()
# Startup Hatchet based on server type
if settings.SERVER_TYPE.value == ServerType.workflow_server.value:
hatchet_init()
@app.on_event("shutdown")
async def shut_down_event():
await redis.close() # Clean up
cleanup_tmp_dir()
remove_mw_logs()
This way we can start Hatchet in its separate thread and have our FastAPI server running in its own thread.
This is different from the example currently on Hatchet.
So this was what exposed the issue with sys.exit(1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - unfortunately, I think that we don't want to optimize around this method of running Hatchet. In general, we advise running Hatchet as its own worker and not as a subprocess (e.g.) in a FastAPI app, in which case we wouldn't need to worry about sys.exit causing problems with the parent FastAPI application.
| logger.info("👋") | ||
|
|
||
| def exit_forcefully(self) -> None: | ||
| async def exit_forcefully(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure that this being async will cause the same problem as you're seeing now if exit_forcefully is called synchronously. You can reproduce this if you change _setup_signal_handlers to this:
def _setup_signal_handlers(self) -> None:
signal.signal(signal.SIGTERM, self._handle_force_quit_signal)
signal.signal(signal.SIGINT, self._handle_force_quit_signal)
signal.signal(signal.SIGQUIT, self._handle_force_quit_signal)and then start any worker and then try to shut it off. I'm pretty sure that changing _handle_force_quit_signal to something like this will fix it, but haven't tested super thoroughly:
def _handle_force_quit_signal(self, signum: int, frame: FrameType | None) -> None:
logger.info("received SIGQUIT...")
self.loop.create_task(self.exit_forcefully())There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this change works. I will revert the change to a sync func.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay nice! to be clear - I think what you have basically works, and it should be all good if you modify this _handle_force_kill_signal to handle things async similarly to what you've done already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mrkaye97, this has been done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Just to clarify, this was an example but I didn't mean we should persist this change - sorry for being unclear on that!
Ultimately I think we should use _handle_exit_signal for SIGINT and SIGTERM, I was just using that as an example to make it easier to test
Hi, currently at work we are integrating hatchet as our job orchestrator.
We are including hatchet into our backend system which is a FastAPI application.
The additions are
Here is the error tracebacks that served as bases for the addition.
There was also a cleanup addition done to cater for exit.