Skip to content
Open
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
e63fa88
add basic implementation and python interfaces
JasonZhang517 Aug 29, 2025
22e59a0
add real communicator logic and interface
JasonZhang517 Sep 1, 2025
998a1c0
add integration test
JasonZhang517 Sep 1, 2025
f8cee4d
added tensor rebuild logic
JasonZhang517 Sep 1, 2025
81cbfdd
add tensor rebuild and tests
JasonZhang517 Sep 2, 2025
714ef16
removed useless lines
JasonZhang517 Sep 2, 2025
59c5105
removed blank lines
JasonZhang517 Sep 2, 2025
06a672f
turn single client creation to client pool
JasonZhang517 Sep 2, 2025
1c225b9
merged conflicts
JasonZhang517 Sep 2, 2025
1fc1ee0
Merge remote-tracking branch 'upstream/main' into coro_rpc_communicator
JasonZhang517 Sep 2, 2025
f5deb6c
change std::future to async_simple coro lazy
JasonZhang517 Sep 2, 2025
65ee0c8
1. removed some data copies 2. replace synchronous thread with asynch…
JasonZhang517 Sep 4, 2025
8c061e3
1.added tests to ci 2. removed CLAUDE.md
JasonZhang517 Sep 4, 2025
aed64b1
add pybind installation in CI
JasonZhang517 Sep 4, 2025
08146aa
fixed clang spelling
JasonZhang517 Sep 4, 2025
584e386
fixed communicator spelling
JasonZhang517 Sep 4, 2025
e58e2c3
fixed redundant functions in transfer engine py
JasonZhang517 Sep 4, 2025
8953508
fixed ci path
JasonZhang517 Sep 4, 2025
b2afec2
fixed memory copy bugs, removed client pool creation
JasonZhang517 Sep 8, 2025
fe165e9
convert memorycopy to memoryview
JasonZhang517 Sep 8, 2025
a1d4c64
set attachment for handleDataTransfer
JasonZhang517 Sep 8, 2025
9ca0282
removed duplicate lock
JasonZhang517 Sep 8, 2025
afdb741
replace string copy with stringview
JasonZhang517 Sep 8, 2025
8ab6dfa
reformat the code
JasonZhang517 Sep 8, 2025
ca4d006
removed tensor rebuild logic
JasonZhang517 Sep 9, 2025
22c418d
removed unnecessary copies
JasonZhang517 Sep 9, 2025
fafe32a
reformat the code with clang
JasonZhang517 Sep 9, 2025
7ebe733
remove useless files
JasonZhang517 Sep 9, 2025
5af45f2
updated tests
JasonZhang517 Sep 9, 2025
101c031
replaced std io with LOG io
JasonZhang517 Sep 10, 2025
5fd5bb8
convert string_view to size in the handledata
JasonZhang517 Sep 10, 2025
98ca700
convert string_view to size in the handledatatransfer
JasonZhang517 Sep 10, 2025
fdd5a18
remove monitor
JasonZhang517 Sep 10, 2025
9cd2e63
removed redundent lines
JasonZhang517 Sep 10, 2025
6e1efa1
removed ndim restrictions
JasonZhang517 Sep 10, 2025
ff96760
1. removed old test scripts 2. add new test scripts for the communica…
JasonZhang517 Sep 10, 2025
2efd542
removed pybind11 in .github/workflows/ci.yml
JasonZhang517 Sep 11, 2025
cd85ad7
fixed cmakelists.txt
JasonZhang517 Sep 11, 2025
4158094
removed cross compilation problem
JasonZhang517 Sep 11, 2025
dd83623
removed cmakelists.txt
JasonZhang517 Sep 11, 2025
45987ba
refactor the test
JasonZhang517 Sep 11, 2025
4d218a3
added packet size as parameters
JasonZhang517 Sep 11, 2025
393e5b8
remove pybind dependency and integration
JasonZhang517 Sep 11, 2025
e925202
readd cmakelists.txt to connector folder
JasonZhang517 Sep 11, 2025
dc479af
fixed CMakeLists.txt
JasonZhang517 Sep 11, 2025
6383910
fixed pybind modules
JasonZhang517 Sep 11, 2025
d4caef8
reformat headers
JasonZhang517 Sep 12, 2025
e52aa02
fixed CMakeLists.txt
JasonZhang517 Sep 12, 2025
fa10062
add indication of protocol
JasonZhang517 Sep 12, 2025
e72c7a0
fixed rdma bugs
JasonZhang517 Sep 12, 2025
732fe0e
fixed some minor bugs
JasonZhang517 Sep 12, 2025
c40b13a
removed Chinese comments
JasonZhang517 Sep 15, 2025
aa62943
remove useless files
JasonZhang517 Sep 15, 2025
ff9525a
removed unnecessary include in CMakeLists.txt
JasonZhang517 Sep 15, 2025
2b83a76
refactored client and server initialization methods
JasonZhang517 Sep 15, 2025
ed76b8b
fixed server initialization pool size
JasonZhang517 Sep 15, 2025
600db58
fixed interface pybind bugs
JasonZhang517 Sep 15, 2025
abd15a1
fixed pybind
JasonZhang517 Sep 17, 2025
20b1bc2
update pybind11 parameters to -Dpybind11_DIR
JasonZhang517 Sep 23, 2025
fc7dde6
update cmakelists
JasonZhang517 Sep 23, 2025
c790f21
local py312 test complete
JasonZhang517 Sep 24, 2025
255a5a9
adjused workflow
JasonZhang517 Sep 24, 2025
61b03da
adjusted Cmakelists.txxt
JasonZhang517 Sep 24, 2025
2ac0105
restored some useless changes
JasonZhang517 Sep 28, 2025
41e871f
remove Chinese comments
JasonZhang517 Oct 10, 2025
ba6bd7f
remove blank lines
JasonZhang517 Oct 15, 2025
384e3b1
Merge remote-tracking branch 'origin/coro_rpc_communicator' into coro…
JasonZhang517 Oct 15, 2025
2130bf0
Merge branch 'main' into coro_rpc_communicator
JasonZhang517 Oct 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ jobs:
sudo bash -x dependencies.sh -y
shell: bash


