-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathrestarted.py
More file actions
475 lines (392 loc) · 21.3 KB
/
restarted.py
File metadata and controls
475 lines (392 loc) · 21.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
import copy
import itertools
import torch
from torch.cuda.comm import broadcast_coalesced
import torch.distributed as dist
from torch.cuda.comm import broadcast_coalesced, reduce_add_coalesced
if dist.is_available():
from torch.distributed.distributed_c10d import _get_default_group
from torch.nn.modules import Module
from torch.nn.parallel.replicate import replicate
from torch.nn.parallel.scatter_gather import scatter_kwargs, gather
from torch.nn.parallel.parallel_apply import parallel_apply
from torch.cuda._utils import _get_device_index
def _find_tensors(obj):
r"""
Recursively find all tensors contained in the specified object.
"""
if isinstance(obj, torch.Tensor):
return [obj]
if isinstance(obj, (list, tuple)):
return itertools.chain(*map(_find_tensors, obj))
if isinstance(obj, dict):
return itertools.chain(*map(_find_tensors, obj.values()))
return []
def _flatten_tensors(tensors):
"""
Flatten dense tensors into a contiguous 1D buffer. Assume tensors are of
same dense type.
Since inputs are dense, the resulting tensor will be a concatenated 1D
buffer. Element-wise operation on this buffer will be equivalent to
operating individually.
Arguments:
tensors (Iterable[Tensor]): dense tensors to flatten.
Returns:
A 1D buffer containing input tensors.
"""
if len(tensors) == 1:
return tensors[0].view(-1).clone()
flat = torch.cat([t.view(-1) for t in tensors], dim=0)
return flat
def _unflatten_tensors(flat, tensors):
"""
View a flat buffer using the sizes of tensors. Assume that tensors are of
same dense type, and that flat is given by _flatten_dense_tensors.
Arguments:
flat (Tensor): flattened dense tensors to unflatten.
tensors (Iterable[Tensor]): dense tensors whose sizes will be used to
unflatten flat.
Returns:
Unflattened dense tensors with sizes same as tensors and values from
flat.
"""
outputs = []
offset = 0
for tensor in tensors:
numel = tensor.numel()
outputs.append(flat.narrow(0, offset, numel).view_as(tensor))
offset += numel
return tuple(outputs)
class DistributedDataParallel(Module):
r"""Implements distributed data parallelism that is based on
``torch.distributed`` package at the module level.
This container parallelizes the application of the given module by
splitting the input across the specified devices by chunking in the batch
dimension. The module is replicated on each machine and each device, and
each such replica handles a portion of the input. During the backwards
pass, gradients from each node are averaged.
The batch size should be larger than the number of GPUs used locally.
See also: :ref:`distributed-basics` and :ref:`cuda-nn-dataparallel-instead`.
The same constraints on input as in :class:`torch.nn.DataParallel` apply.
Creation of this class requires that ``torch.distributed`` to be already
initialized, by calling :func:`torch.distributed.init_process_group`.
``DistributedDataParallel`` can be used in the following two ways:
(1) Single-Process Multi-GPU
In this case, a single process will be
spawned on each host/node and each process will operate on all the GPUs
of the node where it's running. To use ``DistributedDataParallel`` in
this way, you can simply construct the model as the following:
>>> torch.distributed.init_process_group(backend="nccl")
>>> model = DistributedDataParallel(model) # device_ids will include all GPU devices by default
(2) Multi-Process Single-GPU
This is the highly recommended way to use ``DistributedDataParallel``, with
multiple processes, each of which operates on a single GPU. This is
currently the fastest approach to do data parallel training using PyTorch
and applies to both single-node(multi-GPU) and multi-node data
parallel training. It is proven to be significantly faster than
:class:`torch.nn.DataParallel` for single-node multi-GPU data
parallel training.
Here is how to use it: on each host with N GPUs, you should spawn up N
processes, while ensuring that each process individually works on a single GPU
from 0 to N-1. Therefore, it is your job to ensure that your training script
operates on a single given GPU by calling:
>>> torch.cuda.set_device(i)
where i is from 0 to N-1. In each process, you should refer the following
to construct this module:
>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)
In order to spawn up multiple processes per node, you can use either
``torch.distributed.launch`` or ``torch.multiprocessing.spawn``
.. note:: ``nccl`` backend is currently the fastest and
highly recommended backend to be used with Multi-Process Single-GPU
distributed training and this applies to both single-node and multi-node
distributed training
.. note:: This module also supports mixed-precision distributed training.
This means that your model can have different types of parameters such
as mixed types of fp16 and fp32, the gradient reduction on these
mixed types of parameters will just work fine.
Also note that ``nccl`` backend is currently the fastest and highly
recommended backend for fp16/fp32 mixed-precision training.
.. note:: If you use ``torch.save`` on one process to checkpoint the module,
and ``torch.load`` on some other processes to recover it, make sure that
``map_location`` is configured properly for every process. Without
``map_location``, ``torch.load`` would recover the module to devices
where the module was saved from.
.. warning::
This module works only with the ``gloo`` and ``nccl`` backends.
.. warning::
Constructor, forward method, and differentiation of the output (or a
function of the output of this module) is a distributed synchronization
point. Take that into account in case different processes might be
executing different code.
.. warning::
This module assumes all parameters are registered in the model by the
time it is created. No parameters should be added nor removed later.
Same applies to buffers.
.. warning::
This module assumes all parameters are registered in the model of each
distributed processes are in the same order. The module itself will
conduct gradient all-reduction following the reverse order of the
registered parameters of the model. In other words, it is users'
responsibility to ensure that each distributed process has the exact
same model and thus the exact same parameter registration order.
.. warning::
This module assumes all buffers and gradients are dense.
.. warning::
This module doesn't work with :func:`torch.autograd.grad` (i.e. it will
only work if gradients are to be accumulated in ``.grad`` attributes of
parameters).
.. warning::
If you plan on using this module with a ``nccl`` backend or a ``gloo``
backend (that uses Infiniband), together with a DataLoader that uses
multiple workers, please change the multiprocessing start method to
``forkserver`` (Python 3 only) or ``spawn``. Unfortunately
Gloo (that uses Infiniband) and NCCL2 are not fork safe, and you will
likely experience deadlocks if you don't change this setting.
.. warning::
Forward and backward hooks defined on :attr:`module` and its submodules
won't be invoked anymore, unless the hooks are initialized in the
:meth:`forward` method.
.. warning::
You should never try to change your model's parameters after wrapping
up your model with DistributedDataParallel. In other words, when
wrapping up your model with DistributedDataParallel, the constructor of
DistributedDataParallel will register the additional gradient
reduction functions on all the parameters of the model itself at the
time of construction. If you change the model's parameters after
the DistributedDataParallel construction, this is not supported and
unexpected behaviors can happen, since some parameters' gradient
reduction functions might not get called.
.. note::
Parameters are never broadcast between processes. The module performs
an all-reduce step on gradients and assumes that they will be modified
by the optimizer in all processes in the same way. Buffers
(e.g. BatchNorm stats) are broadcast from the module in process of rank
0, to all other replicas in the system in every iteration.
Args:
module (Module): module to be parallelized
device_ids (list of int or torch.device): CUDA devices. This should
only be provided when the input module resides on a single
CUDA device. For single-device modules, the ``i``th
:attr:`module` replica is placed on ``device_ids[i]``. For
multi-device modules and CPU modules, device_ids must be None
or an empty list, and input data for the forward pass must be
placed on the correct device. (default: all devices for
single-device modules)
output_device (int or torch.device): device location of output for
single-device CUDA modules. For multi-device modules and
CPU modules, it must be None, and the module itself
dictates the output location. (default: device_ids[0] for
single-device modules)
broadcast_buffers (bool): flag that enables syncing (broadcasting) buffers of
the module at beginning of the forward function.
(default: ``True``)
process_group: the process group to be used for distributed data
all-reduction. If ``None``, the default process group, which
is created by ```torch.distributed.init_process_group```,
will be used. (default: ``None``)
bucket_cap_mb: DistributedDataParallel will bucket parameters into
multiple buckets so that gradient reduction of each
bucket can potentially overlap with backward computation.
:attr:`bucket_cap_mb` controls the bucket size in MegaBytes (MB)
(default: 25)
find_unused_parameters (bool): Traverse the autograd graph of all tensors
contained in the return value of the wrapped
module's ``forward`` function.
Parameters that don't receive gradients as
part of this graph are preemptively marked
as being ready to be reduced.
(default: ``False``)
check_reduction: when setting to ``True``, it enables DistributedDataParallel
to automatically check if the previous iteration's
backward reductions were successfully issued at the
beginning of every iteration's forward function.
You normally don't need this option enabled unless you
are observing weird behaviors such as different ranks
are getting different gradients, which should not
happen if DistributedDataParallel is correctly used.
(default: ``False``)
Attributes:
module (Module): the module to be parallelized
Example::
>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> net = torch.nn.DistributedDataParallel(model, pg)
"""
def __init__(self, module, local=True, device_ids=None,
output_device=None, dim=0, broadcast_buffers=True,
process_group=None, bucket_cap_mb=25,
find_unused_parameters=False,
check_reduction=False, update_period=10):
super(DistributedDataParallel, self).__init__()
self.is_multi_device_module = len({p.device for p in module.parameters()}) > 1
self.is_cuda = all([p.device.type == 'cuda' for p in module.parameters()])
if not self.is_cuda or self.is_multi_device_module:
assert not device_ids and not output_device, (
"DistributedDataParallel device_ids and output_device arguments "
"only work with single-device CUDA modules, but got "
"device_ids {}, output_device {}, and module parameters {}."
).format(device_ids, output_device, {p.device for p in module.parameters()})
self.device_ids = None
self.output_device = None
else:
# Use all devices by default for single-device CUDA modules
if device_ids is None:
device_ids = list(range(torch.cuda.device_count()))
self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))
if output_device is None:
output_device = device_ids[0]
self.output_device = _get_device_index(output_device, True)
print("devices", self.device_ids)
if self.is_multi_device_module:
assert self.is_cuda, (
"DistributedDataParallel with multi-device module only works "
"with CUDA devices, but module parameters locate in {}."
).format({p.device for p in module.parameters()})
if process_group is None:
self.process_group = _get_default_group()
else:
self.process_group = process_group
self.dim = dim
self.module = module
self.broadcast_buffers = broadcast_buffers
self.find_unused_parameters = find_unused_parameters
if check_reduction:
# This argument is no longer used since the reducer
# will ensure reduction completes even if some parameters
# do not receive gradients.
pass
MB = 1024 * 1024
# used for intra-node param sync and inter-node sync as well
self.broadcast_bucket_size = int(250 * MB)
# reduction bucket size
self.bucket_bytes_cap = int(bucket_cap_mb * MB)
# Sync params and buffers
module_states = list(self.module.state_dict().values())
if len(module_states) > 0:
self._dist_broadcast_coalesced(module_states,
self.broadcast_bucket_size)
self._ddp_init_helper()
print("self.device_ids",self.device_ids)
def _ddp_init_helper(self):
"""
Initialization helper function that does the following:
(1) replicating the module from device[0] to the other devices
(2) bucketing the parameters for reductions
(3) resetting the bucketing states
(4) registering the grad hooks
(5) passing a handle of DDP to SyncBatchNorm Layer
"""
if self.device_ids and len(self.device_ids) > 1:
# only create replicas for single-device CUDA modules
#
# TODO: we don't need to replicate params in here. they're always going to
# be broadcasted using larger blocks in broadcast_coalesced, so it might be
# better to not pollute the caches with these small blocks
self._module_copies = replicate(self.module, self.device_ids, detach=True)
self._module_copies[0] = self.module
for module_copy in self._module_copies[1:]:
for param, copy_param in zip(self.module.parameters(), module_copy.parameters()):
copy_param.requires_grad = param.requires_grad
else:
self._module_copies = [self.module]
self.modules_params = [list(m.parameters()) for m in self._module_copies]
self.modules_buffers = [list(m.buffers()) for m in self._module_copies]
param_list = [
list(filter(lambda p: p.requires_grad, module.parameters()))
for module in self._module_copies]
# The bucket size limit is specified in the constructor.
# Additionally, we allow for a single small bucket for parameters
# that are defined first, such that their gradients don't spill into
# a much larger bucket, adding unnecessary latency after gradient
# computation finishes. Experiments showed 1MB is a reasonable value.
bucket_indices = dist._compute_bucket_assignment_by_size(
param_list[0],
[1024 * 1024, self.bucket_bytes_cap])
# Note: reverse list of buckets because we want to approximate the
# order in which their gradients are produced, and assume they
# are used in the forward pass in the order they are defined.
self.reducer = dist.Reducer(
param_list,
list(reversed(bucket_indices)),
self.process_group)
# passing a handle to torch.nn.SyncBatchNorm layer
self._passing_sync_batchnorm_handle(self._module_copies)
def __getstate__(self):
self._check_default_group()
attrs = copy.copy(self.__dict__)
del attrs['process_group']
del attrs['reducer']
return attrs
def __setstate__(self, state):
# If serializable, then the process group should be the default one
self.process_group = _get_default_group()
super(DistributedDataParallel, self).__setstate__(state)
self._ddp_init_helper()
def _check_default_group(self):
pickle_not_supported = False
try:
if self.process_group != _get_default_group():
pickle_not_supported = True
except RuntimeError:
pickle_not_supported = True
if pickle_not_supported:
raise RuntimeError("DDP Pickling/Unpickling are only supported "
"when using DDP with the default process "
"group. That is, when you have called "
"init_process_group and have not passed "
"process_group argument to DDP constructor")
def forward(self, *inputs, **kwargs):
# self._sync_buffers()
if self.device_ids:
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
if len(self.device_ids) == 1:
output = self.module(*inputs[0], **kwargs[0])
else:
outputs = self.parallel_apply(self._module_copies[:len(inputs)], inputs, kwargs)
# output = outputs
output = self.gather(outputs, self.output_device)
else:
output = self.module(*inputs, **kwargs)
return output
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
def gather(self, outputs, output_device):
return gather(outputs, output_device, dim=self.dim)
def train(self, mode=True):
super(DistributedDataParallel, self).train(mode)
for module in self._module_copies[1:]:
module.train(mode)
def _dist_broadcast_coalesced(self, tensors, buffer_size):
dist._dist_broadcast_coalesced(self.process_group, tensors, buffer_size, False)
def _sync_period(self):
with torch.no_grad():
if self.device_ids :
if self.broadcast_buffers and len(self.modules_buffers[0]) > 0:
# cross-node buffer sync
self._dist_broadcast_coalesced(self.modules_buffers[0],
self.broadcast_bucket_size)
out_msg_list = _flatten_tensors(self.modules_params[0]).clone()
dist.all_reduce(out_msg_list)
for r, g in zip(_unflatten_tensors(out_msg_list, self.modules_params[0]),
self.modules_params[0]):
g.copy_(r/dist.get_world_size())
for param in g:
if param.grad is not None:
param.grad.detach_()
param.grad.zero_()
def _sync_buffers(self):
with torch.no_grad():
# module buffer sync
if self.broadcast_buffers and len(self.modules_buffers[0]) > 0:
# cross-node buffer sync
self._dist_broadcast_coalesced(self.modules_buffers[0],
self.broadcast_bucket_size)
def _passing_sync_batchnorm_handle(self, module_copies):
for dev_idx, module in enumerate(module_copies):
for layer in module.modules():
if isinstance(layer, torch.nn.modules.SyncBatchNorm):
assert self.is_cuda, "SyncBatchNorm layers only work with CUDA modules"
layer._specify_ddp_gpu_num(
len(self.device_ids) if self.device_ids else 1)