Skip to content

Commit aeb2dc2

Browse files
authored
Nccl2 dist API (#13506)
* add nccl2 dist api * update apispec * update * update api spec
1 parent c66a8d2 commit aeb2dc2

File tree

3 files changed

+97
-18
lines changed

3 files changed

+97
-18
lines changed

paddle/fluid/API.spec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'end
5353
paddle.fluid.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
5454
paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
5555
paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,))
56-
paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None))
56+
paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program', 'current_endpoint'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None, '127.0.0.1:6174'))
5757
paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))
5858
paddle.fluid.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,))
5959
paddle.fluid.DistributeTranspilerConfig.__init__
@@ -336,7 +336,7 @@ paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['
336336
paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
337337
paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
338338
paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,))
339-
paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None))
339+
paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program', 'current_endpoint'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None, '127.0.0.1:6174'))
340340
paddle.fluid.transpiler.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))
341341
paddle.fluid.transpiler.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,))
342342
paddle.fluid.transpiler.HashName.__init__ ArgSpec(args=['self', 'pserver_endpoints'], varargs=None, keywords=None, defaults=None)

python/paddle/fluid/tests/unittests/test_dist_transpiler.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,5 +659,25 @@ def transpiler_test_impl(self):
659659
pserver2._slice_vars_and_attrs[idx][2].shape))
660660

661661

662+
class TestNCCL2Transpile(TranspilerTest):
663+
def test_nccl2_transpile(self):
664+
main = fluid.Program()
665+
startup = fluid.Program()
666+
with fluid.program_guard(main, startup):
667+
self.net_conf()
668+
669+
config = fluid.DistributeTranspilerConfig()
670+
config.mode = "nccl2"
671+
t = fluid.DistributeTranspiler(config=config)
672+
t.transpile(
673+
0,
674+
trainers="127.0.0.1:6174,127.0.0.1:6175",
675+
current_endpoint="127.0.0.1:6174",
676+
startup_program=startup)
677+
print([op.type for op in startup.global_block().ops])
678+
self.assertEqual(startup.global_block().ops[-1].type, "gen_nccl_id")
679+
self.assertIsNotNone(startup.global_block().vars.get("NCCLID"))
680+
681+
662682
if __name__ == "__main__":
663683
unittest.main()

python/paddle/fluid/transpiler/distribute_transpiler.py

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ class DistributeTranspilerConfig(object):
136136
slice_var_up = True
137137
split_method = None
138138
min_block_size = 8192
139+
# supported modes: pserver, nccl2
140+
mode = "pserver"
139141
print_log = False
140142

141143

