Skip to content

Commit 78d7538

Browse files
committed
feat: moved methods shared by _mpi and _nccl to _common
1 parent c852fc4 commit 78d7538

File tree

3 files changed

+117
-95
lines changed

3 files changed

+117
-95
lines changed

pylops_mpi/utils/_common.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
__all__ = [
2+
"_prepare_allgather_inputs",
3+
"_unroll_allgather_recv"
4+
]
5+
6+
from typing import Optional
7+
8+
import numpy as np
9+
from mpi4py import MPI
10+
from pylops.utils.backend import get_module
11+
from pylops_mpi.utils import deps
12+
13+
14+
# TODO: return type annotation for both cupy and numpy
15+
def _prepare_allgather_inputs(send_buf, send_buf_shapes, engine):
16+
r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather)
17+
18+
Buffered Allgather (MPI and NCCL) requires the sending buffer to have the same size for every device.
19+
Therefore, padding is required when the array is not evenly partitioned across
20+
all the ranks. The padding is applied such that the each dimension of the sending buffers
21+
is equal to the max size of that dimension across all ranks.
22+
23+
Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size
24+
25+
Parameters
26+
----------
27+
send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
28+
The data buffer from the local GPU to be sent for allgather.
29+
send_buf_shapes: :obj:`list`
30+
A list of shapes for each GPU send_buf (used to calculate padding size)
31+
engine : :obj:`str`
32+
Engine used to store array (``numpy`` or ``cupy``)
33+
34+
Returns
35+
-------
36+
send_buf: :obj:`cupy.ndarray`
37+
A buffer containing the data and padded elements to be sent by this rank.
38+
recv_buf : :obj:`cupy.ndarray`
39+
An empty, padded buffer to gather data from all GPUs.
40+
"""
41+
ncp = get_module(engine)
42+
sizes_each_dim = list(zip(*send_buf_shapes))
43+
send_shape = tuple(map(max, sizes_each_dim))
44+
pad_size = [
45+
(0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape)
46+
]
47+
48+
send_buf = ncp.pad(
49+
send_buf, pad_size, mode="constant", constant_values=0
50+
)
51+
52+
ndev = len(send_buf_shapes)
53+
recv_buf = ncp.zeros(ndev * send_buf.size, dtype=send_buf.dtype)
54+
55+
return send_buf, recv_buf
56+
57+
58+
def _unroll_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> list:
59+
r"""Unrolll recv_buf after Buffered Allgather (MPI and NCCL)
60+
61+
Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays
62+
Each GPU may send array with a different shape, so the return type has to be a list of array
63+
instead of the concatenated array.
64+
65+
Parameters
66+
----------
67+
recv_buf: :obj:`cupy.ndarray` or array-like
68+
The data buffer returned from nccl_allgather call
69+
padded_send_buf_shape: :obj:`tuple`:int
70+
The size of send_buf after padding used in nccl_allgather
71+
send_buf_shapes: :obj:`list`
72+
A list of original shapes for each GPU send_buf prior to padding
73+
74+
Returns
75+
-------
76+
chunks: :obj:`list`
77+
A list of `cupy.ndarray` from each GPU with the padded element removed
78+
"""
79+
ndev = len(send_buf_shapes)
80+
# extract an individual array from each device
81+
chunk_size = np.prod(padded_send_buf_shape)
82+
chunks = [
83+
recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev)
84+
]
85+
86+
# Remove padding from each array: the padded value may appear somewhere
87+
# in the middle of the flat array and thus the reshape and slicing for each dimension is required
88+
for i in range(ndev):
89+
slicing = tuple(slice(0, end) for end in send_buf_shapes[i])
90+
chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing]
91+
92+
return chunks

pylops_mpi/utils/_mpi.py

Lines changed: 24 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
__all__ = [
22
"mpi_allgather",
33
"mpi_allreduce",
4-
# "mpi_bcast",
4+
"mpi_bcast",
55
# "mpi_asarray",
66
"mpi_send",
77
"mpi_recv",
8-
"_prepare_allgather_inputs",
9-
"_unroll_allgather_recv"
108
]
119

1210
from typing import Optional
@@ -15,87 +13,26 @@
1513
from mpi4py import MPI
1614
from pylops.utils.backend import get_module
1715
from pylops_mpi.utils import deps
16+
from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv
1817

1918

20-
# TODO: return type annotation for both cupy and numpy
21-
def _prepare_allgather_inputs(send_buf, send_buf_shapes, engine):
22-
r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather)
23-
24-
Buffered Allgather (MPI and NCCL) requires the sending buffer to have the same size for every device.
25-
Therefore, padding is required when the array is not evenly partitioned across
26-
all the ranks. The padding is applied such that the each dimension of the sending buffers
27-
is equal to the max size of that dimension across all ranks.
28-
29-
Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size
30-
31-
Parameters
32-
----------
33-
send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
34-
The data buffer from the local GPU to be sent for allgather.
35-
send_buf_shapes: :obj:`list`
36-
A list of shapes for each GPU send_buf (used to calculate padding size)
37-
engine : :obj:`str`
38-
Engine used to store array (``numpy`` or ``cupy``)
39-
40-
Returns
41-
-------
42-
send_buf: :obj:`cupy.ndarray`
43-
A buffer containing the data and padded elements to be sent by this rank.
44-
recv_buf : :obj:`cupy.ndarray`
45-
An empty, padded buffer to gather data from all GPUs.
46-
"""
47-
ncp = get_module(engine)
48-
sizes_each_dim = list(zip(*send_buf_shapes))
49-
send_shape = tuple(map(max, sizes_each_dim))
50-
pad_size = [
51-
(0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape)
52-
]
53-
54-
send_buf = ncp.pad(
55-
send_buf, pad_size, mode="constant", constant_values=0
56-
)
57-
58-
ndev = len(send_buf_shapes)
59-
recv_buf = ncp.zeros(ndev * send_buf.size, dtype=send_buf.dtype)
60-
61-
return send_buf, recv_buf
62-
63-
64-
def _unroll_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> list:
65-
r"""Unrolll recv_buf after Buffered Allgather (MPI and NCCL)
66-
67-
Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays
68-
Each GPU may send array with a different shape, so the return type has to be a list of array
69-
instead of the concatenated array.
70-
71-
Parameters
72-
----------
73-
recv_buf: :obj:`cupy.ndarray` or array-like
74-
The data buffer returned from nccl_allgather call
75-
padded_send_buf_shape: :obj:`tuple`:int
76-
The size of send_buf after padding used in nccl_allgather
77-
send_buf_shapes: :obj:`list`
78-
A list of original shapes for each GPU send_buf prior to padding
79-
80-
Returns
81-
-------
82-
chunks: :obj:`list`
83-
A list of `cupy.ndarray` from each GPU with the padded element removed
84-
"""
85-
ndev = len(send_buf_shapes)
86-
# extract an individual array from each device
87-
chunk_size = np.prod(padded_send_buf_shape)
88-
chunks = [
89-
recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev)
90-
]
19+
def mpi_allgather(base_comm: MPI.Comm,
20+
send_buf, recv_buf=None,
21+
engine: Optional[str] = "numpy") -> np.ndarray:
9122

