- 
                Notifications
    You must be signed in to change notification settings 
- Fork 407
[Transfer Engine]Coro rpc communicator #795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
          
     Open
      
        
      
            JasonZhang517
  wants to merge
  68
  commits into
  kvcache-ai:main
  
    
      
        
          
  
    
      Choose a base branch
      
     
    
      
        
      
      
        
          
          
        
        
          
            
              
              
              
  
           
        
        
          
            
              
              
           
        
       
     
  
        
          
            
          
            
          
        
       
    
      
from
JasonZhang517:coro_rpc_communicator
  
      
      
   
  
    
  
  
  
 
  
      
    base: main
Could not load branches
            
              
  
    Branch not found: {{ refName }}
  
            
                
      Loading
              
            Could not load tags
            
            
              Nothing to show
            
              
  
            
                
      Loading
              
            Are you sure you want to change the base?
            Some commits from the old base branch may be removed from the timeline,
            and old review comments may become outdated.
          
          
  
     Open
                    Changes from 2 commits
      Commits
    
    
            Show all changes
          
          
            68 commits
          
        
        Select commit
          Hold shift + click to select a range
      
      e63fa88
              
                add basic implementation and python interfaces
              
              
                JasonZhang517 22e59a0
              
                add real communicator logic and interface
              
              
                JasonZhang517 998a1c0
              
                add integration test
              
              
                JasonZhang517 f8cee4d
              
                added tensor rebuild logic
              
              
                JasonZhang517 81cbfdd
              
                add tensor rebuild and tests
              
              
                JasonZhang517 714ef16
              
                removed useless lines
              
              
                JasonZhang517 59c5105
              
                removed blank lines
              
              
                JasonZhang517 06a672f
              
                turn single client creation to client pool
              
              
                JasonZhang517 1c225b9
              
                merged conflicts
              
              
                JasonZhang517 1fc1ee0
              
                Merge remote-tracking branch 'upstream/main' into coro_rpc_communicator
              
              
                JasonZhang517 f5deb6c
              
                change std::future to async_simple coro lazy
              
              
                JasonZhang517 65ee0c8
              
                1. removed some data copies 2. replace synchronous thread with asynch…
              
              
                JasonZhang517 8c061e3
              
                1.added tests to ci 2. removed CLAUDE.md
              
              
                JasonZhang517 aed64b1
              
                add pybind installation in CI
              
              
                JasonZhang517 08146aa
              
                fixed clang spelling
              
              
                JasonZhang517 584e386
              
                fixed communicator spelling
              
              
                JasonZhang517 e58e2c3
              
                fixed redundant functions in transfer engine py
              
              
                JasonZhang517 8953508
              
                fixed ci path
              
              
                JasonZhang517 b2afec2
              
                fixed memory copy bugs, removed client pool creation
              
              
                JasonZhang517 fe165e9
              
                convert memorycopy to memoryview
              
              
                JasonZhang517 a1d4c64
              
                set attachment for handleDataTransfer
              
              
                JasonZhang517 9ca0282
              
                removed duplicate lock
              
              
                JasonZhang517 afdb741
              
                replace string copy with stringview
              
              
                JasonZhang517 8ab6dfa
              
                reformat the code
              
              
                JasonZhang517 ca4d006
              
                removed tensor rebuild logic
              
              
                JasonZhang517 22c418d
              
                removed unnecessary copies
              
              
                JasonZhang517 fafe32a
              
                reformat the code with clang
              
              
                JasonZhang517 7ebe733
              
                remove useless files
              
              
                JasonZhang517 5af45f2
              
                updated tests
              
              
                JasonZhang517 101c031
              
                replaced std io with LOG io
              
              
                JasonZhang517 5fd5bb8
              
                convert string_view to size in the handledata
              
              
                JasonZhang517 98ca700
              
                convert string_view to size in the handledatatransfer
              
              
                JasonZhang517 fdd5a18
              
                remove monitor
              
              
                JasonZhang517 9cd2e63
              
                removed redundent lines
              
              
                JasonZhang517 6e1efa1
              
                removed ndim restrictions
              
              
                JasonZhang517 ff96760
              
                1. removed old test scripts 2. add new test scripts for the communica…
              
              
                JasonZhang517 2efd542
              
                removed pybind11 in .github/workflows/ci.yml
              
              
                JasonZhang517 cd85ad7
              
                fixed cmakelists.txt
              
              
                JasonZhang517 4158094
              
                removed cross compilation problem
              
              
                JasonZhang517 dd83623
              
                removed cmakelists.txt
              
              
                JasonZhang517 45987ba
              
                refactor the test
              
              
                JasonZhang517 4d218a3
              
                added packet size as parameters
              
              
                JasonZhang517 393e5b8
              
                remove pybind dependency and integration
              
              
                JasonZhang517 e925202
              
                readd cmakelists.txt to connector folder
              
              
                JasonZhang517 dc479af
              
                fixed CMakeLists.txt
              
              
                JasonZhang517 6383910
              
                fixed pybind modules
              
              
                JasonZhang517 d4caef8
              
                reformat headers
              
              
                JasonZhang517 e52aa02
              
                fixed CMakeLists.txt
              
              
                JasonZhang517 fa10062
              
                add indication of protocol
              
              
                JasonZhang517 e72c7a0
              
                fixed rdma bugs
              
              
                JasonZhang517 732fe0e
              
                fixed some minor bugs
              
              
                JasonZhang517 c40b13a
              
                removed Chinese comments
              
              
                JasonZhang517 aa62943
              
                remove useless files
              
              
                JasonZhang517 ff9525a
              
                removed unnecessary include in CMakeLists.txt
              
              
                JasonZhang517 2b83a76
              
                refactored client and server initialization methods
              
              
                JasonZhang517 ed76b8b
              
                fixed server initialization pool size
              
              
                JasonZhang517 600db58
              
                fixed interface pybind bugs
              
              
                JasonZhang517 abd15a1
              
                fixed pybind
              
              
                JasonZhang517 20b1bc2
              
                update pybind11 parameters to -Dpybind11_DIR
              
              
                JasonZhang517 fc7dde6
              
                update cmakelists
              
              
                JasonZhang517 c790f21
              
                local py312 test complete
              
              
                JasonZhang517 255a5a9
              
                adjused workflow
              
              
                JasonZhang517 61b03da
              
                adjusted Cmakelists.txxt
              
              
                JasonZhang517 2ac0105
              
                restored some useless changes
              
              
                JasonZhang517 41e871f
              
                remove Chinese comments
              
              
                JasonZhang517 ba6bd7f
              
                remove blank lines
              
              
                JasonZhang517 384e3b1
              
                Merge remote-tracking branch 'origin/coro_rpc_communicator' into coro…
              
              
                JasonZhang517 2130bf0
              
                Merge branch 'main' into coro_rpc_communicator
              
              
                JasonZhang517 File filter
