Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions pylops_mpi/Distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
from mpi4py import MPI
from pylops.utils import NDArray
from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils
from pylops_mpi.utils._mpi import (mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, mpi_sendrecv,
_prepare_allgather_inputs, _unroll_allgather_recv)
from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, mpi_sendrecv
from pylops_mpi.utils._common import _unroll_allgather_recv
from pylops_mpi.utils import deps

cupy_message = pylops_deps.cupy_import("the DistributedArray module")
nccl_message = deps.nccl_import("the DistributedArray module")

if nccl_message is None and cupy_message is None:
from pylops_mpi.utils._nccl import (
nccl_allgather, nccl_allreduce, nccl_bcast, nccl_send, nccl_recv, nccl_sendrecv
nccl_allgather, nccl_allreduce, nccl_bcast, nccl_send, nccl_recv, nccl_sendrecv, _prepare_allgather_inputs_nccl
)
from cupy.cuda.nccl import NcclCommunicator
else:
Expand All @@ -32,7 +32,6 @@ class DistributedMixIn:
MPI installation is not available).

"""

def _allreduce(self,
base_comm: MPI.Comm,
base_comm_nccl: NcclCommunicatorType,
Expand Down Expand Up @@ -145,7 +144,7 @@ def _allgather(self,
return nccl_allgather(base_comm_nccl, send_buf, recv_buf)
else:
send_shapes = base_comm.allgather(send_buf.shape)
(padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy")
(padded_send, padded_recv) = _prepare_allgather_inputs_nccl(send_buf, send_shapes, engine="cupy")
raw_recv = nccl_allgather(base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes)
else:
Expand Down Expand Up @@ -187,7 +186,7 @@ def _allgather_subcomm(self,
return nccl_allgather(sub_comm, send_buf, recv_buf)
else:
send_shapes = sub_comm._allgather_subcomm(send_buf.shape)
(padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy")
(padded_send, padded_recv) = _prepare_allgather_inputs_nccl(send_buf, send_shapes, engine="cupy")
raw_recv = nccl_allgather(sub_comm, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes)
else:
Expand Down
2 changes: 1 addition & 1 deletion pylops_mpi/signalprocessing/Fredholm1.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _matvec(self, x: DistributedArray) -> DistributedArray:
engine=y.engine)).ravel()
return y

def _rmatvec(self, x: NDArray) -> NDArray:
def _rmatvec(self, x: DistributedArray) -> DistributedArray:
ncp = get_module(x.engine)
if x.partition not in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]:
raise ValueError(f"x should have partition={Partition.BROADCAST},{Partition.UNSAFE_BROADCAST}"
Expand Down
111 changes: 40 additions & 71 deletions pylops_mpi/utils/_common.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,58 @@
__all__ = [
"_prepare_allgather_inputs",
"_unroll_allgather_recv"
]


import numpy as np
from pylops.utils.backend import get_module


# TODO: return type annotation for both cupy and numpy
def _prepare_allgather_inputs(send_buf, send_buf_shapes, engine):
r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather)
def _unroll_allgather_recv(recv_buf, buffer_chunk_shape, send_buf_shapes, displs=None) -> list:
r"""Unroll recv_buf after Buffered Allgather (MPI and NCCL)

Buffered Allgather (MPI and NCCL) requires the sending buffer to have the same size for every device.
Therefore, padding is required when the array is not evenly partitioned across
all the ranks. The padding is applied such that the each dimension of the sending buffers
is equal to the max size of that dimension across all ranks.
Depending on the provided parameters, the function:
- uses ``displs`` and element counts to extract variable-sized chunks.
- removes padding and reshapes each chunk using ``chunk_shape``.

Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size

Parameters
----------
send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
The data buffer from the local GPU to be sent for allgather.
send_buf_shapes: :obj:`list`
A list of shapes for each GPU send_buf (used to calculate padding size)
engine : :obj:`str`
Engine used to store array (``numpy`` or ``cupy``)

Returns
-------
send_buf: :obj:`cupy.ndarray`
A buffer containing the data and padded elements to be sent by this rank.
recv_buf : :obj:`cupy.ndarray`
An empty, padded buffer to gather data from all GPUs.
"""
ncp = get_module(engine)
sizes_each_dim = list(zip(*send_buf_shapes))
send_shape = tuple(map(max, sizes_each_dim))
pad_size = [
(0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape)
]

send_buf = ncp.pad(
send_buf, pad_size, mode="constant", constant_values=0
)

ndev = len(send_buf_shapes)
recv_buf = ncp.zeros(ndev * send_buf.size, dtype=send_buf.dtype)

return send_buf, recv_buf


def _unroll_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> list:
r"""Unrolll recv_buf after Buffered Allgather (MPI and NCCL)

Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays
Each GPU may send array with a different shape, so the return type has to be a list of array
instead of the concatenated array.
Each rank may send an array with a different shape, so the return type is a list of arrays
instead of a concatenated array.

Parameters
----------
recv_buf: :obj:`cupy.ndarray` or array-like
The data buffer returned from nccl_allgather call
padded_send_buf_shape: :obj:`tuple`:int
The size of send_buf after padding used in nccl_allgather
The data buffer returned from the allgather call
send_buf_shapes: :obj:`list`
A list of original shapes for each GPU send_buf prior to padding

