Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/source/gpu.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ In the following, we provide a list of modules (i.e., operators and solvers) whe
* - :class:`pylops_mpi.optimization.basic.cgls`
- ✅
* - :class:`pylops_mpi.signalprocessing.Fredhoml1`
- Planned ⏳
* - ISTA Solver
- Planned ⏳
- ✅
* - Complex Numeric Data Type for NCCL
- Planned ⏳
- ✅
* - ISTA Solver
- Planned ⏳
20 changes: 17 additions & 3 deletions pylops_mpi/DistributedArray.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
nccl_message = deps.nccl_import("the DistributedArray module")

if nccl_message is None and cupy_message is None:
from pylops_mpi.utils._nccl import nccl_allgather, nccl_allreduce, nccl_asarray, nccl_bcast, nccl_split, nccl_send, nccl_recv
from pylops_mpi.utils._nccl import nccl_allgather, nccl_allreduce, nccl_asarray, nccl_bcast, nccl_split, nccl_send, nccl_recv, _prepare_nccl_allgather_inputs, _unroll_nccl_allgather_recv
from cupy.cuda.nccl import NcclCommunicator
else:
NcclCommunicator = Any
Expand Down Expand Up @@ -500,7 +500,15 @@ def _allgather(self, send_buf, recv_buf=None):
"""Allgather operation
"""
if deps.nccl_enabled and self.base_comm_nccl:
return nccl_allgather(self.base_comm_nccl, send_buf, recv_buf)
if hasattr(send_buf, "shape"):
send_shapes = self.base_comm.allgather(send_buf.shape)
(padded_send, padded_recv) = _prepare_nccl_allgather_inputs(send_buf, send_shapes)
# TODO: Should we ignore recv_buf completely in this case ?
raw_recv = nccl_allgather(self.base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_nccl_allgather_recv(raw_recv, padded_send.shape, send_shapes)
else:
# still works for a send_buf whose type is a tuple for _nccl_local_shapes
return nccl_allgather(self.base_comm_nccl, send_buf, recv_buf)
else:
if recv_buf is None:
return self.base_comm.allgather(send_buf)
Expand All @@ -511,7 +519,13 @@ def _allgather_subcomm(self, send_buf, recv_buf=None):
"""Allgather operation with subcommunicator
"""
if deps.nccl_enabled and getattr(self, "base_comm_nccl"):
return nccl_allgather(self.sub_comm, send_buf, recv_buf)
if hasattr(send_buf, "shape"):
send_shapes = self.base_comm.allgather(send_buf.shape)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you sure is not self.sub_comm?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, you are right. Strange enough, no test case catches that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I would think here

xmaskedloc = arr.asarray(masked=True)
the test should fail... the original code (in current main branch) looks correct but your previous commit should have called this
return nccl_asarray(self.sub_comm if masked else self.base_comm_nccl,
in the .asarray call... anyways, now it is correct 😄

(padded_send, padded_recv) = _prepare_nccl_allgather_inputs(send_buf, send_shapes)
raw_recv = nccl_allgather(self.sub_comm, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_nccl_allgather_recv(raw_recv, padded_send.shape, send_shapes)
else:
return nccl_allgather(self.sub_comm, send_buf, recv_buf)
else:
if recv_buf is None:
return self.sub_comm.allgather(send_buf)
Expand Down
14 changes: 10 additions & 4 deletions pylops_mpi/signalprocessing/Fredholm1.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ def _matvec(self, x: DistributedArray) -> DistributedArray:
if x.partition not in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]:
raise ValueError(f"x should have partition={Partition.BROADCAST},{Partition.UNSAFE_BROADCAST}"
f"Got {x.partition} instead...")
y = DistributedArray(global_shape=self.shape[0], partition=x.partition,
y = DistributedArray(global_shape=self.shape[0],
base_comm=x.base_comm,
base_comm_nccl=x.base_comm_nccl,
partition=x.partition,
engine=x.engine, dtype=self.dtype)
x = x.local_array.reshape(self.dims).squeeze()
x = x[self.islstart[self.rank]:self.islend[self.rank]]
Expand All @@ -125,15 +128,18 @@ def _matvec(self, x: DistributedArray) -> DistributedArray:
for isl in range(self.nsls[self.rank]):
y1[isl] = ncp.dot(self.G[isl], x[isl])
# gather results
y[:] = np.vstack(self.base_comm.allgather(y1)).ravel()
y[:] = ncp.vstack(y._allgather(y1)).ravel()
return y

def _rmatvec(self, x: NDArray) -> NDArray:
ncp = get_module(x.engine)
if x.partition not in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]:
raise ValueError(f"x should have partition={Partition.BROADCAST},{Partition.UNSAFE_BROADCAST}"
f"Got {x.partition} instead...")
y = DistributedArray(global_shape=self.shape[1], partition=x.partition,
y = DistributedArray(global_shape=self.shape[1],
base_comm=x.base_comm,
base_comm_nccl=x.base_comm_nccl,
partition=x.partition,
engine=x.engine, dtype=self.dtype)
x = x.local_array.reshape(self.dimsd).squeeze()
x = x[self.islstart[self.rank]:self.islend[self.rank]]
Expand All @@ -159,5 +165,5 @@ def _rmatvec(self, x: NDArray) -> NDArray:
y1[isl] = ncp.dot(x[isl].T.conj(), self.G[isl]).T.conj()

# gather results
y[:] = np.vstack(self.base_comm.allgather(y1)).ravel()
y[:] = ncp.vstack(y._allgather(y1)).ravel()
return y
142 changes: 108 additions & 34 deletions pylops_mpi/utils/_nccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"nccl_bcast",
"nccl_asarray",
"nccl_send",
"nccl_recv"
"nccl_recv",
"_prepare_nccl_allgather_inputs",
"_unroll_nccl_allgather_recv"
]

from enum import IntEnum
Expand All @@ -25,6 +27,9 @@
"int8": nccl.NCCL_INT8,
"uint32": nccl.NCCL_UINT32,
"uint64": nccl.NCCL_UINT64,
# sending complex array as float with 2x size
"complex64": nccl.NCCL_FLOAT32,
"complex128": nccl.NCCL_FLOAT64,
}


Expand All @@ -35,6 +40,27 @@ class NcclOp(IntEnum):
MIN = nccl.NCCL_MIN


def _nccl_buf_size(buf, count=None):
""" Get an appropriate buffer size according to the dtype of buf

Parameters
----------
buf : :obj:`cupy.ndarray` or array-like
The data buffer from the local GPU to be sent.

count : :obj:`int`, optional
Number of elements to send from `buf`, if not sending the every element in `buf`.
Returns:
-------
:obj:`int`
An appropriate number of elements to send from `send_buf` for NCCL communication.
"""
if buf.dtype in ['complex64', 'complex128']:
return 2 * count if count else 2 * buf.size
else:
return count if count else buf.size


def mpi_op_to_nccl(mpi_op) -> NcclOp:
""" Map MPI reduction operation to NCCL equivalent

Expand Down Expand Up @@ -155,7 +181,7 @@ def nccl_allgather(nccl_comm, send_buf, recv_buf=None) -> cp.ndarray:
nccl_comm.allGather(
send_buf.data.ptr,
recv_buf.data.ptr,
send_buf.size,
_nccl_buf_size(send_buf),
cupy_to_nccl_dtype[str(send_buf.dtype)],
cp.cuda.Stream.null.ptr,
)
Expand Down Expand Up @@ -193,7 +219,7 @@ def nccl_allreduce(nccl_comm, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM) ->
nccl_comm.allReduce(
send_buf.data.ptr,
recv_buf.data.ptr,
send_buf.size,
_nccl_buf_size(send_buf),
cupy_to_nccl_dtype[str(send_buf.dtype)],
mpi_op_to_nccl(op),
cp.cuda.Stream.null.ptr,
Expand All @@ -220,68 +246,116 @@ def nccl_bcast(nccl_comm, local_array, index, value) -> None:
local_array[index] = value
nccl_comm.bcast(
local_array[index].data.ptr,
local_array[index].size,
_nccl_buf_size(local_array[index]),
cupy_to_nccl_dtype[str(local_array[index].dtype)],
0,
cp.cuda.Stream.null.ptr,
)


def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray:
"""Global view of the array
def _prepare_nccl_allgather_inputs(send_buf, send_buf_shapes) -> tuple[cp.ndarray, cp.ndarray]:
""" Preparing the send_buf and recv_buf for the NCCL allgather (nccl_allgather)

Gather all local GPU arrays into a single global array via NCCL all-gather.
NCCL's allGather requires the sending buffer to have the same size for every device.
Therefore, the padding is required when the array is not evenly partitioned across
all the ranks. The padding is applied such that the sending buffer has the size of
each dimension corresponding to the max possible size of that dimension.

Receiver buff (recv_buf) will have the size n_rank * send_buf.size

Parameters
----------
nccl_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
The NCCL communicator used for collective communication.
local_array : :obj:`cupy.ndarray`
The local array on the current GPU.
local_shapes : :obj:`list`
A list of shapes for each GPU local array (used to trim padding).
axis : :obj:`int`
The axis along which to concatenate the gathered arrays.
send_buf : :obj:`cupy.ndarray` or array-like
The data buffer from the local GPU to be sent for allgather.
send_buf_shapes: :obj:`list`
A list of shapes for each GPU send_buf (used to calculate padding size)

Returns
-------
final_array : :obj:`cupy.ndarray`
Global array gathered from all GPUs and concatenated along `axis`.
tuple[send_buf, recv_buf]: :obj:`tuple`
A tuple of (send_buf, recv_buf) will an appropriate size, shape and dtype for NCCL allgather

Notes
-----
NCCL's allGather requires the sending buffer to have the same size for every device.
Therefore, the padding is required when the array is not evenly partitioned across
all the ranks. The padding is applied such that the sending buffer has the size of
each dimension corresponding to the max possible size of that dimension.
"""
sizes_each_dim = list(zip(*local_shapes))

sizes_each_dim = list(zip(*send_buf_shapes))
send_shape = tuple(map(max, sizes_each_dim))
pad_size = [
(0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, local_array.shape)
(0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape)
]

send_buf = cp.pad(
local_array, pad_size, mode="constant", constant_values=0
send_buf, pad_size, mode="constant", constant_values=0
)

# NCCL recommends to use one MPI Process per GPU and so size of receiving buffer can be inferred
ndev = len(local_shapes)
ndev = len(send_buf_shapes)
recv_buf = cp.zeros(ndev * send_buf.size, dtype=send_buf.dtype)
nccl_allgather(nccl_comm, send_buf, recv_buf)

return (send_buf, recv_buf)


def _unroll_nccl_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> list:
""" Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays

Each GPU may send array with a different shape, so the return type has to be a list of array
instead of the concatenated array.

Parameters
----------
recv_buf: :obj:`cupy.ndarray` or array-like
The data buffer returned from nccl_allgather call
padded_send_buf_shape: :obj:`tuple`:int
The size of send_buf after padding used in nccl_allgather
send_buf_shapes: :obj:`list`
A list of original shapes for each GPU send_buf prior to padding

Returns
-------
chunks: :obj:`list`
A list of `cupy.ndarray` from each GPU with the padded element removed
"""

ndev = len(send_buf_shapes)
# extract an individual array from each device
chunk_size = np.prod(send_shape)
chunk_size = np.prod(padded_send_buf_shape)
chunks = [
recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev)
]