92-
# Remove padding from each array: the padded value may appear somewhere
93-
# in the middle of the flat array and thus the reshape and slicing for each dimension is required
94-
for i in range(ndev):
95-
slicing = tuple(slice(0, end) for end in send_buf_shapes[i])
96-
chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing]
23+
if deps.cuda_aware_mpi_enabled or engine == "numpy":
24+
send_shapes = base_comm.allgather(send_buf.shape)
25+
(padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine=engine)
26+
recv_buffer_to_use = recv_buf if recv_buf else padded_recv
27+
base_comm.Allgather(padded_send, recv_buffer_to_use)
28+
return _unroll_allgather_recv(recv_buffer_to_use, padded_send.shape, send_shapes)
9729

98-
return chunks
30+
else:
31+
# CuPy with non-CUDA-aware MPI
32+
if recv_buf is None:
33+
return base_comm.allgather(send_buf)
34+
base_comm.Allgather(send_buf, recv_buf)
35+
return recv_buf
9936

10037

10138
def mpi_allreduce(base_comm: MPI.Comm,
@@ -142,23 +79,16 @@ def mpi_allreduce(base_comm: MPI.Comm,
14279
return recv_buf
14380

14481

145-
def mpi_allgather(base_comm: MPI.Comm,
146-
send_buf, recv_buf=None,
147-
engine: Optional[str] = "numpy") -> np.ndarray:
148-
82+
def mpi_bcast(base_comm: MPI.Comm,
83+
rank, local_array, index, value,
84+
engine: Optional[str] = "numpy") -> np.ndarray:
14985
if deps.cuda_aware_mpi_enabled or engine == "numpy":
150-
send_shapes = base_comm.allgather(send_buf.shape)
151-
(padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine=engine)
152-
recv_buffer_to_use = recv_buf if recv_buf else padded_recv
153-
base_comm.Allgather(padded_send, recv_buffer_to_use)
154-
return _unroll_allgather_recv(recv_buffer_to_use, padded_send.shape, send_shapes)
155-
86+
if rank == 0:
87+
local_array[index] = value
88+
base_comm.Bcast(local_array[index])
15689
else:
15790
# CuPy with non-CUDA-aware MPI
158-
if recv_buf is None:
159-
return base_comm.allgather(send_buf)
160-
base_comm.Allgather(send_buf, recv_buf)
161-
return recv_buf
91+
local_array[index] = base_comm.bcast(value)
16292

16393

16494
def mpi_send(base_comm: MPI.Comm,

pylops_mpi/utils/_nccl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import os
1616
import cupy as cp
1717
import cupy.cuda.nccl as nccl
18-
from pylops_mpi.utils._mpi import _prepare_allgather_inputs, _unroll_allgather_recv
18+
from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv
1919

2020
cupy_to_nccl_dtype = {
2121
"float32": nccl.NCCL_FLOAT32,

0 commit comments

Comments
 (0)