Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/source/gpu.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ In the following, we provide a list of modules (i.e., operators and solvers) whe
* - :class:`pylops_mpi.optimization.basic.cgls`
- ✅
* - :class:`pylops_mpi.signalprocessing.Fredhoml1`
- Planned ⏳
* - ISTA Solver
- Planned ⏳
- ✅
* - Complex Numeric Data Type for NCCL
- Planned ⏳
- ✅
* - ISTA Solver
- Planned ⏳
18 changes: 15 additions & 3 deletions pylops_mpi/DistributedArray.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
nccl_message = deps.nccl_import("the DistributedArray module")

if nccl_message is None and cupy_message is None:
from pylops_mpi.utils._nccl import nccl_allgather, nccl_allreduce, nccl_asarray, nccl_bcast, nccl_split, nccl_send, nccl_recv
from pylops_mpi.utils._nccl import nccl_allgather, nccl_allreduce, nccl_asarray, nccl_bcast, nccl_split, nccl_send, nccl_recv, _prepare_nccl_allgather_inputs, _unroll_nccl_allgather_recv
from cupy.cuda.nccl import NcclCommunicator
else:
NcclCommunicator = Any
Expand Down Expand Up @@ -500,7 +500,13 @@ def _allgather(self, send_buf, recv_buf=None):
"""Allgather operation
"""
if deps.nccl_enabled and self.base_comm_nccl:
return nccl_allgather(self.base_comm_nccl, send_buf, recv_buf)
if isinstance(send_buf, (tuple, list, int)):
return nccl_allgather(self.base_comm_nccl, send_buf, recv_buf)
else:
send_shapes = self.base_comm.allgather(send_buf.shape)
(padded_send, padded_recv) = _prepare_nccl_allgather_inputs(send_buf, send_shapes)
raw_recv = nccl_allgather(self.base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_nccl_allgather_recv(raw_recv, padded_send.shape, send_shapes)
else:
if recv_buf is None:
return self.base_comm.allgather(send_buf)
Expand All @@ -511,7 +517,13 @@ def _allgather_subcomm(self, send_buf, recv_buf=None):
"""Allgather operation with subcommunicator
"""
if deps.nccl_enabled and getattr(self, "base_comm_nccl"):
return nccl_allgather(self.sub_comm, send_buf, recv_buf)
if isinstance(send_buf, (tuple, list, int)):
return nccl_allgather(self.sub_comm, send_buf, recv_buf)
else:
send_shapes = self._allgather_subcomm(send_buf.shape)
(padded_send, padded_recv) = _prepare_nccl_allgather_inputs(send_buf, send_shapes)
raw_recv = nccl_allgather(self.sub_comm, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_nccl_allgather_recv(raw_recv, padded_send.shape, send_shapes)
else:
if recv_buf is None:
return self.sub_comm.allgather(send_buf)
Expand Down
14 changes: 10 additions & 4 deletions pylops_mpi/signalprocessing/Fredholm1.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ def _matvec(self, x: DistributedArray) -> DistributedArray:
if x.partition not in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]:
raise ValueError(f"x should have partition={Partition.BROADCAST},{Partition.UNSAFE_BROADCAST}"
f"Got {x.partition} instead...")
y = DistributedArray(global_shape=self.shape[0], partition=x.partition,
y = DistributedArray(global_shape=self.shape[0],
base_comm=x.base_comm,
base_comm_nccl=x.base_comm_nccl,
partition=x.partition,
engine=x.engine, dtype=self.dtype)
x = x.local_array.reshape(self.dims).squeeze()
x = x[self.islstart[self.rank]:self.islend[self.rank]]
Expand All @@ -125,15 +128,18 @@ def _matvec(self, x: DistributedArray) -> DistributedArray:
for isl in range(self.nsls[self.rank]):
y1[isl] = ncp.dot(self.G[isl], x[isl])
# gather results
y[:] = np.vstack(self.base_comm.allgather(y1)).ravel()
y[:] = ncp.vstack(y._allgather(y1)).ravel()
return y

def _rmatvec(self, x: NDArray) -> NDArray:
ncp = get_module(x.engine)
if x.partition not in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]:
raise ValueError(f"x should have partition={Partition.BROADCAST},{Partition.UNSAFE_BROADCAST}"
f"Got {x.partition} instead...")
y = DistributedArray(global_shape=self.shape[1], partition=x.partition,
y = DistributedArray(global_shape=self.shape[1],
base_comm=x.base_comm,
base_comm_nccl=x.base_comm_nccl,
partition=x.partition,
engine=x.engine, dtype=self.dtype)
x = x.local_array.reshape(self.dimsd).squeeze()
x = x[self.islstart[self.rank]:self.islend[self.rank]]
Expand All @@ -159,5 +165,5 @@ def _rmatvec(self, x: NDArray) -> NDArray:
y1[isl] = ncp.dot(x[isl].T.conj(), self.G[isl]).T.conj()

# gather results
y[:] = np.vstack(self.base_comm.allgather(y1)).ravel()
y[:] = ncp.vstack(y._allgather(y1)).ravel()
return y
152 changes: 115 additions & 37 deletions pylops_mpi/utils/_nccl.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
__all__ = [
"_prepare_nccl_allgather_inputs",
"_unroll_nccl_allgather_recv",
"initialize_nccl_comm",
"nccl_split",
"nccl_allgather",
"nccl_allreduce",
"nccl_bcast",
"nccl_asarray",
"nccl_send",
"nccl_recv"
"nccl_recv",
]

from enum import IntEnum
from typing import Tuple
from mpi4py import MPI
import os
import numpy as np
import cupy as cp
import cupy.cuda.nccl as nccl


cupy_to_nccl_dtype = {
"float32": nccl.NCCL_FLOAT32,
"float64": nccl.NCCL_FLOAT64,
Expand All @@ -25,6 +29,9 @@
"int8": nccl.NCCL_INT8,
"uint32": nccl.NCCL_UINT32,
"uint64": nccl.NCCL_UINT64,
# sending complex array as float with 2x size
"complex64": nccl.NCCL_FLOAT32,
"complex128": nccl.NCCL_FLOAT64,
}


Expand All @@ -35,6 +42,106 @@ class NcclOp(IntEnum):
MIN = nccl.NCCL_MIN


def _nccl_buf_size(buf, count=None):
""" Get an appropriate buffer size according to the dtype of buf

Parameters
----------
buf : :obj:`cupy.ndarray` or array-like
The data buffer from the local GPU to be sent.

count : :obj:`int`, optional
Number of elements to send from `buf`, if not sending the every element in `buf`.
Returns:
-------
:obj:`int`
An appropriate number of elements to send from `send_buf` for NCCL communication.
"""
if buf.dtype in ['complex64', 'complex128']:
return 2 * count if count else 2 * buf.size
else:
return count if count else buf.size


def _prepare_nccl_allgather_inputs(send_buf, send_buf_shapes) -> Tuple[cp.ndarray, cp.ndarray]:
r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather)

NCCL's allGather requires the sending buffer to have the same size for every device.
Therefore, padding is required when the array is not evenly partitioned across
all the ranks. The padding is applied such that the each dimension of the sending buffers
is equal to the max size of that dimension across all ranks.

Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size

Parameters
----------
send_buf : :obj:`cupy.ndarray` or array-like
The data buffer from the local GPU to be sent for allgather.
send_buf_shapes: :obj:`list`
A list of shapes for each GPU send_buf (used to calculate padding size)

Returns
-------
send_buf: :obj:`cupy.ndarray`
A buffer containing the data and padded elements to be sent by this rank.
recv_buf : :obj:`cupy.ndarray`
An empty, padded buffer to gather data from all GPUs.
"""
sizes_each_dim = list(zip(*send_buf_shapes))
send_shape = tuple(map(max, sizes_each_dim))
pad_size = [
(0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape)
]

send_buf = cp.pad(
send_buf, pad_size, mode="constant", constant_values=0
)

# NCCL recommends to use one MPI Process per GPU and so size of receiving buffer can be inferred
ndev = len(send_buf_shapes)
recv_buf = cp.zeros(ndev * send_buf.size, dtype=send_buf.dtype)

return send_buf, recv_buf


def _unroll_nccl_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> list:
"""Unrolll recv_buf after NCCL allgather (nccl_allgather)

Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays
Each GPU may send array with a different shape, so the return type has to be a list of array
instead of the concatenated array.

Parameters
----------
recv_buf: :obj:`cupy.ndarray` or array-like
The data buffer returned from nccl_allgather call
padded_send_buf_shape: :obj:`tuple`:int
The size of send_buf after padding used in nccl_allgather
send_buf_shapes: :obj:`list`
A list of original shapes for each GPU send_buf prior to padding

Returns
-------
chunks: :obj:`list`
A list of `cupy.ndarray` from each GPU with the padded element removed
"""

ndev = len(send_buf_shapes)
# extract an individual array from each device
chunk_size = np.prod(padded_send_buf_shape)
chunks = [
recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev)
]

# Remove padding from each array: the padded value may appear somewhere
# in the middle of the flat array and thus the reshape and slicing for each dimension is required
for i in range(ndev):
slicing = tuple(slice(0, end) for end in send_buf_shapes[i])
chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing]

return chunks


def mpi_op_to_nccl(mpi_op) -> NcclOp:
""" Map MPI reduction operation to NCCL equivalent

Expand Down Expand Up @@ -155,7 +262,7 @@ def nccl_allgather(nccl_comm, send_buf, recv_buf=None) -> cp.ndarray:
nccl_comm.allGather(
send_buf.data.ptr,
recv_buf.data.ptr,
send_buf.size,
_nccl_buf_size(send_buf),
cupy_to_nccl_dtype[str(send_buf.dtype)],
cp.cuda.Stream.null.ptr,
)
Expand Down Expand Up @@ -193,7 +300,7 @@ def nccl_allreduce(nccl_comm, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM) ->
nccl_comm.allReduce(
send_buf.data.ptr,
recv_buf.data.ptr,
send_buf.size,
_nccl_buf_size(send_buf),
cupy_to_nccl_dtype[str(send_buf.dtype)],
mpi_op_to_nccl(op),
cp.cuda.Stream.null.ptr,
Expand All @@ -220,7 +327,7 @@ def nccl_bcast(nccl_comm, local_array, index, value) -> None:
local_array[index] = value
nccl_comm.bcast(
local_array[index].data.ptr,
local_array[index].size,
_nccl_buf_size(local_array[index]),
cupy_to_nccl_dtype[str(local_array[index].dtype)],
0,
cp.cuda.Stream.null.ptr,
Expand All @@ -247,41 +354,12 @@ def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray:
-------
final_array : :obj:`cupy.ndarray`
Global array gathered from all GPUs and concatenated along `axis`.

Notes
-----
NCCL's allGather requires the sending buffer to have the same size for every device.
Therefore, the padding is required when the array is not evenly partitioned across
all the ranks. The padding is applied such that the sending buffer has the size of
each dimension corresponding to the max possible size of that dimension.
"""
sizes_each_dim = list(zip(*local_shapes))

send_shape = tuple(map(max, sizes_each_dim))
pad_size = [
(0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, local_array.shape)
]

send_buf = cp.pad(
local_array, pad_size, mode="constant", constant_values=0
)

# NCCL recommends to use one MPI Process per GPU and so size of receiving buffer can be inferred
ndev = len(local_shapes)
recv_buf = cp.zeros(ndev * send_buf.size, dtype=send_buf.dtype)
send_buf, recv_buf = _prepare_nccl_allgather_inputs(local_array, local_shapes)
nccl_allgather(nccl_comm, send_buf, recv_buf)
chunks = _unroll_nccl_allgather_recv(recv_buf, send_buf.shape, local_shapes)

# extract an individual array from each device
chunk_size = np.prod(send_shape)
chunks = [
recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev)
]

# Remove padding from each array: the padded value may appear somewhere
# in the middle of the flat array and thus the reshape and slicing for each dimension is required
for i in range(ndev):
slicing = tuple(slice(0, end) for end in local_shapes[i])
chunks[i] = chunks[i].reshape(send_shape)[slicing]
# combine back to single global array
return cp.concatenate(chunks, axis=axis)

Expand All @@ -302,7 +380,7 @@ def nccl_send(nccl_comm, send_buf, dest, count):
Number of elements to send from `send_buf`.
"""
nccl_comm.send(send_buf.data.ptr,
count,
_nccl_buf_size(send_buf, count),
cupy_to_nccl_dtype[str(send_buf.dtype)],
dest,
cp.cuda.Stream.null.ptr
Expand All @@ -325,7 +403,7 @@ def nccl_recv(nccl_comm, recv_buf, source, count=None):
Number of elements to receive.
"""
nccl_comm.recv(recv_buf.data.ptr,
count,
_nccl_buf_size(recv_buf, count),
cupy_to_nccl_dtype[str(recv_buf.dtype)],
source,
cp.cuda.Stream.null.ptr
Expand Down
8 changes: 4 additions & 4 deletions tests_nccl/test_blockdiag_nccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
nccl_comm = initialize_nccl_comm()

par1 = {'ny': 101, 'nx': 101, 'dtype': np.float64}
# par1j = {'ny': 101, 'nx': 101, 'dtype': np.complex128}
par1j = {'ny': 101, 'nx': 101, 'dtype': np.complex128}
par2 = {'ny': 301, 'nx': 101, 'dtype': np.float64}
# par2j = {'ny': 301, 'nx': 101, 'dtype': np.complex128}
par2j = {'ny': 301, 'nx': 101, 'dtype': np.complex128}

np.random.seed(42)


@pytest.mark.mpi(min_size=2)
@pytest.mark.parametrize("par", [(par1), (par2)])
@pytest.mark.parametrize("par", [(par1), (par1j), (par2), (par2j)])
def test_blockdiag_nccl(par):
"""Test the MPIBlockDiag with NCCL"""
size = MPI.COMM_WORLD.Get_size()
Expand Down Expand Up @@ -71,7 +71,7 @@ def test_blockdiag_nccl(par):


@pytest.mark.mpi(min_size=2)
@pytest.mark.parametrize("par", [(par1), (par2)])
@pytest.mark.parametrize("par", [(par1), (par1j), (par2), (par2j)])
def test_stacked_blockdiag_nccl(par):
"""Tests for MPIStackedBlogDiag with NCCL"""
size = MPI.COMM_WORLD.Get_size()
Expand Down
Loading