Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
BasedOnStyle: Google
IndentWidth: 4
TabWidth: 4
UseTab: Never
ColumnLimit: 80
SortIncludes: false
...
33 changes: 31 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,10 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Build Docker image
run: docker build -t mooncake-app .

Expand All @@ -327,3 +327,32 @@ jobs:
uses: actions/checkout@v4
- name: Spell Check Repo
uses: crate-ci/[email protected]

clang-format:
name: Check code format
runs-on: ubuntu-22.04
steps:
- name: Checkout Actions Repository
uses: actions/checkout@v4

- name: Install clang-format 20
run: |
wget https://apt.llvm.org/llvm.sh
chmod +x llvm.sh
sudo ./llvm.sh 20
sudo apt-get install -y clang-format-20

- name: run clang-format-20
run: |
# the old clang-format-14 which is the defaut version in ubuntu 22.04,
# is inconsistent with clang-format-20.
ls -lh /usr/bin/clang-format*
clang-format --version
clang-format-20 --version
# skip cachelib_memory_allocator
find . -type f \( -name "*.h" -o -name "*.cpp" \) | grep -v cachelib_memory_allocator | xargs clang-format-20 -style=file -i
if ! git diff --exit-code; then
echo "Please follow the .clang-format code style, try clang-format -i FILENAME"
exit 1
fi
shell: bash
130 changes: 71 additions & 59 deletions mooncake-integration/transfer_engine/transfer_engine_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,36 +223,34 @@ int TransferEnginePy::transferSyncRead(const char *target_hostname,
TransferOpcode::READ);
}

