diff --git a/mooncake-integration/transfer_engine/transfer_engine_py.cpp b/mooncake-integration/transfer_engine/transfer_engine_py.cpp index ca02559e7..ef6d0cbfb 100644 --- a/mooncake-integration/transfer_engine/transfer_engine_py.cpp +++ b/mooncake-integration/transfer_engine/transfer_engine_py.cpp @@ -207,6 +207,15 @@ int TransferEnginePy::freeManagedBuffer(uintptr_t buffer_addr, size_t length) { return 0; } +int TransferEnginePy::freeRemoteSegment(const char *target_hostname) { + std::lock_guard guard(mutex_); + if (handle_map_.count(target_hostname)) { + engine_->closeSegment(handle_map_[target_hostname]); + handle_map_.erase(target_hostname); + } + return 0; +} + int TransferEnginePy::transferSyncWrite(const char *target_hostname, uintptr_t buffer, uintptr_t peer_buffer_address, @@ -684,6 +693,7 @@ PYBIND11_MODULE(engine, m) { .def("batch_unregister_memory", &TransferEnginePy::batchUnregisterMemory) .def("get_local_topology", &TransferEnginePy::getLocalTopology) + .def("free_remote_segment", &TransferEnginePy::freeRemoteSegment) .def("get_first_buffer_address", &TransferEnginePy::getFirstBufferAddress); diff --git a/mooncake-integration/transfer_engine/transfer_engine_py.h b/mooncake-integration/transfer_engine/transfer_engine_py.h index 0c2f83eee..6ba429126 100644 --- a/mooncake-integration/transfer_engine/transfer_engine_py.h +++ b/mooncake-integration/transfer_engine/transfer_engine_py.h @@ -143,6 +143,8 @@ class TransferEnginePy { int batchUnregisterMemory(std::vector buffer_addresses); + int markDead(const char *target_hostname); + std::string getLocalTopology(); private: diff --git a/mooncake-transfer-engine/include/transport/ascend_transport/ascend_direct_transport/ascend_direct_transport.h b/mooncake-transfer-engine/include/transport/ascend_transport/ascend_direct_transport/ascend_direct_transport.h index 93c9b9c0d..a736bce48 100644 --- a/mooncake-transfer-engine/include/transport/ascend_transport/ascend_direct_transport/ascend_direct_transport.h +++ b/mooncake-transfer-engine/include/transport/ascend_transport/ascend_direct_transport/ascend_direct_transport.h @@ -81,6 +81,8 @@ class AscendDirectTransport : public Transport { void processSliceList(const std::vector &slice_list); + int closeSegment(Transport::SegmentHandle handle) override; + private: int InitAdxlEngine(); diff --git a/mooncake-transfer-engine/include/transport/ascend_transport/hccl_transport/hccl_transport.h b/mooncake-transfer-engine/include/transport/ascend_transport/hccl_transport/hccl_transport.h index 94a09787f..c566bf4c4 100644 --- a/mooncake-transfer-engine/include/transport/ascend_transport/hccl_transport/hccl_transport.h +++ b/mooncake-transfer-engine/include/transport/ascend_transport/hccl_transport/hccl_transport.h @@ -77,6 +77,8 @@ class HcclTransport : public Transport { int unregisterLocalMemoryBatch( const std::vector &addr_list) override; + int closeSegment(Transport::SegmentHandle handle) override; + private: int allocateLocalSegmentID(); diff --git a/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h b/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h index db06d64c1..839c6983f 100644 --- a/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h +++ b/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h @@ -84,6 +84,8 @@ class CxlTransport : public Transport { bool validateMemoryBounds(void *dest, void *src, size_t size); + int closeSegment(Transport::SegmentHandle handle) override; + private: void *cxl_base_addr; size_t cxl_dev_size; diff --git a/mooncake-transfer-engine/include/transport/nvlink_transport/nvlink_transport.h b/mooncake-transfer-engine/include/transport/nvlink_transport/nvlink_transport.h index eddac0ac2..bff81b492 100644 --- a/mooncake-transfer-engine/include/transport/nvlink_transport/nvlink_transport.h +++ b/mooncake-transfer-engine/include/transport/nvlink_transport/nvlink_transport.h @@ -72,6 +72,8 @@ class NvlinkTransport : public Transport { const char* getName() const override { return "nvlink"; } + int closeSegment(Transport::SegmentHandle handle) override; + private: std::atomic_bool running_; diff --git a/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h b/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h index 120e25db6..a946efa16 100644 --- a/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h +++ b/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h @@ -59,6 +59,8 @@ class NVMeoFTransport : public Transport { uint64_t target_start, TransferRequest::OpCode op, TransferTask &task, const char *file_path); + int closeSegment(Transport::SegmentHandle handle) override; + private: void startTransfer(Slice *slice); diff --git a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h index 4a411e3b2..6afd35d48 100644 --- a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h +++ b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h @@ -71,6 +71,8 @@ class RdmaTransport : public Transport { int unregisterLocalMemoryBatch( const std::vector &addr_list) override; + int closeSegment(Transport::SegmentHandle handle) override; + // TRANSFER Status submitTransfer(BatchID batch_id, diff --git a/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h b/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h index adf96c241..fec20a866 100644 --- a/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h +++ b/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h @@ -55,6 +55,8 @@ class TcpTransport : public Transport { Status getTransferStatus(BatchID batch_id, size_t task_id, TransferStatus &status) override; + int closeSegment(Transport::SegmentHandle handle) override; + private: int install(std::string &local_server_name, std::shared_ptr meta, diff --git a/mooncake-transfer-engine/include/transport/transport.h b/mooncake-transfer-engine/include/transport/transport.h index 29c2aadb4..cf96cd83c 100644 --- a/mooncake-transfer-engine/include/transport/transport.h +++ b/mooncake-transfer-engine/include/transport/transport.h @@ -245,6 +245,11 @@ class Transport { virtual Status getTransferStatus(BatchID batch_id, size_t task_id, TransferStatus &status) = 0; + /// @brief Close a segment handle. + /// @param handle The segment handle to close. + /// @return 0 on success, -1 on failure. + virtual int closeSegment(Transport::SegmentHandle handle) = 0; + std::shared_ptr &meta() { return metadata_; } struct BufferEntry { diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index 384b7d35f..39cf0f672 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -430,6 +430,14 @@ int TransferEngine::unregisterLocalMemoryBatch( return 0; } +int TransferEngine::closeSegment(Transport::SegmentHandle handle) { + for (auto &transport : multi_transports_->listTransports()) { + int ret = transport->closeSegment(handle); + if (ret < 0) return ret; + } + return 0; +} + #ifdef WITH_METRICS // Helper function to convert string to lowercase for case-insensitive // comparison diff --git a/mooncake-transfer-engine/src/transport/ascend_transport/ascend_direct_transport/ascend_direct_transport.cpp b/mooncake-transfer-engine/src/transport/ascend_transport/ascend_direct_transport/ascend_direct_transport.cpp index 5dfe0f388..55e41d04e 100644 --- a/mooncake-transfer-engine/src/transport/ascend_transport/ascend_direct_transport/ascend_direct_transport.cpp +++ b/mooncake-transfer-engine/src/transport/ascend_transport/ascend_direct_transport/ascend_direct_transport.cpp @@ -432,6 +432,10 @@ void AscendDirectTransport::workerThread() { LOG(INFO) << "AscendDirectTransport worker thread stopped"; } +int AscendDirectTransport::closeSegment(Transport::SegmentHandle handle) { + return 0; +} + void AscendDirectTransport::processSliceList( const std::vector &slice_list) { if (slice_list.empty()) { diff --git a/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/hccl_transport.cpp b/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/hccl_transport.cpp index b1e1b09a6..236d8de1d 100644 --- a/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/hccl_transport.cpp +++ b/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/hccl_transport.cpp @@ -495,6 +495,10 @@ Status HcclTransport::getTransferStatus(BatchID batch_id, size_t task_id, return Status::OK(); } +int HcclTransport::closeSegment(Transport::SegmentHandle handle) { + return 0; +} + int HcclTransport::registerLocalMemory(void *addr, size_t length, const std::string &location, bool remote_accessible, diff --git a/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp b/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp index f08c5720c..3e5fa2ada 100644 --- a/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp +++ b/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp @@ -261,6 +261,10 @@ int CxlTransport::unregisterLocalMemory(void *addr, bool update_metadata) { return metadata_->removeLocalMemoryBuffer(addr, update_metadata); } +int CxlTransport::closeSegment(Transport::SegmentHandle handle) { + return 0; +} + int CxlTransport::registerLocalMemoryBatch( const std::vector &buffer_list, const std::string &location) { diff --git a/mooncake-transfer-engine/src/transport/nvlink_transport/nvlink_transport.cpp b/mooncake-transfer-engine/src/transport/nvlink_transport/nvlink_transport.cpp index 282fdf72f..b914e74a9 100644 --- a/mooncake-transfer-engine/src/transport/nvlink_transport/nvlink_transport.cpp +++ b/mooncake-transfer-engine/src/transport/nvlink_transport/nvlink_transport.cpp @@ -542,6 +542,20 @@ int NvlinkTransport::unregisterLocalMemoryBatch( return metadata_->updateLocalSegmentDesc(); } +int NvlinkTransport::closeSegment(Transport::SegmentHandle handle) { + // close all opened ipc handles for this SegmentHandle. + for (auto &entry : remap_entries_) { + if (entry.first.first == handle) { + cudaError_t err = cudaIpcCloseMemHandle(entry.second.shm_addr); + if (err != cudaSuccess) { + LOG(ERROR) << "NvlinkTransport: cudaIpcCloseMemHandle failed: " + << cudaGetErrorString(err); + } + } + } + return 0; +} + void *NvlinkTransport::allocatePinnedLocalMemory(size_t size) { if (!supportFabricMem()) { void *ptr = nullptr; diff --git a/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp b/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp index 8f51aeb5c..52053db28 100644 --- a/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp +++ b/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp @@ -239,6 +239,10 @@ int NVMeoFTransport::unregisterLocalMemory(void *addr, bool update_metadata) { return 0; } +int NVMeoFTransport::closeSegment(Transport::SegmentHandle handle) { + return 0; +} + void NVMeoFTransport::addSliceToTask(void *source_addr, uint64_t slice_len, uint64_t target_start, TransferRequest::OpCode op, diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp index faf80f14b..492cca24c 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp @@ -407,6 +407,10 @@ Status RdmaTransport::getTransferStatus(BatchID batch_id, size_t task_id, return Status::OK(); } +int RdmaTransport::closeSegment(Transport::SegmentHandle handle) { + return 0; +} + RdmaTransport::SegmentID RdmaTransport::getSegmentID( const std::string &segment_name) { return metadata_->getSegmentID(segment_name); diff --git a/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp b/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp index 71c2176ed..eff2c319e 100644 --- a/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp +++ b/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp @@ -318,6 +318,10 @@ int TcpTransport::allocateLocalSegmentID(int tcp_data_port) { return 0; } +int TcpTransport::closeSegment(Transport::SegmentHandle handle) { + return 0; +} + int TcpTransport::registerLocalMemory(void *addr, size_t length, const std::string &location, bool remote_accessible,