Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pylops_mpi/Distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from mpi4py import MPI
from pylops.utils import NDArray
from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils
from pylops_mpi.utils._mpi import (mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, mpi_sendrecv,
_prepare_allgather_inputs, _unroll_allgather_recv)
from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, mpi_sendrecv
from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv
from pylops_mpi.utils import deps

cupy_message = pylops_deps.cupy_import("the DistributedArray module")
Expand Down Expand Up @@ -32,7 +32,6 @@ class DistributedMixIn:
MPI installation is not available).

"""

def _allreduce(self,
base_comm: MPI.Comm,
base_comm_nccl: NcclCommunicatorType,
Expand Down Expand Up @@ -147,7 +146,7 @@ def _allgather(self,
send_shapes = base_comm.allgather(send_buf.shape)
(padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy")
raw_recv = nccl_allgather(base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes)
return _unroll_allgather_recv(raw_recv, send_shapes, padded_send.shape, engine="cupy")
else:
if isinstance(send_buf, (tuple, list, int)):
return base_comm.allgather(send_buf)
Expand Down Expand Up @@ -189,7 +188,7 @@ def _allgather_subcomm(self,
send_shapes = sub_comm._allgather_subcomm(send_buf.shape)
(padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy")
raw_recv = nccl_allgather(sub_comm, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes)
return _unroll_allgather_recv(raw_recv, send_shapes, padded_send.shape, engine="cupy")
else:
return mpi_allgather(sub_comm, send_buf, recv_buf, engine)

Expand Down
2 changes: 1 addition & 1 deletion pylops_mpi/signalprocessing/Fredholm1.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _matvec(self, x: DistributedArray) -> DistributedArray:
engine=y.engine)).ravel()
return y

def _rmatvec(self, x: NDArray) -> NDArray:
def _rmatvec(self, x: DistributedArray) -> DistributedArray:
ncp = get_module(x.engine)
if x.partition not in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]:
raise ValueError(f"x should have partition={Partition.BROADCAST},{Partition.UNSAFE_BROADCAST}"
Expand Down
82 changes: 64 additions & 18 deletions pylops_mpi/utils/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,44 @@ def _prepare_allgather_inputs(send_buf, send_buf_shapes, engine):
return send_buf, recv_buf


def _unroll_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> list:
r"""Unrolll recv_buf after Buffered Allgather (MPI and NCCL)
def _prepare_allgather_inputs_mpi(send_buf, send_buf_shapes, recvcounts, engine):
r"""
Prepare send_buf and recv_buf for MPI allgather (mpi_allgather)

Parameters
----------
send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
The data buffer to be sent for allgather.
send_buf_shapes: :obj:`list`
A list of shapes for each send_buf (used to calculate padding size)
recvcounts: :obj:`list`
The element counts per rank in mpi_allgather
engine : :obj:`str`
Engine used to store array (``numpy`` or ``cupy``)

Returns
-------
send_buf: :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
A buffer containing the data and padded elements to be sent by this rank.
recv_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
A buffer to gather data from all ranks.
displs : list, optional
The starting offsets in recv_buf where data from each rank in mpi_allgather
"""
ncp = get_module(engine)
recv_buf = ncp.zeros(sum(recvcounts), dtype=send_buf.dtype)
if len(set(send_buf_shapes)) == 1:
displs = None
else:
displs = [0]
for i in range(1, len(recvcounts)):
displs.append(displs[i - 1] + recvcounts[i - 1])
return ncp.ascontiguousarray(send_buf), recv_buf, displs


def _unroll_allgather_recv(recv_buf, send_buf_shapes, padded_send_buf_shape=None,
recvcounts=None, displs=None, engine="numpy") -> list:
r"""Unroll recv_buf after Buffered Allgather (MPI and NCCL)

Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays
Each GPU may send array with a different shape, so the return type has to be a list of array
Expand All @@ -63,27 +99,37 @@ def _unroll_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) ->
----------
recv_buf: :obj:`cupy.ndarray` or array-like
The data buffer returned from nccl_allgather call
padded_send_buf_shape: :obj:`tuple`:int
The size of send_buf after padding used in nccl_allgather
send_buf_shapes: :obj:`list`
A list of original shapes for each GPU send_buf prior to padding

padded_send_buf_shape : tuple, optional
The size of send_buf after padding used in nccl_allgather
recvcounts : list, optional
The element counts per rank in mpi_allgather
displs : list, optional
The starting offsets in recv_buf where data from each rank in mpi_allgather
engine : :obj:`str`
Engine used to store array (``numpy`` or ``cupy``)
Returns
-------
chunks: :obj:`list`
A list of `cupy.ndarray` from each GPU with the padded element removed
"""
ncp = get_module(engine)
ndev = len(send_buf_shapes)
# extract an individual array from each device
chunk_size = np.prod(padded_send_buf_shape)
chunks = [
recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev)
]