# Remove padding from each array: the padded value may appear somewhere
# in the middle of the flat array and thus the reshape and slicing for each dimension is required
for i in range(ndev):
slicing = tuple(slice(0, end) for end in local_shapes[i])
chunks[i] = chunks[i].reshape(send_shape)[slicing]
slicing = tuple(slice(0, end) for end in send_buf_shapes[i])
chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing]

return chunks


def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray:
"""Global view of the array

Gather all local GPU arrays into a single global array via NCCL all-gather.

Parameters
----------
nccl_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
The NCCL communicator used for collective communication.
local_array : :obj:`cupy.ndarray`
The local array on the current GPU.
local_shapes : :obj:`list`
A list of shapes for each GPU local array (used to trim padding).
axis : :obj:`int`
The axis along which to concatenate the gathered arrays.

Returns
-------
final_array : :obj:`cupy.ndarray`
Global array gathered from all GPUs and concatenated along `axis`.
"""

(send_buf, recv_buf) = _prepare_nccl_allgather_inputs(local_array, local_shapes)
nccl_allgather(nccl_comm, send_buf, recv_buf)
chunks = _unroll_nccl_allgather_recv(recv_buf, send_buf.shape, local_shapes)

# combine back to single global array
return cp.concatenate(chunks, axis=axis)

