feat(serve): migrate server implementation to use asyncio and grpc.aio for asynchronous handling#6
Conversation
…o for asynchronous handling
There was a problem hiding this comment.
Hello @david20571015, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
Summary of Changes
This pull request migrates the server implementation to use asyncio and grpc.aio for asynchronous handling. This involves changes to the README.md to update the execution command, modifications to the search and summarize services to use asynchronous contexts and calls, updates to pyproject.toml to remove the serve script, and significant changes to scripts/serve.py to implement the asynchronous server.
Highlights
- Asynchronous Migration: The core change is the migration from synchronous to asynchronous server implementation using
asyncioandgrpc.aio. - Service Updates: The
SearchandSummarizemethods in their respective services are updated to be asynchronous. - Script Changes: The
scripts/serve.pyfile is heavily modified to support the new asynchronous server setup.
Changelog
Click here to see the changelog
- README.md
- Updated the execution command to use
python3 scripts/serve.pyinstead ofuv run serve.
- Updated the execution command to use
- llm_backend/search/service.py
- Made the
Searchmethod asynchronous by defining it asasync def Search. - Updated the
contextparameter type togrpc.aio.ServicerContext. - Used
await retriever.aretrieve(prompt)to asynchronously retrieve results.
- Made the
- llm_backend/summarize/service.py
- Made the
Summarizemethod asynchronous by defining it asasync def Summarize. - Updated the
contextparameter type togrpc.aio.ServicerContext. - Used
str(self.summarizer.aget_response(self.query_str, texts))to asynchronously get the summary.
- Made the
- pyproject.toml
- Removed the
servescript entry fromproject.scripts.
- Removed the
- scripts/serve.py
- Imported
asyncio. - Removed the
start_serverandservefunctions. - Modified the main execution block to use
asyncio.runto start the asynchronous server. - Implemented graceful shutdown using
server.stop().
- Imported
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Trivia time!
What is the main purpose of asyncio in Python?
Click here for the answer
The asyncio module is used to write concurrent code using the async/await syntax. It is often a perfect fit for IO-bound and high-level structured network code.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Code Review
The pull request migrates the server implementation to use asyncio and grpc.aio for asynchronous handling, which is a significant improvement for performance and scalability. The changes look good overall, but there are a few areas that could be improved.
Summary of Findings
- Error Handling in Asynchronous Context: The code initializes an event loop but doesn't explicitly handle potential exceptions that might occur within the
servefunction or the cleanup coroutines. It's important to ensure that exceptions are caught and logged appropriately to prevent the application from crashing. - Graceful Shutdown: The graceful shutdown implementation could be improved by ensuring all tasks are properly cancelled and awaited before closing the loop. This can prevent resource leaks and ensure a clean exit.
Merge Readiness
The pull request introduces important changes for asynchronous handling. While the core functionality seems to be implemented correctly, I recommend addressing the error handling and graceful shutdown improvements before merging. I am unable to directly approve the pull request, and other reviewers should also examine the code before merging.
| loop.run_until_complete(serve(config, logger)) | ||
| finally: | ||
| loop.run_until_complete(*_cleanup_coroutines) | ||
| loop.close() |
There was a problem hiding this comment.
It's important to handle exceptions that might occur within the serve function to prevent the application from crashing. Consider adding a try-except block around the loop.run_until_complete(serve(config, logger)) call to catch and log any exceptions.
try:
loop.run_until_complete(serve(config, logger))
except Exception as e:
logger.error(f"Unhandled exception during server execution: {e}")
finally:
loop.run_until_complete(*_cleanup_coroutines)
loop.close()| async def server_graceful_shutdown(): | ||
| logging.info("Starting graceful shutdown...") | ||
| await server.stop(3) | ||
|
|
There was a problem hiding this comment.
Consider adding a timeout to server.stop() to prevent indefinite blocking in case of issues during shutdown. Also, it might be useful to log any exceptions that occur during the shutdown process.
logging.info("Starting graceful shutdown...")
try:
await server.stop(3)
except Exception as e:
logging.error(f"Error during graceful shutdown: {e}")| _cleanup_coroutines = [] | ||
| try: | ||
| loop.run_until_complete(serve(config, logger)) | ||
| finally: | ||
| loop.run_until_complete(*_cleanup_coroutines) | ||
| loop.close() |
There was a problem hiding this comment.
Ensure all tasks are properly cancelled and awaited before closing the loop in the finally block. This can prevent resource leaks and ensure a clean exit. You can use asyncio.gather to await all cleanup coroutines concurrently.
finally:
try:
await asyncio.gather(*_cleanup_coroutines)
except Exception as e:
logger.error(f"Error during cleanup: {e}")
finally:
loop.close()|
|
||
| retriever = self.index.as_retriever(similarity_top_k=similarity_top_k) | ||
| results: list[NodeWithScore] = retriever.retrieve(prompt) | ||
| results: list[NodeWithScore] = await retriever.aretrieve(prompt) |
There was a problem hiding this comment.
Consider adding error handling around the retriever.aretrieve call to gracefully handle potential exceptions during retrieval.
try:
results: list[NodeWithScore] = await retriever.aretrieve(prompt)
except Exception as e:
logger.error(f"Error during retrieval: {e}")
return search_pb2.SearchResponse(results=[]) # Or handle the error appropriately
llm_backend/summarize/service.py
Outdated
| ): | ||
| texts = self.content_formatter(request.contents) | ||
| summary = str(self.summarizer.get_response(self.query_str, texts)) | ||
| summary = str(self.summarizer.aget_response(self.query_str, texts)) |
There was a problem hiding this comment.
Consider adding error handling around the self.summarizer.aget_response call to gracefully handle potential exceptions during summarization.
| summary = str(self.summarizer.aget_response(self.query_str, texts)) | |
| try: | |
| summary = str(self.summarizer.aget_response(self.query_str, texts)) | |
| except Exception as e: | |
| logger.error(f"Error during summarization: {e}") | |
| return summarize_pb2.SummarizeResponse(summary="") # Or handle the error appropriately |
There was a problem hiding this comment.
PR Overview
This PR migrates the server implementation to use asyncio and grpc.aio for asynchronous handling and updates the corresponding gRPC service methods.
- Migrate from synchronous gRPC server to asynchronous using grpc.aio.
- Update gRPC service methods in Summarize and Search to asynchronous versions.
- Update README and pyproject.toml to reflect the new server entry point and usage.
Reviewed Changes
| File | Description |
|---|---|
| scripts/serve.py | Migrates server to asyncio and modifies shutdown handling |
| README.md | Updates command usage for the async server implementation |
| pyproject.toml | Removes old serve script configuration |
| llm_backend/summarize/service.py | Updates Summarize service method to be asynchronous |
| llm_backend/search/service.py | Updates Search service method to be asynchronous |
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
| try: | ||
| loop.run_until_complete(serve(config, logger)) | ||
| finally: | ||
| loop.run_until_complete(*_cleanup_coroutines) |
There was a problem hiding this comment.
The shutdown coroutines are passed directly to run_until_complete using the unpacking operator, which is incorrect. Use 'loop.run_until_complete(asyncio.gather(*_cleanup_coroutines))' to correctly await all cleanup coroutines.
| loop.run_until_complete(*_cleanup_coroutines) | |
| loop.run_until_complete(asyncio.gather(*_cleanup_coroutines)) |
llm_backend/summarize/service.py
Outdated
| ): | ||
| texts = self.content_formatter(request.contents) | ||
| summary = str(self.summarizer.get_response(self.query_str, texts)) | ||
| summary = str(self.summarizer.aget_response(self.query_str, texts)) |
There was a problem hiding this comment.
Since 'Summarize' is now async, the asynchronous call to 'aget_response' should be awaited. Change the line to 'summary = str(await self.summarizer.aget_response(self.query_str, texts))'.
No description provided.