From 1442fb35ed9f32e8ceca1517beb80e4242f67b69 Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Tue, 17 Feb 2026 23:40:16 +0530 Subject: [PATCH 01/13] Add MPI_Allgatherv --- pylops_mpi/Distributed.py | 5 ++--- pylops_mpi/signalprocessing/Fredholm1.py | 8 ++------ pylops_mpi/utils/_mpi.py | 17 ++++++++++++----- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/pylops_mpi/Distributed.py b/pylops_mpi/Distributed.py index 569b6aa4..d81eb6c5 100644 --- a/pylops_mpi/Distributed.py +++ b/pylops_mpi/Distributed.py @@ -3,8 +3,8 @@ from mpi4py import MPI from pylops.utils import NDArray from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils -from pylops_mpi.utils._mpi import (mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, mpi_sendrecv, - _prepare_allgather_inputs, _unroll_allgather_recv) +from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv +from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv from pylops_mpi.utils import deps cupy_message = pylops_deps.cupy_import("the DistributedArray module") @@ -32,7 +32,6 @@ class DistributedMixIn: MPI installation is not available). """ - def _allreduce(self, base_comm: MPI.Comm, base_comm_nccl: NcclCommunicatorType, diff --git a/pylops_mpi/signalprocessing/Fredholm1.py b/pylops_mpi/signalprocessing/Fredholm1.py index 2969e3c9..e70ce7cc 100644 --- a/pylops_mpi/signalprocessing/Fredholm1.py +++ b/pylops_mpi/signalprocessing/Fredholm1.py @@ -132,7 +132,7 @@ def _matvec(self, x: DistributedArray) -> DistributedArray: engine=y.engine)).ravel() return y - def _rmatvec(self, x: NDArray) -> NDArray: + def _rmatvec(self, x: DistributedArray) -> DistributedArray: ncp = get_module(x.engine) if x.partition not in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]: raise ValueError(f"x should have partition={Partition.BROADCAST},{Partition.UNSAFE_BROADCAST}" @@ -151,11 +151,7 @@ def _rmatvec(self, x: NDArray) -> NDArray: if hasattr(self, "GT"): y1 = ncp.matmul(self.GT, x) else: - y1 = ( - ncp.matmul(x.transpose(0, 2, 1).conj(), self.G) - .transpose(0, 2, 1) - .conj() - ) + y1 = ncp.matmul(self.G.transpose(0, 2, 1).conj(), x) else: y1 = ncp.squeeze(ncp.zeros((self.nsls[self.rank], self.ny, self.nz), dtype=self.dtype)) if hasattr(self, "GT"): diff --git a/pylops_mpi/utils/_mpi.py b/pylops_mpi/utils/_mpi.py index ad702d73..af157a2c 100644 --- a/pylops_mpi/utils/_mpi.py +++ b/pylops_mpi/utils/_mpi.py @@ -13,7 +13,6 @@ from pylops.utils import NDArray from pylops.utils.backend import get_module from pylops_mpi.utils import deps -from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv def mpi_allgather(base_comm: MPI.Comm, @@ -45,11 +44,19 @@ def mpi_allgather(base_comm: MPI.Comm, """ if deps.cuda_aware_mpi_enabled or engine == "numpy": + ncp = get_module(engine) send_shapes = base_comm.allgather(send_buf.shape) - (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine=engine) - recv_buffer_to_use = recv_buf if recv_buf else padded_recv - _mpi_calls(base_comm, "Allgather", padded_send, recv_buffer_to_use, engine=engine) - return _unroll_allgather_recv(recv_buffer_to_use, padded_send.shape, send_shapes) + recvcounts = base_comm.allgather(send_buf.size) + recv_buf = recv_buf if recv_buf else ncp.zeros(sum(recvcounts), dtype=send_buf.dtype) + displs = [0] + for i in range(1, len(recvcounts)): + displs.append(displs[i - 1] + recvcounts[i - 1]) + _mpi_calls(base_comm, "Allgatherv", send_buf, + [recv_buf, recvcounts, list(displs), MPI._typedict[send_buf.dtype.char]], engine=engine) + return [ + recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_shapes[i]) + for i in range(base_comm.size) + ] else: # CuPy with non-CUDA-aware MPI if recv_buf is None: From ebcbe8fe803389fa15cde8b921ec136949c1aaa6 Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Wed, 18 Feb 2026 11:26:36 +0530 Subject: [PATCH 02/13] Add MPI_Allgather for uniform and MPI_Allgatherv for variable length arrays --- pylops_mpi/signalprocessing/Fredholm1.py | 6 +++++- pylops_mpi/utils/_mpi.py | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pylops_mpi/signalprocessing/Fredholm1.py b/pylops_mpi/signalprocessing/Fredholm1.py index e70ce7cc..ccd6fdd2 100644 --- a/pylops_mpi/signalprocessing/Fredholm1.py +++ b/pylops_mpi/signalprocessing/Fredholm1.py @@ -151,7 +151,11 @@ def _rmatvec(self, x: DistributedArray) -> DistributedArray: if hasattr(self, "GT"): y1 = ncp.matmul(self.GT, x) else: - y1 = ncp.matmul(self.G.transpose(0, 2, 1).conj(), x) + y1 = ( + ncp.matmul(x.transpose(0, 2, 1).conj(), self.G) + .transpose(0, 2, 1) + .conj() + ) else: y1 = ncp.squeeze(ncp.zeros((self.nsls[self.rank], self.ny, self.nz), dtype=self.dtype)) if hasattr(self, "GT"): diff --git a/pylops_mpi/utils/_mpi.py b/pylops_mpi/utils/_mpi.py index af157a2c..d160a352 100644 --- a/pylops_mpi/utils/_mpi.py +++ b/pylops_mpi/utils/_mpi.py @@ -48,11 +48,14 @@ def mpi_allgather(base_comm: MPI.Comm, send_shapes = base_comm.allgather(send_buf.shape) recvcounts = base_comm.allgather(send_buf.size) recv_buf = recv_buf if recv_buf else ncp.zeros(sum(recvcounts), dtype=send_buf.dtype) + if len(set(send_shapes)) == 1: + _mpi_calls(base_comm, "Allgather", send_buf.copy(), recv_buf, engine=engine) + return [chunk.reshape(send_shapes[0]) for chunk in ncp.split(recv_buf, base_comm.size)] displs = [0] for i in range(1, len(recvcounts)): displs.append(displs[i - 1] + recvcounts[i - 1]) - _mpi_calls(base_comm, "Allgatherv", send_buf, - [recv_buf, recvcounts, list(displs), MPI._typedict[send_buf.dtype.char]], engine=engine) + _mpi_calls(base_comm, "Allgatherv", send_buf.copy(), + [recv_buf, recvcounts, displs, MPI._typedict[send_buf.dtype.char]], engine=engine) return [ recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_shapes[i]) for i in range(base_comm.size) From b62fcbe537d6d2a3b304bba0ad9a76390866c87c Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Fri, 13 Mar 2026 23:22:13 +0530 Subject: [PATCH 03/13] Add ncp.ascontiguousarray in allgather --- pylops_mpi/utils/_mpi.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/pylops_mpi/utils/_mpi.py b/pylops_mpi/utils/_mpi.py index d160a352..ffb0585f 100644 --- a/pylops_mpi/utils/_mpi.py +++ b/pylops_mpi/utils/_mpi.py @@ -7,7 +7,7 @@ "mpi_sendrecv" ] -from typing import Optional +from typing import List, Optional from mpi4py import MPI from pylops.utils import NDArray @@ -19,7 +19,7 @@ def mpi_allgather(base_comm: MPI.Comm, send_buf: NDArray, recv_buf: Optional[NDArray] = None, engine: str = "numpy", - ) -> NDArray: + ) -> List[NDArray]: """MPI_Allallgather/allallgather Dispatch allgather routine based on type of input and availability of @@ -39,8 +39,8 @@ def mpi_allgather(base_comm: MPI.Comm, Returns ------- - recv_buf : :obj:`numpy.ndarray` or :obj:`cupy.ndarray` - A buffer containing the gathered data from all ranks. + recv_buf : :obj:`list` + A list of arrays containing the gathered data from all ranks. """ if deps.cuda_aware_mpi_enabled or engine == "numpy": @@ -49,17 +49,19 @@ def mpi_allgather(base_comm: MPI.Comm, recvcounts = base_comm.allgather(send_buf.size) recv_buf = recv_buf if recv_buf else ncp.zeros(sum(recvcounts), dtype=send_buf.dtype) if len(set(send_shapes)) == 1: - _mpi_calls(base_comm, "Allgather", send_buf.copy(), recv_buf, engine=engine) + _mpi_calls(base_comm, "Allgather", ncp.ascontiguousarray(send_buf), recv_buf, engine=engine) return [chunk.reshape(send_shapes[0]) for chunk in ncp.split(recv_buf, base_comm.size)] - displs = [0] - for i in range(1, len(recvcounts)): - displs.append(displs[i - 1] + recvcounts[i - 1]) - _mpi_calls(base_comm, "Allgatherv", send_buf.copy(), - [recv_buf, recvcounts, displs, MPI._typedict[send_buf.dtype.char]], engine=engine) - return [ - recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_shapes[i]) - for i in range(base_comm.size) - ] + else: + # displs represent the starting offsets in recv_buf where data from each rank will be placed + displs = [0] + for i in range(1, len(recvcounts)): + displs.append(displs[i - 1] + recvcounts[i - 1]) + _mpi_calls(base_comm, "Allgatherv", ncp.ascontiguousarray(send_buf), + [recv_buf, recvcounts, displs, MPI._typedict[send_buf.dtype.char]], engine=engine) + return [ + recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_shapes[i]) + for i in range(base_comm.size) + ] else: # CuPy with non-CUDA-aware MPI if recv_buf is None: From 2f1225ad4ef95dc327031ce7d5f59b01449d77e3 Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Mon, 16 Mar 2026 20:00:09 +0530 Subject: [PATCH 04/13] Update docstring --- pylops_mpi/utils/_mpi.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pylops_mpi/utils/_mpi.py b/pylops_mpi/utils/_mpi.py index ffb0585f..0bdd54fd 100644 --- a/pylops_mpi/utils/_mpi.py +++ b/pylops_mpi/utils/_mpi.py @@ -20,10 +20,14 @@ def mpi_allgather(base_comm: MPI.Comm, recv_buf: Optional[NDArray] = None, engine: str = "numpy", ) -> List[NDArray]: - """MPI_Allallgather/allallgather + """MPI_Allgather/allgather - Dispatch allgather routine based on type of input and availability of - CUDA-Aware MPI + Dispatch the appropriate allgather routine based on buffer sizes and + CUDA-aware MPI availability. + + If all ranks provide buffers of equal size, the standard `Allgather` + collective is used. Otherwise, `Allgatherv` is invoked to handle + variable-sized buffers. Parameters ---------- From f5093bd6ae4c6dadf76af2d1512d07e7271c1d6d Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Mon, 16 Mar 2026 20:07:09 +0530 Subject: [PATCH 05/13] Minor Fix --- pylops_mpi/Distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylops_mpi/Distributed.py b/pylops_mpi/Distributed.py index d81eb6c5..eda684b4 100644 --- a/pylops_mpi/Distributed.py +++ b/pylops_mpi/Distributed.py @@ -3,7 +3,7 @@ from mpi4py import MPI from pylops.utils import NDArray from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils -from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv +from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, mpi_sendrecv from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv from pylops_mpi.utils import deps From 6ecbfdc0e2f97f94dc70c053acd2d284498a79c9 Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Wed, 18 Mar 2026 23:57:09 +0530 Subject: [PATCH 06/13] Add _prepare_allgather_inputs_mpi and update unroll gather recv --- pylops_mpi/Distributed.py | 4 +- pylops_mpi/utils/_common.py | 82 +++++++++++++++++++++++++++++-------- pylops_mpi/utils/_mpi.py | 18 +++----- pylops_mpi/utils/_nccl.py | 2 +- 4 files changed, 72 insertions(+), 34 deletions(-) diff --git a/pylops_mpi/Distributed.py b/pylops_mpi/Distributed.py index eda684b4..e444fb5c 100644 --- a/pylops_mpi/Distributed.py +++ b/pylops_mpi/Distributed.py @@ -146,7 +146,7 @@ def _allgather(self, send_shapes = base_comm.allgather(send_buf.shape) (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy") raw_recv = nccl_allgather(base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv) - return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes) + return _unroll_allgather_recv(raw_recv, send_shapes, padded_send.shape, engine="cupy") else: if isinstance(send_buf, (tuple, list, int)): return base_comm.allgather(send_buf) @@ -188,7 +188,7 @@ def _allgather_subcomm(self, send_shapes = sub_comm._allgather_subcomm(send_buf.shape) (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy") raw_recv = nccl_allgather(sub_comm, padded_send, recv_buf if recv_buf else padded_recv) - return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes) + return _unroll_allgather_recv(raw_recv, send_shapes, padded_send.shape, engine="cupy") else: return mpi_allgather(sub_comm, send_buf, recv_buf, engine) diff --git a/pylops_mpi/utils/_common.py b/pylops_mpi/utils/_common.py index 895265df..cab4681d 100644 --- a/pylops_mpi/utils/_common.py +++ b/pylops_mpi/utils/_common.py @@ -52,8 +52,44 @@ def _prepare_allgather_inputs(send_buf, send_buf_shapes, engine): return send_buf, recv_buf -def _unroll_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> list: - r"""Unrolll recv_buf after Buffered Allgather (MPI and NCCL) +def _prepare_allgather_inputs_mpi(send_buf, send_buf_shapes, recvcounts, engine): + r""" + Prepare send_buf and recv_buf for MPI allgather (mpi_allgather) + + Parameters + ---------- + send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + The data buffer to be sent for allgather. + send_buf_shapes: :obj:`list` + A list of shapes for each send_buf (used to calculate padding size) + recvcounts: :obj:`list` + The element counts per rank in mpi_allgather + engine : :obj:`str` + Engine used to store array (``numpy`` or ``cupy``) + + Returns + ------- + send_buf: :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + A buffer containing the data and padded elements to be sent by this rank. + recv_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + A buffer to gather data from all ranks. + displs : list, optional + The starting offsets in recv_buf where data from each rank in mpi_allgather + """ + ncp = get_module(engine) + recv_buf = ncp.zeros(sum(recvcounts), dtype=send_buf.dtype) + if len(set(send_buf_shapes)) == 1: + displs = None + else: + displs = [0] + for i in range(1, len(recvcounts)): + displs.append(displs[i - 1] + recvcounts[i - 1]) + return ncp.ascontiguousarray(send_buf), recv_buf, displs + + +def _unroll_allgather_recv(recv_buf, send_buf_shapes, padded_send_buf_shape=None, + recvcounts=None, displs=None, engine="numpy") -> list: + r"""Unroll recv_buf after Buffered Allgather (MPI and NCCL) Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays Each GPU may send array with a different shape, so the return type has to be a list of array @@ -63,27 +99,37 @@ def _unroll_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> ---------- recv_buf: :obj:`cupy.ndarray` or array-like The data buffer returned from nccl_allgather call - padded_send_buf_shape: :obj:`tuple`:int - The size of send_buf after padding used in nccl_allgather send_buf_shapes: :obj:`list` A list of original shapes for each GPU send_buf prior to padding - + padded_send_buf_shape : tuple, optional + The size of send_buf after padding used in nccl_allgather + recvcounts : list, optional + The element counts per rank in mpi_allgather + displs : list, optional + The starting offsets in recv_buf where data from each rank in mpi_allgather + engine : :obj:`str` + Engine used to store array (``numpy`` or ``cupy``) Returns ------- chunks: :obj:`list` A list of `cupy.ndarray` from each GPU with the padded element removed """ + ncp = get_module(engine) ndev = len(send_buf_shapes) - # extract an individual array from each device - chunk_size = np.prod(padded_send_buf_shape) - chunks = [ - recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev) - ] - - # Remove padding from each array: the padded value may appear somewhere - # in the middle of the flat array and thus the reshape and slicing for each dimension is required - for i in range(ndev): - slicing = tuple(slice(0, end) for end in send_buf_shapes[i]) - chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing] - - return chunks + if padded_send_buf_shape is not None: + chunk_size = int(np.prod(padded_send_buf_shape)) + chunks = [ + recv_buf[i * chunk_size:(i + 1) * chunk_size] + for i in range(ndev) + ] + for i in range(ndev): + slicing = tuple(slice(0, end) for end in send_buf_shapes[i]) + chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing] + return chunks + if displs is not None: + return [ + recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_buf_shapes[i]) + for i in range(ndev) + ] + chunks = ncp.split(recv_buf, ndev) + return [chunk.reshape(send_buf_shapes[0]) for chunk in chunks] diff --git a/pylops_mpi/utils/_mpi.py b/pylops_mpi/utils/_mpi.py index 0bdd54fd..680e2d04 100644 --- a/pylops_mpi/utils/_mpi.py +++ b/pylops_mpi/utils/_mpi.py @@ -13,6 +13,7 @@ from pylops.utils import NDArray from pylops.utils.backend import get_module from pylops_mpi.utils import deps +from pylops_mpi.utils._common import _unroll_allgather_recv, _prepare_allgather_inputs_mpi def mpi_allgather(base_comm: MPI.Comm, @@ -48,24 +49,15 @@ def mpi_allgather(base_comm: MPI.Comm, """ if deps.cuda_aware_mpi_enabled or engine == "numpy": - ncp = get_module(engine) send_shapes = base_comm.allgather(send_buf.shape) recvcounts = base_comm.allgather(send_buf.size) - recv_buf = recv_buf if recv_buf else ncp.zeros(sum(recvcounts), dtype=send_buf.dtype) + send_buf, recv_buf, displs = _prepare_allgather_inputs_mpi(send_buf, send_shapes, recvcounts, engine) if len(set(send_shapes)) == 1: - _mpi_calls(base_comm, "Allgather", ncp.ascontiguousarray(send_buf), recv_buf, engine=engine) - return [chunk.reshape(send_shapes[0]) for chunk in ncp.split(recv_buf, base_comm.size)] + _mpi_calls(base_comm, "Allgather", send_buf, recv_buf, engine=engine) else: - # displs represent the starting offsets in recv_buf where data from each rank will be placed - displs = [0] - for i in range(1, len(recvcounts)): - displs.append(displs[i - 1] + recvcounts[i - 1]) - _mpi_calls(base_comm, "Allgatherv", ncp.ascontiguousarray(send_buf), + _mpi_calls(base_comm, "Allgatherv", send_buf, [recv_buf, recvcounts, displs, MPI._typedict[send_buf.dtype.char]], engine=engine) - return [ - recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_shapes[i]) - for i in range(base_comm.size) - ] + return _unroll_allgather_recv(recv_buf, send_shapes, recvcounts=recvcounts, displs=displs, engine=engine) else: # CuPy with non-CUDA-aware MPI if recv_buf is None: diff --git a/pylops_mpi/utils/_nccl.py b/pylops_mpi/utils/_nccl.py index 11b374c7..d61e45ae 100644 --- a/pylops_mpi/utils/_nccl.py +++ b/pylops_mpi/utils/_nccl.py @@ -284,7 +284,7 @@ def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray: send_buf, recv_buf = _prepare_allgather_inputs(local_array, local_shapes, engine="cupy") nccl_allgather(nccl_comm, send_buf, recv_buf) - chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, local_shapes) + chunks = _unroll_allgather_recv(recv_buf, local_shapes, send_buf.shape, engine="cupy") # combine back to single global array return cp.concatenate(chunks, axis=axis) From 935059464a9ca5f007971f506187391986c9ce2e Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Thu, 19 Mar 2026 19:54:22 +0530 Subject: [PATCH 07/13] Change _prepare_allgather_inputs to _prepare_allgather_inputs_nccl --- pylops_mpi/Distributed.py | 6 +++--- pylops_mpi/utils/_common.py | 5 +++-- pylops_mpi/utils/_nccl.py | 4 ++-- tests_nccl/test_ncclutils_nccl.py | 4 ++-- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pylops_mpi/Distributed.py b/pylops_mpi/Distributed.py index e444fb5c..e4f37507 100644 --- a/pylops_mpi/Distributed.py +++ b/pylops_mpi/Distributed.py @@ -4,7 +4,7 @@ from pylops.utils import NDArray from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, mpi_sendrecv -from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv +from pylops_mpi.utils._common import _prepare_allgather_inputs_nccl, _unroll_allgather_recv from pylops_mpi.utils import deps cupy_message = pylops_deps.cupy_import("the DistributedArray module") @@ -144,7 +144,7 @@ def _allgather(self, return nccl_allgather(base_comm_nccl, send_buf, recv_buf) else: send_shapes = base_comm.allgather(send_buf.shape) - (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy") + (padded_send, padded_recv) = _prepare_allgather_inputs_nccl(send_buf, send_shapes, engine="cupy") raw_recv = nccl_allgather(base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv) return _unroll_allgather_recv(raw_recv, send_shapes, padded_send.shape, engine="cupy") else: @@ -186,7 +186,7 @@ def _allgather_subcomm(self, return nccl_allgather(sub_comm, send_buf, recv_buf) else: send_shapes = sub_comm._allgather_subcomm(send_buf.shape) - (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy") + (padded_send, padded_recv) = _prepare_allgather_inputs_nccl(send_buf, send_shapes, engine="cupy") raw_recv = nccl_allgather(sub_comm, padded_send, recv_buf if recv_buf else padded_recv) return _unroll_allgather_recv(raw_recv, send_shapes, padded_send.shape, engine="cupy") else: diff --git a/pylops_mpi/utils/_common.py b/pylops_mpi/utils/_common.py index cab4681d..3446daa2 100644 --- a/pylops_mpi/utils/_common.py +++ b/pylops_mpi/utils/_common.py @@ -1,5 +1,6 @@ __all__ = [ - "_prepare_allgather_inputs", + "_prepare_allgather_inputs_nccl", + "_prepare_allgather_inputs_mpi", "_unroll_allgather_recv" ] @@ -9,7 +10,7 @@ # TODO: return type annotation for both cupy and numpy -def _prepare_allgather_inputs(send_buf, send_buf_shapes, engine): +def _prepare_allgather_inputs_nccl(send_buf, send_buf_shapes, engine): r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather) Buffered Allgather (MPI and NCCL) requires the sending buffer to have the same size for every device. diff --git a/pylops_mpi/utils/_nccl.py b/pylops_mpi/utils/_nccl.py index d61e45ae..5b9f49fd 100644 --- a/pylops_mpi/utils/_nccl.py +++ b/pylops_mpi/utils/_nccl.py @@ -16,7 +16,7 @@ import os import cupy as cp import cupy.cuda.nccl as nccl -from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv +from pylops_mpi.utils._common import _prepare_allgather_inputs_nccl, _unroll_allgather_recv cupy_to_nccl_dtype = { "float32": nccl.NCCL_FLOAT32, @@ -282,7 +282,7 @@ def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray: Global array gathered from all GPUs and concatenated along `axis`. """ - send_buf, recv_buf = _prepare_allgather_inputs(local_array, local_shapes, engine="cupy") + send_buf, recv_buf = _prepare_allgather_inputs_nccl(local_array, local_shapes, engine="cupy") nccl_allgather(nccl_comm, send_buf, recv_buf) chunks = _unroll_allgather_recv(recv_buf, local_shapes, send_buf.shape, engine="cupy") diff --git a/tests_nccl/test_ncclutils_nccl.py b/tests_nccl/test_ncclutils_nccl.py index 27cd221d..0ab29d6a 100644 --- a/tests_nccl/test_ncclutils_nccl.py +++ b/tests_nccl/test_ncclutils_nccl.py @@ -9,7 +9,7 @@ import pytest from pylops_mpi.utils._nccl import initialize_nccl_comm, nccl_allgather -from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv +from pylops_mpi.utils._common import _prepare_allgather_inputs_nccl, _unroll_allgather_recv from pylops_mpi.utils.deps import nccl_enabled np.random.seed(42) @@ -90,7 +90,7 @@ def test_allgather_differentsize_withrecbuf(par): # Gathered array send_shapes = MPI.COMM_WORLD.allgather(local_array.shape) - send_buf, recv_buf = _prepare_allgather_inputs(local_array, send_shapes, engine="cupy") + send_buf, recv_buf = _prepare_allgather_inputs_nccl(local_array, send_shapes, engine="cupy") recv_buf = nccl_allgather(nccl_comm, send_buf, recv_buf) chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, send_shapes) gathered_array = cp.concatenate(chunks) From ffcfcfac168c9bcf3ce9b3fd6ea05370155b3e80 Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Sat, 21 Mar 2026 13:57:17 +0530 Subject: [PATCH 08/13] Minor fix in tests_ncclutils --- tests_nccl/test_ncclutils_nccl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests_nccl/test_ncclutils_nccl.py b/tests_nccl/test_ncclutils_nccl.py index 0ab29d6a..a796aea4 100644 --- a/tests_nccl/test_ncclutils_nccl.py +++ b/tests_nccl/test_ncclutils_nccl.py @@ -92,7 +92,7 @@ def test_allgather_differentsize_withrecbuf(par): send_shapes = MPI.COMM_WORLD.allgather(local_array.shape) send_buf, recv_buf = _prepare_allgather_inputs_nccl(local_array, send_shapes, engine="cupy") recv_buf = nccl_allgather(nccl_comm, send_buf, recv_buf) - chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, send_shapes) + chunks = _unroll_allgather_recv(recv_buf, send_shapes, send_buf.shape, engine="cupy") gathered_array = cp.concatenate(chunks) # Compare with global array created in rank0 From 811506de166453016099b712b5f1188412d748bc Mon Sep 17 00:00:00 2001 From: Rohan Babbar Date: Sat, 21 Mar 2026 14:56:55 +0530 Subject: [PATCH 09/13] Remove int() in chunk size calculation --- pylops_mpi/utils/_common.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pylops_mpi/utils/_common.py b/pylops_mpi/utils/_common.py index 3446daa2..3ce975eb 100644 --- a/pylops_mpi/utils/_common.py +++ b/pylops_mpi/utils/_common.py @@ -115,10 +115,9 @@ def _unroll_allgather_recv(recv_buf, send_buf_shapes, padded_send_buf_shape=None chunks: :obj:`list` A list of `cupy.ndarray` from each GPU with the padded element removed """ - ncp = get_module(engine) ndev = len(send_buf_shapes) if padded_send_buf_shape is not None: - chunk_size = int(np.prod(padded_send_buf_shape)) + chunk_size = np.prod(padded_send_buf_shape) chunks = [ recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev) @@ -132,5 +131,6 @@ def _unroll_allgather_recv(recv_buf, send_buf_shapes, padded_send_buf_shape=None recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_buf_shapes[i]) for i in range(ndev) ] + ncp = get_module(engine) chunks = ncp.split(recv_buf, ndev) return [chunk.reshape(send_buf_shapes[0]) for chunk in chunks] From 175a396b61993f425ed08fb7d84e2856212587ae Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Sun, 22 Mar 2026 16:16:04 +0530 Subject: [PATCH 10/13] Add comments and improve functions --- pylops_mpi/Distributed.py | 8 +- pylops_mpi/utils/_common.py | 138 +++++++----------------------- pylops_mpi/utils/_mpi.py | 51 +++++++++-- pylops_mpi/utils/_nccl.py | 49 ++++++++++- tests_nccl/test_ncclutils_nccl.py | 6 +- 5 files changed, 129 insertions(+), 123 deletions(-) diff --git a/pylops_mpi/Distributed.py b/pylops_mpi/Distributed.py index e4f37507..7ce69d01 100644 --- a/pylops_mpi/Distributed.py +++ b/pylops_mpi/Distributed.py @@ -4,7 +4,7 @@ from pylops.utils import NDArray from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, mpi_sendrecv -from pylops_mpi.utils._common import _prepare_allgather_inputs_nccl, _unroll_allgather_recv +from pylops_mpi.utils._common import _unroll_allgather_recv from pylops_mpi.utils import deps cupy_message = pylops_deps.cupy_import("the DistributedArray module") @@ -12,7 +12,7 @@ if nccl_message is None and cupy_message is None: from pylops_mpi.utils._nccl import ( - nccl_allgather, nccl_allreduce, nccl_bcast, nccl_send, nccl_recv, nccl_sendrecv + nccl_allgather, nccl_allreduce, nccl_bcast, nccl_send, nccl_recv, nccl_sendrecv, _prepare_allgather_inputs_nccl ) from cupy.cuda.nccl import NcclCommunicator else: @@ -146,7 +146,7 @@ def _allgather(self, send_shapes = base_comm.allgather(send_buf.shape) (padded_send, padded_recv) = _prepare_allgather_inputs_nccl(send_buf, send_shapes, engine="cupy") raw_recv = nccl_allgather(base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv) - return _unroll_allgather_recv(raw_recv, send_shapes, padded_send.shape, engine="cupy") + return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes) else: if isinstance(send_buf, (tuple, list, int)): return base_comm.allgather(send_buf) @@ -188,7 +188,7 @@ def _allgather_subcomm(self, send_shapes = sub_comm._allgather_subcomm(send_buf.shape) (padded_send, padded_recv) = _prepare_allgather_inputs_nccl(send_buf, send_shapes, engine="cupy") raw_recv = nccl_allgather(sub_comm, padded_send, recv_buf if recv_buf else padded_recv) - return _unroll_allgather_recv(raw_recv, send_shapes, padded_send.shape, engine="cupy") + return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes) else: return mpi_allgather(sub_comm, send_buf, recv_buf, engine) diff --git a/pylops_mpi/utils/_common.py b/pylops_mpi/utils/_common.py index 3ce975eb..c14c5e99 100644 --- a/pylops_mpi/utils/_common.py +++ b/pylops_mpi/utils/_common.py @@ -1,136 +1,56 @@ __all__ = [ - "_prepare_allgather_inputs_nccl", - "_prepare_allgather_inputs_mpi", "_unroll_allgather_recv" ] - import numpy as np -from pylops.utils.backend import get_module - - -# TODO: return type annotation for both cupy and numpy -def _prepare_allgather_inputs_nccl(send_buf, send_buf_shapes, engine): - r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather) - Buffered Allgather (MPI and NCCL) requires the sending buffer to have the same size for every device. - Therefore, padding is required when the array is not evenly partitioned across - all the ranks. The padding is applied such that the each dimension of the sending buffers - is equal to the max size of that dimension across all ranks. - Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size - - Parameters - ---------- - send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like - The data buffer from the local GPU to be sent for allgather. - send_buf_shapes: :obj:`list` - A list of shapes for each GPU send_buf (used to calculate padding size) - engine : :obj:`str` - Engine used to store array (``numpy`` or ``cupy``) - - Returns - ------- - send_buf: :obj:`cupy.ndarray` - A buffer containing the data and padded elements to be sent by this rank. - recv_buf : :obj:`cupy.ndarray` - An empty, padded buffer to gather data from all GPUs. - """ - ncp = get_module(engine) - sizes_each_dim = list(zip(*send_buf_shapes)) - send_shape = tuple(map(max, sizes_each_dim)) - pad_size = [ - (0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape) - ] - - send_buf = ncp.pad( - send_buf, pad_size, mode="constant", constant_values=0 - ) - - ndev = len(send_buf_shapes) - recv_buf = ncp.zeros(ndev * send_buf.size, dtype=send_buf.dtype) - - return send_buf, recv_buf - - -def _prepare_allgather_inputs_mpi(send_buf, send_buf_shapes, recvcounts, engine): - r""" - Prepare send_buf and recv_buf for MPI allgather (mpi_allgather) - - Parameters - ---------- - send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like - The data buffer to be sent for allgather. - send_buf_shapes: :obj:`list` - A list of shapes for each send_buf (used to calculate padding size) - recvcounts: :obj:`list` - The element counts per rank in mpi_allgather - engine : :obj:`str` - Engine used to store array (``numpy`` or ``cupy``) - - Returns - ------- - send_buf: :obj: `numpy.ndarray` or `cupy.ndarray` or array-like - A buffer containing the data and padded elements to be sent by this rank. - recv_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like - A buffer to gather data from all ranks. - displs : list, optional - The starting offsets in recv_buf where data from each rank in mpi_allgather - """ - ncp = get_module(engine) - recv_buf = ncp.zeros(sum(recvcounts), dtype=send_buf.dtype) - if len(set(send_buf_shapes)) == 1: - displs = None - else: - displs = [0] - for i in range(1, len(recvcounts)): - displs.append(displs[i - 1] + recvcounts[i - 1]) - return ncp.ascontiguousarray(send_buf), recv_buf, displs - - -def _unroll_allgather_recv(recv_buf, send_buf_shapes, padded_send_buf_shape=None, - recvcounts=None, displs=None, engine="numpy") -> list: +def _unroll_allgather_recv(recv_buf, chunk_shape, send_buf_shapes, displs=None) -> list: r"""Unroll recv_buf after Buffered Allgather (MPI and NCCL) - Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays - Each GPU may send array with a different shape, so the return type has to be a list of array - instead of the concatenated array. + Depending on the provided parameters, the function: + - uses ``displs`` and element counts to extract variable-sized chunks. + - removes padding and reshapes each chunk using ``padded_send_buf_shape``. + + Each rank may send an array with a different shape, so the return type is a list of arrays + instead of a concatenated array. Parameters ---------- recv_buf: :obj:`cupy.ndarray` or array-like The data buffer returned from nccl_allgather call send_buf_shapes: :obj:`list` - A list of original shapes for each GPU send_buf prior to padding - padded_send_buf_shape : tuple, optional - The size of send_buf after padding used in nccl_allgather - recvcounts : list, optional - The element counts per rank in mpi_allgather + A list ofOriginal shapes of each rank's send_buf before any padding. + chunk_shape : tuple + Shape of each gathered chunk in recv_buf. This must match the shape + used to construct the gathered buffer: use the padded send buffer shape + when padding is required, or the original send buffer shape when no padding is used. displs : list, optional - The starting offsets in recv_buf where data from each rank in mpi_allgather - engine : :obj:`str` - Engine used to store array (``numpy`` or ``cupy``) + Starting offsets in recv_buf for each rank's data, used when chunks have + variable sizes (e.g., mpi_allgather with displacements). Returns ------- - chunks: :obj:`list` - A list of `cupy.ndarray` from each GPU with the padded element removed + chunks : list of ndarray + List of arrays (NumPy or CuPy, depending on ``engine``), one per rank, + reconstructed to their original shapes with any padding removed. """ ndev = len(send_buf_shapes) - if padded_send_buf_shape is not None: - chunk_size = np.prod(padded_send_buf_shape) + if displs is not None: + recvcounts = [int(np.prod(shape)) for shape in send_buf_shapes] + # Slice recv_buf using displacements and then reconstruct the original-shaped chunk. + return [ + recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_buf_shapes[i]) + for i in range(ndev) + ] + else: + chunk_size = np.prod(chunk_shape) chunks = [ recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev) ] + # Remove padding from each array: the padded value may appear somewhere + # in the middle of the flat array and thus the reshape and slicing for each dimension is required for i in range(ndev): slicing = tuple(slice(0, end) for end in send_buf_shapes[i]) - chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing] + chunks[i] = chunks[i].reshape(chunk_shape)[slicing] return chunks - if displs is not None: - return [ - recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_buf_shapes[i]) - for i in range(ndev) - ] - ncp = get_module(engine) - chunks = ncp.split(recv_buf, ndev) - return [chunk.reshape(send_buf_shapes[0]) for chunk in chunks] diff --git a/pylops_mpi/utils/_mpi.py b/pylops_mpi/utils/_mpi.py index 680e2d04..73f359b3 100644 --- a/pylops_mpi/utils/_mpi.py +++ b/pylops_mpi/utils/_mpi.py @@ -4,16 +4,18 @@ "mpi_bcast", "mpi_send", "mpi_recv", - "mpi_sendrecv" + "mpi_sendrecv", + "_prepare_allgather_inputs_mpi" ] from typing import List, Optional +import numpy as np from mpi4py import MPI from pylops.utils import NDArray from pylops.utils.backend import get_module from pylops_mpi.utils import deps -from pylops_mpi.utils._common import _unroll_allgather_recv, _prepare_allgather_inputs_mpi +from pylops_mpi.utils._common import _unroll_allgather_recv def mpi_allgather(base_comm: MPI.Comm, @@ -50,14 +52,13 @@ def mpi_allgather(base_comm: MPI.Comm, """ if deps.cuda_aware_mpi_enabled or engine == "numpy": send_shapes = base_comm.allgather(send_buf.shape) - recvcounts = base_comm.allgather(send_buf.size) - send_buf, recv_buf, displs = _prepare_allgather_inputs_mpi(send_buf, send_shapes, recvcounts, engine) + send_buf, recv_buf, displs, recvcounts = _prepare_allgather_inputs_mpi(send_buf, send_shapes, engine) if len(set(send_shapes)) == 1: _mpi_calls(base_comm, "Allgather", send_buf, recv_buf, engine=engine) else: _mpi_calls(base_comm, "Allgatherv", send_buf, [recv_buf, recvcounts, displs, MPI._typedict[send_buf.dtype.char]], engine=engine) - return _unroll_allgather_recv(recv_buf, send_shapes, recvcounts=recvcounts, displs=displs, engine=engine) + return _unroll_allgather_recv(recv_buf, send_buf.shape, send_shapes, displs) else: # CuPy with non-CUDA-aware MPI if recv_buf is None: @@ -301,3 +302,43 @@ def _mpi_calls(comm: MPI.Comm, func: str, *args, engine: Optional[str] = "numpy" ncp.cuda.Device().synchronize() mpi_func = getattr(comm, func) return mpi_func(*args, **kwargs) + + +def _prepare_allgather_inputs_mpi(send_buf, send_buf_shapes, engine): + r"""Prepare send_buf and recv_buf for MPI allgather (mpi_allgather) + + Buffered Allgather (MPI) supports both uniform and variable-sized data across ranks. Unlike NCCL, padding is + not required when array sizes differ. Instead, displacements are used to correctly place each rank’s data + within the received buffer. The function ensures that the send_buf is contiguous. + + Parameters + ---------- + send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + The data buffer to be sent for allgather. + send_buf_shapes: :obj:`list` + A list of shapes for each send_buf (used to calculate padding size) + engine : :obj:`str` + Engine used to store array (``numpy`` or ``cupy``) + + Returns + ------- + send_buf: :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + A buffer containing the data and padded elements to be sent by this rank. + recv_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + A buffer to gather data from all ranks. + displs : list, optional + Starting offsets in recv_buf for each rank's data, used when chunks have + variable sizes + recvcounts: :obj:`list` + A list of element counts from all ranks, where each entry corresponds to one rank. + """ + ncp = get_module(engine) + recvcounts = [int(np.prod(shape)) for shape in send_buf_shapes] + recv_buf = ncp.zeros(sum(recvcounts), dtype=send_buf.dtype) + if len(set(send_buf_shapes)) == 1: + displs = None + else: + displs = [0] + for i in range(1, len(recvcounts)): + displs.append(displs[i - 1] + recvcounts[i - 1]) + return ncp.ascontiguousarray(send_buf), recv_buf, displs, recvcounts diff --git a/pylops_mpi/utils/_nccl.py b/pylops_mpi/utils/_nccl.py index 5b9f49fd..776a4dcf 100644 --- a/pylops_mpi/utils/_nccl.py +++ b/pylops_mpi/utils/_nccl.py @@ -8,7 +8,8 @@ "nccl_asarray", "nccl_send", "nccl_recv", - "nccl_sendrecv" + "nccl_sendrecv", + "_prepare_allgather_inputs_nccl" ] from enum import IntEnum @@ -16,7 +17,8 @@ import os import cupy as cp import cupy.cuda.nccl as nccl -from pylops_mpi.utils._common import _prepare_allgather_inputs_nccl, _unroll_allgather_recv +from pylops_mpi.utils._common import _unroll_allgather_recv +from pylops.utils.backend import get_module cupy_to_nccl_dtype = { "float32": nccl.NCCL_FLOAT32, @@ -356,3 +358,46 @@ def nccl_sendrecv(nccl_comm, sendbuf, dest, recvbuf, source): nccl_send(nccl_comm, sendbuf, dest, sendbuf.size) nccl_recv(nccl_comm, recvbuf, source) nccl.groupEnd() + + +def _prepare_allgather_inputs_nccl(send_buf, send_buf_shapes, engine): + r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather) + + Buffered Allgather (NCCL) requires the sending buffer to have the same size for every device. + Therefore, padding is required when the array is not evenly partitioned across + all the ranks. The padding is applied such that each dimension of the sending buffers + is equal to the max size of that dimension across all ranks. + + Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size + + Parameters + ---------- + send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + The data buffer from the local GPU to be sent for allgather. + send_buf_shapes: :obj:`list` + A list of shapes for each GPU send_buf (used to calculate padding size) + engine : :obj:`str` + Engine used to store array (``numpy`` or ``cupy``) + + Returns + ------- + send_buf: :obj:`cupy.ndarray` + A buffer containing the data and padded elements to be sent by this rank. + recv_buf : :obj:`cupy.ndarray` + An empty, padded buffer to gather data from all GPUs. + """ + ncp = get_module(engine) + sizes_each_dim = list(zip(*send_buf_shapes)) + send_shape = tuple(map(max, sizes_each_dim)) + pad_size = [ + (0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape) + ] + + send_buf = ncp.pad( + send_buf, pad_size, mode="constant", constant_values=0 + ) + + ndev = len(send_buf_shapes) + recv_buf = ncp.zeros(ndev * send_buf.size, dtype=send_buf.dtype) + + return send_buf, recv_buf diff --git a/tests_nccl/test_ncclutils_nccl.py b/tests_nccl/test_ncclutils_nccl.py index a796aea4..17647660 100644 --- a/tests_nccl/test_ncclutils_nccl.py +++ b/tests_nccl/test_ncclutils_nccl.py @@ -8,8 +8,8 @@ from numpy.testing import assert_allclose import pytest -from pylops_mpi.utils._nccl import initialize_nccl_comm, nccl_allgather -from pylops_mpi.utils._common import _prepare_allgather_inputs_nccl, _unroll_allgather_recv +from pylops_mpi.utils._nccl import initialize_nccl_comm, nccl_allgather, _prepare_allgather_inputs_nccl +from pylops_mpi.utils._common import _unroll_allgather_recv from pylops_mpi.utils.deps import nccl_enabled np.random.seed(42) @@ -92,7 +92,7 @@ def test_allgather_differentsize_withrecbuf(par): send_shapes = MPI.COMM_WORLD.allgather(local_array.shape) send_buf, recv_buf = _prepare_allgather_inputs_nccl(local_array, send_shapes, engine="cupy") recv_buf = nccl_allgather(nccl_comm, send_buf, recv_buf) - chunks = _unroll_allgather_recv(recv_buf, send_shapes, send_buf.shape, engine="cupy") + chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, send_shapes) gathered_array = cp.concatenate(chunks) # Compare with global array created in rank0 From 4fbc5c94901a35909561a425552ae9319d5e21a2 Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Sun, 22 Mar 2026 16:38:25 +0530 Subject: [PATCH 11/13] Minor fix in nccl_asarray --- pylops_mpi/utils/_nccl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylops_mpi/utils/_nccl.py b/pylops_mpi/utils/_nccl.py index 776a4dcf..55636573 100644 --- a/pylops_mpi/utils/_nccl.py +++ b/pylops_mpi/utils/_nccl.py @@ -286,7 +286,7 @@ def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray: send_buf, recv_buf = _prepare_allgather_inputs_nccl(local_array, local_shapes, engine="cupy") nccl_allgather(nccl_comm, send_buf, recv_buf) - chunks = _unroll_allgather_recv(recv_buf, local_shapes, send_buf.shape, engine="cupy") + chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, local_shapes) # combine back to single global array return cp.concatenate(chunks, axis=axis) From 75abc1e7fad9e9ccb73285d25667fd921d8d4987 Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Sun, 22 Mar 2026 19:12:36 +0530 Subject: [PATCH 12/13] Minor change in the docstring --- pylops_mpi/utils/_common.py | 7 ++++--- pylops_mpi/utils/_nccl.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pylops_mpi/utils/_common.py b/pylops_mpi/utils/_common.py index c14c5e99..87438fe9 100644 --- a/pylops_mpi/utils/_common.py +++ b/pylops_mpi/utils/_common.py @@ -10,7 +10,7 @@ def _unroll_allgather_recv(recv_buf, chunk_shape, send_buf_shapes, displs=None) Depending on the provided parameters, the function: - uses ``displs`` and element counts to extract variable-sized chunks. - - removes padding and reshapes each chunk using ``padded_send_buf_shape``. + - removes padding and reshapes each chunk using ``chunk_shape``. Each rank may send an array with a different shape, so the return type is a list of arrays instead of a concatenated array. @@ -20,11 +20,12 @@ def _unroll_allgather_recv(recv_buf, chunk_shape, send_buf_shapes, displs=None) recv_buf: :obj:`cupy.ndarray` or array-like The data buffer returned from nccl_allgather call send_buf_shapes: :obj:`list` - A list ofOriginal shapes of each rank's send_buf before any padding. + A list of original shapes of each rank's send_buf before any padding. chunk_shape : tuple Shape of each gathered chunk in recv_buf. This must match the shape used to construct the gathered buffer: use the padded send buffer shape - when padding is required, or the original send buffer shape when no padding is used. + when padding is required (e.g., nccl_allgather with padding), or the original send buffer + shape when no padding is used. displs : list, optional Starting offsets in recv_buf for each rank's data, used when chunks have variable sizes (e.g., mpi_allgather with displacements). diff --git a/pylops_mpi/utils/_nccl.py b/pylops_mpi/utils/_nccl.py index 55636573..d54f381a 100644 --- a/pylops_mpi/utils/_nccl.py +++ b/pylops_mpi/utils/_nccl.py @@ -17,8 +17,8 @@ import os import cupy as cp import cupy.cuda.nccl as nccl -from pylops_mpi.utils._common import _unroll_allgather_recv from pylops.utils.backend import get_module +from pylops_mpi.utils._common import _unroll_allgather_recv cupy_to_nccl_dtype = { "float32": nccl.NCCL_FLOAT32, From fe05882339a13a08386ae7d6032d4402fd0c7265 Mon Sep 17 00:00:00 2001 From: rohanbabbar04 Date: Sun, 22 Mar 2026 22:54:26 +0530 Subject: [PATCH 13/13] Change param name to buffer_chunk_shape --- pylops_mpi/utils/_common.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/pylops_mpi/utils/_common.py b/pylops_mpi/utils/_common.py index 87438fe9..a5e378a7 100644 --- a/pylops_mpi/utils/_common.py +++ b/pylops_mpi/utils/_common.py @@ -5,7 +5,7 @@ import numpy as np -def _unroll_allgather_recv(recv_buf, chunk_shape, send_buf_shapes, displs=None) -> list: +def _unroll_allgather_recv(recv_buf, buffer_chunk_shape, send_buf_shapes, displs=None) -> list: r"""Unroll recv_buf after Buffered Allgather (MPI and NCCL) Depending on the provided parameters, the function: @@ -18,14 +18,14 @@ def _unroll_allgather_recv(recv_buf, chunk_shape, send_buf_shapes, displs=None) Parameters ---------- recv_buf: :obj:`cupy.ndarray` or array-like - The data buffer returned from nccl_allgather call + The data buffer returned from the allgather call send_buf_shapes: :obj:`list` A list of original shapes of each rank's send_buf before any padding. - chunk_shape : tuple - Shape of each gathered chunk in recv_buf. This must match the shape - used to construct the gathered buffer: use the padded send buffer shape - when padding is required (e.g., nccl_allgather with padding), or the original send buffer - shape when no padding is used. + buffer_chunk_shape : tuple + Shape of each rank’s data as stored in ``recv_buf``. This should match + the layout used during allgather: use the padded send buffer shape when + padding is applied (e.g., NCCL), or the original send buffer shape when + no padding is used. displs : list, optional Starting offsets in recv_buf for each rank's data, used when chunks have variable sizes (e.g., mpi_allgather with displacements). @@ -44,7 +44,8 @@ def _unroll_allgather_recv(recv_buf, chunk_shape, send_buf_shapes, displs=None) for i in range(ndev) ] else: - chunk_size = np.prod(chunk_shape) + # extract an individual array from each device + chunk_size = np.prod(buffer_chunk_shape) chunks = [ recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev) @@ -53,5 +54,5 @@ def _unroll_allgather_recv(recv_buf, chunk_shape, send_buf_shapes, displs=None) # in the middle of the flat array and thus the reshape and slicing for each dimension is required for i in range(ndev): slicing = tuple(slice(0, end) for end in send_buf_shapes[i]) - chunks[i] = chunks[i].reshape(chunk_shape)[slicing] + chunks[i] = chunks[i].reshape(buffer_chunk_shape)[slicing] return chunks