- name: Build transfer engine only
run: |
cd mooncake-transfer-engine
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,4 @@ mooncake-wheel/mooncake/mooncake_master
mooncake-wheel/mooncake/transfer_engine_bench

# Claude Code Memory
CLAUDE.md
CLAUDE.md
19 changes: 18 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,24 @@ option(WITH_STORE "build mooncake store library and sample code" ON)
option(WITH_P2P_STORE "build p2p store library and sample code" OFF)
option(WITH_RUST_EXAMPLE "build the Rust interface and sample code for the transfer engine" OFF)

add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/extern/pybind11)
# pybind11: prefer system/finder (>=2.13), then vendored extern, finally FetchContent
find_package(pybind11 2.13 CONFIG QUIET)

if (NOT pybind11_FOUND)
if (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/extern/pybind11/CMakeLists.txt)
message(STATUS "Using vendored pybind11 from extern/pybind11")
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/extern/pybind11)
else()
include(FetchContent)
message(STATUS "Fetching pybind11 (v2.13.6) via FetchContent")
FetchContent_Declare(pybind11
GIT_REPOSITORY https://github.com/pybind/pybind11.git
GIT_TAG v2.13.6
GIT_SHALLOW TRUE
)
FetchContent_MakeAvailable(pybind11)
endif()
endif()
set(PYTHON_EXECUTABLE "python3")
execute_process(
COMMAND ${PYTHON_EXECUTABLE} -c "import sys; print(sys.path[-1])"
Expand Down
10 changes: 9 additions & 1 deletion mooncake-integration/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ endif()

include_directories("/usr/include/jsoncpp")
include_directories("./")
include_directories("../mooncake-transfer-engine/include")

find_package(Python3 COMPONENTS Interpreter Development REQUIRED)

set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
Expand All @@ -30,11 +32,17 @@ message("${PYTHON_SYS_PATH}")
set(PYTHON_PACKAGE_NAME "mooncake")

pybind11_add_module(engine ${SOURCES} ${CACHE_ALLOCATOR_SOURCES}
transfer_engine/transfer_engine_py.cpp
transfer_engine/transfer_engine_py.cpp
$<TARGET_OBJECTS:coro_rpc_connector>
)

target_include_directories(engine PRIVATE
${Python3_INCLUDE_DIRS}
)

target_link_libraries(engine PUBLIC
transfer_engine
coro_rpc_connector
glog::glog
gflags::gflags
)
Expand Down
61 changes: 61 additions & 0 deletions mooncake-integration/transfer_engine/transfer_engine_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

#include <pybind11/stl.h>

// Include coro_rpc_interface headers
#include "transport/coro_rpc_connector/cororpc_interface.h"

using namespace pybind11::literals;

#ifdef USE_MNNVL
#include "transport/nvlink_transport/nvlink_transport.h"
static void *allocateMemory(size_t size) {
Expand Down Expand Up @@ -639,6 +644,59 @@ std::string TransferEnginePy::getLocalTopology() {

namespace py = pybind11;

// Forward declaration for coro_rpc_interface binding function
void bind_coro_rpc_interface(py::module_ &m);

// Implementation of coro_rpc_interface binding function
void bind_coro_rpc_interface(py::module_ &m) {
using namespace mooncake;

py::class_<CoroRPCInterface::ReceivedData>(m, "ReceivedData")
.def(py::init<>())
.def_readonly("source_address",
&CoroRPCInterface::ReceivedData::source_address)
.def_readonly("data_size", &CoroRPCInterface::ReceivedData::data_size)
.def("get_bytes", &CoroRPCInterface::ReceivedData::getBytes);

py::class_<CoroRPCInterface::ReceivedTensor>(m, "ReceivedTensor")
.def(py::init<>())
.def_readonly("source_address",
&CoroRPCInterface::ReceivedTensor::source_address)
.def_readonly("shape", &CoroRPCInterface::ReceivedTensor::shape)
.def_readonly("dtype", &CoroRPCInterface::ReceivedTensor::dtype)
.def_readonly("total_bytes",
&CoroRPCInterface::ReceivedTensor::total_bytes)
.def("get_data_size", &CoroRPCInterface::ReceivedTensor::getDataSize)
.def("get_data_as_bytes",
&CoroRPCInterface::ReceivedTensor::getDataAsBytes);

py::class_<CoroRPCInterface>(m, "CoroRPCInterface")
.def(py::init<>())
.def("initialize", &CoroRPCInterface::initialize,
"listen_address"_a = "", "thread_count"_a = 0,
"timeout_seconds"_a = 30, "pool_size"_a = 10)
.def("initialize_client", &CoroRPCInterface::initializeClient,
"pool_size"_a = 10, "timeout_seconds"_a = 30)
.def("initialize_server", &CoroRPCInterface::initializeServer,
"listen_address"_a, "thread_count"_a = 8, "timeout_seconds"_a = 30)
.def("start_server", &CoroRPCInterface::startServer)
.def("start_server_async", &CoroRPCInterface::startServerAsync)
.def("stop_server", &CoroRPCInterface::stopServer)
.def("send_data", &CoroRPCInterface::sendData)
.def("send_data_async", &CoroRPCInterface::sendDataAsync)
.def("send_tensor", &CoroRPCInterface::sendTensor)
.def("send_tensor_async", &CoroRPCInterface::sendTensorAsync)
.def("set_data_receive_callback",
&CoroRPCInterface::setDataReceiveCallback)
.def("set_tensor_receive_callback",
&CoroRPCInterface::setTensorReceiveCallback);

m.def("create_rpc_client", &createRPCClient, "pool_size"_a = 10,
"timeout_seconds"_a = 30);
m.def("create_rpc_server", &createRPCServer, "listen_address"_a,
"thread_count"_a = 0);
}

PYBIND11_MODULE(engine, m) {
py::enum_<TransferEnginePy::TransferOpcode> transfer_opcode(
m, "TransferOpcode", py::arithmetic());
Expand Down Expand Up @@ -688,4 +746,7 @@ PYBIND11_MODULE(engine, m) {
&TransferEnginePy::getFirstBufferAddress);

adaptor_cls.attr("TransferOpcode") = transfer_opcode;

// Bind coro_rpc_interface directly to the main module
bind_coro_rpc_interface(m);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#pragma once

#include <memory>
#include <string>
#include <vector>
#include <future>
#include <unordered_map>
#include <functional>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <ylt/coro_rpc/coro_rpc_client.hpp>
#include <ylt/coro_rpc/coro_rpc_server.hpp>
#include <ylt/coro_io/client_pool.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <async_simple/coro/Lazy.h>

namespace mooncake {

struct TensorInfo {
void* data_ptr = nullptr;
std::vector<size_t> shape;
std::string dtype;
size_t total_bytes = 0;
};

struct result {
int code = 0;
std::string err_msg;
};

struct Config {
std::string listen_address;
size_t thread_count = 0;
size_t timeout_seconds = 30;
size_t pool_size = 10;
};

template <typename T>
struct SimpleContext {
coro_rpc::context<T> context_;
void response_msg() { context_.response_msg(); }
};

class CoroRPCCommunicator {
public:
class Impl {
public:
Config config;
bool is_server_started = false;

std::unique_ptr<coro_rpc::coro_rpc_server> server_;

std::function<void(std::string_view, std::string_view)>
data_receive_callback;

pybind11::handle py_callback;

void handleDataTransfer(coro_rpc::context<void> context,
std::string_view data);
void handleTensorTransfer(coro_rpc::context<void> context);
void handleDataTransferWithAttachment(coro_rpc::context<void> context,
std::string_view data);
void handleTensorTransferWithAttachment(
coro_rpc::context<void> context);
};

CoroRPCCommunicator();
~CoroRPCCommunicator();

bool initialize(const Config& config);
bool startServerImpl(bool is_async = true);
bool startServer();
bool startServerAsync();
void stopServer();

int sendData(const std::string& target_address, const void* data,
size_t data_size);
async_simple::coro::Lazy<result> sendDataAsync(
const std::string& target_address, const void* data, size_t data_size);

int sendTensor(const std::string& target_address,
const pybind11::object& tensor);
async_simple::coro::Lazy<int> sendTensorAsync(
const std::string& target_address, const TensorInfo& tensor);

int receiveData(const std::string& source_address, void* buffer,
size_t buffer_size, int timeout_ms = -1);
async_simple::coro::Lazy<std::string> receiveDataAsync(
const std::string& source_address, int timeout_ms = -1);

void setDataReceiveCallback(
std::function<void(std::string_view, std::string_view)> callback);

std::shared_ptr<Impl> getImpl() { return impl_; }

private:
std::shared_ptr<coro_io::client_pools<coro_rpc::coro_rpc_client>>
client_pools_;
std::shared_ptr<Impl> impl_;
};

} // namespace mooncake
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#pragma once

#include <memory>
#include <string>
#include <vector>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>

namespace mooncake {

struct Config;

class CoroRPCInterface {
public:
struct ReceivedData {
std::string source_address;
std::string data;
size_t data_size = 0;

pybind11::bytes getBytes() const { return pybind11::bytes(data); }
pybind11::memoryview getMemoryView() const {
return pybind11::memoryview::from_memory(
const_cast<char*>(data.data()), data.size(), true);
}
};

struct ReceivedTensor {
std::string source_address;
std::string data;
std::vector<size_t> shape;
std::string dtype;
size_t total_bytes = 0;
size_t getDataSize() const { return data.size(); }
pybind11::bytes getDataAsBytes() const { return pybind11::bytes(data); }
pybind11::memoryview getMemoryView() const {
return pybind11::memoryview::from_memory(
const_cast<char*>(data.data()), data.size(), true);
}
};

class Impl;

CoroRPCInterface();
~CoroRPCInterface();

bool initialize(const std::string& listen_address = "",
size_t thread_count = 0, size_t timeout_seconds = 30,
size_t pool_size = 10);

// Convenience methods for common use cases
bool initializeClient(size_t pool_size = 10, size_t timeout_seconds = 30);
bool initializeServer(const std::string& listen_address,
size_t thread_count = 8, size_t timeout_seconds = 30);

bool startServer();
bool startServerAsync();
bool startServerImpl(bool is_async = true);
void stopServer();

int sendData(const std::string& target_address, pybind11::handle data);
pybind11::object sendDataAsync(std::string& target_address,
pybind11::handle data,
pybind11::handle loop);

int sendTensor(const std::string& target_address, pybind11::handle tensor);
pybind11::object sendTensorAsync(std::string& target_address,
pybind11::handle tensor,
pybind11::handle loop);

void setDataReceiveCallback(pybind11::function callback);
void setTensorReceiveCallback(pybind11::function callback);

void handleIncomingData(std::string_view source_address,
std::string_view data);
void handleIncomingTensor(std::string_view source_address,
std::string_view data,
const std::vector<size_t>& shape,
std::string_view dtype);

private:
std::unique_ptr<Impl> impl_;
};

std::unique_ptr<CoroRPCInterface> createRPCClient(uint64_t local_rank = 0,
uint64_t world_size = 1);
std::unique_ptr<CoroRPCInterface> createRPCServer(uint64_t local_rank = 0,
uint64_t world_size = 1);

} // namespace mooncake

namespace pybind11 {
class module_;
}
void bind_coro_rpc_interface(pybind11::module_& m);
22 changes: 21 additions & 1 deletion mooncake-transfer-engine/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@ file(GLOB ENGINE_SOURCES "*.cpp")
add_subdirectory(common)
add_subdirectory(transport)

# Find Python for pybind11 support
find_package(Python3 COMPONENTS Interpreter Development REQUIRED)
find_package(pybind11 QUIET)
if(NOT pybind11_FOUND)
execute_process(
COMMAND ${Python3_EXECUTABLE} -m pybind11 --cmakedir
OUTPUT_VARIABLE pybind11_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
RESULT_VARIABLE pybind11_RESULT
)
if(pybind11_RESULT EQUAL 0)
find_package(pybind11 REQUIRED PATHS ${pybind11_DIR})
endif()
endif()

SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)

add_library(transfer_engine ${ENGINE_SOURCES} $<TARGET_OBJECTS:transport>)
Expand Down Expand Up @@ -36,9 +51,14 @@ endif()
target_link_libraries(
transfer_engine
PUBLIC
base transport rdma_transport ibverbs glog::glog gflags::gflags pthread JsonCpp::JsonCpp numa yalantinglibs::yalantinglibs
base transport rdma_transport ibverbs glog::glog gflags::gflags pthread JsonCpp::JsonCpp numa yalantinglibs::yalantinglibs ${Python3_LIBRARIES}
)

# Add pybind11 headers if pybind11 is available
if(TARGET pybind11::headers)
target_link_libraries(transfer_engine PUBLIC pybind11::headers)
endif()

if (USE_CUDA)
target_include_directories(transfer_engine PRIVATE /usr/local/cuda/include)
target_link_libraries(transfer_engine PUBLIC cuda cudart rt)
Expand Down
Loading
Loading