-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[serve] Allow same event loop handle shutdown from sync context #55551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[serve] Allow same event loop handle shutdown from sync context #55551
Conversation
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request aims to allow handle.shutdown()
to be called from a synchronous context even when the handle is associated with an event loop. The changes in _private/client.py
and tests/conftest.py
are simple parameter removals that are consistent with the main change. However, the core logic modification in handle.py
introduces a couple of issues. Specifically, it incorrectly handles the case where no event loop is running by checking for None
from asyncio.get_running_loop()
, which actually raises a RuntimeError
. Additionally, it removes a crucial check that prevents deadlocks when the method is called from within an asyncio task. I've provided a suggestion to fix these issues.
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: alexyang <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you
python/ray/serve/_private/router.py
Outdated
loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(loop) | ||
try: | ||
return loop.run_until_complete(self._asyncio_router.shutdown()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline with @akyang-anyscale, _asyncio_router.shutdown()
is not threadsafe
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
@aslonnie @elliot-barn @khluu can I get a CI team review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm pending adding a public api
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
await asyncio.wait_for( | ||
self._controller.graceful_shutdown.remote(), timeout=timeout_s | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this cancel the ray task on timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't believe so, does ray.get with timeout also cancel the ray task if timeout is hit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that the desired behavior? I would've assumed we want the shutdown task to continue after the timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it's not the desired behavior. But asyncio.wait_for
will cancel the asyncio task when timeout is reached, and I don't know if that affects the underlying ray task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested this out locally and the ray task continued to run.
import asyncio
import ray
import time
@ray.remote
def foo():
time.sleep(5)
print("hello")
async def bar(timeout):
await asyncio.wait_for(foo.remote(), timeout=timeout)
try:
print("starting timeout=10")
asyncio.run(bar(10))
except TimeoutError:
print("timed out!")
try:
print("starting timeout=3")
asyncio.run(bar(3))
except TimeoutError:
print("timed out!")
time.sleep(10)
hello gets printed twice
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: akyang-anyscale <[email protected]>
…project#55551) ## Why are these changes needed? In the case user calls `serve.shutdown()`, we'd still want to be able to shutdown the handle if user has initialized it running in the same event loop. The current behaior may throw a runtime error. In order to block on the shutdown result in the same event loop without causing deadlock, the shutdown sequence in `CurrentLoopRouter` needs to happen in a separate thread (instead of the same event loop). <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: akyang-anyscale <[email protected]> Signed-off-by: alexyang <[email protected]> Signed-off-by: Lehui Liu <[email protected]>
Why are these changes needed?
In the case user calls
serve.shutdown()
, we'd still want to be able to shutdown the handle if user has initialized it running in the same event loop. The current behaior may throw a runtime error.In order to block on the shutdown result in the same event loop without causing deadlock, the shutdown sequence in
CurrentLoopRouter
needs to happen in a separate thread (instead of the same event loop).Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.