diff --git a/src/BuildOnLinux.cmake b/src/BuildOnLinux.cmake index aee7118f0..87b0fe345 100644 --- a/src/BuildOnLinux.cmake +++ b/src/BuildOnLinux.cmake @@ -16,6 +16,7 @@ macro(setup_common_libraries) if(USE_C10D_XCCL) target_compile_definitions(torch_xpu_ops PRIVATE USE_C10D_XCCL) target_link_libraries(torch_xpu_ops PUBLIC torch::xccl) + target_link_libraries(torch_xpu_ops PUBLIC fmt::fmt-header-only) endif() list(APPEND TORCH_XPU_OPS_LIBRARIES torch_xpu_ops) endmacro() @@ -125,6 +126,7 @@ else() if(USE_C10D_XCCL) target_compile_definitions(torch_xpu_ops PRIVATE USE_C10D_XCCL) target_link_libraries(torch_xpu_ops PUBLIC torch::xccl) + target_link_libraries(torch_xpu_ops PUBLIC fmt::fmt-header-only) endif() install(TARGETS torch_xpu_ops DESTINATION "${TORCH_INSTALL_LIB_DIR}") diff --git a/src/xccl/FlightRecorderXCCL.cpp b/src/xccl/FlightRecorderXCCL.cpp new file mode 100644 index 000000000..29fccd990 --- /dev/null +++ b/src/xccl/FlightRecorderXCCL.cpp @@ -0,0 +1,21 @@ +#ifdef USE_C10D_XCCL + +#include +#include +#include + +namespace c10d { + +template <> +float getDurationFromEvent( + at::xpu::XPUEvent& xcclStartEvent, + at::xpu::XPUEvent& xcclEndEvent) { + TORCH_CHECK( + xcclEndEvent.query(), + "getDuration can only be called after work is succeeded.") + return xcclStartEvent.elapsed_time(xcclEndEvent); +} + +template struct FlightRecorder; +} // namespace c10d +#endif // USE_C10D_XCCL diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index 9ace6ccb9..240a54cea 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -1,11 +1,14 @@ #ifdef USE_C10D_XCCL #include +#include #include #include namespace c10d { +using FlightRecorderXCCL = FlightRecorder; + namespace { #if defined(CCL_MAJOR_VERSION) && \ @@ -200,6 +203,17 @@ void syncStream( } // namespace +std::string dump_xccl_trace( + bool includeCollectives, + bool includeStackTraces, + bool onlyActive) { + auto xcclDumpMap = std::unordered_map< + std::string, + std::unordered_map>(); + return FlightRecorderXCCL::get()->dump( + xcclDumpMap, includeCollectives, includeStackTraces, onlyActive); +} + constexpr int64_t kSynchronizeBusyWaitMillis = 10; thread_local uint64_t ProcessGroupXCCL::xcclActiveGroupCounter_ = 0; @@ -303,6 +317,10 @@ bool ProcessGroupXCCL::WorkXCCL::wait(std::chrono::milliseconds timeout) { return true; } +ProcessGroupXCCL::Options::Options() + : Backend::Options(XCCL_BACKEND_NAME) {} + + static std::atomic process_group_id = 0; constexpr const char* MULTI_DEVICE_ERROR_MSG = @@ -332,19 +350,28 @@ const std::string& ProcessGroupXCCL::logPrefix() const { ProcessGroupXCCL::ProcessGroupXCCL( const c10::intrusive_ptr& store, int rank, - int size) + int size, + c10::intrusive_ptr options) : Backend(rank, size), store_(store), + options_(std::move(options)), xcclCommCounter_(0), local_id_(process_group_id++) { logPrefix_ = createLogPrefix(); blockingWait_ = getCvarBool(TORCH_XCCL_BLOCKING_WAIT, false); + traceBufferSize_ = getCvarInt({"TORCH_FR_BUFFER_SIZE"}, 2000); + + this->setGroupUid(options_->group_name); + // In PGNCCL, the pg ranks are recorded on comm setup in each op, but we just do it here. + const auto XcclVersion = getXcclVersion(); + FlightRecorderXCCL::get()->record_pg_ranks( + std::make_tuple(pg_uid_, pg_desc_), groupRanks()); + FlightRecorderXCCL::get()->record_accelerator_version(XcclVersion); enableNanCheck_ = getCvarBool(TORCH_XCCL_NAN_CHECK, false); init(); const std::string OFF = "OFF"; std::string torch_distributed_debug = getCvarString({"TORCH_DISTRIBUTED_DEBUG"}, OFF.c_str()); - const auto XcclVersion = getXcclVersion(); LOG(INFO) << logPrefix() << "ProcessGroupXCCL initialization options: " << "size: " << size << ", global rank: " << rank_; @@ -353,9 +380,63 @@ ProcessGroupXCCL::ProcessGroupXCCL( << ", TORCH_XCCL_BLOCKING_WAIT: " << blockingWait_ << ", TORCH_DISTRIBUTED_DEBUG: " << torch_distributed_debug << ", TORCH_XCCL_NAN_CHECK: " << enableNanCheck_; + + // Heartbeat monitor thread dumps debug info on write to pipe + heartbeatMonitor_ = std::make_unique(this); + heartbeatMonitor_->start(); +} + +ProcessGroupXCCL::~ProcessGroupXCCL() { + heartbeatMonitor_->stop(); + // Wait for all threads to finish before returning + heartbeatMonitor_->join(); } -ProcessGroupXCCL::~ProcessGroupXCCL() = default; +bool ProcessGroupXCCL::dumpDebuggingInfo(bool includeStackTrace /*=true*/) { + STATIC_SCOPED_WAIT_COUNTER(pytorch.ProcessGroupXCCL__dumpDebuggingInfo); + LOG(ERROR) + << logPrefix() + << "ProcessGroupXCCL preparing to dump debug info. Include stack trace: " + << includeStackTrace; + if (traceBufferSize_ > 0) { + // TODO: dump_xccl_trace + auto xcclTrace = dump_xccl_trace(true, includeStackTrace, false); + DebugInfoWriter& writer = DebugInfoWriter::getWriter(rank_); + LOG(INFO) << logPrefix() << "ProcessGroupXCCL dumping xccl trace to " + << writer.getWriterTarget(); + writer.write(xcclTrace); + LOG(INFO) << logPrefix() << "Flight Recorder trace successfully dumped."; + return true; + } + return false; +} + +const std::vector& ProcessGroupXCCL::groupRanks() const { + if (options_->global_ranks_in_group.empty() && local_id_ == 0) { + static std::vector globalRanks(size_); + std::iota(globalRanks.begin(), globalRanks.end(), 0); + return globalRanks; + } + return options_->global_ranks_in_group; +} + +void ProcessGroupXCCL::setEnqueuedPgStatus( + c10::intrusive_ptr work) { + pgStatus_->lastEnqueuedSeq = static_cast(work->getSequencenumber()); + pgStatus_->lastEnqueuedWorkName = opTypeToString(work->opType_); + pgStatus_->lastEnqueuedNumelIn = work->numelIn_; + pgStatus_->lastEnqueuedNumelOut = work->numelOut_; +} + +void ProcessGroupXCCL::setCompletedPgStatus( + c10::intrusive_ptr work) { + pgStatus_->lastCompletedSeq = static_cast(work->getSequencenumber()); + pgStatus_->lastCompletedWorkName = opTypeToString(work->opType_); + pgStatus_->lastCompletedNumelIn = work->numelIn_; + pgStatus_->lastCompletedNumelOut = work->numelOut_; + // To avoid complexity, we're not computing duration. + FlightRecorderXCCL::get()->retire_id(work->trace_id_, /*compute_duration*/false); +} void ProcessGroupXCCL::setSequenceNumberForGroup() {} @@ -384,6 +465,21 @@ c10::intrusive_ptr ProcessGroupXCCL::initWork( profilingTitle, profilingTitle != nullptr ? std::optional>(inputs) : std::nullopt); + + r->trace_id_ = FlightRecorderXCCL::get()->record( + local_id_, + std::make_tuple(pg_uid_, pg_desc_), // PG name tuple + seqCollective_, + seqP2P_, + op_id_, + profilingTitle ? profilingTitle : "", + inputs, + outputs, + nullptr, + r->xcclEndEvent_.get(), + options_->timeout, + pgStatus_, + isP2P); return r; } @@ -538,6 +634,7 @@ c10::intrusive_ptr ProcessGroupXCCL::endCoalescing(OpType optype) { groupEnd(); work->xcclEndEvent_->record(stream); + setEnqueuedPgStatus(work); coalescing_state_ = 0; coalescedComm_ = nullptr; @@ -572,6 +669,7 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( if ((coalescing_state_ & CoalColl) == 0) { seqCollective_++; } + op_id_++; coalescing_state_ |= CoalColl; if (coalescedDevice_.index() < 0) { coalescedDevice_ = device; @@ -614,6 +712,22 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( c10::intrusive_ptr work; work = initWork(device, rank_, opType, false, profilingTitle, inputs, outputs); + if (coalescing_state_) { + FlightRecorderXCCL::get()->record( + local_id_, + std::make_tuple(pg_uid_, pg_desc_), // PG name tuple + seqCollective_, + seqP2P_, + op_id_, + profilingTitle ? profilingTitle : "", + inputs, + outputs, + nullptr, + nullptr, + options_->timeout, + pgStatus_, + false); + } work->outputs_ = std::make_shared>(outputs); @@ -653,8 +767,22 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( work->future_ = c10::make_intrusive( c10::ListType::create(c10::TensorType::get()), devices); work->future_->markCompleted(at::IValue(*work->outputs_)); + work->future_->addCallback( + [this, work](at::ivalue::Future&) { + this->setCompletedPgStatus(work); + }); work->blockingWait_ = blockingWait_; + work->numelIn_ = 0; + work->numelOut_ = 0; + for (const auto& input : inputs) { + work->numelIn_ += input.numel(); + } + for (const auto& output : outputs) { + work->numelOut_ += output.numel(); + } + setEnqueuedPgStatus(work); + return asyncOp ? work : nullptr; } @@ -687,6 +815,7 @@ c10::intrusive_ptr ProcessGroupXCCL::pointToPoint( } } + op_id_++; auto comm = getXCCLComm(key, device, opType, p2pRank, isSendRecvSelf); if (coalescing_state_ & CoalActive) { @@ -722,6 +851,21 @@ c10::intrusive_ptr ProcessGroupXCCL::pointToPoint( work->outputs_ = std::make_shared>(); work->outputs_->push_back(tensor); + work->trace_id_ = FlightRecorderXCCL::get()->record( + local_id_, + std::make_tuple(pg_uid_, pg_desc_), // PG name tuple + seqCollective_, + seqP2P_, + op_id_, + profilingTitle, + {tensor}, + {tensor}, + nullptr, + work->xcclEndEvent_.get(), + options_->timeout, + pgStatus_, + true); + c10::OptionalDeviceGuard gpuGuard(device); c10::xpu::XPUCachingAllocator::recordStream( @@ -737,8 +881,29 @@ c10::intrusive_ptr ProcessGroupXCCL::pointToPoint( work->future_ = c10::make_intrusive( c10::ListType::create(c10::TensorType::get()), devices); work->future_->markCompleted(at::IValue(*work->outputs_)); + work->future_->addCallback( + [this, work](at::ivalue::Future&) { + this->setCompletedPgStatus(work); + }); + + work->numelIn_ = work->numelOut_ = tensor.numel(); + setEnqueuedPgStatus(work); return work; } else { + FlightRecorderXCCL::get()->record( + local_id_, + std::make_tuple(pg_uid_, pg_desc_), // PG name tuple + seqCollective_, + seqP2P_, + op_id_, + profilingTitle, + {tensor}, + {tensor}, + nullptr, + nullptr, + options_->timeout, + pgStatus_, + true); c10::OptionalDeviceGuard gpuGuard(device); c10::xpu::XPUCachingAllocator::recordStream( @@ -2135,6 +2300,14 @@ c10::intrusive_ptr ProcessGroupXCCL::alltoall( "xccl:all_to_all"); } +std::string getXcclVersion() { + auto xccl_version = ccl::get_library_version(); + std::string versionString = std::to_string(xccl_version.major) + "." + + std::to_string(xccl_version.minor) + "." + + std::to_string(xccl_version.update); + return versionString; +} + } // namespace c10d #endif // USE_C10D_XCCL diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index 919f3809e..42c199735 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -19,13 +19,19 @@ #include #include #include +#include #include +#include namespace c10d { static std::vector TORCH_XCCL_BLOCKING_WAIT = { "TORCH_XCCL_BLOCKING_WAIT", "XCCL_BLOCKING_WAIT"}; +static std::vector TORCH_XCCL_COORD_CHECK_MILSEC = { + "TORCH_XCCL_COORD_CHECK_MILSEC", + "XCCL_COORD_CHECK_MILSEC"}; + using xcclComm_t = ccl::communicator; static std::vector TORCH_XCCL_NAN_CHECK = {"TORCH_XCCL_NAN_CHECK"}; @@ -100,6 +106,9 @@ class TORCH_API ProcessGroupXCCL : public Backend { std::chrono::time_point workStartTime_; uint64_t seq_; bool isP2P_; + std::optional trace_id_; + size_t numelIn_ = -1; + size_t numelOut_ = -1; private: std::shared_ptr> outputs_; @@ -108,7 +117,22 @@ class TORCH_API ProcessGroupXCCL : public Backend { friend class ProcessGroupXCCL; }; - ProcessGroupXCCL(const c10::intrusive_ptr& store, int rank, int size); + struct Options : public Backend::Options { + explicit Options(); + + static c10::intrusive_ptr create() { + return c10::make_intrusive(); + } + + std::vector global_ranks_in_group; + std::string group_name; + }; + + ProcessGroupXCCL( + const c10::intrusive_ptr& store, + int rank, + int size, + c10::intrusive_ptr options = Options::create()); C10_DEPRECATED ProcessGroupXCCL( const c10::intrusive_ptr& store, @@ -389,6 +413,11 @@ class TORCH_API ProcessGroupXCCL : public Backend { c10::DeviceIndex guessDeviceId() const; + const std::vector& groupRanks() const; + void setEnqueuedPgStatus(c10::intrusive_ptr work); + void setCompletedPgStatus(c10::intrusive_ptr work); + bool dumpDebuggingInfo(bool includeStackTrace = true); + protected: std::unordered_map> xcclStreamsMap_; @@ -407,10 +436,18 @@ class TORCH_API ProcessGroupXCCL : public Backend { static thread_local uint64_t xcclActiveGroupCounter_; uint64_t seqCollective_{0}; uint64_t seqP2P_{0}; + uint64_t op_id_{0}; size_t local_id_; std::string logPrefix_; + const c10::intrusive_ptr options_; + std::shared_ptr pgStatus_ = + std::make_shared(); + std::unique_ptr heartbeatMonitor_; + int traceBufferSize_; bool enableNanCheck_; + friend class HeartbeatMonitorXCCL; + private: std::mutex kvs_mutex; @@ -448,18 +485,18 @@ class TORCH_API ProcessGroupXCCL : public Backend { return kvs; } }; + +// Dumps the comm traces and additional information about the ProcessGroup. +TORCH_API std::string dump_xccl_trace( + bool includeCollectives, + bool includeStackTraces, + bool onlyActive); + +TORCH_API std::string getXcclVersion(); } // namespace c10d namespace { -inline std::string getXcclVersion() { - auto xccl_version = ccl::get_library_version(); - std::string versionString = std::to_string(xccl_version.major) + "." + - std::to_string(xccl_version.minor) + "." + - std::to_string(xccl_version.update); - return versionString; -} - inline std::string reduceOpToString(c10d::ReduceOp op) { switch (op) { case c10d::ReduceOp::SUM: diff --git a/src/xccl/ProcessGroupXCCLMonitor.cpp b/src/xccl/ProcessGroupXCCLMonitor.cpp new file mode 100644 index 000000000..cefc6d402 --- /dev/null +++ b/src/xccl/ProcessGroupXCCLMonitor.cpp @@ -0,0 +1,66 @@ +#ifdef USE_C10D_XCCL + +#include +#include +namespace c10d { + +HeartbeatMonitorXCCL::HeartbeatMonitorXCCL(ProcessGroupXCCL* pg) { + pg_ = pg; + coordCheckIntervalMilSec_ = getCvarInt(TORCH_XCCL_COORD_CHECK_MILSEC, 1000); + LOG(INFO) + << pg_->logPrefix() << "HeartbeatMonitor environments: " + << "TORCH_XCCL_COORD_CHECK_MILSEC: " << coordCheckIntervalMilSec_; +} + +void HeartbeatMonitorXCCL::stop() { + terminateHeartbeatMonitorThread_.store(true); + monitorWakeUpCV_.notify_one(); +} + +void HeartbeatMonitorXCCL::start() { + TORCH_CHECK( + !xcclHeartbeatMonitorThread_.joinable(), + "HeartbeatMonitor thread already started"); + xcclHeartbeatMonitorThread_ = + std::thread(&HeartbeatMonitorXCCL::runLoop, this); +} + +void HeartbeatMonitorXCCL::join() { + if (xcclHeartbeatMonitorThread_.joinable()) { + xcclHeartbeatMonitorThread_.join(); + LOG(INFO) << pg_->logPrefix() + << "ProcessGroupXCCL heart beat monitor thread joined."; + } +} + +void HeartbeatMonitorXCCL::runLoop() { + c10::setThreadName("pt_xccl_heartbt"); + + std::optional dumpPipe = std::nullopt; + // We only need to dump once per PG, so we use local_id_ == 0 for the first PG + if (pg_->local_id_ == 0) { + // DumpPipe is one per-trainer process + dumpPipe.emplace(pg_->getRank()); + while (true) { + std::unique_lock lock(monitorMutex_); + if (monitorWakeUpCV_.wait_for( + lock, std::chrono::milliseconds(coordCheckIntervalMilSec_), [&] { + return terminateHeartbeatMonitorThread_.load(); + })) { + return; + } + // Write to pipe files for all ranks to dump debug info + if (dumpPipe.has_value() && dumpPipe->shouldDump()) { + LOG(INFO) << pg_->logPrefix() + << "Dump signal received through pipe, triggering FR dump."; + std::future fut = std::async(std::launch::async, [this]() { + return this->pg_->dumpDebuggingInfo(); + }); + } + } + } +} + +} // namespace c10d + +#endif // USE_C10D_XCCL diff --git a/src/xccl/ProcessGroupXCCLMonitor.hpp b/src/xccl/ProcessGroupXCCLMonitor.hpp new file mode 100644 index 000000000..8924c4e43 --- /dev/null +++ b/src/xccl/ProcessGroupXCCLMonitor.hpp @@ -0,0 +1,93 @@ +#pragma once + +#include +#include +#include +#include + +#ifdef USE_C10D_XCCL +namespace c10d { + +// This definition will later be moved to a common header for ProcessGroups NCCL/Gloo/XCCL +#if defined(__linux__) +struct DumpPipe { + DumpPipe(int rank) { + std::string fileStem = + getCvarString({"TORCH_FR_DEBUG_INFO_PIPE_FILE"}, ""); + if (fileStem.empty() || + getCvarInt({"TORCH_FR_BUFFER_SIZE"}, 0) <= 0) { + return; + } + TORCH_CHECK(!fileStem.empty(), "TORCH_FR_DEBUG_INFO_PIPE_FILE is empty"); + std::string filename = c10::str(fileStem, rank, ".pipe"); + TORCH_CHECK( + unlink(filename.c_str()) != -1 || errno == ENOENT, + "Error removing existing named pipe ", + filename, + ", Error: ", + std::strerror(errno)); + TORCH_CHECK( + mkfifo(filename.c_str(), 0666) != -1, + "Error creating named pipe ", + filename, + ", Error: ", + std::strerror(errno)); + fd_ = open(filename.c_str(), O_RDONLY | O_NONBLOCK); + LOG(INFO) << "Pipe file " << filename + << " has been opened, write to it to trigger ProcessGroup Debug Dump."; + TORCH_CHECK(fd_ != -1, "Error opening named pipe ", filename); + } + bool shouldDump() { + if (fd_ == -1) { + return false; + } + // NOLINTNEXTLINE(*array*) + char buf[128]{}; + // non-blocking from O_NONBLOCK above. + // Ignore EINTR because we already will poll this + // again later. + ssize_t bytesRead = read(fd_, &buf, 128); + return bytesRead > 0; + } + ~DumpPipe() { + if (fd_ != -1) { + close(fd_); + } + } + + private: + int fd_ = -1; +}; +#else +struct DumpPipe { + DumpPipe(int rank) {} + bool shouldDump() { + return false; + } +}; +#endif + +class ProcessGroupXCCL; +class HeartbeatMonitorXCCL { + public: + HeartbeatMonitorXCCL(ProcessGroupXCCL* pg); + virtual ~HeartbeatMonitorXCCL() = default; + + std::string getXCCLTimeoutErrorMsg(const std::string& extraMsg); + void start(); + void join(); + virtual void runLoop(); + void stop(); + + protected: + ProcessGroupXCCL* pg_; + + private: + int coordCheckIntervalMilSec_; + std::condition_variable monitorWakeUpCV_; + std::mutex monitorMutex_; + std::thread xcclHeartbeatMonitorThread_; + std::atomic terminateHeartbeatMonitorThread_{false}; +}; +} +#endif // USE_C10D_XCCL