diff --git a/pylops_mpi/Distributed.py b/pylops_mpi/Distributed.py index 569b6aa4..7ce69d01 100644 --- a/pylops_mpi/Distributed.py +++ b/pylops_mpi/Distributed.py @@ -3,8 +3,8 @@ from mpi4py import MPI from pylops.utils import NDArray from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils -from pylops_mpi.utils._mpi import (mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, mpi_sendrecv, - _prepare_allgather_inputs, _unroll_allgather_recv) +from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, mpi_sendrecv +from pylops_mpi.utils._common import _unroll_allgather_recv from pylops_mpi.utils import deps cupy_message = pylops_deps.cupy_import("the DistributedArray module") @@ -12,7 +12,7 @@ if nccl_message is None and cupy_message is None: from pylops_mpi.utils._nccl import ( - nccl_allgather, nccl_allreduce, nccl_bcast, nccl_send, nccl_recv, nccl_sendrecv + nccl_allgather, nccl_allreduce, nccl_bcast, nccl_send, nccl_recv, nccl_sendrecv, _prepare_allgather_inputs_nccl ) from cupy.cuda.nccl import NcclCommunicator else: @@ -32,7 +32,6 @@ class DistributedMixIn: MPI installation is not available). """ - def _allreduce(self, base_comm: MPI.Comm, base_comm_nccl: NcclCommunicatorType, @@ -145,7 +144,7 @@ def _allgather(self, return nccl_allgather(base_comm_nccl, send_buf, recv_buf) else: send_shapes = base_comm.allgather(send_buf.shape) - (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy") + (padded_send, padded_recv) = _prepare_allgather_inputs_nccl(send_buf, send_shapes, engine="cupy") raw_recv = nccl_allgather(base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv) return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes) else: @@ -187,7 +186,7 @@ def _allgather_subcomm(self, return nccl_allgather(sub_comm, send_buf, recv_buf) else: send_shapes = sub_comm._allgather_subcomm(send_buf.shape) - (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy") + (padded_send, padded_recv) = _prepare_allgather_inputs_nccl(send_buf, send_shapes, engine="cupy") raw_recv = nccl_allgather(sub_comm, padded_send, recv_buf if recv_buf else padded_recv) return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes) else: diff --git a/pylops_mpi/signalprocessing/Fredholm1.py b/pylops_mpi/signalprocessing/Fredholm1.py index 2969e3c9..ccd6fdd2 100644 --- a/pylops_mpi/signalprocessing/Fredholm1.py +++ b/pylops_mpi/signalprocessing/Fredholm1.py @@ -132,7 +132,7 @@ def _matvec(self, x: DistributedArray) -> DistributedArray: engine=y.engine)).ravel() return y - def _rmatvec(self, x: NDArray) -> NDArray: + def _rmatvec(self, x: DistributedArray) -> DistributedArray: ncp = get_module(x.engine) if x.partition not in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]: raise ValueError(f"x should have partition={Partition.BROADCAST},{Partition.UNSAFE_BROADCAST}" diff --git a/pylops_mpi/utils/_common.py b/pylops_mpi/utils/_common.py index 895265df..a5e378a7 100644 --- a/pylops_mpi/utils/_common.py +++ b/pylops_mpi/utils/_common.py @@ -1,89 +1,58 @@ __all__ = [ - "_prepare_allgather_inputs", "_unroll_allgather_recv" ] - import numpy as np -from pylops.utils.backend import get_module -# TODO: return type annotation for both cupy and numpy -def _prepare_allgather_inputs(send_buf, send_buf_shapes, engine): - r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather) +def _unroll_allgather_recv(recv_buf, buffer_chunk_shape, send_buf_shapes, displs=None) -> list: + r"""Unroll recv_buf after Buffered Allgather (MPI and NCCL) - Buffered Allgather (MPI and NCCL) requires the sending buffer to have the same size for every device. - Therefore, padding is required when the array is not evenly partitioned across - all the ranks. The padding is applied such that the each dimension of the sending buffers - is equal to the max size of that dimension across all ranks. + Depending on the provided parameters, the function: + - uses ``displs`` and element counts to extract variable-sized chunks. + - removes padding and reshapes each chunk using ``chunk_shape``. - Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size - - Parameters - ---------- - send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like - The data buffer from the local GPU to be sent for allgather. - send_buf_shapes: :obj:`list` - A list of shapes for each GPU send_buf (used to calculate padding size) - engine : :obj:`str` - Engine used to store array (``numpy`` or ``cupy``) - - Returns - ------- - send_buf: :obj:`cupy.ndarray` - A buffer containing the data and padded elements to be sent by this rank. - recv_buf : :obj:`cupy.ndarray` - An empty, padded buffer to gather data from all GPUs. - """ - ncp = get_module(engine) - sizes_each_dim = list(zip(*send_buf_shapes)) - send_shape = tuple(map(max, sizes_each_dim)) - pad_size = [ - (0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape) - ] - - send_buf = ncp.pad( - send_buf, pad_size, mode="constant", constant_values=0 - ) - - ndev = len(send_buf_shapes) - recv_buf = ncp.zeros(ndev * send_buf.size, dtype=send_buf.dtype) - - return send_buf, recv_buf - - -def _unroll_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> list: - r"""Unrolll recv_buf after Buffered Allgather (MPI and NCCL) - - Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays - Each GPU may send array with a different shape, so the return type has to be a list of array - instead of the concatenated array. + Each rank may send an array with a different shape, so the return type is a list of arrays + instead of a concatenated array. Parameters ---------- recv_buf: :obj:`cupy.ndarray` or array-like - The data buffer returned from nccl_allgather call - padded_send_buf_shape: :obj:`tuple`:int - The size of send_buf after padding used in nccl_allgather + The data buffer returned from the allgather call send_buf_shapes: :obj:`list` - A list of original shapes for each GPU send_buf prior to padding - + A list of original shapes of each rank's send_buf before any padding. + buffer_chunk_shape : tuple + Shape of each rank’s data as stored in ``recv_buf``. This should match + the layout used during allgather: use the padded send buffer shape when + padding is applied (e.g., NCCL), or the original send buffer shape when + no padding is used. + displs : list, optional + Starting offsets in recv_buf for each rank's data, used when chunks have + variable sizes (e.g., mpi_allgather with displacements). Returns ------- - chunks: :obj:`list` - A list of `cupy.ndarray` from each GPU with the padded element removed + chunks : list of ndarray + List of arrays (NumPy or CuPy, depending on ``engine``), one per rank, + reconstructed to their original shapes with any padding removed. """ ndev = len(send_buf_shapes) - # extract an individual array from each device - chunk_size = np.prod(padded_send_buf_shape) - chunks = [ - recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev) - ] - - # Remove padding from each array: the padded value may appear somewhere - # in the middle of the flat array and thus the reshape and slicing for each dimension is required - for i in range(ndev): - slicing = tuple(slice(0, end) for end in send_buf_shapes[i]) - chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing] - - return chunks + if displs is not None: + recvcounts = [int(np.prod(shape)) for shape in send_buf_shapes] + # Slice recv_buf using displacements and then reconstruct the original-shaped chunk. + return [ + recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_buf_shapes[i]) + for i in range(ndev) + ] + else: + # extract an individual array from each device + chunk_size = np.prod(buffer_chunk_shape) + chunks = [ + recv_buf[i * chunk_size:(i + 1) * chunk_size] + for i in range(ndev) + ] + # Remove padding from each array: the padded value may appear somewhere + # in the middle of the flat array and thus the reshape and slicing for each dimension is required + for i in range(ndev): + slicing = tuple(slice(0, end) for end in send_buf_shapes[i]) + chunks[i] = chunks[i].reshape(buffer_chunk_shape)[slicing] + return chunks diff --git a/pylops_mpi/utils/_mpi.py b/pylops_mpi/utils/_mpi.py index ad702d73..73f359b3 100644 --- a/pylops_mpi/utils/_mpi.py +++ b/pylops_mpi/utils/_mpi.py @@ -4,27 +4,33 @@ "mpi_bcast", "mpi_send", "mpi_recv", - "mpi_sendrecv" + "mpi_sendrecv", + "_prepare_allgather_inputs_mpi" ] -from typing import Optional +from typing import List, Optional +import numpy as np from mpi4py import MPI from pylops.utils import NDArray from pylops.utils.backend import get_module from pylops_mpi.utils import deps -from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv +from pylops_mpi.utils._common import _unroll_allgather_recv def mpi_allgather(base_comm: MPI.Comm, send_buf: NDArray, recv_buf: Optional[NDArray] = None, engine: str = "numpy", - ) -> NDArray: - """MPI_Allallgather/allallgather + ) -> List[NDArray]: + """MPI_Allgather/allgather - Dispatch allgather routine based on type of input and availability of - CUDA-Aware MPI + Dispatch the appropriate allgather routine based on buffer sizes and + CUDA-aware MPI availability. + + If all ranks provide buffers of equal size, the standard `Allgather` + collective is used. Otherwise, `Allgatherv` is invoked to handle + variable-sized buffers. Parameters ---------- @@ -40,16 +46,19 @@ def mpi_allgather(base_comm: MPI.Comm, Returns ------- - recv_buf : :obj:`numpy.ndarray` or :obj:`cupy.ndarray` - A buffer containing the gathered data from all ranks. + recv_buf : :obj:`list` + A list of arrays containing the gathered data from all ranks. """ if deps.cuda_aware_mpi_enabled or engine == "numpy": send_shapes = base_comm.allgather(send_buf.shape) - (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine=engine) - recv_buffer_to_use = recv_buf if recv_buf else padded_recv - _mpi_calls(base_comm, "Allgather", padded_send, recv_buffer_to_use, engine=engine) - return _unroll_allgather_recv(recv_buffer_to_use, padded_send.shape, send_shapes) + send_buf, recv_buf, displs, recvcounts = _prepare_allgather_inputs_mpi(send_buf, send_shapes, engine) + if len(set(send_shapes)) == 1: + _mpi_calls(base_comm, "Allgather", send_buf, recv_buf, engine=engine) + else: + _mpi_calls(base_comm, "Allgatherv", send_buf, + [recv_buf, recvcounts, displs, MPI._typedict[send_buf.dtype.char]], engine=engine) + return _unroll_allgather_recv(recv_buf, send_buf.shape, send_shapes, displs) else: # CuPy with non-CUDA-aware MPI if recv_buf is None: @@ -293,3 +302,43 @@ def _mpi_calls(comm: MPI.Comm, func: str, *args, engine: Optional[str] = "numpy" ncp.cuda.Device().synchronize() mpi_func = getattr(comm, func) return mpi_func(*args, **kwargs) + + +def _prepare_allgather_inputs_mpi(send_buf, send_buf_shapes, engine): + r"""Prepare send_buf and recv_buf for MPI allgather (mpi_allgather) + + Buffered Allgather (MPI) supports both uniform and variable-sized data across ranks. Unlike NCCL, padding is + not required when array sizes differ. Instead, displacements are used to correctly place each rank’s data + within the received buffer. The function ensures that the send_buf is contiguous. + + Parameters + ---------- + send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + The data buffer to be sent for allgather. + send_buf_shapes: :obj:`list` + A list of shapes for each send_buf (used to calculate padding size) + engine : :obj:`str` + Engine used to store array (``numpy`` or ``cupy``) + + Returns + ------- + send_buf: :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + A buffer containing the data and padded elements to be sent by this rank. + recv_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + A buffer to gather data from all ranks. + displs : list, optional + Starting offsets in recv_buf for each rank's data, used when chunks have + variable sizes + recvcounts: :obj:`list` + A list of element counts from all ranks, where each entry corresponds to one rank. + """ + ncp = get_module(engine) + recvcounts = [int(np.prod(shape)) for shape in send_buf_shapes] + recv_buf = ncp.zeros(sum(recvcounts), dtype=send_buf.dtype) + if len(set(send_buf_shapes)) == 1: + displs = None + else: + displs = [0] + for i in range(1, len(recvcounts)): + displs.append(displs[i - 1] + recvcounts[i - 1]) + return ncp.ascontiguousarray(send_buf), recv_buf, displs, recvcounts diff --git a/pylops_mpi/utils/_nccl.py b/pylops_mpi/utils/_nccl.py index 11b374c7..d54f381a 100644 --- a/pylops_mpi/utils/_nccl.py +++ b/pylops_mpi/utils/_nccl.py @@ -8,7 +8,8 @@ "nccl_asarray", "nccl_send", "nccl_recv", - "nccl_sendrecv" + "nccl_sendrecv", + "_prepare_allgather_inputs_nccl" ] from enum import IntEnum @@ -16,7 +17,8 @@ import os import cupy as cp import cupy.cuda.nccl as nccl -from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv +from pylops.utils.backend import get_module +from pylops_mpi.utils._common import _unroll_allgather_recv cupy_to_nccl_dtype = { "float32": nccl.NCCL_FLOAT32, @@ -282,7 +284,7 @@ def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray: Global array gathered from all GPUs and concatenated along `axis`. """ - send_buf, recv_buf = _prepare_allgather_inputs(local_array, local_shapes, engine="cupy") + send_buf, recv_buf = _prepare_allgather_inputs_nccl(local_array, local_shapes, engine="cupy") nccl_allgather(nccl_comm, send_buf, recv_buf) chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, local_shapes) @@ -356,3 +358,46 @@ def nccl_sendrecv(nccl_comm, sendbuf, dest, recvbuf, source): nccl_send(nccl_comm, sendbuf, dest, sendbuf.size) nccl_recv(nccl_comm, recvbuf, source) nccl.groupEnd() + + +def _prepare_allgather_inputs_nccl(send_buf, send_buf_shapes, engine): + r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather) + + Buffered Allgather (NCCL) requires the sending buffer to have the same size for every device. + Therefore, padding is required when the array is not evenly partitioned across + all the ranks. The padding is applied such that each dimension of the sending buffers + is equal to the max size of that dimension across all ranks. + + Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size + + Parameters + ---------- + send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + The data buffer from the local GPU to be sent for allgather. + send_buf_shapes: :obj:`list` + A list of shapes for each GPU send_buf (used to calculate padding size) + engine : :obj:`str` + Engine used to store array (``numpy`` or ``cupy``) + + Returns + ------- + send_buf: :obj:`cupy.ndarray` + A buffer containing the data and padded elements to be sent by this rank. + recv_buf : :obj:`cupy.ndarray` + An empty, padded buffer to gather data from all GPUs. + """ + ncp = get_module(engine) + sizes_each_dim = list(zip(*send_buf_shapes)) + send_shape = tuple(map(max, sizes_each_dim)) + pad_size = [ + (0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape) + ] + + send_buf = ncp.pad( + send_buf, pad_size, mode="constant", constant_values=0 + ) + + ndev = len(send_buf_shapes) + recv_buf = ncp.zeros(ndev * send_buf.size, dtype=send_buf.dtype) + + return send_buf, recv_buf diff --git a/tests_nccl/test_ncclutils_nccl.py b/tests_nccl/test_ncclutils_nccl.py index 27cd221d..17647660 100644 --- a/tests_nccl/test_ncclutils_nccl.py +++ b/tests_nccl/test_ncclutils_nccl.py @@ -8,8 +8,8 @@ from numpy.testing import assert_allclose import pytest -from pylops_mpi.utils._nccl import initialize_nccl_comm, nccl_allgather -from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv +from pylops_mpi.utils._nccl import initialize_nccl_comm, nccl_allgather, _prepare_allgather_inputs_nccl +from pylops_mpi.utils._common import _unroll_allgather_recv from pylops_mpi.utils.deps import nccl_enabled np.random.seed(42) @@ -90,7 +90,7 @@ def test_allgather_differentsize_withrecbuf(par): # Gathered array send_shapes = MPI.COMM_WORLD.allgather(local_array.shape) - send_buf, recv_buf = _prepare_allgather_inputs(local_array, send_shapes, engine="cupy") + send_buf, recv_buf = _prepare_allgather_inputs_nccl(local_array, send_shapes, engine="cupy") recv_buf = nccl_allgather(nccl_comm, send_buf, recv_buf) chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, send_shapes) gathered_array = cp.concatenate(chunks)