-
Notifications
You must be signed in to change notification settings - Fork 407
[Transfer Engine]Coro rpc communicator #795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
JasonZhang517
wants to merge
68
commits into
kvcache-ai:main
Choose a base branch
from
JasonZhang517:coro_rpc_communicator
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 63 commits
Commits
Show all changes
68 commits
Select commit
Hold shift + click to select a range
e63fa88
add basic implementation and python interfaces
JasonZhang517 22e59a0
add real communicator logic and interface
JasonZhang517 998a1c0
add integration test
JasonZhang517 f8cee4d
added tensor rebuild logic
JasonZhang517 81cbfdd
add tensor rebuild and tests
JasonZhang517 714ef16
removed useless lines
JasonZhang517 59c5105
removed blank lines
JasonZhang517 06a672f
turn single client creation to client pool
JasonZhang517 1c225b9
merged conflicts
JasonZhang517 1fc1ee0
Merge remote-tracking branch 'upstream/main' into coro_rpc_communicator
JasonZhang517 f5deb6c
change std::future to async_simple coro lazy
JasonZhang517 65ee0c8
1. removed some data copies 2. replace synchronous thread with asynch…
JasonZhang517 8c061e3
1.added tests to ci 2. removed CLAUDE.md
JasonZhang517 aed64b1
add pybind installation in CI
JasonZhang517 08146aa
fixed clang spelling
JasonZhang517 584e386
fixed communicator spelling
JasonZhang517 e58e2c3
fixed redundant functions in transfer engine py
JasonZhang517 8953508
fixed ci path
JasonZhang517 b2afec2
fixed memory copy bugs, removed client pool creation
JasonZhang517 fe165e9
convert memorycopy to memoryview
JasonZhang517 a1d4c64
set attachment for handleDataTransfer
JasonZhang517 9ca0282
removed duplicate lock
JasonZhang517 afdb741
replace string copy with stringview
JasonZhang517 8ab6dfa
reformat the code
JasonZhang517 ca4d006
removed tensor rebuild logic
JasonZhang517 22c418d
removed unnecessary copies
JasonZhang517 fafe32a
reformat the code with clang
JasonZhang517 7ebe733
remove useless files
JasonZhang517 5af45f2
updated tests
JasonZhang517 101c031
replaced std io with LOG io
JasonZhang517 5fd5bb8
convert string_view to size in the handledata
JasonZhang517 98ca700
convert string_view to size in the handledatatransfer
JasonZhang517 fdd5a18
remove monitor
JasonZhang517 9cd2e63
removed redundent lines
JasonZhang517 6e1efa1
removed ndim restrictions
JasonZhang517 ff96760
1. removed old test scripts 2. add new test scripts for the communica…
JasonZhang517 2efd542
removed pybind11 in .github/workflows/ci.yml
JasonZhang517 cd85ad7
fixed cmakelists.txt
JasonZhang517 4158094
removed cross compilation problem
JasonZhang517 dd83623
removed cmakelists.txt
JasonZhang517 45987ba
refactor the test
JasonZhang517 4d218a3
added packet size as parameters
JasonZhang517 393e5b8
remove pybind dependency and integration
JasonZhang517 e925202
readd cmakelists.txt to connector folder
JasonZhang517 dc479af
fixed CMakeLists.txt
JasonZhang517 6383910
fixed pybind modules
JasonZhang517 d4caef8
reformat headers
JasonZhang517 e52aa02
fixed CMakeLists.txt
JasonZhang517 fa10062
add indication of protocol
JasonZhang517 e72c7a0
fixed rdma bugs
JasonZhang517 732fe0e
fixed some minor bugs
JasonZhang517 c40b13a
removed Chinese comments
JasonZhang517 aa62943
remove useless files
JasonZhang517 ff9525a
removed unnecessary include in CMakeLists.txt
JasonZhang517 2b83a76
refactored client and server initialization methods
JasonZhang517 ed76b8b
fixed server initialization pool size
JasonZhang517 600db58
fixed interface pybind bugs
JasonZhang517 abd15a1
fixed pybind
JasonZhang517 20b1bc2
update pybind11 parameters to -Dpybind11_DIR
JasonZhang517 fc7dde6
update cmakelists
JasonZhang517 c790f21
local py312 test complete
JasonZhang517 255a5a9
adjused workflow
JasonZhang517 61b03da
adjusted Cmakelists.txxt
JasonZhang517 2ac0105
restored some useless changes
JasonZhang517 41e871f
remove Chinese comments
JasonZhang517 ba6bd7f
remove blank lines
JasonZhang517 384e3b1
Merge remote-tracking branch 'origin/coro_rpc_communicator' into coro…
JasonZhang517 2130bf0
Merge branch 'main' into coro_rpc_communicator
JasonZhang517 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
102 changes: 102 additions & 0 deletions
102
mooncake-transfer-engine/include/transport/coro_rpc_connector/cororpc_communicator.h
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| #pragma once | ||
|
|
||
| #include <memory> | ||
| #include <string> | ||
| #include <vector> | ||
| #include <future> | ||
| #include <unordered_map> | ||
| #include <functional> | ||
| #include <pybind11/pybind11.h> | ||
| #include <pybind11/pytypes.h> | ||
| #include <ylt/coro_rpc/coro_rpc_client.hpp> | ||
| #include <ylt/coro_rpc/coro_rpc_server.hpp> | ||
| #include <ylt/coro_io/client_pool.hpp> | ||
| #include <ylt/coro_io/coro_io.hpp> | ||
| #include <async_simple/coro/Lazy.h> | ||
|
|
||
| namespace mooncake { | ||
|
|
||
| struct TensorInfo { | ||
| void* data_ptr = nullptr; | ||
| std::vector<size_t> shape; | ||
| std::string dtype; | ||
| size_t total_bytes = 0; | ||
| }; | ||
|
|
||
| struct result { | ||
| int code = 0; | ||
| std::string err_msg; | ||
| }; | ||
|
|
||
| struct Config { | ||
| std::string listen_address; | ||
| size_t thread_count = 0; | ||
| size_t timeout_seconds = 30; | ||
| size_t pool_size = 10; | ||
| }; | ||
|
|
||
| template <typename T> | ||
| struct SimpleContext { | ||
| coro_rpc::context<T> context_; | ||
| void response_msg() { context_.response_msg(); } | ||
| }; | ||
|
|
||
| class CoroRPCCommunicator { | ||
| public: | ||
| class Impl { | ||
| public: | ||
| Config config; | ||
| bool is_server_started = false; | ||
|
|
||
| std::unique_ptr<coro_rpc::coro_rpc_server> server_; | ||
|
|
||
| std::function<void(std::string_view, std::string_view)> | ||
| data_receive_callback; | ||
|
|
||
| pybind11::handle py_callback; | ||
|
|
||
| void handleDataTransfer(coro_rpc::context<void> context, | ||
| std::string_view data); | ||
| void handleTensorTransfer(coro_rpc::context<void> context); | ||
| void handleDataTransferWithAttachment(coro_rpc::context<void> context, | ||
| std::string_view data); | ||
| void handleTensorTransferWithAttachment( | ||
| coro_rpc::context<void> context); | ||
| }; | ||
|
|
||
| CoroRPCCommunicator(); | ||
| ~CoroRPCCommunicator(); | ||
|
|
||
| bool initialize(const Config& config); | ||
| bool startServerImpl(bool is_async = true); | ||
| bool startServer(); | ||
| bool startServerAsync(); | ||
| void stopServer(); | ||
|
|
||
| int sendData(const std::string& target_address, const void* data, | ||
| size_t data_size); | ||
| async_simple::coro::Lazy<result> sendDataAsync( | ||
| const std::string& target_address, const void* data, size_t data_size); | ||
|
|
||
| int sendTensor(const std::string& target_address, | ||
| const pybind11::object& tensor); | ||
| async_simple::coro::Lazy<int> sendTensorAsync( | ||
| const std::string& target_address, const TensorInfo& tensor); | ||
|
|
||
| int receiveData(const std::string& source_address, void* buffer, | ||
| size_t buffer_size, int timeout_ms = -1); | ||
| async_simple::coro::Lazy<std::string> receiveDataAsync( | ||
| const std::string& source_address, int timeout_ms = -1); | ||
|
|
||
| void setDataReceiveCallback( | ||
| std::function<void(std::string_view, std::string_view)> callback); | ||
|
|
||
| std::shared_ptr<Impl> getImpl() { return impl_; } | ||
|
|
||
| private: | ||
| std::shared_ptr<coro_io::client_pools<coro_rpc::coro_rpc_client>> | ||
| client_pools_; | ||
| std::shared_ptr<Impl> impl_; | ||
| }; | ||
|
|
||
| } // namespace mooncake |
94 changes: 94 additions & 0 deletions
94
mooncake-transfer-engine/include/transport/coro_rpc_connector/cororpc_interface.h
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| #pragma once | ||
|
|
||
| #include <memory> | ||
| #include <string> | ||
| #include <vector> | ||
| #include <pybind11/pybind11.h> | ||
| #include <pybind11/pytypes.h> | ||
|
|
||
| namespace mooncake { | ||
|
|
||
| struct Config; | ||
|
|
||
| class CoroRPCInterface { | ||
| public: | ||
| struct ReceivedData { | ||
| std::string source_address; | ||
| std::string data; | ||
| size_t data_size = 0; | ||
|
|
||
| pybind11::bytes getBytes() const { return pybind11::bytes(data); } | ||
JasonZhang517 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pybind11::memoryview getMemoryView() const { | ||
| return pybind11::memoryview::from_memory( | ||
| const_cast<char*>(data.data()), data.size(), true); | ||
| } | ||
| }; | ||
|
|
||
| struct ReceivedTensor { | ||
| std::string source_address; | ||
| std::string data; | ||
| std::vector<size_t> shape; | ||
| std::string dtype; | ||
| size_t total_bytes = 0; | ||
| size_t getDataSize() const { return data.size(); } | ||
| pybind11::bytes getDataAsBytes() const { return pybind11::bytes(data); } | ||
JasonZhang517 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pybind11::memoryview getMemoryView() const { | ||
| return pybind11::memoryview::from_memory( | ||
| const_cast<char*>(data.data()), data.size(), true); | ||
| } | ||
| }; | ||
|
|
||
| class Impl; | ||
|
|
||
| CoroRPCInterface(); | ||
| ~CoroRPCInterface(); | ||
|
|
||
| bool initialize(const std::string& listen_address = "", | ||
| size_t thread_count = 0, size_t timeout_seconds = 30, | ||
| size_t pool_size = 10); | ||
|
|
||
| // Convenience methods for common use cases | ||
| bool initializeClient(size_t pool_size = 10, size_t timeout_seconds = 30); | ||
| bool initializeServer(const std::string& listen_address, | ||
| size_t thread_count = 8, size_t timeout_seconds = 30); | ||
|
|
||
| bool startServer(); | ||
| bool startServerAsync(); | ||
| bool startServerImpl(bool is_async = true); | ||
| void stopServer(); | ||
|
|
||
| int sendData(const std::string& target_address, pybind11::handle data); | ||
| pybind11::object sendDataAsync(std::string& target_address, | ||
| pybind11::handle data, | ||
| pybind11::handle loop); | ||
|
|
||
| int sendTensor(const std::string& target_address, pybind11::handle tensor); | ||
| pybind11::object sendTensorAsync(std::string& target_address, | ||
| pybind11::handle tensor, | ||
| pybind11::handle loop); | ||
|
|
||
| void setDataReceiveCallback(pybind11::function callback); | ||
| void setTensorReceiveCallback(pybind11::function callback); | ||
|
|
||
| void handleIncomingData(std::string_view source_address, | ||
| std::string_view data); | ||
| void handleIncomingTensor(std::string_view source_address, | ||
| std::string_view data, | ||
| const std::vector<size_t>& shape, | ||
| std::string_view dtype); | ||
|
|
||
| private: | ||
| std::unique_ptr<Impl> impl_; | ||
| }; | ||
|
|
||
| std::unique_ptr<CoroRPCInterface> createRPCClient(uint64_t local_rank = 0, | ||
| uint64_t world_size = 1); | ||
| std::unique_ptr<CoroRPCInterface> createRPCServer(uint64_t local_rank = 0, | ||
| uint64_t world_size = 1); | ||
|
|
||
| } // namespace mooncake | ||
|
|
||
| namespace pybind11 { | ||
| class module_; | ||
| } | ||
| void bind_coro_rpc_interface(pybind11::module_& m); | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move it here?