A list of original shapes of each rank's send_buf before any padding.
buffer_chunk_shape : tuple
Shape of each rank’s data as stored in ``recv_buf``. This should match
the layout used during allgather: use the padded send buffer shape when
padding is applied (e.g., NCCL), or the original send buffer shape when
no padding is used.
displs : list, optional
Starting offsets in recv_buf for each rank's data, used when chunks have
variable sizes (e.g., mpi_allgather with displacements).
Returns
-------
chunks: :obj:`list`
A list of `cupy.ndarray` from each GPU with the padded element removed
chunks : list of ndarray
List of arrays (NumPy or CuPy, depending on ``engine``), one per rank,
reconstructed to their original shapes with any padding removed.
"""
ndev = len(send_buf_shapes)
# extract an individual array from each device
chunk_size = np.prod(padded_send_buf_shape)
chunks = [
recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev)
]

# Remove padding from each array: the padded value may appear somewhere
# in the middle of the flat array and thus the reshape and slicing for each dimension is required
for i in range(ndev):
slicing = tuple(slice(0, end) for end in send_buf_shapes[i])
chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing]

return chunks
if displs is not None:
recvcounts = [int(np.prod(shape)) for shape in send_buf_shapes]
# Slice recv_buf using displacements and then reconstruct the original-shaped chunk.
return [
recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_buf_shapes[i])
for i in range(ndev)
]
else:
# extract an individual array from each device
chunk_size = np.prod(buffer_chunk_shape)
chunks = [
recv_buf[i * chunk_size:(i + 1) * chunk_size]
for i in range(ndev)
]
# Remove padding from each array: the padded value may appear somewhere
# in the middle of the flat array and thus the reshape and slicing for each dimension is required
for i in range(ndev):
slicing = tuple(slice(0, end) for end in send_buf_shapes[i])
chunks[i] = chunks[i].reshape(buffer_chunk_shape)[slicing]
return chunks
75 changes: 62 additions & 13 deletions pylops_mpi/utils/_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,33 @@
"mpi_bcast",
"mpi_send",
"mpi_recv",
"mpi_sendrecv"
"mpi_sendrecv",
"_prepare_allgather_inputs_mpi"
]

from typing import Optional
from typing import List, Optional
import numpy as np

from mpi4py import MPI
from pylops.utils import NDArray
from pylops.utils.backend import get_module
from pylops_mpi.utils import deps
from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv
from pylops_mpi.utils._common import _unroll_allgather_recv


def mpi_allgather(base_comm: MPI.Comm,
send_buf: NDArray,
recv_buf: Optional[NDArray] = None,
engine: str = "numpy",
) -> NDArray:
"""MPI_Allallgather/allallgather
) -> List[NDArray]:
"""MPI_Allgather/allgather

Dispatch allgather routine based on type of input and availability of
CUDA-Aware MPI
Dispatch the appropriate allgather routine based on buffer sizes and
CUDA-aware MPI availability.

If all ranks provide buffers of equal size, the standard `Allgather`
collective is used. Otherwise, `Allgatherv` is invoked to handle
variable-sized buffers.

Parameters
----------
Expand All @@ -40,16 +46,19 @@ def mpi_allgather(base_comm: MPI.Comm,

Returns
-------
recv_buf : :obj:`numpy.ndarray` or :obj:`cupy.ndarray`
A buffer containing the gathered data from all ranks.
recv_buf : :obj:`list`
A list of arrays containing the gathered data from all ranks.

"""
if deps.cuda_aware_mpi_enabled or engine == "numpy":
send_shapes = base_comm.allgather(send_buf.shape)
(padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine=engine)
recv_buffer_to_use = recv_buf if recv_buf else padded_recv
_mpi_calls(base_comm, "Allgather", padded_send, recv_buffer_to_use, engine=engine)
return _unroll_allgather_recv(recv_buffer_to_use, padded_send.shape, send_shapes)
send_buf, recv_buf, displs, recvcounts = _prepare_allgather_inputs_mpi(send_buf, send_shapes, engine)
if len(set(send_shapes)) == 1:
_mpi_calls(base_comm, "Allgather", send_buf, recv_buf, engine=engine)
else:
_mpi_calls(base_comm, "Allgatherv", send_buf,
[recv_buf, recvcounts, displs, MPI._typedict[send_buf.dtype.char]], engine=engine)
return _unroll_allgather_recv(recv_buf, send_buf.shape, send_shapes, displs)
else:
# CuPy with non-CUDA-aware MPI
if recv_buf is None:
Expand Down Expand Up @@ -293,3 +302,43 @@ def _mpi_calls(comm: MPI.Comm, func: str, *args, engine: Optional[str] = "numpy"
ncp.cuda.Device().synchronize()
mpi_func = getattr(comm, func)
return mpi_func(*args, **kwargs)


def _prepare_allgather_inputs_mpi(send_buf, send_buf_shapes, engine):
r"""Prepare send_buf and recv_buf for MPI allgather (mpi_allgather)

Buffered Allgather (MPI) supports both uniform and variable-sized data across ranks. Unlike NCCL, padding is
not required when array sizes differ. Instead, displacements are used to correctly place each rank’s data
within the received buffer. The function ensures that the send_buf is contiguous.

Parameters
----------
send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
The data buffer to be sent for allgather.
send_buf_shapes: :obj:`list`
A list of shapes for each send_buf (used to calculate padding size)
engine : :obj:`str`
Engine used to store array (``numpy`` or ``cupy``)

Returns
-------
send_buf: :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
A buffer containing the data and padded elements to be sent by this rank.
recv_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
A buffer to gather data from all ranks.
displs : list, optional
Starting offsets in recv_buf for each rank's data, used when chunks have
variable sizes
recvcounts: :obj:`list`
A list of element counts from all ranks, where each entry corresponds to one rank.
"""
ncp = get_module(engine)
recvcounts = [int(np.prod(shape)) for shape in send_buf_shapes]
recv_buf = ncp.zeros(sum(recvcounts), dtype=send_buf.dtype)
if len(set(send_buf_shapes)) == 1:
displs = None
else:
displs = [0]
for i in range(1, len(recvcounts)):
displs.append(displs[i - 1] + recvcounts[i - 1])
return ncp.ascontiguousarray(send_buf), recv_buf, displs, recvcounts
51 changes: 48 additions & 3 deletions pylops_mpi/utils/_nccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
"nccl_asarray",
"nccl_send",
"nccl_recv",
"nccl_sendrecv"
"nccl_sendrecv",
"_prepare_allgather_inputs_nccl"
]