int TransferEnginePy::batchTransferSyncWrite(const char *target_hostname,
std::vector<uintptr_t> buffers,
std::vector<uintptr_t> peer_buffer_addresses,
std::vector<size_t> lengths) {
return batchTransferSync(target_hostname, buffers, peer_buffer_addresses, lengths,
TransferOpcode::WRITE);
int TransferEnginePy::batchTransferSyncWrite(
const char *target_hostname, std::vector<uintptr_t> buffers,
std::vector<uintptr_t> peer_buffer_addresses, std::vector<size_t> lengths) {
return batchTransferSync(target_hostname, buffers, peer_buffer_addresses,
lengths, TransferOpcode::WRITE);
}

int TransferEnginePy::batchTransferSyncRead(const char *target_hostname,
std::vector<uintptr_t> buffers,
std::vector<uintptr_t> peer_buffer_addresses,
std::vector<size_t> lengths) {
return batchTransferSync(target_hostname, buffers, peer_buffer_addresses, lengths,
TransferOpcode::READ);
int TransferEnginePy::batchTransferSyncRead(
const char *target_hostname, std::vector<uintptr_t> buffers,
std::vector<uintptr_t> peer_buffer_addresses, std::vector<size_t> lengths) {
return batchTransferSync(target_hostname, buffers, peer_buffer_addresses,
lengths, TransferOpcode::READ);
}

batch_id_t TransferEnginePy::batchTransferAsyncWrite(const char *target_hostname,
const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths) {
return batchTransferAsync(target_hostname, buffers, peer_buffer_addresses, lengths,
TransferOpcode::WRITE);
batch_id_t TransferEnginePy::batchTransferAsyncWrite(
const char *target_hostname, const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths) {
return batchTransferAsync(target_hostname, buffers, peer_buffer_addresses,
lengths, TransferOpcode::WRITE);
}

batch_id_t TransferEnginePy::batchTransferAsyncRead(const char *target_hostname,
const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths) {
return batchTransferAsync(target_hostname, buffers, peer_buffer_addresses, lengths,
TransferOpcode::READ);
batch_id_t TransferEnginePy::batchTransferAsyncRead(
const char *target_hostname, const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths) {
return batchTransferAsync(target_hostname, buffers, peer_buffer_addresses,
lengths, TransferOpcode::READ);
}

int TransferEnginePy::transferSync(const char *target_hostname,
Expand Down Expand Up @@ -328,11 +326,10 @@ int TransferEnginePy::transferSync(const char *target_hostname,
return -1;
}

int TransferEnginePy::batchTransferSync(const char *target_hostname,
std::vector<uintptr_t> buffers,
std::vector<uintptr_t> peer_buffer_addresses,
std::vector<size_t> lengths,
TransferOpcode opcode) {
int TransferEnginePy::batchTransferSync(
const char *target_hostname, std::vector<uintptr_t> buffers,
std::vector<uintptr_t> peer_buffer_addresses, std::vector<size_t> lengths,
TransferOpcode opcode) {
pybind11::gil_scoped_release release;
Transport::SegmentHandle handle;
{
Expand All @@ -346,8 +343,10 @@ int TransferEnginePy::batchTransferSync(const char *target_hostname,
}
}

if (buffers.size() != peer_buffer_addresses.size() || buffers.size() != lengths.size()) {
LOG(ERROR) << "buffers, peer_buffer_addresses and lengths have different size";
if (buffers.size() != peer_buffer_addresses.size() ||
buffers.size() != lengths.size()) {
LOG(ERROR)
<< "buffers, peer_buffer_addresses and lengths have different size";
return -1;
}

Expand Down Expand Up @@ -397,12 +396,14 @@ int TransferEnginePy::batchTransferSync(const char *target_hostname,
completed = true;
}
auto current_ts = getCurrentTimeInNano();
const int64_t timeout = transfer_timeout_nsec_ + total_length; // 1GiB per second
const int64_t timeout =
transfer_timeout_nsec_ + total_length; // 1GiB per second
if (current_ts - start_ts > timeout) {
LOG(INFO) << "Sync batch data transfer timeout after "
LOG(INFO) << "Sync batch data transfer timeout after "
<< current_ts - start_ts << "ns";
// TODO: as @doujiang24 mentioned, early free(while there are still waiting tasks)
// the batch_id may fail and cause memory leak(a known issue).
// TODO: as @doujiang24 mentioned, early free(while there are
// still waiting tasks) the batch_id may fail and cause memory
// leak(a known issue).
if (!already_freed) {
engine_->freeBatchID(batch_id);
}
Expand All @@ -413,11 +414,10 @@ int TransferEnginePy::batchTransferSync(const char *target_hostname,
return -1;
}

batch_id_t TransferEnginePy::batchTransferAsync(const char *target_hostname,
const std::vector<uintptr_t>& buffers,
const std::vector<uintptr_t>& peer_buffer_addresses,
const std::vector<size_t>& lengths,
TransferOpcode opcode) {
batch_id_t TransferEnginePy::batchTransferAsync(
const char *target_hostname, const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths, TransferOpcode opcode) {
pybind11::gil_scoped_release release;
Transport::SegmentHandle handle;
{
Expand All @@ -431,8 +431,10 @@ batch_id_t TransferEnginePy::batchTransferAsync(const char *target_hostname,
}
}

if (buffers.size() != peer_buffer_addresses.size() || buffers.size() != lengths.size()) {
LOG(ERROR) << "buffers, peer_buffer_addresses and lengths have different size";
if (buffers.size() != peer_buffer_addresses.size() ||
buffers.size() != lengths.size()) {
LOG(ERROR)
<< "buffers, peer_buffer_addresses and lengths have different size";
return 0;
}

Expand Down Expand Up @@ -474,7 +476,8 @@ batch_id_t TransferEnginePy::batchTransferAsync(const char *target_hostname,
return batch_id;
}

int TransferEnginePy::getBatchTransferStatus(const std::vector<batch_id_t>& batch_ids) {
int TransferEnginePy::getBatchTransferStatus(
const std::vector<batch_id_t> &batch_ids) {
pybind11::gil_scoped_release release;
TransferStatus status;
std::unordered_map<batch_id_t, int64_t> timeout_table{};
Expand All @@ -494,7 +497,7 @@ int TransferEnginePy::getBatchTransferStatus(const std::vector<batch_id_t>& batc
}

bool failed_or_timeout = false;
std::unordered_set<batch_id_t> remove_ids {};
std::unordered_set<batch_id_t> remove_ids{};
while (!timeout_table.empty() && !failed_or_timeout) {
for (auto &entry : timeout_table) {
auto batch_desc = reinterpret_cast<BatchDesc *>(entry.first);
Expand All @@ -511,8 +514,8 @@ int TransferEnginePy::getBatchTransferStatus(const std::vector<batch_id_t>& batc
}
auto current_ts = getCurrentTimeInNano();
if (current_ts - batch_desc->start_timestamp > entry.second) {
LOG(INFO) << "Sync batch data transfer timeout after "
<< current_ts - batch_desc->start_timestamp << "ns";
LOG(INFO) << "Sync batch data transfer timeout after "
<< current_ts - batch_desc->start_timestamp << "ns";
failed_or_timeout = true;
}
}
Expand Down Expand Up @@ -582,22 +585,24 @@ int TransferEnginePy::transferCheckStatus(batch_id_t batch_id) {
}
}

int TransferEnginePy::batchRegisterMemory(std::vector<uintptr_t> buffer_addresses,
std::vector<size_t> capacities) {
int TransferEnginePy::batchRegisterMemory(
std::vector<uintptr_t> buffer_addresses, std::vector<size_t> capacities) {
pybind11::gil_scoped_release release;
auto batch_size = buffer_addresses.size();
std::vector<BufferEntry> buffers;
for (size_t i = 0; i < batch_size; i ++ ) {
buffers.push_back(BufferEntry{(void *)buffer_addresses[i], capacities[i]});
for (size_t i = 0; i < batch_size; i++) {
buffers.push_back(
BufferEntry{(void *)buffer_addresses[i], capacities[i]});
}
return engine_->registerLocalMemoryBatch(buffers, kWildcardLocation);
}

int TransferEnginePy::batchUnregisterMemory(std::vector<uintptr_t> buffer_addresses) {
int TransferEnginePy::batchUnregisterMemory(
std::vector<uintptr_t> buffer_addresses) {
pybind11::gil_scoped_release release;
auto batch_size = buffer_addresses.size();
std::vector<void *> buffers;
for (size_t i = 0; i < batch_size; i ++ ) {
for (size_t i = 0; i < batch_size; i++) {
buffers.push_back(reinterpret_cast<char *>(buffer_addresses[i]));
}
return engine_->unregisterLocalMemoryBatch(buffers);
Expand Down Expand Up @@ -641,14 +646,19 @@ PYBIND11_MODULE(engine, m) {
.def("free_managed_buffer", &TransferEnginePy::freeManagedBuffer)
.def("transfer_sync_write", &TransferEnginePy::transferSyncWrite)
.def("transfer_sync_read", &TransferEnginePy::transferSyncRead)
.def("batch_transfer_sync_write", &TransferEnginePy::batchTransferSyncWrite)
.def("batch_transfer_sync_read", &TransferEnginePy::batchTransferSyncRead)
.def("batch_transfer_async_write", &TransferEnginePy::batchTransferAsyncWrite)
.def("batch_transfer_async_read", &TransferEnginePy::batchTransferAsyncRead)
.def("batch_transfer_sync_write",
&TransferEnginePy::batchTransferSyncWrite)
.def("batch_transfer_sync_read",
&TransferEnginePy::batchTransferSyncRead)
.def("batch_transfer_async_write",
&TransferEnginePy::batchTransferAsyncWrite)
.def("batch_transfer_async_read",
&TransferEnginePy::batchTransferAsyncRead)
.def("transfer_sync", &TransferEnginePy::transferSync)
.def("batch_transfer_sync", &TransferEnginePy::batchTransferSync)
.def("batch_transfer_async", &TransferEnginePy::batchTransferAsync)
.def("get_batch_transfer_status", &TransferEnginePy::getBatchTransferStatus)
.def("get_batch_transfer_status",
&TransferEnginePy::getBatchTransferStatus)
.def("transfer_submit_write",
&TransferEnginePy::transferSubmitWrite)
.def("transfer_check_status",
Expand All @@ -658,8 +668,10 @@ PYBIND11_MODULE(engine, m) {
&TransferEnginePy::readBytesFromBuffer)
.def("register_memory", &TransferEnginePy::registerMemory)
.def("unregister_memory", &TransferEnginePy::unregisterMemory)
.def("batch_register_memory", &TransferEnginePy::batchRegisterMemory)
.def("batch_unregister_memory", &TransferEnginePy::batchUnregisterMemory)
.def("batch_register_memory",
&TransferEnginePy::batchRegisterMemory)
.def("batch_unregister_memory",
&TransferEnginePy::batchUnregisterMemory)
.def("get_first_buffer_address",
&TransferEnginePy::getFirstBufferAddress);

Expand Down
49 changes: 25 additions & 24 deletions mooncake-integration/transfer_engine/transfer_engine_py.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,16 @@ class TransferEnginePy {
int transferSyncWrite(const char *target_hostname, uintptr_t buffer,
uintptr_t peer_buffer_address, size_t length);

batch_id_t transferSubmitWrite(const char *target_hostname, uintptr_t buffer,
uintptr_t peer_buffer_address, size_t length);
batch_id_t transferSubmitWrite(const char *target_hostname,
uintptr_t buffer,
uintptr_t peer_buffer_address,
size_t length);

int transferCheckStatus(batch_id_t batch_id);

int transferSyncRead(const char *target_hostname, uintptr_t buffer,
uintptr_t peer_buffer_address, size_t length);

int batchTransferSyncWrite(const char *target_hostname,
std::vector<uintptr_t> buffers,
std::vector<uintptr_t> peer_buffer_addresses,
Expand All @@ -86,35 +88,33 @@ class TransferEnginePy {
std::vector<uintptr_t> peer_buffer_addresses,
std::vector<size_t> lengths);

batch_id_t batchTransferAsyncWrite(const char *target_hostname,
const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths);
batch_id_t batchTransferAsyncWrite(
const char *target_hostname, const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths);

batch_id_t batchTransferAsyncRead(const char *target_hostname,
const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths);
batch_id_t batchTransferAsyncRead(
const char *target_hostname, const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths);

int transferSync(const char *target_hostname, uintptr_t buffer,
uintptr_t peer_buffer_address, size_t length,
TransferOpcode opcode);
// Known issue: in a few inference engines and benchmarks, accuracy
// may be affected when using the batchTransferSync API. We currently

// Known issue: in a few inference engines and benchmarks, accuracy
// may be affected when using the batchTransferSync API. We currently
// found this issue only in multi-node NVLink transfers.
int batchTransferSync(const char *target_hostname,
std::vector<uintptr_t> buffers,
std::vector<uintptr_t> peer_buffer_addresses,
std::vector<size_t> lengths,
TransferOpcode opcode);

batch_id_t batchTransferAsync(const char *target_hostname,
const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths,
TransferOpcode opcode);

std::vector<size_t> lengths, TransferOpcode opcode);

batch_id_t batchTransferAsync(
const char *target_hostname, const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peer_buffer_addresses,
const std::vector<size_t> &lengths, TransferOpcode opcode);

int getBatchTransferStatus(const std::vector<batch_id_t> &batch_ids);

uintptr_t getFirstBufferAddress(const std::string &segment_name);
Expand All @@ -138,7 +138,8 @@ class TransferEnginePy {
// must be called before TransferEnginePy::~TransferEnginePy()
int unregisterMemory(uintptr_t buffer_addr);

int batchRegisterMemory(std::vector<uintptr_t> buffer_addresses, std::vector<size_t> capacities);
int batchRegisterMemory(std::vector<uintptr_t> buffer_addresses,
std::vector<size_t> capacities);

int batchUnregisterMemory(std::vector<uintptr_t> buffer_addresses);

Expand Down
Loading
Loading