@@ -144,27 +146,30 @@ class DistributeTranspiler(object):
144146
**DistributeTranspiler**
145147
146148
Convert the fluid program to distributed data-parallelism programs.
149+
Supports two modes: pserver mode and nccl2 mode.
147150
148-
The main_program will be transformed to use a remote parameter server
149-
to do parameter optimization. And the optimization graph will be put
150-
into a parameter server program.
151+
In pserver mode, the main_program will be transformed to use a remote
152+
parameter server to do parameter optimization. And the optimization
153+
graph will be put into a parameter server program.
154+
155+
In nccl2 mode, the transpiler will append a NCCL_ID broadcasting
156+
op in startup_program to share the NCCL_ID across the job nodes.
157+
After transpile_nccl2 called, you ***must*** pass trainer_id and
158+
num_trainers argument to ParallelExecutor to enable NCCL2 distributed
159+
mode.
151160
152161
Examples:
153162
.. code-block:: python
154163
155-
# Define your model before these codes.
156-
port = os.getenv("PADDLE_PSERVER_PORT", "6174")
157-
pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
158-
eplist = []
159-
for ip in pserver_ips.split(","):
160-
eplist.append(':'.join([ip, port]))
161-
pserver_endpoints = ",".join(eplist)
162-
trainers = int(os.getenv("PADDLE_TRAINERS"))
163-
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
164-
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
164+
# for pserver mode
165+
pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
166+
trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
167+
current_endpoint = "192.168.0.1:6174"
168+
trainer_id = 0
169+
trainers = 4
165170
role = os.getenv("PADDLE_TRAINING_ROLE")
166171
167-
t = distribute_transpiler.DistributeTranspiler()
172+
t = fluid.DistributeTranspiler()
168173
t.transpile(
169174
trainer_id, pservers=pserver_endpoints, trainers=trainers)
170175
if role == "PSERVER":
@@ -173,6 +178,18 @@ class DistributeTranspiler(object):
173178
pserver_program)
174179
elif role == "TRAINER":
175180
trainer_program = t.get_trainer_program()
181+
182+
# for nccl2 mode
183+
config = fluid.DistributeTranspilerConfig()
184+
config.mode = "nccl2"
185+
t = fluid.DistributeTranspiler(config=config)
186+
t.transpile(trainer_id, workers=workers, current_endpoint=curr_ep)
187+
exe = fluid.ParallelExecutor(
188+
use_cuda,
189+
loss_name=loss_var.name,
190+
num_trainers=len(trainers.split(",)),
191+
trainer_id=trainer_id
192+
)
176193
"""
177194

178195
def __init__(self, config=None):
@@ -190,13 +207,41 @@ def __init__(self, config=None):
190207
assert (self.config.min_block_size >= 8192)
191208
assert (self.config.split_method.__bases__[0] == PSDispatcher)
192209

210+
def _transpile_nccl2(self,
211+
trainer_id,
212+
trainers,
213+
current_endpoint,
214+
startup_program=None):
215+
if not startup_program:
216+
startup_program = default_startup_program()
217+
if trainer_id >= 0:
218+
worker_endpoints = trainers.split(",")
219+
# send NCCL_ID to others or recv from trainer 0
220+
worker_endpoints.remove(current_endpoint)
221+
222+
nccl_id_var = startup_program.global_block().create_var(
223+
name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW)
224+
startup_program.global_block().append_op(
225+
type="gen_nccl_id",
226+
inputs={},
227+
outputs={"NCCLID": nccl_id_var},
228+
attrs={
229+
"endpoint": current_endpoint,
230+
"endpoint_list": worker_endpoints,
231+
"trainer_id": trainer_id
232+
})
233+
return nccl_id_var
234+
else:
235+
raise ValueError("must set trainer_id > 0")
236+
193237
def transpile(self,
194238
trainer_id,
195239
program=None,
196240
pservers="127.0.0.1:6174",
197241
trainers=1,
198242
sync_mode=True,
199-
startup_program=None):
243+
startup_program=None,
244+
current_endpoint="127.0.0.1:6174"):
200245
"""
201246
Run the transpiler.
202247
@@ -207,10 +252,15 @@ def transpile(self,
207252
default is fluid.default_main_program().
208253
pservers (str): comma separated ip:port string for the pserver
209254
list.
210-
trainers (int): number of trainers in the distributed job.
255+
trainers (int|str): in pserver mode this is the number of
256+
trainers, in nccl2 mode this is a string of trainer
257+
endpoints.
211258
sync_mode (bool): Do sync training or not, default is True.
212259
startup_program (Program|None): startup_program to transpile,
213260
default is fluid.default_main_program().
261+
current_endpoint (str): need pass current endpoint when
262+
transpile as nccl2 distributed mode. In pserver mode
263+
this argument is not used.
214264
"""
215265
if program is None:
216266
program = default_main_program()
@@ -220,6 +270,15 @@ def transpile(self,
220270
self.startup_program = startup_program
221271
self.origin_startup_program = self.startup_program.clone()
222272

273+
if self.config.mode == "nccl2":
274+
assert (isinstance(trainers, str))
275+
self._transpile_nccl2(
276+
trainer_id,
277+
trainers,
278+
current_endpoint,
279+
startup_program=startup_program)
280+
return
281+
223282
self.trainer_num = trainers
224283
self.sync_mode = sync_mode
225284
self.trainer_id = trainer_id

0 commit comments

Comments
 (0)