Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions 1
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this file

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

export CC=clang-20
export CXX=clang++-20
export CUDAHOSTCXX=/usr/bin/clang++-20
export LDFLAGS="-fuse-ld=mold"

export NVFUSER_BUILD_ENABLE_PCH

export UCC_HOME="/opt/hpcx/ucc"
export UCC_DIR="/opt/hpcx/ucc/lib/cmake/ucc"
export UCX_HOME="/opt/hpcx/ucx"
export UCX_DIR="/opt/hpcx/ucx/lib/cmake/ucx"

# export TORCH_CUDA_ARCH_LIST="9.0"

export NVFUSER_BUILD_WITH_UCC=1
export NVFUSER_BUILD_INSTALL_DIR=$BUILD_DIRECTORY/nvfuser
export NVFUSER_BUILD_DIR=$BUILD_DIRECTORY

# Enable debug mode, leave empty for non-debug compilation
export NVFUSER_BUILD_BUILD_TYPE=Debug
export RUN_CMAKE=""

pip install -v -e ./python --no-build-isolation
45 changes: 44 additions & 1 deletion csrc/multidevice/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
#include <numeric>

#ifdef NVFUSER_DISTRIBUTED
#if NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP
#include <torch/csrc/distributed/c10d/GroupRegistry.hpp>
#endif
#include <torch/csrc/distributed/c10d/PrefixStore.hpp>
#if NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP
#include <torch/csrc/distributed/c10d/ProcessGroup.hpp>
#endif
#include <torch/csrc/distributed/c10d/exception.h>
#ifdef USE_C10D_NCCL
#include <torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp>
Expand Down Expand Up @@ -362,6 +368,12 @@ void Communicator::cleanup() {
pg_nccl->shutdown();
}
}
#endif
#if NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP
for (const auto& entry : process_groups_) {
c10d::unregister_process_group(entry.first);
}
process_groups_.clear();
Comment on lines +370 to +373
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 process_groups_ cleanup guard mismatch — compile error when NVFUSER_DISTRIBUTED is set without USE_DISTRIBUTED

process_groups_ is declared in communicator.h under #if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED), but the cleanup loop here lives under the broader #if defined(NVFUSER_DISTRIBUTED) (without the USE_DISTRIBUTED guard). When a build defines NVFUSER_DISTRIBUTED but not USE_DISTRIBUTED, process_groups_ does not exist as a member, yet this code tries to iterate over it — a hard compile error.

c10d::unregister_process_group (from GroupRegistry.hpp) is already included under #ifdef NVFUSER_DISTRIBUTED, so fixing just the guard on these lines is sufficient:

Suggested change
for (const auto& entry : process_groups_) {
c10d::unregister_process_group(entry.first);
}
process_groups_.clear();
#if defined(USE_DISTRIBUTED)
for (const auto& entry : process_groups_) {
c10d::unregister_process_group(entry.first);
}
process_groups_.clear();
#endif

(The surrounding #if defined(NVFUSER_DISTRIBUTED) / #endif already provides the outer distributed guard.)

#endif
backends_.clear();
}
Expand Down Expand Up @@ -402,6 +414,28 @@ c10d::Backend* Communicator::getBackendForTeam(
}();
#else
backends_[team_key] = nullptr;
#endif
#if NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP
std::optional<c10d::ProcessGroup::BackendType> pg_backend =
(b == CommunicatorBackend::kNccl)
? std::optional<c10d::ProcessGroup::BackendType>(
c10d::ProcessGroup::BackendType::NCCL)
: std::nullopt;
if (backends_[team_key] != nullptr && pg_backend.has_value()) {
auto rank_it = std::find(team.begin(), team.end(), deviceId());
RankType team_rank = std::distance(team.begin(), rank_it);

auto pg = c10::make_intrusive<c10d::ProcessGroup>(
c10::make_intrusive<c10d::PrefixStore>(team_key, store_),
team_rank,
static_cast<int>(team.size()));
pg->setBackend(c10::DeviceType::CUDA, *pg_backend, backends_[team_key]);
pg->setDefaultBackend(*pg_backend);
pg->setGroupName(team_key);

c10d::register_process_group(team_key, pg);
process_groups_[team_key] = std::move(pg);
}
#endif
}
return backends_.at(team_key).get();
Expand All @@ -424,4 +458,13 @@ void Communicator::barrier(std::optional<CommunicatorBackend> backend) {
getWorld(backend)->barrier(options)->wait();
}

} // namespace nvfuser
std::string Communicator::getSymmMemGroupKey(
std::optional<CommunicatorBackend> backend) {
std::vector<RankType> all_ranks(size_);
std::iota(all_ranks.begin(), all_ranks.end(), 0);
CommunicatorBackend b = backend.value_or(default_backend_);
(void)getBackendForTeam(all_ranks, b);
return getTeamKey(all_ranks, b);
}
Comment on lines +458 to +465
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 getSymmMemGroupKey body is incorrectly indented and missing trailing newline

The function body uses column-0 indentation, inconsistent with every other member function in this file. All statements should be indented at the standard 2-space level. Additionally, the file is missing a trailing newline (shown by \ No newline at end of file in the diff).

