From 26a4542d1bbeed712f89090e276bd0cac8edb2dc Mon Sep 17 00:00:00 2001 From: David Sidler Date: Tue, 29 Oct 2024 13:12:35 -0700 Subject: [PATCH] Add simple allgather example; fix initialization of output buffer --- .../mscclang/mscclpp/simple/allgather_ring.py | 70 +++++++++++++++++++ msccl/language/collectives.py | 4 +- msccl/language/mscclpp/ir.py | 2 +- 3 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 examples/mscclang/mscclpp/simple/allgather_ring.py diff --git a/examples/mscclang/mscclpp/simple/allgather_ring.py b/examples/mscclang/mscclpp/simple/allgather_ring.py new file mode 100644 index 0000000..7023542 --- /dev/null +++ b/examples/mscclang/mscclpp/simple/allgather_ring.py @@ -0,0 +1,70 @@ +# Copyright (c) 2024 Advanced Micro Devices. +# Licensed under the MIT License. + +import argparse +from msccl.language import * +from msccl.topologies import * +from msccl.language.collectives import AllGather + +# Implementation of ring-based AllGather where data is being pushed using put() +def allgather_ring_push(size, in_place): + topology = fully_connected(size) + collective = AllGather(size, 1, in_place) + with MSCCLPPProgram(f"allgather_ring_push", topology, collective, 1): + # If not in-place copy local data chunk to output buffer + if not in_place: + for rank in range(0, size): + c = chunk(rank, Buffer.input, 0) + c.copy(rank, Buffer.output, rank) + # Iterate over steps + for step in range(0, size - 1): + for rank in range(0, size): + # Put & Signal + index = (rank - step) % size + c = chunk(rank, Buffer.output, index) + next_rank = (rank + 1) % size + c.put(next_rank, Buffer.output, index, sendtb=0) # TODO how does this guarantee that the buffer is ready? + c.signal(next_rank, Buffer.output, index, 0) + # Wait + prev_rank = (rank - 1) % size + recv_index = (rank - step - 1) % size + c = chunk(rank, Buffer.output, recv_index) + c.wait(prev_rank, Buffer.output, recv_index, 0) + Json() + Check() + +# Implementation of ring-based AllGather where data is being pulled using get() +def allgather_ring_pull(size, in_place): + topology = fully_connected(size) + collective = AllGather(size, 1, in_place) + with MSCCLPPProgram(f"allgather_ring_pull", topology, collective, 1): + # If not in-place copy local data chunk to output buffer + if not in_place: + for rank in range(0, size): + c = chunk(rank, Buffer.input, 0) + c.copy(rank, Buffer.output, rank) + # Iterate over steps + for step in range(0, size - 1): # size - 1): + for rank in range(0, size): + # Signal + index = (rank - step) % size + c = chunk(rank, Buffer.output, index) + next_rank = (rank + 1) % size + c.signal(next_rank, Buffer.output, index, 0) + # Wait & Get + prev_rank = (rank - 1) % size + recv_index = (rank - step - 1) % size + c = chunk(rank, Buffer.output, recv_index) + c.wait(prev_rank, Buffer.output, recv_index, 0) + c.get(prev_rank, Buffer.output, recv_index, recvtb=0) + Json() + Check() + +parser = argparse.ArgumentParser() +parser.add_argument('num_gpus', type=int, help ='number of gpus') +parser.add_argument('--in-place', type=bool, default=True, help='Do collective in-place?') + +args = parser.parse_args() + +# allgather_ring_push(args.num_gpus, args.in_place) +allgather_ring_pull(args.num_gpus, args.in_place) \ No newline at end of file diff --git a/msccl/language/collectives.py b/msccl/language/collectives.py index 6dc20cc..d588957 100755 --- a/msccl/language/collectives.py +++ b/msccl/language/collectives.py @@ -72,7 +72,7 @@ def init_buffers(self): if self.inplace: # Inplace AllGather only uses the output buffer for r in range(self.num_ranks): - output_buffer = [None] * (self.num_ranks * self.chunk_factor) + output_buffer = [Chunk(-1, -1, -1, -1)] * (self.num_ranks * self.chunk_factor) for ch in range(self.chunk_factor): output_buffer[r * self.chunk_factor + ch] = Chunk(r, ch, -1, r * self.chunk_factor + ch) buffers = { @@ -83,7 +83,7 @@ def init_buffers(self): else: for r in range(self.num_ranks): input_buffer = [None] * self.chunk_factor - output_buffer = [None] * (self.num_ranks * self.chunk_factor) + output_buffer = [Chunk(-1, -1, -1, -1)] * (self.num_ranks * self.chunk_factor) for ch in range(self.chunk_factor): input_buffer[ch] = Chunk(r, ch, -1, r * self.chunk_factor + ch) buffers = {Buffer.input: input_buffer, Buffer.output: output_buffer} diff --git a/msccl/language/mscclpp/ir.py b/msccl/language/mscclpp/ir.py index f5ba0fd..6c5c435 100644 --- a/msccl/language/mscclpp/ir.py +++ b/msccl/language/mscclpp/ir.py @@ -315,7 +315,7 @@ def remove_empty_fields(d): gpus.append(gpu_instance) obj = { "name": program.name, - "colletive": program.collective, + "collective": program.collective, "protocol": program.protocol, "inplace": program.inplace, "gpus": gpus,