-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy path_nccl.py
More file actions
358 lines (301 loc) · 10.9 KB
/
_nccl.py
File metadata and controls
358 lines (301 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
__all__ = [
"_nccl_sync",
"initialize_nccl_comm",
"nccl_split",
"nccl_allgather",
"nccl_allreduce",
"nccl_bcast",
"nccl_asarray",
"nccl_send",
"nccl_recv",
"nccl_sendrecv"
]
from enum import IntEnum
from mpi4py import MPI
import os
import cupy as cp
import cupy.cuda.nccl as nccl
from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv
cupy_to_nccl_dtype = {
"float32": nccl.NCCL_FLOAT32,
"float64": nccl.NCCL_FLOAT64,
"int32": nccl.NCCL_INT32,
"int64": nccl.NCCL_INT64,
"uint8": nccl.NCCL_UINT8,
"int8": nccl.NCCL_INT8,
"uint32": nccl.NCCL_UINT32,
"uint64": nccl.NCCL_UINT64,
# sending complex array as float with 2x size
"complex64": nccl.NCCL_FLOAT32,
"complex128": nccl.NCCL_FLOAT64,
}
class NcclOp(IntEnum):
SUM = nccl.NCCL_SUM
PROD = nccl.NCCL_PROD
MAX = nccl.NCCL_MAX
MIN = nccl.NCCL_MIN
def _nccl_buf_size(buf, count=None):
""" Get an appropriate buffer size according to the dtype of buf
Parameters
----------
buf : :obj:`cupy.ndarray` or array-like
The data buffer from the local GPU to be sent.
count : :obj:`int`, optional
Number of elements to send from `buf`, if not sending the every element in `buf`.
Returns:
-------
:obj:`int`
An appropriate number of elements to send from `send_buf` for NCCL communication.
"""
if buf.dtype in ['complex64', 'complex128']:
return 2 * count if count else 2 * buf.size
else:
return count if count else buf.size
def _nccl_sync():
"""A thin wrapper of CuPy's synchronization for protected import"""
if cp.cuda.runtime.getDeviceCount() == 0:
return
cp.cuda.runtime.deviceSynchronize()
def mpi_op_to_nccl(mpi_op) -> NcclOp:
""" Map MPI reduction operation to NCCL equivalent
Parameters
----------
mpi_op : :obj:`MPI.Op`
A MPI reduction operation (e.g., MPI.SUM, MPI.PROD, MPI.MAX, MPI.MIN).
Returns:
-------
NcclOp : :obj:`IntEnum`
A corresponding NCCL reduction operation.
"""
if mpi_op is MPI.SUM:
return NcclOp.SUM
elif mpi_op is MPI.PROD:
return NcclOp.PROD
elif mpi_op is MPI.MAX:
return NcclOp.MAX
elif mpi_op is MPI.MIN:
return NcclOp.MIN
else:
raise ValueError(f"Unsupported MPI.Op for NCCL: {mpi_op}")
def initialize_nccl_comm() -> nccl.NcclCommunicator:
""" Initialize NCCL world communicator for every GPU device
Each GPU must be managed by exactly one MPI process.
i.e. the number of MPI process launched must be equal to
number of GPUs in communications
Returns:
-------
nccl_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
A corresponding NCCL communicator
"""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Create a communicator for ranks on the same node
node_comm = comm.Split_type(MPI.COMM_TYPE_SHARED)
size_node = node_comm.Get_size()
device_id = int(
os.environ.get("OMPI_COMM_WORLD_LOCAL_RANK")
or (rank % size_node) % cp.cuda.runtime.getDeviceCount()
)
cp.cuda.Device(device_id).use()
if rank == 0:
with cp.cuda.Device(device_id):
nccl_id_bytes = nccl.get_unique_id()
else:
nccl_id_bytes = None
nccl_id_bytes = comm.bcast(nccl_id_bytes, root=0)
nccl_comm = nccl.NcclCommunicator(size, nccl_id_bytes, rank)
return nccl_comm
def nccl_split(mask) -> nccl.NcclCommunicator:
""" NCCL-equivalent of MPI.Split()
Splitting the communicator into multiple NCCL subcommunicators
Parameters
----------
mask : :obj:`list`
Mask defining subsets of ranks to consider when performing 'global'
operations on the distributed array such as dot product or norm.
Returns:
-------
sub_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
Subcommunicator according to mask
"""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
sub_comm = comm.Split(color=mask[rank], key=rank)
sub_rank = sub_comm.Get_rank()
sub_size = sub_comm.Get_size()
if sub_rank == 0:
nccl_id_bytes = nccl.get_unique_id()
else:
nccl_id_bytes = None
nccl_id_bytes = sub_comm.bcast(nccl_id_bytes, root=0)
sub_comm = nccl.NcclCommunicator(sub_size, nccl_id_bytes, sub_rank)
return sub_comm
def nccl_allgather(nccl_comm, send_buf, recv_buf=None) -> cp.ndarray:
""" NCCL equivalent of MPI_Allgather. Gathers data from all GPUs
and distributes the concatenated result to all participants.
Parameters
----------
nccl_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
The NCCL communicator over which data will be gathered.
send_buf : :obj:`cupy.ndarray` or array-like
The data buffer from the local GPU to be sent.
recv_buf : :obj:`cupy.ndarray`, optional
The buffer to receive data from all GPUs. If None, a new
buffer will be allocated with the appropriate shape.
Returns
-------
recv_buf : :obj:`cupy.ndarray`
A buffer containing the gathered data from all GPUs.
"""
send_buf = (
send_buf if isinstance(send_buf, cp.ndarray) else cp.asarray(send_buf)
)
if recv_buf is None:
recv_buf = cp.zeros(
nccl_comm.size() * send_buf.size,
dtype=send_buf.dtype,
)
nccl_comm.allGather(
send_buf.data.ptr,
recv_buf.data.ptr,
_nccl_buf_size(send_buf),
cupy_to_nccl_dtype[str(send_buf.dtype)],
cp.cuda.Stream.null.ptr,
)
return recv_buf
def nccl_allreduce(nccl_comm, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM) -> cp.ndarray:
""" NCCL equivalent of MPI_Allreduce. Applies a reduction operation
(e.g., sum, max) across all GPUs and distributes the result.
Parameters
----------
nccl_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
The NCCL communicator used for collective communication.
send_buf : :obj:`cupy.ndarray` or array-like
The data buffer from the local GPU to be reduced.
recv_buf : :obj:`cupy.ndarray`, optional
The buffer to store the result of the reduction. If None,
a new buffer will be allocated with the appropriate shape.
op : :obj:mpi4py.MPI.Op, optional
The reduction operation to apply. Defaults to MPI.SUM.
Returns
-------
recv_buf : :obj:`cupy.ndarray`
A buffer containing the result of the reduction, broadcasted
to all GPUs.
"""
send_buf = (
send_buf if isinstance(send_buf, cp.ndarray) else cp.asarray(send_buf)
)
if recv_buf is None:
recv_buf = cp.zeros(send_buf.size, dtype=send_buf.dtype)
nccl_comm.allReduce(
send_buf.data.ptr,
recv_buf.data.ptr,
_nccl_buf_size(send_buf),
cupy_to_nccl_dtype[str(send_buf.dtype)],
mpi_op_to_nccl(op),
cp.cuda.Stream.null.ptr,
)
return recv_buf
def nccl_bcast(nccl_comm, send_buf, root: int = 0) -> None:
"""NCCL equivalent of MPI_Bcast for an array buffer.
Parameters
----------
nccl_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
The NCCL communicator used for collective communication.
send_buf: :obj:`numpy.ndarray` or :obj:`cupy.ndarray`
The data buffer to be broadcasted to the other ranks from the broadcasting root rank.
root: :obj:`int`
The rank of the broadcasting process.
"""
send_buf = send_buf if isinstance(send_buf, cp.ndarray) else cp.asarray(send_buf)
nccl_comm.bcast(
send_buf.data.ptr,
_nccl_buf_size(send_buf),
cupy_to_nccl_dtype[str(send_buf.dtype)],
root,
cp.cuda.Stream.null.ptr,
)
def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray:
"""Global view of the array
Gather all local GPU arrays into a single global array via NCCL all-gather.
Parameters
----------
nccl_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
The NCCL communicator used for collective communication.
local_array : :obj:`cupy.ndarray`
The local array on the current GPU.
local_shapes : :obj:`list`
A list of shapes for each GPU local array (used to trim padding).
axis : :obj:`int`
The axis along which to concatenate the gathered arrays.
Returns
-------
final_array : :obj:`cupy.ndarray`
Global array gathered from all GPUs and concatenated along `axis`.
"""
send_buf, recv_buf = _prepare_allgather_inputs(local_array, local_shapes, engine="cupy")
nccl_allgather(nccl_comm, send_buf, recv_buf)
chunks = _unroll_allgather_recv(recv_buf, local_shapes, send_buf.shape, engine="cupy")
# combine back to single global array
return cp.concatenate(chunks, axis=axis)
def nccl_send(nccl_comm, send_buf, dest, count):
"""NCCL equivalent of MPI_Send. Sends a specified number of elements
from the buffer to a destination GPU device.
Parameters
----------
nccl_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
The NCCL communicator used for point-to-point communication.
send_buf : :obj:`cupy.ndarray`
The array containing data to send.
dest: :obj:`int`
The rank of the destination GPU device.
count : :obj:`int`
Number of elements to send from `send_buf`.
"""
nccl_comm.send(send_buf.data.ptr,
_nccl_buf_size(send_buf, count),
cupy_to_nccl_dtype[str(send_buf.dtype)],
dest,
cp.cuda.Stream.null.ptr
)
def nccl_recv(nccl_comm, recv_buf, source, count=None):
"""NCCL equivalent of MPI_Recv. Receives data from a source GPU device
into the given buffer.
Parameters
----------
nccl_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
The NCCL communicator used for point-to-point communication.
recv_buf : :obj:`cupy.ndarray`
The array to store the received data.
source : :obj:`int`
The rank of the source GPU device.
count : :obj:`int`, optional
Number of elements to receive.
"""
nccl_comm.recv(recv_buf.data.ptr,
_nccl_buf_size(recv_buf, count),
cupy_to_nccl_dtype[str(recv_buf.dtype)],
source,
cp.cuda.Stream.null.ptr
)
def nccl_sendrecv(nccl_comm, sendbuf, dest, recvbuf, source):
"""NCCL equivalent of MPI_SendRecv. Sends/Receives data in one combined call.
Parameters
----------
nccl_comm : :obj:`cupy.cuda.nccl.NcclCommunicator`
The NCCL communicator used for point-to-point communication.
sendbuf : :obj:`cupy.ndarray`
The array containing data to send.
dest: :obj:`int`
The rank of the destination GPU device.
recvbuf : :obj:`cupy.ndarray`
The array to store the received data.
source : :obj:`int`
The rank of the source GPU device.
"""
nccl.groupStart()
nccl_send(nccl_comm, sendbuf, dest, sendbuf.size)
nccl_recv(nccl_comm, recvbuf, source)
nccl.groupEnd()