Filter by extension
Conversations
          Failed to load comments.   
        
        
          
      Loading
        
  Jump to
        
          Jump to file
        
      
      
          Failed to load files.   
        
        
          
      Loading
        
  Diff view
Diff view
          Some comments aren't visible on the classic Files Changed page.
        
There are no files selected for viewing
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -198,3 +198,5 @@ mooncake-wheel/mooncake/transfer_engine_bench | |
|  | ||
| # Claude Code Memory | ||
| CLAUDE.md | ||
|  | ||
|  | ||
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
        
          
          
            88 changes: 88 additions & 0 deletions
          
          88 
        
  mooncake-transfer-engine/include/transport/coro_rpc_connector/cororpc_communicator.h
  
  
      
      
   
        
      
      
    
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| #pragma once | ||
|  | ||
| #include <memory> | ||
| #include <string> | ||
| #include <vector> | ||
| #include <future> | ||
| #include <unordered_map> | ||
| #include <pybind11/pybind11.h> | ||
| #include <ylt/coro_rpc/coro_rpc_client.hpp> | ||
| #include <ylt/coro_rpc/coro_rpc_server.hpp> | ||
| #include <ylt/coro_io/client_pool.hpp> | ||
|  | ||
| namespace mooncake { | ||
|  | ||
| struct TensorInfo { | ||
| void* data_ptr = nullptr; | ||
| std::vector<size_t> shape; | ||
| std::string dtype; | ||
| size_t total_bytes = 0; | ||
| }; | ||
|  | ||
| struct result { | ||
| int code = 0; | ||
| std::string err_msg; | ||
| }; | ||
|  | ||
| struct Config { | ||
| std::string listen_address; | ||
| size_t thread_count = 0; | ||
| size_t timeout_seconds = 30; | ||
| size_t pool_size = 10; | ||
| }; | ||
|  | ||
| template<typename T> | ||
| struct SimpleContext { | ||
| coro_rpc::context<T> context_; | ||
| void response_msg() { context_.response_msg(); } | ||
| }; | ||
|  | ||
| class CoroRPCCommunicator { | ||
| public: | ||
| class Impl { | ||
| public: | ||
| Config config; | ||
| bool is_server_started = false; | ||
|  | ||
| // 真实的 coro_rpc 组件 | ||
| std::unique_ptr<coro_rpc::coro_rpc_server> server_; | ||
| std::shared_ptr<coro_io::client_pool<coro_rpc::coro_rpc_client>> client_pool_; | ||
| std::unordered_map<std::string, coro_rpc::coro_rpc_client> clients_; | ||
|  | ||
| void handleDataTransfer(coro_rpc::context<void> context, std::string_view data); | ||
| void handleTensorTransfer(coro_rpc::context<void> context); | ||
| void handleDataTransferWithAttachment(coro_rpc::context<void> context, std::string_view data); | ||
| void handleTensorTransferWithAttachment(coro_rpc::context<void> context); | ||
| }; | ||
|  | ||
| CoroRPCCommunicator(); | ||
| ~CoroRPCCommunicator(); | ||
|  | ||
| bool initialize(const Config& config); | ||
| bool startServer(); | ||
| bool startServerAsync(); | ||
| void stopServer(); | ||
|  | ||
| bool addRemoteConnection(const std::string& remote_address); | ||
| void removeRemoteConnection(const std::string& remote_address); | ||
| bool isConnected(const std::string& remote_address); | ||
|  | ||
| int sendData(const std::string& target_address, const void* data, size_t data_size); | ||
| std::future<result> sendDataAsync(const std::string& target_address, const void* data, size_t data_size); | ||
|  | ||
| int sendTensor(const std::string& target_address, const pybind11::object& tensor); | ||
| std::future<int> sendTensorAsync(const std::string& target_address, const TensorInfo& tensor); | ||
|  | ||
| int receiveData(const std::string& source_address, void* buffer, size_t buffer_size, int timeout_ms = -1); | ||
| std::future<std::string> receiveDataAsync(const std::string& source_address, int timeout_ms = -1); | ||
|  | ||
| std::shared_ptr<Impl> getImpl() { return impl_; } | ||
|  | ||
| private: | ||
| std::shared_ptr<Impl> impl_; | ||
| }; | ||
|  | ||
| std::unique_ptr<CoroRPCCommunicator> createClientPool(size_t pool_size = 10, size_t timeout_seconds = 30); | ||
| std::unique_ptr<CoroRPCCommunicator> createServer(const std::string& listen_address, size_t thread_count = 0); | ||
|  | ||
| } // namespace mooncake | 
        
          
          
            87 changes: 87 additions & 0 deletions
          
          87 
        
  mooncake-transfer-engine/include/transport/coro_rpc_connector/cororpc_interface.h
  
  
      
      
   
        
      
      
    
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| #pragma once | ||
|  | ||
| #include <memory> | ||
| #include <string> | ||
| #include <vector> | ||
| #include <pybind11/pybind11.h> | ||
|  | ||
| namespace mooncake { | ||
|  | ||
| struct Config; | ||
|  | ||
| class CoroRPCInterface { | ||
| public: | ||
| struct ReceivedData { | ||
| std::string source_address; | ||
| std::string data; | ||
| size_t data_size = 0; | ||
|  | ||
| pybind11::bytes getBytes() const { | ||
| return pybind11::bytes(data); | ||
| } | ||
| }; | ||
|  | ||
| struct ReceivedTensor { | ||
| std::string source_address; | ||
| std::string data; | ||
| std::vector<size_t> shape; | ||
| std::string dtype; | ||
| size_t total_bytes = 0; | ||
|  | ||
| pybind11::object rebuildTensor() const; | ||
|         
                  JasonZhang517 marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
|  | ||
| private: | ||
| pybind11::object rebuildTensorInternal() const; | ||
| }; | ||
|  | ||
| class Impl; | ||
|  | ||
| CoroRPCInterface(); | ||
| ~CoroRPCInterface(); | ||
|  | ||
| // 初始化 | ||
| bool initialize(const std::string& listen_address = "", | ||
| size_t thread_count = 0, | ||
| size_t timeout_seconds = 30, | ||
| size_t pool_size = 10); | ||
|  | ||
| bool startServer(); | ||
| bool startServerAsync(); | ||
| void stopServer(); | ||
|  | ||
| bool addRemoteConnection(const std::string& remote_address); | ||
| void removeRemoteConnection(const std::string& remote_address); | ||
| bool isConnected(const std::string& remote_address); | ||
|  | ||
| int sendData(const std::string& target_address, pybind11::bytes data); | ||
| pybind11::object sendDataAsync(const std::string& target_address, | ||
| pybind11::bytes data, | ||
| pybind11::handle loop); | ||
|  | ||
| int sendTensor(const std::string& target_address, pybind11::handle tensor); | ||
| pybind11::object sendTensorAsync(const std::string& target_address, | ||
| pybind11::handle tensor, | ||
| pybind11::handle loop); | ||
|  | ||
| void setDataReceiveCallback(pybind11::function callback); | ||
| void setTensorReceiveCallback(pybind11::function callback); | ||
|  | ||
| void handleIncomingData(const std::string& source_address, | ||
| const std::string& data); | ||
| void handleIncomingTensor(const std::string& source_address, | ||
| const std::string& data, | ||
| const std::vector<size_t>& shape, | ||
| const std::string& dtype); | ||
|  | ||
| private: | ||
| std::unique_ptr<Impl> impl_; | ||
| }; | ||
|  | ||
| std::unique_ptr<CoroRPCInterface> createRPCClient(size_t pool_size = 10, size_t timeout_seconds = 30); | ||
| std::unique_ptr<CoroRPCInterface> createRPCServer(const std::string& listen_address, size_t thread_count = 0); | ||
|  | ||
| } // namespace mooncake | ||
|  | ||
| // Forward declaration for pybind11 integration | ||
| namespace pybind11 { class module_; } | ||
| void bind_coro_rpc_interface(pybind11::module_ &m); | ||
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
        
          
          
            38 changes: 38 additions & 0 deletions
          
          38 
        
  mooncake-transfer-engine/src/transport/coro_rpc_connector/CMakeLists.txt
  
  
      
      
   
        
      
      
    
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| # Find Python and pybind11 for the binding code | ||
| find_package(Python3 COMPONENTS Interpreter Development REQUIRED) | ||
| find_package(pybind11 QUIET) | ||
| if(NOT pybind11_FOUND) | ||
| execute_process( | ||
| COMMAND ${Python3_EXECUTABLE} -m pybind11 --cmakedir | ||
| OUTPUT_VARIABLE pybind11_DIR | ||
| OUTPUT_STRIP_TRAILING_WHITESPACE | ||
| RESULT_VARIABLE pybind11_RESULT | ||
| ) | ||
| if(pybind11_RESULT EQUAL 0) | ||
| find_package(pybind11 REQUIRED PATHS ${pybind11_DIR}) | ||
| else() | ||
| message(FATAL_ERROR "pybind11 not found. Please install with: pip install pybind11") | ||
| endif() | ||
| endif() | ||
|  | ||
| # Create object library for coro_rpc_connector | ||
| set(CORO_RPC_SOURCES | ||
| cororpc_interface.cpp | ||
| cororpc_communicator.cpp | ||
| ) | ||
|  | ||
| add_library(coro_rpc_connector OBJECT ${CORO_RPC_SOURCES}) | ||
|  | ||
| target_compile_features(coro_rpc_connector PRIVATE cxx_std_20) | ||
| target_compile_options(coro_rpc_connector PRIVATE -O3 -Wall) | ||
|  | ||
| target_include_directories(coro_rpc_connector PRIVATE | ||
| ${CMAKE_CURRENT_SOURCE_DIR}/../../../include | ||
| ${Python3_INCLUDE_DIRS} | ||
| ) | ||
|  | ||
| target_link_libraries(coro_rpc_connector PRIVATE | ||
| yalantinglibs::yalantinglibs | ||
| pthread | ||
| pybind11::module | ||
| ) | 
      
      Oops, something went wrong.
        
    
  
      
      Oops, something went wrong.
        
    
  
  Add this suggestion to a batch that can be applied as a single commit.
  This suggestion is invalid because no changes were made to the code.
  Suggestions cannot be applied while the pull request is closed.
  Suggestions cannot be applied while viewing a subset of changes.
  Only one suggestion per line can be applied in a batch.
  Add this suggestion to a batch that can be applied as a single commit.
  Applying suggestions on deleted lines is not supported.
  You must change the existing code in this line in order to create a valid suggestion.
  Outdated suggestions cannot be applied.
  This suggestion has been applied or marked resolved.
  Suggestions cannot be applied from pending reviews.
  Suggestions cannot be applied on multi-line comments.
  Suggestions cannot be applied while the pull request is queued to merge.
  Suggestion cannot be applied right now. Please check back later.
  
    
  
    
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to add these files here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we want to use coro_rpc_connector as a submodule of mooncake transfer engine, we have to compile it here