from enum import IntEnum
from mpi4py import MPI
import os
import cupy as cp
import cupy.cuda.nccl as nccl
from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv
from pylops.utils.backend import get_module
from pylops_mpi.utils._common import _unroll_allgather_recv

cupy_to_nccl_dtype = {
"float32": nccl.NCCL_FLOAT32,
Expand Down Expand Up @@ -282,7 +284,7 @@ def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray:
Global array gathered from all GPUs and concatenated along `axis`.
"""

send_buf, recv_buf = _prepare_allgather_inputs(local_array, local_shapes, engine="cupy")
send_buf, recv_buf = _prepare_allgather_inputs_nccl(local_array, local_shapes, engine="cupy")
nccl_allgather(nccl_comm, send_buf, recv_buf)
chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, local_shapes)

Expand Down Expand Up @@ -356,3 +358,46 @@ def nccl_sendrecv(nccl_comm, sendbuf, dest, recvbuf, source):
nccl_send(nccl_comm, sendbuf, dest, sendbuf.size)
nccl_recv(nccl_comm, recvbuf, source)
nccl.groupEnd()


def _prepare_allgather_inputs_nccl(send_buf, send_buf_shapes, engine):
r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather)

Buffered Allgather (NCCL) requires the sending buffer to have the same size for every device.
Therefore, padding is required when the array is not evenly partitioned across
all the ranks. The padding is applied such that each dimension of the sending buffers
is equal to the max size of that dimension across all ranks.

Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size

Parameters
----------
send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
The data buffer from the local GPU to be sent for allgather.
send_buf_shapes: :obj:`list`
A list of shapes for each GPU send_buf (used to calculate padding size)
engine : :obj:`str`
Engine used to store array (``numpy`` or ``cupy``)

Returns
-------
send_buf: :obj:`cupy.ndarray`
A buffer containing the data and padded elements to be sent by this rank.
recv_buf : :obj:`cupy.ndarray`
An empty, padded buffer to gather data from all GPUs.
"""
ncp = get_module(engine)
sizes_each_dim = list(zip(*send_buf_shapes))
send_shape = tuple(map(max, sizes_each_dim))
pad_size = [
(0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape)
]

send_buf = ncp.pad(
send_buf, pad_size, mode="constant", constant_values=0
)

ndev = len(send_buf_shapes)
recv_buf = ncp.zeros(ndev * send_buf.size, dtype=send_buf.dtype)

return send_buf, recv_buf
6 changes: 3 additions & 3 deletions tests_nccl/test_ncclutils_nccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from numpy.testing import assert_allclose
import pytest

from pylops_mpi.utils._nccl import initialize_nccl_comm, nccl_allgather
from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv
from pylops_mpi.utils._nccl import initialize_nccl_comm, nccl_allgather, _prepare_allgather_inputs_nccl
from pylops_mpi.utils._common import _unroll_allgather_recv
from pylops_mpi.utils.deps import nccl_enabled

np.random.seed(42)
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_allgather_differentsize_withrecbuf(par):

# Gathered array
send_shapes = MPI.COMM_WORLD.allgather(local_array.shape)
send_buf, recv_buf = _prepare_allgather_inputs(local_array, send_shapes, engine="cupy")
send_buf, recv_buf = _prepare_allgather_inputs_nccl(local_array, send_shapes, engine="cupy")
recv_buf = nccl_allgather(nccl_comm, send_buf, recv_buf)
chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, send_shapes)
gathered_array = cp.concatenate(chunks)
Expand Down
Loading