# Remove padding from each array: the padded value may appear somewhere
# in the middle of the flat array and thus the reshape and slicing for each dimension is required
for i in range(ndev):
slicing = tuple(slice(0, end) for end in send_buf_shapes[i])
chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing]

return chunks
if padded_send_buf_shape is not None:
chunk_size = int(np.prod(padded_send_buf_shape))
chunks = [
recv_buf[i * chunk_size:(i + 1) * chunk_size]
for i in range(ndev)
]
for i in range(ndev):
slicing = tuple(slice(0, end) for end in send_buf_shapes[i])
chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing]
return chunks
if displs is not None:
return [
recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_buf_shapes[i])
for i in range(ndev)
]
chunks = ncp.split(recv_buf, ndev)
return [chunk.reshape(send_buf_shapes[0]) for chunk in chunks]
32 changes: 20 additions & 12 deletions pylops_mpi/utils/_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,28 @@
"mpi_sendrecv"
]

from typing import Optional
from typing import List, Optional

from mpi4py import MPI
from pylops.utils import NDArray
from pylops.utils.backend import get_module
from pylops_mpi.utils import deps
from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv
from pylops_mpi.utils._common import _unroll_allgather_recv, _prepare_allgather_inputs_mpi


def mpi_allgather(base_comm: MPI.Comm,
send_buf: NDArray,
recv_buf: Optional[NDArray] = None,
engine: str = "numpy",
) -> NDArray:
"""MPI_Allallgather/allallgather
) -> List[NDArray]:
"""MPI_Allgather/allgather

Dispatch allgather routine based on type of input and availability of
CUDA-Aware MPI
Dispatch the appropriate allgather routine based on buffer sizes and
CUDA-aware MPI availability.

If all ranks provide buffers of equal size, the standard `Allgather`
collective is used. Otherwise, `Allgatherv` is invoked to handle
variable-sized buffers.

Parameters
----------
Expand All @@ -40,16 +44,20 @@ def mpi_allgather(base_comm: MPI.Comm,

Returns
-------
recv_buf : :obj:`numpy.ndarray` or :obj:`cupy.ndarray`
A buffer containing the gathered data from all ranks.
recv_buf : :obj:`list`
A list of arrays containing the gathered data from all ranks.

"""
if deps.cuda_aware_mpi_enabled or engine == "numpy":
send_shapes = base_comm.allgather(send_buf.shape)
(padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine=engine)
recv_buffer_to_use = recv_buf if recv_buf else padded_recv
_mpi_calls(base_comm, "Allgather", padded_send, recv_buffer_to_use, engine=engine)
return _unroll_allgather_recv(recv_buffer_to_use, padded_send.shape, send_shapes)
recvcounts = base_comm.allgather(send_buf.size)
send_buf, recv_buf, displs = _prepare_allgather_inputs_mpi(send_buf, send_shapes, recvcounts, engine)
if len(set(send_shapes)) == 1:
_mpi_calls(base_comm, "Allgather", send_buf, recv_buf, engine=engine)
else:
_mpi_calls(base_comm, "Allgatherv", send_buf,
[recv_buf, recvcounts, displs, MPI._typedict[send_buf.dtype.char]], engine=engine)
return _unroll_allgather_recv(recv_buf, send_shapes, recvcounts=recvcounts, displs=displs, engine=engine)
else:
# CuPy with non-CUDA-aware MPI
if recv_buf is None:
Expand Down
2 changes: 1 addition & 1 deletion pylops_mpi/utils/_nccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray:

send_buf, recv_buf = _prepare_allgather_inputs(local_array, local_shapes, engine="cupy")
nccl_allgather(nccl_comm, send_buf, recv_buf)
chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, local_shapes)
chunks = _unroll_allgather_recv(recv_buf, local_shapes, send_buf.shape, engine="cupy")

# combine back to single global array
return cp.concatenate(chunks, axis=axis)
Expand Down
Loading