@@ -38,14 +38,14 @@ def split_dense_variable(var_list,
38
38
min_block_size = 1024 ,
39
39
max_block_size = 1048576 ):
40
40
"""
41
- We may need to split dense tensor to one or several blocks and put
41
+ We may need to split dense tensor to one or more blocks and put
42
42
them equally onto parameter server. One block is a sub-tensor
43
43
aligned by dim[0] of the tensor.
44
-
44
+
45
45
We need to have a minimal block size so that the calculations in
46
46
the parameter server side can gain better performance. By default
47
- mininum block size is 1024. The max block size is used to prevent
48
- too large block that may causing send error.
47
+ minimum block size is 1024. The max block size is used to prevent
48
+ very large blocks that may cause send error.
49
49
"""
50
50
blocks = []
51
51
for var in var_list :
@@ -64,7 +64,7 @@ def split_dense_variable(var_list,
64
64
remains = block_size % dim1
65
65
if remains != 0 :
66
66
block_size += dim1 - remains
67
- # update split_count after align
67
+ # update split_count after aligning
68
68
split_count = int (math .ceil (var_numel / float (block_size )))
69
69
for block_id in xrange (split_count ):
70
70
curr_block_size = min (block_size , var_numel - (
@@ -83,18 +83,18 @@ def transpile(self,
83
83
trainers = 1 ,
84
84
split_method = round_robin ):
85
85
"""
86
- Transpile the program to a distributed data-parallelism programs.
87
- The main_program will be transform to use a remote parameter server
86
+ Transpile the program to distributed data-parallelism programs.
87
+ The main_program will be transformed to use a remote parameter server
88
88
to do parameter optimization. And the optimization graph will be put
89
- in to a parameter server program.
89
+ into a parameter server program.
90
90
91
- Use different methods to split trainable varialbles to different
91
+ Use different methods to split trainable variables to different
92
92
parameter servers.
93
93
94
94
:param optimize_ops: op list of optimization, should be the
95
95
return value of Optimizer.minimize
96
96
:type optimize_ops: list
97
- :param program: program to optimize, default default_main_program
97
+ :param program: program to optimize, default is default_main_program
98
98
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
99
99
:type pservers: string
100
100
:return: return a list of programs
@@ -106,11 +106,11 @@ def transpile(self,
106
106
self .trainers = trainers
107
107
self .optimize_ops = optimize_ops
108
108
# steps to transpile:
109
- # 1. split variable to multiple blocks, align by product(dim[1:]) (width).
109
+ # 1. split variable to multiple blocks, aligned by product(dim[1:]) (width).
110
110
# 2. modify trainer program add split_op to each Grad.
111
111
# 3. append send_op to trainer.
112
112
# 4. append concat_op to trainer to update local weights.
113
- # 5. create new program as parameter server.
113
+ # 5. create new program for parameter server.
114
114
# 6. create parameter server program by split_method generated endpoint->VarBlock
115
115
116
116
pserver_endpoints = pservers .split ("," )
@@ -136,10 +136,10 @@ def transpile(self,
136
136
for b in param_blocks :
137
137
varname , block_id , _ = b .split (":" )
138
138
send_outputs .append (param_var_mapping [varname ][int (block_id )])
139
- # let send_op know which endpoint to send which var, eplist is of the same
140
- # order of send_inputs.
139
+ # let send_op know which endpoint to send which var to , eplist has the same
140
+ # order as send_inputs.
141
141
eplist = split_method (send_inputs , pserver_endpoints )
142
- # create mapping of endpoint -> splited var to create pserver side program
142
+ # create mapping of endpoint -> split var to create pserver side program
143
143
self .param_grad_ep_mapping = dict ()
144
144
for i , ep in enumerate (eplist ):
145
145
param = send_outputs [i ]
@@ -149,6 +149,7 @@ def transpile(self,
149
149
self .param_grad_ep_mapping [ep ]["params" ].append (param )
150
150
self .param_grad_ep_mapping [ep ]["grads" ].append (grad )
151
151
152
+ # create send_op
152
153
send_op = program .global_block ().append_op (
153
154
type = "send" ,
154
155
inputs = {"X" : send_inputs },
@@ -167,6 +168,7 @@ def transpile(self,
167
168
attrs = {"axis" : 0 })
168
169
169
170
def _create_vars_from_blocklist (self , program , block_list ):
171
+ # Create respective variables using the block_list
170
172
block_map = dict ()
171
173
var_mapping = dict ()
172
174
for block_str in block_list :
@@ -207,11 +209,12 @@ def _clone_var(self, block, var):
207
209
dtype = var .dtype ,
208
210
type = var .type ,
209
211
lod_level = var .lod_level ,
210
- # HACK: let all param in pserver persistable so child
212
+ # HACK: let all param in pserver be persistable so the child
211
213
# program in recv can get them
212
214
persistable = True )
213
215
214
216
def _append_split_op (self , program , gradblocks ):
217
+ # Split variables that need to be split and append respective ops
215
218
var_mapping = self ._create_vars_from_blocklist (program , gradblocks )
216
219
for varname , splited_vars in var_mapping .iteritems ():
217
220
# variable that don't need to split have empty splited_vars
@@ -248,6 +251,7 @@ def get_trainer_program(self):
248
251
return self .program
249
252
250
253
def _create_var_for_trainers (self , block , var , trainers ):
254
+ # For each trainer, create the necessary variables
251
255
var_list = []
252
256
for i in xrange (trainers ):
253
257
var_each = block .create_var (
@@ -262,7 +266,7 @@ def _get_optimizer_input_shape(self, op_type, varkey, orig_shape,
262
266
param_shape ):
263
267
"""
264
268
Returns the shape for optimizer inputs that need to be reshaped when
265
- Param and Grad is splited to multiple servers.
269
+ Param and Grad is split to multiple servers.
266
270
"""
267
271
# HACK(typhoonzero): Should use functions of corresponding optimizer in
268
272
# optimizer.py to get the shape, do not bind this in the transpiler.
@@ -300,7 +304,7 @@ def _is_op_on_pserver(self, endpoint, all_ops, idx):
300
304
else :
301
305
for n in param_names :
302
306
if n .startswith (op .inputs ["Param" ].name + ".block" ) and \
303
- n != op .inputs ["Param" ].name :
307
+ n != op .inputs ["Param" ].name :
304
308
return True
305
309
return False
306
310
else :
@@ -396,7 +400,7 @@ def _append_pserver_ops(self, program, pserver_program, opt_op, endpoint):
396
400
dtype = var .dtype ,
397
401
shape = new_shape )
398
402
399
- # change outputs ParamOut variable
403
+ # change output's ParamOut variable
400
404
opt_op .outputs ["ParamOut" ] = new_inputs ["Param" ]
401
405
program .global_block ().append_op (
402
406
type = opt_op .type ,
@@ -405,6 +409,7 @@ def _append_pserver_ops(self, program, pserver_program, opt_op, endpoint):
405
409
attrs = opt_op .attrs )
406
410
407
411
def _append_pserver_non_opt_ops (self , program , pserver_program , opt_op ):
412
+ # Append the ops for parameters that do not need to be optimized/updated
408
413
for _ , var in opt_op .inputs .iteritems ():
409
414
program .global_block ().create_var (
410
415
name = var .name ,
@@ -424,7 +429,7 @@ def _append_pserver_non_opt_ops(self, program, pserver_program, opt_op):
424
429
425
430
def get_pserver_program (self , endpoint ):
426
431
"""
427
- get pserver side program by endpoint
432
+ Get pserver side program using the endpoint
428
433
429
434
NOTE: assume blocks of the same variable is not distributed
430
435
on the same pserver, only change param/grad varnames for
@@ -450,6 +455,7 @@ def get_pserver_program(self, endpoint):
450
455
shape = v .shape )
451
456
# step6
452
457
optimize_sub_program = Program ()
458
+ # Iterate through the ops and append ops as needed
453
459
for idx , opt_op in enumerate (self .optimize_ops ):
454
460
is_op_on_pserver = self ._is_op_on_pserver (endpoint ,
455
461
self .optimize_ops , idx )
@@ -461,6 +467,7 @@ def get_pserver_program(self, endpoint):
461
467
else :
462
468
self ._append_pserver_non_opt_ops (optimize_sub_program ,
463
469
pserver_program , opt_op )
470
+ # Append the recv op
464
471
pserver_program .global_block ().append_op (
465
472
type = "recv" ,
466
473
inputs = {"RX" : self .param_grad_ep_mapping [endpoint ]["grads" ]
@@ -486,7 +493,7 @@ def get_startup_program(self, endpoint, pserver_program):
486
493
"""
487
494
Get startup program for current parameter server.
488
495
Modify operator input variables if there are variables that
489
- was splited to several blocks.
496
+ were split to several blocks.
490
497
"""
491
498
s_prog = Program ()
492
499
orig_s_prog = framework .default_startup_program ()
0 commit comments