Skip to content

Conversation

@wujingyue
Copy link
Collaborator

No description provided.

@wujingyue
Copy link
Collaborator Author

!test

@github-actions
Copy link

github-actions bot commented Jan 2, 2026

Review updated until commit b4adbf6

Description

  • Simplify getCUDAStream implementation using try_emplace and cleaner null checks

  • Add new AssignStreams pass for stream-parallel loop management

  • Remove HostIrLowering knob from StreamTest and move end-to-end tests to Python

  • Clean up test parameters by removing large memory test cases

Changes walkthrough

Relevant files
Enhancement
5 files
assign_streams.cpp
New pass for assigning streams to stream-parallel loops   
+64/-0   
evaluator.cpp
Simplify getCUDAStream with try_emplace and cleaner logic
+7/-14   
ir.cpp
Change Stream::toString to print pointer address                 
+1/-1     
passes.cpp
Add AssignStreams pass to runPasses pipeline                         
+2/-0     
assign_streams.h
Header file for new AssignStreams pass                                     
+26/-0   
Cleanup
3 files
allocate_and_deallocate.h
Remove unnecessary include of container.h                               
+0/-1     
ir.h
Remove scheduler/heuristic.h include                                         
+0/-1     
communicator.h
Remove exceptions.h include                                                           
+0/-1     
Documentation
2 files
internal_nodes.h
Add comment for Scope::insert method                                         
+1/-1     
benchmark_utils.py
Update profiling command documentation                                     
+4/-5     
Configuration changes
1 files
CMakeLists.txt
Add new assign_streams.cpp and reorder source files           
+2/-1     
Tests
4 files
test_multidevice_lower_communication_cuda.cpp
Remove large memory test cases (128MB, 256MB)                       
+1/-3     
test_stream.cpp
Remove HostIrLowering knob and add explanatory comment     
+4/-6     
test_stream.py
Remove nvfuser_direct_test parameter from test functions 
+3/-3     
test_overlap.py
Add reference implementation and benchmark tests for row parallel
linear
+165/-25

PR Reviewer Guide

Here are some key observations to aid the review process:

🧪 PR contains tests
⚡ Recommended focus areas for review
Stream Creation Simplification

The getCUDAStream method has been significantly simplified by replacing manual lookup+insertion logic with try_emplace. While this is generally an improvement, verify that the new logic correctly handles all edge cases, particularly around device index determination when communicator is null or unavailable.

c10::cuda::CUDAStream HostIrEvaluator::getCUDAStream(Stream* stream) {
  StreamKey stream_key = stream;
  // if stream points to an index, it represents the dynamic value of that index
  if (Val* index = stream->index(); index != nullptr) {
    auto value = expr_evaluator_.evaluate(index);
    NVF_ERROR(value.hasValue() && value.is<int64_t>());
    stream_key = value.as<int64_t>();
  }

  auto [it, inserted] =
      streams_.try_emplace(stream_key, c10::cuda::getStreamFromPool());
  return it->second;
}
Communicator Availability Check Removal

The PR removes the communicator_->is_available() check that was previously used to determine device index. This change assumes the deviceId() method is safe to call even when communicator is null or unavailable. Validate this assumption and ensure no runtime issues arise from this change.

    my_local_device_index_(
        communicator_ == nullptr ? 0 : communicator_->local_rank()),
    ipc_handle_cache_(expr_evaluator_),
    multicast_handle_cache_() {
const DeviceIdxType device_index =
    communicator_ == nullptr ? 0 : communicator_->deviceId();
if (isDebugDumpEnabled(DebugDumpOption::HostIr) && device_index == 0) {
New Stream Assignment Logic

A new AssignStreams pass has been added that modifies the host IR by inserting stream management operations. While this appears to be part of the stream parallelism feature, ensure the logic correctly handles stream synchronization and doesn't introduce race conditions or performance regressions.

void AssignStreams::runPass(Fusion* fusion) {
  auto* hic = dynamic_cast<HostIrContainer*>(fusion);
  NVF_CHECK(hic != nullptr);
  FusionGuard fg(hic);

  for (auto it = hic->topLevel().exprs().begin();
       it != hic->topLevel().exprs().end();) {
    auto next_it = std::next(it);

    auto* for_loop = dynamic_cast<ForLoop*>(*it);
    if (for_loop == nullptr) {
      it = next_it;
      continue;
    }

    // We should check that the loop is stream-parallel. This is not necessary
    // at this moment because all loops are stream-parallel. This is also hard
    // to do becauase hir::ForLoop doesn't point to the source IterDomain.

    auto* get_current_stream = IrBuilder::create<GetCurrentStream>();
    Stream* main_stream = get_current_stream->stream();
    hic->topLevel().insert(it, get_current_stream);

    // At the beginning of each iteration: set stream and synchronize with main
    // stream
    auto* worker_stream = IrBuilder::create<Stream>(for_loop->index());
    auto* set_stream = IrBuilder::create<SetCurrentStream>(worker_stream);
    auto* sync_main = IrBuilder::create<Synchronize>(main_stream);
    auto old_begin = for_loop->body().exprs().begin();
    for_loop->body().insert(old_begin, set_stream);
    for_loop->body().insert(old_begin, sync_main);

    // After the loop: create a joining loop to synchronize all worker streams
    hic->topLevel().insert(
        next_it, IrBuilder::create<SetCurrentStream>(main_stream));
    auto* join_loop = IrBuilder::create<ForLoop>(
        for_loop->index(), for_loop->start(), for_loop->stop());
    hic->topLevel().insert(next_it, join_loop);

    // In the joining loop: synchronize each worker stream
    auto* join_worker_stream = IrBuilder::create<Stream>(join_loop->index());
    auto* sync_worker = IrBuilder::create<Synchronize>(join_worker_stream);
    join_loop->body().push_back(sync_worker);

    it = next_it;
  }
}

@wujingyue wujingyue closed this Jan 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants