From 70c1d4de68be79bfbcd772ca3828ac4cb3bc78a1 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 4 Nov 2025 19:57:34 +0000 Subject: [PATCH 01/12] add ipc cache --- src/include/registered_memory.hpp | 1 + src/registered_memory.cc | 58 ++++++++++++++++++++++++++----- 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/src/include/registered_memory.hpp b/src/include/registered_memory.hpp index 77cbf09b2..3500fbbfa 100644 --- a/src/include/registered_memory.hpp +++ b/src/include/registered_memory.hpp @@ -55,6 +55,7 @@ struct RegisteredMemory::Impl { bool isCuMemMapAlloc; TransportFlags transports; std::vector transportInfos; + std::shared_ptr peerHandle; // Only used for IB transport std::unordered_map> ibMrMap; diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 3cca7e49f..b96598449 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -7,6 +7,7 @@ #include #include +#include #include #include "api.h" @@ -28,7 +29,22 @@ } \ } while (false) +namespace std { +template <> +struct hash { + size_t operator()(const cudaIpcMemHandle_t& handle) const { + std::string_view view(handle.reserved, sizeof(handle.reserved)); + return std::hash{}(view); + } +}; +} // namespace std + +inline bool operator==(const cudaIpcMemHandle_t& lhs, const cudaIpcMemHandle_t& rhs) { + return std::memcmp(lhs.reserved, rhs.reserved, sizeof(lhs.reserved)) == 0; +} + namespace { + CUmemAllocationHandleType getNvlsMemHandleType() { #if (CUDA_NVLS_API_AVAILABLE) if (mscclpp::detail::nvlsCompatibleMemHandleType & CU_MEM_HANDLE_TYPE_FABRIC) { @@ -41,6 +57,37 @@ CUmemAllocationHandleType getNvlsMemHandleType() { #endif } +std::shared_ptr getPeerMemoryHandle(cudaIpcMemHandle_t ipcHandle) { + void* addr; + auto deleter = [](void* p) { + cudaError_t err = cudaIpcCloseMemHandle(p); + if (err != cudaSuccess) { + WARN("Failed to close CUDA IPC handle at pointer %p: %s", p, cudaGetErrorString(err)); + } else { + INFO(MSCCLPP_P2P, "Closed CUDA IPC handle at pointer %p", p); + } + }; +#if defined(__HIP_PLATFORM_AMD__) + static std::unordered_map> peerMemoryHandleMap; + std::mutex mutex; + std::lock_guard lock(mutex); + auto it = peerMemoryHandleMap.find(ipcHandle); + if (it != peerMemoryHandleMap.end()) { + if (auto ptr = it->second.lock()) { + return ptr; + } + throw mscclpp::Error("Failed to get peer memory handle, may already be closed", mscclpp::ErrorCode::InvalidUsage); + } + MSCCLPP_CUDATHROW(cudaIpcOpenMemHandle(&addr, ipcHandle, cudaIpcMemLazyEnablePeerAccess)); + std::shared_ptr ptr = std::shared_ptr(addr, deleter); + peerMemoryHandleMap[ipcHandle] = ptr; + return ptr; +#else + MSCCLPP_CUDATHROW(cudaIpcOpenMemHandle(&addr, ipcHandle, cudaIpcMemLazyEnablePeerAccess)); + return std::shared_ptr(addr, deleter); +#endif +} + } // namespace namespace mscclpp { @@ -256,8 +303,8 @@ RegisteredMemory::Impl::Impl(const std::vector::const_iterator& begin, throw Error("Unexpected error", ErrorCode::InternalError); #endif // !(CUDA_NVLS_API_AVAILABLE) } else if (getHostHash() == this->hostHash) { - MSCCLPP_CUDATHROW(cudaIpcOpenMemHandle(&base, entry.cudaIpcBaseHandle, cudaIpcMemLazyEnablePeerAccess)); - this->data = static_cast(base) + entry.cudaIpcOffsetFromBase; + this->peerHandle = getPeerMemoryHandle(entry.cudaIpcBaseHandle); + this->data = static_cast(this->peerHandle.get()) + entry.cudaIpcOffsetFromBase; } } if (this->data != nullptr) { @@ -291,13 +338,6 @@ RegisteredMemory::Impl::~Impl() { MSCCLPP_CULOG_WARN(cuMemUnmap((CUdeviceptr)base, size)); MSCCLPP_CULOG_WARN(cuMemRelease(handle)); MSCCLPP_CULOG_WARN(cuMemAddressFree((CUdeviceptr)base, size)); - } else { - cudaError_t err = cudaIpcCloseMemHandle(base); - if (err != cudaSuccess) { - WARN("Failed to close CUDA IPC handle at pointer %p: %s", base, cudaGetErrorString(err)); - } else { - INFO(MSCCLPP_P2P, "Closed CUDA IPC handle at pointer %p", base); - } } data = nullptr; fileDesc = -1; From 1739f5a2145f6a8117526d3c4c295c6d0e9854df Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Wed, 5 Nov 2025 01:17:09 +0000 Subject: [PATCH 02/12] WIP --- src/registered_memory.cc | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/registered_memory.cc b/src/registered_memory.cc index b96598449..cfb746f25 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -69,17 +69,20 @@ std::shared_ptr getPeerMemoryHandle(cudaIpcMemHandle_t ipcHandle) { }; #if defined(__HIP_PLATFORM_AMD__) static std::unordered_map> peerMemoryHandleMap; - std::mutex mutex; + static std::mutex mutex; std::lock_guard lock(mutex); auto it = peerMemoryHandleMap.find(ipcHandle); if (it != peerMemoryHandleMap.end()) { if (auto ptr = it->second.lock()) { return ptr; } - throw mscclpp::Error("Failed to get peer memory handle, may already be closed", mscclpp::ErrorCode::InvalidUsage); } MSCCLPP_CUDATHROW(cudaIpcOpenMemHandle(&addr, ipcHandle, cudaIpcMemLazyEnablePeerAccess)); - std::shared_ptr ptr = std::shared_ptr(addr, deleter); + std::shared_ptr ptr = std::shared_ptr(addr, [ipcHandle, deleter](void* p) { + deleter(p); + std::lock_guard lock(mutex); + peerMemoryHandleMap.erase(ipcHandle); + }); peerMemoryHandleMap[ipcHandle] = ptr; return ptr; #else @@ -304,6 +307,9 @@ RegisteredMemory::Impl::Impl(const std::vector::const_iterator& begin, #endif // !(CUDA_NVLS_API_AVAILABLE) } else if (getHostHash() == this->hostHash) { this->peerHandle = getPeerMemoryHandle(entry.cudaIpcBaseHandle); + if (!this->peerHandle) { + throw Error("Failed to open CUDA IPC handle, may already be closed", ErrorCode::InvalidUsage); + } this->data = static_cast(this->peerHandle.get()) + entry.cudaIpcOffsetFromBase; } } From 4ebe37ebfbc181dbc98ba37b308b043d158e44b6 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Thu, 4 Dec 2025 11:36:32 -0800 Subject: [PATCH 03/12] Update src/registered_memory.cc Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/registered_memory.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/registered_memory.cc b/src/registered_memory.cc index cfb746f25..ef4dc9275 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -75,6 +75,8 @@ std::shared_ptr getPeerMemoryHandle(cudaIpcMemHandle_t ipcHandle) { if (it != peerMemoryHandleMap.end()) { if (auto ptr = it->second.lock()) { return ptr; + } else { + peerMemoryHandleMap.erase(it); } } MSCCLPP_CUDATHROW(cudaIpcOpenMemHandle(&addr, ipcHandle, cudaIpcMemLazyEnablePeerAccess)); From bf513b4d65a42666b82bb1fd7a1475d7b72732dc Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Thu, 4 Dec 2025 11:41:42 -0800 Subject: [PATCH 04/12] [WIP] Address feedback on handle cache implementation (#700) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [x] Move hash specialization and equality operator from std/global namespace to custom namespace - [x] Update unordered_map to use custom hash and equality as template parameters - [x] Add noexcept to equality operator - [x] Verify the changes build correctly - [x] Run code review and security checks --- ✨ Let Copilot coding agent [set things up for you](https://github.com/microsoft/mscclpp/issues/new?title=✨+Set+up+Copilot+instructions&body=Configure%20instructions%20for%20this%20repository%20as%20documented%20in%20%5BBest%20practices%20for%20Copilot%20coding%20agent%20in%20your%20repository%5D%28https://gh.io/copilot-coding-agent-tips%29%2E%0A%0A%3COnboard%20this%20repo%3E&assignees=copilot) — coding agent works faster and does higher quality work when set up for your repo. --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Binyang2014 <9415966+Binyang2014@users.noreply.github.com> Co-authored-by: Binyang Li --- src/registered_memory.cc | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/registered_memory.cc b/src/registered_memory.cc index ef4dc9275..7542d13c8 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -29,21 +29,22 @@ } \ } while (false) -namespace std { -template <> -struct hash { +namespace { + +// Custom hash and equality for cudaIpcMemHandle_t +struct CudaIpcMemHandleHash { size_t operator()(const cudaIpcMemHandle_t& handle) const { std::string_view view(handle.reserved, sizeof(handle.reserved)); return std::hash{}(view); } }; -} // namespace std -inline bool operator==(const cudaIpcMemHandle_t& lhs, const cudaIpcMemHandle_t& rhs) { - return std::memcmp(lhs.reserved, rhs.reserved, sizeof(lhs.reserved)) == 0; -} +struct CudaIpcMemHandleEqual { + bool operator()(const cudaIpcMemHandle_t& lhs, const cudaIpcMemHandle_t& rhs) const noexcept { + return std::memcmp(lhs.reserved, rhs.reserved, sizeof(lhs.reserved)) == 0; + } +}; -namespace { CUmemAllocationHandleType getNvlsMemHandleType() { #if (CUDA_NVLS_API_AVAILABLE) @@ -68,7 +69,8 @@ std::shared_ptr getPeerMemoryHandle(cudaIpcMemHandle_t ipcHandle) { } }; #if defined(__HIP_PLATFORM_AMD__) - static std::unordered_map> peerMemoryHandleMap; + static std::unordered_map, CudaIpcMemHandleHash, CudaIpcMemHandleEqual> + peerMemoryHandleMap; static std::mutex mutex; std::lock_guard lock(mutex); auto it = peerMemoryHandleMap.find(ipcHandle); From 21373259ec9e38fd42b2c80295e246d9c3718c7f Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Thu, 4 Dec 2025 19:43:41 +0000 Subject: [PATCH 05/12] WIP --- src/registered_memory.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 7542d13c8..e77f7f576 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -45,7 +45,6 @@ struct CudaIpcMemHandleEqual { } }; - CUmemAllocationHandleType getNvlsMemHandleType() { #if (CUDA_NVLS_API_AVAILABLE) if (mscclpp::detail::nvlsCompatibleMemHandleType & CU_MEM_HANDLE_TYPE_FABRIC) { From b1029b97fca46df0c078402d7c963252c0a2c2d0 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Thu, 4 Dec 2025 23:46:33 +0000 Subject: [PATCH 06/12] fix ut --- src/registered_memory.cc | 3 --- test/mp_unit/executor_tests.cc | 1 - 2 files changed, 4 deletions(-) diff --git a/src/registered_memory.cc b/src/registered_memory.cc index e77f7f576..c126211c3 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -310,9 +310,6 @@ RegisteredMemory::Impl::Impl(const std::vector::const_iterator& begin, #endif // !(CUDA_NVLS_API_AVAILABLE) } else if (getHostHash() == this->hostHash) { this->peerHandle = getPeerMemoryHandle(entry.cudaIpcBaseHandle); - if (!this->peerHandle) { - throw Error("Failed to open CUDA IPC handle, may already be closed", ErrorCode::InvalidUsage); - } this->data = static_cast(this->peerHandle.get()) + entry.cudaIpcOffsetFromBase; } } diff --git a/test/mp_unit/executor_tests.cc b/test/mp_unit/executor_tests.cc index 3df4422e3..d0040e966 100644 --- a/test/mp_unit/executor_tests.cc +++ b/test/mp_unit/executor_tests.cc @@ -49,7 +49,6 @@ void ExecutorTest::TearDown() { TEST_F(ExecutorTest, TwoNodesAllreduce) { if (gEnv->worldSize != 2 || gEnv->nRanksPerNode != 2) { - GTEST_SKIP() << "This test requires world size to be 2 and ranks per node to be 2"; return; } std::string executablePath = getExecutablePath(); From 09d6b7076e22179b9ef6078a9bd33c4cd1d4abde Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sat, 13 Dec 2025 09:35:44 +0000 Subject: [PATCH 07/12] fix ci --- python/mscclpp/utils.py | 2 +- python/test/conftest.py | 25 ++++++++++++++++++ python/test/mscclpp_mpi.py | 54 +++++++++++++------------------------- 3 files changed, 44 insertions(+), 37 deletions(-) create mode 100644 python/test/conftest.py diff --git a/python/mscclpp/utils.py b/python/mscclpp/utils.py index 08e3d4a9b..3a779a7a3 100644 --- a/python/mscclpp/utils.py +++ b/python/mscclpp/utils.py @@ -123,7 +123,7 @@ def _compile_cuda(self, source_file, output_file, std_version="c++17"): if self.macros: command += self.macros try: - subprocess.run(command, capture_output=True, text=True, check=True, bufsize=1) + subprocess.run(command, capture_output=True, text=True, check=True, bufsize=1, stdin=subprocess.DEVNULL) with open(f"{self._tempdir.name}/{output_file}", "rb") as f: return f.read() except subprocess.CalledProcessError as e: diff --git a/python/test/conftest.py b/python/test/conftest.py new file mode 100644 index 000000000..12871b3b7 --- /dev/null +++ b/python/test/conftest.py @@ -0,0 +1,25 @@ +import atexit +import mpi4py +import os +import sys + +mpi4py.rc.initialize = False +mpi4py.rc.finalize = True + +import cupy as cp +from mpi4py import MPI +import pytest + + +def pytest_configure(config): + """Initialize MPI before test collection.""" + if not MPI.Is_initialized(): + MPI.Init() + shm_comm = MPI.COMM_WORLD.Split_type(MPI.COMM_TYPE_SHARED, 0, MPI.INFO_NULL) + N_GPUS_PER_NODE = shm_comm.size + shm_comm.Free() + cp.cuda.Device(MPI.COMM_WORLD.rank % N_GPUS_PER_NODE).use() + + if MPI.COMM_WORLD.rank != 0: + sys.stdout = open(os.devnull, "w") + sys.stderr = open(os.devnull, "w") diff --git a/python/test/mscclpp_mpi.py b/python/test/mscclpp_mpi.py index 21a7fff44..4002fc529 100644 --- a/python/test/mscclpp_mpi.py +++ b/python/test/mscclpp_mpi.py @@ -1,15 +1,8 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -import atexit import logging -import cupy as cp -import mpi4py - -mpi4py.rc.initialize = False -mpi4py.rc.finalize = False - from mpi4py import MPI import pytest @@ -17,24 +10,7 @@ logging.basicConfig(level=logging.INFO) - -def init_mpi(): - if not MPI.Is_initialized(): - MPI.Init() - shm_comm = MPI.COMM_WORLD.Split_type(MPI.COMM_TYPE_SHARED, 0, MPI.INFO_NULL) - N_GPUS_PER_NODE = shm_comm.size - shm_comm.Free() - cp.cuda.Device(MPI.COMM_WORLD.rank % N_GPUS_PER_NODE).use() - - -# Define a function to finalize MPI -def finalize_mpi(): - if MPI.Is_initialized(): - MPI.Finalize() - - -# Register the function to be called on exit -atexit.register(finalize_mpi) +_mpi_group_cache = {} class MpiGroup: @@ -50,9 +26,17 @@ def __init__(self, ranks: list = []): @pytest.fixture def mpi_group(request: pytest.FixtureRequest): MPI.COMM_WORLD.barrier() - if request.param is None: - pytest.skip(f"Skip for rank {MPI.COMM_WORLD.rank}") - yield request.param + + mpi_group_obj = request.param + should_skip = mpi_group_obj.comm == MPI.COMM_NULL + + try: + if should_skip: + pytest.skip(f"Skip for rank {MPI.COMM_WORLD.rank}") + yield request.param + finally: + if MPI.Is_initialized() and not MPI.Is_finalized(): + MPI.COMM_WORLD.barrier() def parametrize_mpi_groups(*tuples: tuple): @@ -62,14 +46,12 @@ def decorator(func): if MPI.COMM_WORLD.size < group_size: logging.warning(f"MPI.COMM_WORLD.size < {group_size}, skip") continue - mpi_group = MpiGroup(list(range(group_size))) - if mpi_group.comm == MPI.COMM_NULL: - mpi_groups.append(None) - else: - mpi_groups.append(mpi_group) + ranks = list(range(group_size)) + ranks_key = tuple(ranks) + if ranks_key not in _mpi_group_cache: + _mpi_group_cache[ranks_key] = MpiGroup(ranks) + + mpi_groups.append(_mpi_group_cache[ranks_key]) return pytest.mark.parametrize("mpi_group", mpi_groups, indirect=True)(func) return decorator - - -init_mpi() From 01fcb329ef6610a7cb05961c0d7bae643103a33d Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sat, 13 Dec 2025 09:53:38 +0000 Subject: [PATCH 08/12] address comment --- python/test/conftest.py | 4 ++++ src/include/registered_memory.hpp | 2 +- src/registered_memory.cc | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/python/test/conftest.py b/python/test/conftest.py index 12871b3b7..b0ab0daed 100644 --- a/python/test/conftest.py +++ b/python/test/conftest.py @@ -1,3 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + import atexit import mpi4py import os @@ -20,6 +23,7 @@ def pytest_configure(config): shm_comm.Free() cp.cuda.Device(MPI.COMM_WORLD.rank % N_GPUS_PER_NODE).use() + # only print process with rank 0 to avoid bad fd issue if MPI.COMM_WORLD.rank != 0: sys.stdout = open(os.devnull, "w") sys.stderr = open(os.devnull, "w") diff --git a/src/include/registered_memory.hpp b/src/include/registered_memory.hpp index 3500fbbfa..4d435f530 100644 --- a/src/include/registered_memory.hpp +++ b/src/include/registered_memory.hpp @@ -55,7 +55,7 @@ struct RegisteredMemory::Impl { bool isCuMemMapAlloc; TransportFlags transports; std::vector transportInfos; - std::shared_ptr peerHandle; + std::shared_ptr peerMemHandle; // Only used for IB transport std::unordered_map> ibMrMap; diff --git a/src/registered_memory.cc b/src/registered_memory.cc index c126211c3..1c03fbc0b 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -309,8 +309,8 @@ RegisteredMemory::Impl::Impl(const std::vector::const_iterator& begin, throw Error("Unexpected error", ErrorCode::InternalError); #endif // !(CUDA_NVLS_API_AVAILABLE) } else if (getHostHash() == this->hostHash) { - this->peerHandle = getPeerMemoryHandle(entry.cudaIpcBaseHandle); - this->data = static_cast(this->peerHandle.get()) + entry.cudaIpcOffsetFromBase; + this->peerMemHandle = getPeerMemoryHandle(entry.cudaIpcBaseHandle); + this->data = static_cast(this->peerMemHandle.get()) + entry.cudaIpcOffsetFromBase; } } if (this->data != nullptr) { From 4acf3a90584e63794274b494919c8b6fc40a27b3 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sat, 13 Dec 2025 13:52:31 +0000 Subject: [PATCH 09/12] update --- test/mp_unit/executor_tests.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/mp_unit/executor_tests.cc b/test/mp_unit/executor_tests.cc index d0040e966..a903ed08d 100644 --- a/test/mp_unit/executor_tests.cc +++ b/test/mp_unit/executor_tests.cc @@ -21,6 +21,9 @@ std::string getExecutablePath() { } // namespace void ExecutorTest::SetUp() { + if (gEnv->worldSize != 2 || gEnv->nRanksPerNode != 2) { + GTEST_SKIP() << "This test requires world size to be 2 and ranks per node to be 2"; + } MultiProcessTest::SetUp(); MSCCLPP_CUDATHROW(cudaSetDevice(rankToLocalRank(gEnv->rank))); @@ -43,14 +46,10 @@ void ExecutorTest::TearDown() { NpKit::Dump(npkitDumpDir); NpKit::Shutdown(); } - executor.reset(); MultiProcessTest::TearDown(); } TEST_F(ExecutorTest, TwoNodesAllreduce) { - if (gEnv->worldSize != 2 || gEnv->nRanksPerNode != 2) { - return; - } std::string executablePath = getExecutablePath(); std::filesystem::path path = executablePath; std::filesystem::path executionFilesPath = From e283c5de2e9fe38991d3afaa5df8bcf55b80a243 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Mon, 15 Dec 2025 09:28:10 +0000 Subject: [PATCH 10/12] update for log --- src/include/logger.hpp | 2 +- src/registered_memory.cc | 38 ++++++++++++++++++++------------------ 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/include/logger.hpp b/src/include/logger.hpp index 67a5fcdc6..87281f875 100644 --- a/src/include/logger.hpp +++ b/src/include/logger.hpp @@ -21,7 +21,7 @@ namespace mscclpp { typedef enum : unsigned int { NONE = 0, DEBUG, INFO, WARN, ERROR } LogLevel; -typedef enum : std::size_t { ENV = 0, NET, CONN, EXEC, NCCL, COUNT } LogSubsys; +typedef enum : std::size_t { ENV = 0, NET, CONN, P2P, EXEC, NCCL, COUNT } LogSubsys; namespace detail { diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 1c03fbc0b..1308db641 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -12,21 +12,21 @@ #include "api.h" #include "context.hpp" -#include "debug.h" +#include "logger.hpp" #include "serialization.hpp" #include "unix_socket.hpp" #include "utils_internal.hpp" -#define MSCCLPP_CULOG_WARN(cmd) \ - do { \ - CUresult err = cmd; \ - if (err != CUDA_SUCCESS) { \ - const char* errStr; \ - if (cuGetErrorString(err, &errStr) != CUDA_SUCCESS) { \ - errStr = "failed to get error string"; \ - } \ - WARN("Call to " #cmd " failed, error is %s", errStr); \ - } \ +#define MSCCLPP_CULOG_WARN(cmd) \ + do { \ + CUresult err = cmd; \ + if (err != CUDA_SUCCESS) { \ + const char* errStr; \ + if (cuGetErrorString(err, &errStr) != CUDA_SUCCESS) { \ + errStr = "failed to get error string"; \ + } \ + WARN(mscclpp::P2P, "Call to " #cmd " failed, error is ", errStr); \ + } \ } while (false) namespace { @@ -62,12 +62,14 @@ std::shared_ptr getPeerMemoryHandle(cudaIpcMemHandle_t ipcHandle) { auto deleter = [](void* p) { cudaError_t err = cudaIpcCloseMemHandle(p); if (err != cudaSuccess) { - WARN("Failed to close CUDA IPC handle at pointer %p: %s", p, cudaGetErrorString(err)); + WARN(mscclpp::P2P, "Failed to close CUDA IPC handle at pointer ", std::hex, p, ": ", cudaGetErrorString(err)); } else { - INFO(MSCCLPP_P2P, "Closed CUDA IPC handle at pointer %p", p); + INFO(mscclpp::P2P, "Closed CUDA IPC handle at pointer ", std::hex, p); } }; #if defined(__HIP_PLATFORM_AMD__) + // Unlike Nvidia, ROCm will not reuse the same ipc handle for same memory region. + // We cache the opened ipc handles to avoid opening multiple times. (May exceed system limit on vm.max_map_count) static std::unordered_map, CudaIpcMemHandleHash, CudaIpcMemHandleEqual> peerMemoryHandleMap; static std::mutex mutex; @@ -146,7 +148,7 @@ RegisteredMemory::Impl::Impl(void* data, size_t size, TransportFlags transports, transportInfo.ibLocal = true; transportInfo.ibMrInfo = this->ibMrMap[ibTransport]->getInfo(); this->transportInfos.push_back(transportInfo); - INFO(MSCCLPP_NET, "IB mr for address %p with size %ld is registered", data, size); + INFO(NET, "IB mr for address ", data, " with size ", size, " is registered"); }; if (transports.has(Transport::IB0)) addIb(Transport::IB0); if (transports.has(Transport::IB1)) addIb(Transport::IB1); @@ -280,8 +282,8 @@ RegisteredMemory::Impl::Impl(const std::vector::const_iterator& begin, // TODO: only open handle if in same MNNVL domain CUresult err = cuMemImportFromShareableHandle(&handle, entry.shareableHandle, getNvlsMemHandleType()); if (err != CUDA_SUCCESS) { - INFO(MSCCLPP_P2P, "Failed to import shareable handle from host: 0x%lx, may not be in the same MNNVL domain", - hostHash); + INFO(P2P, "Failed to import shareable handle from host: 0x", std::hex, hostHash, + ", may not be in the same MNNVL domain"); return; } } else { @@ -290,7 +292,7 @@ RegisteredMemory::Impl::Impl(const std::vector::const_iterator& begin, } else { int fd = UnixSocketClient::instance().requestFd(UnixSocketServer::generateSocketPath(entry.rootPid), entry.rootFd); - INFO(MSCCLPP_P2P, "Get file descriptor %d from peer 0x%lx", fd, hostHash); + INFO(P2P, "Get file descriptor ", fd, " from peer 0x", std::hex, hostHash); MSCCLPP_CUTHROW(cuMemImportFromShareableHandle(&handle, reinterpret_cast(fd), CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR)); close(fd); @@ -314,7 +316,7 @@ RegisteredMemory::Impl::Impl(const std::vector::const_iterator& begin, } } if (this->data != nullptr) { - INFO(MSCCLPP_P2P, "Opened CUDA IPC handle at pointer %p", this->data); + INFO(P2P, "Opened CUDA IPC handle at pointer ", this->data); } } From a1581e69680365fa2f92c4a274bd047a5b1a8bd1 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Thu, 18 Dec 2025 10:56:24 +0800 Subject: [PATCH 11/12] Update python/test/conftest.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- python/test/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/test/conftest.py b/python/test/conftest.py index b0ab0daed..89c1ac418 100644 --- a/python/test/conftest.py +++ b/python/test/conftest.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -import atexit import mpi4py import os import sys From 9d508a08608d2d9da7e9c62f3c9a097cc47f798f Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Thu, 18 Dec 2025 04:23:28 +0000 Subject: [PATCH 12/12] address comments --- python/test/conftest.py | 1 - python/test/mscclpp_mpi.py | 6 +++++- src/include/logger.hpp | 2 ++ src/registered_memory.cc | 28 +++++++++++++++------------- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/python/test/conftest.py b/python/test/conftest.py index 89c1ac418..f8ecaabf4 100644 --- a/python/test/conftest.py +++ b/python/test/conftest.py @@ -10,7 +10,6 @@ import cupy as cp from mpi4py import MPI -import pytest def pytest_configure(config): diff --git a/python/test/mscclpp_mpi.py b/python/test/mscclpp_mpi.py index 4002fc529..8c18ea453 100644 --- a/python/test/mscclpp_mpi.py +++ b/python/test/mscclpp_mpi.py @@ -1,5 +1,5 @@ # Copyright (c) Microsoft Corporation. -# Licensed under the MIT license. +# Licensed under the MIT License. import logging @@ -22,6 +22,10 @@ def __init__(self, ranks: list = []): group = world_group.Incl(ranks) self.comm = MPI.COMM_WORLD.Create(group) + def __del__(self): + if self.comm != MPI.COMM_NULL and MPI.Is_initialized() and not MPI.Is_finalized(): + self.comm.Free() + @pytest.fixture def mpi_group(request: pytest.FixtureRequest): diff --git a/src/include/logger.hpp b/src/include/logger.hpp index 87281f875..c6306c32e 100644 --- a/src/include/logger.hpp +++ b/src/include/logger.hpp @@ -61,6 +61,8 @@ constexpr std::string_view logSubsysToString(LogSubsys subsys) { return "NET"; case LogSubsys::CONN: return "CONN"; + case LogSubsys::P2P: + return "P2P"; case LogSubsys::EXEC: return "EXEC"; case LogSubsys::NCCL: diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 1308db641..284895e4f 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "api.h" #include "context.hpp" @@ -70,25 +71,26 @@ std::shared_ptr getPeerMemoryHandle(cudaIpcMemHandle_t ipcHandle) { #if defined(__HIP_PLATFORM_AMD__) // Unlike Nvidia, ROCm will not reuse the same ipc handle for same memory region. // We cache the opened ipc handles to avoid opening multiple times. (May exceed system limit on vm.max_map_count) - static std::unordered_map, CudaIpcMemHandleHash, CudaIpcMemHandleEqual> - peerMemoryHandleMap; - static std::mutex mutex; - std::lock_guard lock(mutex); - auto it = peerMemoryHandleMap.find(ipcHandle); - if (it != peerMemoryHandleMap.end()) { + static auto peerMemoryHandleMap = std::make_shared< + std::unordered_map, CudaIpcMemHandleHash, CudaIpcMemHandleEqual>>(); + static auto mutex = std::make_shared(); + std::lock_guard lock(*mutex); + auto it = peerMemoryHandleMap->find(ipcHandle); + if (it != peerMemoryHandleMap->end()) { if (auto ptr = it->second.lock()) { return ptr; } else { - peerMemoryHandleMap.erase(it); + peerMemoryHandleMap->erase(it); } } MSCCLPP_CUDATHROW(cudaIpcOpenMemHandle(&addr, ipcHandle, cudaIpcMemLazyEnablePeerAccess)); - std::shared_ptr ptr = std::shared_ptr(addr, [ipcHandle, deleter](void* p) { - deleter(p); - std::lock_guard lock(mutex); - peerMemoryHandleMap.erase(ipcHandle); - }); - peerMemoryHandleMap[ipcHandle] = ptr; + std::shared_ptr ptr = + std::shared_ptr(addr, [ipcHandle, deleter, m = mutex, map = peerMemoryHandleMap](void* p) { + deleter(p); + std::lock_guard lock(*m); + map->erase(ipcHandle); + }); + peerMemoryHandleMap->emplace(ipcHandle, ptr); return ptr; #else MSCCLPP_CUDATHROW(cudaIpcOpenMemHandle(&addr, ipcHandle, cudaIpcMemLazyEnablePeerAccess));