Suggested change
std::string Communicator::getSymmMemGroupKey(
std::optional<CommunicatorBackend> backend) {
std::vector<RankType> all_ranks(size_);
std::iota(all_ranks.begin(), all_ranks.end(), 0);
CommunicatorBackend b = backend.value_or(default_backend_);
(void)getBackendForTeam(all_ranks, b);
return getTeamKey(all_ranks, b);
}
std::string Communicator::getSymmMemGroupKey(
std::optional<CommunicatorBackend> backend) {
std::vector<RankType> all_ranks(size_);
std::iota(all_ranks.begin(), all_ranks.end(), 0);
CommunicatorBackend b = backend.value_or(default_backend_);
(void)getBackendForTeam(all_ranks, b);
return getTeamKey(all_ranks, b);
}

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


} // namespace nvfuser
20 changes: 20 additions & 0 deletions csrc/multidevice/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,19 @@
#include <ATen/core/ivalue.h>
#include <c10/util/intrusive_ptr.h>

#if defined(NVFUSER_DISTRIBUTED) && \
__has_include(<torch/csrc/distributed/c10d/GroupRegistry.hpp>) && \
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the rationale behind defining NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP? In what scenario can the header be missing?

__has_include(<torch/csrc/distributed/c10d/ProcessGroup.hpp>)
#define NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP 1
#else
#define NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP 0
#endif

#ifdef NVFUSER_DISTRIBUTED
#include <torch/csrc/distributed/c10d/Backend.hpp>
#if NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP
#include <torch/csrc/distributed/c10d/ProcessGroup.hpp>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this header should always be present, no?

#endif
#include <torch/csrc/distributed/c10d/TCPStore.hpp>
#include <torch/csrc/distributed/c10d/Work.hpp>
#else
Expand Down Expand Up @@ -110,6 +121,10 @@ class NVF_API Communicator {
c10d::Backend* getWorld(
std::optional<CommunicatorBackend> backend = std::nullopt);

// Returns the world process-group name for the given backend.
std::string getSymmMemGroupKey(
std::optional<CommunicatorBackend> backend = std::nullopt);

// returns if a backend is available for creation
bool isBackendAvailable(CommunicatorBackend backend) const {
if (backend == CommunicatorBackend::kUcc) {
Expand Down Expand Up @@ -153,6 +168,11 @@ class NVF_API Communicator {
c10::intrusive_ptr<c10d::TCPStore> store_;
// cache for the created backends. The keys are strings generated from Teams
std::unordered_map<std::string, c10::intrusive_ptr<c10d::Backend>> backends_;
#if NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP
// c10d process-group wrappers registered for symmetric-memory rendezvous.
std::unordered_map<std::string, c10::intrusive_ptr<c10d::ProcessGroup>>
process_groups_;
Comment on lines +161 to +164
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Non-distributed build compilation failure: c10d::ProcessGroup not in mock

c10d::ProcessGroup is declared as the value type of process_groups_ without a #ifdef NVFUSER_DISTRIBUTED guard. In a non-distributed build, communicator.h falls through to c10d_mock.h, which defines c10d::Backend, c10d::TCPStore, and the options structs — but not c10d::ProcessGroup. The class declaration therefore fails to compile.

The same issue propagates into communicator.cpp:

  • cleanup() calls c10d::unregister_process_group(...) (not in mock) outside any #ifdef guard.
  • getBackendForTeam() uses c10d::ProcessGroup::BackendType, c10d::ProcessGroup, and c10d::register_process_group after the #endif that closes the distributed block.

The process_groups_ field and all code that touches it needs to be wrapped in #ifdef NVFUSER_DISTRIBUTED / #endif, or a minimal ProcessGroup stub must be added to c10d_mock.h.

// communicator.h — wrap the new field:
#ifdef NVFUSER_DISTRIBUTED
  std::unordered_map<std::string, c10::intrusive_ptr<c10d::ProcessGroup>>
      process_groups_;
#endif

And similarly guard the cleanup() loop and the new block in getBackendForTeam() in communicator.cpp.

#endif
};

} // namespace nvfuser
18 changes: 18 additions & 0 deletions csrc/multidevice/ipc_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,22 @@ MulticastProtocol getMulticastProtocol() {
return MulticastProtocol::BatchMemcpy;
}

SymmetricMemoryBackend getSymmetricMemoryBackend() {
if (isOptionEnabled(EnableOption::SymmetricMemoryBackend)) {
if (hasEnableOptionArgument(
EnableOption::SymmetricMemoryBackend, "pytorch_nccl")) {
return SymmetricMemoryBackend::PyTorchNccl;
}
if (hasEnableOptionArgument(
EnableOption::SymmetricMemoryBackend, "pytorch_nvshmem")) {
return SymmetricMemoryBackend::PyTorchNvshmem;
}
if (hasEnableOptionArgument(
EnableOption::SymmetricMemoryBackend, "pytorch_cuda")) {
return SymmetricMemoryBackend::PyTorchCuda;
}
}
return SymmetricMemoryBackend::Native;
}

} // namespace nvfuser
13 changes: 13 additions & 0 deletions csrc/multidevice/ipc_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ enum class MulticastProtocol { Memcpy, Multimem, BatchMemcpy };

MulticastProtocol getMulticastProtocol();

// Backend for symmetric memory allocation and rendezvous.
// Native: Fuser's own CUDA VMM + IPC implementation (default, maintained).
// PyTorch*: Use PyTorch's symmetric memory (torch.distributed._symmetric_memory)
// with the given transport backend (Nccl, Nvshmem, or Cuda).
enum class SymmetricMemoryBackend {
Native,
PyTorchNccl,
PyTorchNvshmem,
PyTorchCuda,
};

SymmetricMemoryBackend getSymmetricMemoryBackend();

// Creates a listening Unix domain socket bound to path.
// If path starts with '@', it uses the abstract namespace (replaced with \0).
// Returns the socket file descriptor.
Expand Down
Loading