Skip to content

Conversation

frost-intel
Copy link
Contributor

As a follow-up to #1867 , this PR includes tests for the FlightRecorder on XCCL, as well as moving some definitions from ProcessGroupXCCL::Options to Backend::Options.

These tests are largely based on pytorch/test/distributed/test_c10d_nccl.py, but doesn't include some tests:

  • test_short_json since json dumps are not supported in ProcessGroupXCCL
  • test_trace_while_all_works_retired: _wait_for_pending_works isn't supported by XCCL
  • test_trace_while_active: XCCL hangs when op is called on only one rank
  • test_trace_while_stuck: XCCL hangs when op is called on only one rank

@frost-intel frost-intel marked this pull request as ready for review August 27, 2025 20:36
@Copilot Copilot AI review requested due to automatic review settings August 28, 2025 12:32
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds FlightRecorder tests for XCCL (Intel XPU Collective Communications Library) as a follow-up to #1867. The tests validate flight recording functionality for distributed operations on Intel XPU devices, including trace dumping, timing, and various collective operations.

  • Adds comprehensive test suite for XCCL FlightRecorder functionality based on NCCL tests
  • Moves global_ranks_in_group and group_name from ProcessGroupXCCL::Options to Backend::Options
  • Adds conditional recording parameter to initWork method to control when flight recording occurs

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
test/xpu/distributed/test_c10d_xccl.py Adds XCCLTraceTestBase and XCCLTraceTest classes with comprehensive flight recorder tests
src/xccl/ProcessGroupXCCL.hpp Removes group-specific options and adds record parameter to initWork method
src/xccl/ProcessGroupXCCL.cpp Implements conditional flight recording and fixes sequence counting logic

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

self._verify_trace(
t,
include_collectives=include_collectives,
is_json=True,
Copy link
Preview

Copilot AI Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The is_json parameter is set to True but the test is using pickle format, not JSON. This should be False since pickle.loads() is used to deserialize the trace data.

Suggested change
is_json=True,
is_json=False,

Copilot uses AI. Check for mistakes.

Comment on lines +1100 to +1107
output_tensor = torch.zeros(sum_len, 2).to(self.rank)
expected_tensor = torch.ones(sum_len, 2).to(self.rank)
input_tensor = torch.ones(output_split_sizes[self.rank], 2).to(self.rank)

dist.all_gather(
list(torch.split(output_tensor, output_split_sizes)), input_tensor
)
torch.xpu.synchronize(device=self.rank)
Copy link
Preview

Copilot AI Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using self.rank as device argument to .to() is incorrect. It should use self.local_device to properly specify the XPU device, similar to other tests in this file.

Suggested change
output_tensor = torch.zeros(sum_len, 2).to(self.rank)
expected_tensor = torch.ones(sum_len, 2).to(self.rank)
input_tensor = torch.ones(output_split_sizes[self.rank], 2).to(self.rank)
dist.all_gather(
list(torch.split(output_tensor, output_split_sizes)), input_tensor
)
torch.xpu.synchronize(device=self.rank)
output_tensor = torch.zeros(sum_len, 2).to(self.local_device)
expected_tensor = torch.ones(sum_len, 2).to(self.local_device)
input_tensor = torch.ones(output_split_sizes[self.rank], 2).to(self.local_device)
dist.all_gather(
list(torch.split(output_tensor, output_split_sizes)), input_tensor
)
torch.xpu.synchronize(device=self.local_device)

Copilot uses AI. Check for mistakes.

dist.all_gather(
list(torch.split(output_tensor, output_split_sizes)), input_tensor
)
torch.xpu.synchronize(device=self.rank)
Copy link
Preview

Copilot AI Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using self.rank as device argument is incorrect. It should use self.local_device to properly specify the XPU device for synchronization.

Suggested change
torch.xpu.synchronize(device=self.rank)
torch.xpu.synchronize(device=self.local_device)

Copilot uses AI. Check for mistakes.

Comment on lines +1147 to +1148
output_tensors = torch.zeros(2, 2).to(self.rank)
input_tensors = [torch.ones(2, 2).to(self.rank) for _ in range(self.world_size)]
Copy link
Preview

Copilot AI Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using self.rank as device argument to .to() is incorrect. It should use self.local_device to properly specify the XPU device.

Suggested change
output_tensors = torch.zeros(2, 2).to(self.rank)
input_tensors = [torch.ones(2, 2).to(self.rank) for _ in range(self.world_size)]
output_tensors = torch.zeros(2, 2).to(self.local_device)
input_tensors = [torch.ones(2, 2).to(self.local_device) for _ in range(self.world_size)]

Copilot uses AI. Check for mistakes.

dist.reduce_scatter_tensor(output_tensors[i], input_tensors[i])
self.assertEqual(output_tensors, input_tensors[self.rank] * self.world_size)

torch.xpu.synchronize(device=self.rank)
Copy link
Preview

Copilot AI Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using self.rank as device argument is incorrect. It should use self.local_device to properly specify the XPU device for synchronization.

Suggested change
torch.xpu.synchronize(device=self.rank)
torch.xpu.synchronize(device=self.local_device)

Copilot uses AI. Check for mistakes.

@@ -171,7 +168,8 @@ class TORCH_API ProcessGroupXCCL : public Backend {
bool isP2P,
const char* profilingTitle = nullptr,
const std::vector<at::Tensor>& inputs = {},
const std::vector<at::Tensor>& outputs = {});
const std::vector<at::Tensor>& outputs = {},
bool record = false);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it follow the same logic like NCCL backend?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, NCCL uses the same logic. Sometimes initWork needs to be recorded, but in other cases it causes improper access in FlightRecorder.

auto device = inputs[0].device();
const auto key = std::to_string(device.index());
auto comm = getXCCLComm(key, device, opType);

if (!coalescing_state_) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Chao1Han Please help check why this condition is not needed previously.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

frost’s modification is correct, this is indeed a bug. The community fixed it ten months ago, but since we had never dumped seqCollective_, we failed to notice it.

@Chao1Han
Copy link
Contributor

Chao1Han commented Aug 29, 2025

Hi @frost-intel, could you apt install clang-format and format ProcessGroupXCCL.cpp/ProcessGroupXCCL.hpp before push code.
Avoid periodic code cleanup like https://github.com/intel/torch-xpu-ops/pull/1960/files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants