-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy path_common.py
More file actions
135 lines (116 loc) · 5.12 KB
/
_common.py
File metadata and controls
135 lines (116 loc) · 5.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
__all__ = [
"_prepare_allgather_inputs",
"_unroll_allgather_recv"
]
import numpy as np
from pylops.utils.backend import get_module
# TODO: return type annotation for both cupy and numpy
def _prepare_allgather_inputs(send_buf, send_buf_shapes, engine):
r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather)
Buffered Allgather (MPI and NCCL) requires the sending buffer to have the same size for every device.
Therefore, padding is required when the array is not evenly partitioned across
all the ranks. The padding is applied such that the each dimension of the sending buffers
is equal to the max size of that dimension across all ranks.
Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size
Parameters
----------
send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
The data buffer from the local GPU to be sent for allgather.
send_buf_shapes: :obj:`list`
A list of shapes for each GPU send_buf (used to calculate padding size)
engine : :obj:`str`
Engine used to store array (``numpy`` or ``cupy``)
Returns
-------
send_buf: :obj:`cupy.ndarray`
A buffer containing the data and padded elements to be sent by this rank.
recv_buf : :obj:`cupy.ndarray`
An empty, padded buffer to gather data from all GPUs.
"""
ncp = get_module(engine)
sizes_each_dim = list(zip(*send_buf_shapes))
send_shape = tuple(map(max, sizes_each_dim))
pad_size = [
(0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape)
]
send_buf = ncp.pad(
send_buf, pad_size, mode="constant", constant_values=0
)
ndev = len(send_buf_shapes)
recv_buf = ncp.zeros(ndev * send_buf.size, dtype=send_buf.dtype)
return send_buf, recv_buf
def _prepare_allgather_inputs_mpi(send_buf, send_buf_shapes, recvcounts, engine):
r"""
Prepare send_buf and recv_buf for MPI allgather (mpi_allgather)
Parameters
----------
send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
The data buffer to be sent for allgather.
send_buf_shapes: :obj:`list`
A list of shapes for each send_buf (used to calculate padding size)
recvcounts: :obj:`list`
The element counts per rank in mpi_allgather
engine : :obj:`str`
Engine used to store array (``numpy`` or ``cupy``)
Returns
-------
send_buf: :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
A buffer containing the data and padded elements to be sent by this rank.
recv_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like
A buffer to gather data from all ranks.
displs : list, optional
The starting offsets in recv_buf where data from each rank in mpi_allgather
"""
ncp = get_module(engine)
recv_buf = ncp.zeros(sum(recvcounts), dtype=send_buf.dtype)
if len(set(send_buf_shapes)) == 1:
displs = None
else:
displs = [0]
for i in range(1, len(recvcounts)):
displs.append(displs[i - 1] + recvcounts[i - 1])
return ncp.ascontiguousarray(send_buf), recv_buf, displs
def _unroll_allgather_recv(recv_buf, send_buf_shapes, padded_send_buf_shape=None,
recvcounts=None, displs=None, engine="numpy") -> list:
r"""Unroll recv_buf after Buffered Allgather (MPI and NCCL)
Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays
Each GPU may send array with a different shape, so the return type has to be a list of array
instead of the concatenated array.
Parameters
----------
recv_buf: :obj:`cupy.ndarray` or array-like
The data buffer returned from nccl_allgather call
send_buf_shapes: :obj:`list`
A list of original shapes for each GPU send_buf prior to padding
padded_send_buf_shape : tuple, optional
The size of send_buf after padding used in nccl_allgather
recvcounts : list, optional
The element counts per rank in mpi_allgather
displs : list, optional
The starting offsets in recv_buf where data from each rank in mpi_allgather
engine : :obj:`str`
Engine used to store array (``numpy`` or ``cupy``)
Returns
-------
chunks: :obj:`list`
A list of `cupy.ndarray` from each GPU with the padded element removed
"""
ncp = get_module(engine)
ndev = len(send_buf_shapes)
if padded_send_buf_shape is not None:
chunk_size = int(np.prod(padded_send_buf_shape))
chunks = [
recv_buf[i * chunk_size:(i + 1) * chunk_size]
for i in range(ndev)
]
for i in range(ndev):
slicing = tuple(slice(0, end) for end in send_buf_shapes[i])
chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing]
return chunks
if displs is not None:
return [
recv_buf[displs[i]:displs[i] + recvcounts[i]].reshape(send_buf_shapes[i])
for i in range(ndev)
]
chunks = ncp.split(recv_buf, ndev)
return [chunk.reshape(send_buf_shapes[0]) for chunk in chunks]