Expand All @@ -302,7 +376,7 @@ def nccl_send(nccl_comm, send_buf, dest, count):
Number of elements to send from `send_buf`.
"""
nccl_comm.send(send_buf.data.ptr,
count,
_nccl_buf_size(send_buf, count),
cupy_to_nccl_dtype[str(send_buf.dtype)],
dest,
cp.cuda.Stream.null.ptr
Expand All @@ -325,7 +399,7 @@ def nccl_recv(nccl_comm, recv_buf, source, count=None):
Number of elements to receive.
"""
nccl_comm.recv(recv_buf.data.ptr,
count,
_nccl_buf_size(recv_buf, count),
cupy_to_nccl_dtype[str(recv_buf.dtype)],
source,
cp.cuda.Stream.null.ptr
Expand Down
8 changes: 4 additions & 4 deletions tests_nccl/test_blockdiag_nccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
nccl_comm = initialize_nccl_comm()

par1 = {'ny': 101, 'nx': 101, 'dtype': np.float64}
# par1j = {'ny': 101, 'nx': 101, 'dtype': np.complex128}
par1j = {'ny': 101, 'nx': 101, 'dtype': np.complex128}
par2 = {'ny': 301, 'nx': 101, 'dtype': np.float64}
# par2j = {'ny': 301, 'nx': 101, 'dtype': np.complex128}
par2j = {'ny': 301, 'nx': 101, 'dtype': np.complex128}

np.random.seed(42)


@pytest.mark.mpi(min_size=2)
@pytest.mark.parametrize("par", [(par1), (par2)])
@pytest.mark.parametrize("par", [(par1), (par1j), (par2), (par2j)])
def test_blockdiag_nccl(par):
"""Test the MPIBlockDiag with NCCL"""
size = MPI.COMM_WORLD.Get_size()
Expand Down Expand Up @@ -71,7 +71,7 @@ def test_blockdiag_nccl(par):


@pytest.mark.mpi(min_size=2)
@pytest.mark.parametrize("par", [(par1), (par2)])
@pytest.mark.parametrize("par", [(par1), (par1j), (par2), (par2j)])
def test_stacked_blockdiag_nccl(par):
"""Tests for MPIStackedBlogDiag with NCCL"""
size = MPI.COMM_WORLD.Get_size()
Expand Down
Loading