Skip to content

Commit 21071f7

Browse files
committed
no create trainer var on listen_and_serv
1 parent b009636 commit 21071f7

File tree

3 files changed

+16
-41
lines changed

3 files changed

+16
-41
lines changed

paddle/fluid/operators/listen_and_serv_op.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class ListenAndServOp : public framework::OperatorBase {
8585
rpc_service_->SetScope(&recv_scope);
8686
rpc_service_->SetDevCtx(&dev_ctx);
8787
auto ins = Inputs("X");
88-
auto fan_in = ins.size();
88+
auto fan_in = Attr<int>("Fanin");
8989

9090
auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock);
9191
auto *program = block->Program();
@@ -163,6 +163,8 @@ from send_op and send back variables to recv_op.
163163
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); });
164164
AddAttr<framework::BlockDesc *>(kOptimizeBlock,
165165
"BlockID to run on server side.");
166+
AddAttr<int>("Fanin", "How many clients send to this server.")
167+
.SetDefault(1);
166168
}
167169
};
168170

python/paddle/v2/fluid/distribute_transpiler.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from __future__ import print_function
1616
import framework
17-
from framework import Program, default_main_program, Parameter, Variable
17+
from framework import Program, default_main_program, default_startup_program, Parameter, Variable
1818
import optimizer
1919
from layer_helper import LayerHelper
2020
from distributed_spliter import *
@@ -97,7 +97,7 @@ def transpile(self,
9797
parameter servers.
9898
9999
:param optimize_ops: op list of optimization, should be the
100-
return value of Optimizer.minimize
100+
return value of Optimizer.minimize
101101
:type optimize_ops: list
102102
:param params_grads: list of tuple(weight, gradient)
103103
:type params_grads: list
@@ -131,6 +131,7 @@ def transpile(self,
131131
# 4. append concat_op to trainer to update local weights.
132132
# 5. create new program for parameter server.
133133
# 6. create parameter server program by split_method generated endpoint->VarBlock
134+
# 7. update startup_program, rename variables to variables with trainer_id
134135

135136
pserver_endpoints = pservers.split(",")
136137

@@ -175,7 +176,6 @@ def transpile(self,
175176
shape=[0])
176177

177178
# create send_op
178-
print("send inputs: ", send_inputs)
179179
send_op = program.global_block().append_op(
180180
type="send",
181181
inputs={"X": send_inputs},
@@ -194,6 +194,15 @@ def transpile(self,
194194
outputs={"Out": [orig_param]},
195195
attrs={"axis": 0})
196196

197+
# step 7
198+
startup_prog = default_startup_program()
199+
for varname in startup_prog.global_block().vars.keys():
200+
if varname in param_var_mapping and \
201+
len(param_var_mapping[varname]) == 1:
202+
new_var_name = "%s.trainer_%d" % \
203+
(varname, self.trainer_id)
204+
startup_prog.global_block().rename_var(varname, new_var_name)
205+
197206
def _create_vars_from_blocklist(self, program, block_list):
198207
# Create respective variables using the block_list
199208
block_map = dict()
@@ -210,7 +219,6 @@ def _create_vars_from_blocklist(self, program, block_list):
210219
new_var_name = "%s.trainer_%d" % \
211220
(orig_var.name, self.trainer_id)
212221
program.global_block().rename_var(varname, new_var_name)
213-
print("renaming OK...", varname, new_var_name)
214222
var_mapping[varname] = \
215223
[program.global_block().var(new_var_name)]
216224
continue
@@ -377,10 +385,7 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint):
377385
new_inputs = dict()
378386
# update param/grad shape first, then other inputs like
379387
# moment can use the updated shape
380-
print("mark1")
381388
for key in opt_op.input_names:
382-
# print("opt type: ", opt_op.type)
383-
# print("opt op input: ", key)
384389
if key == "Grad":
385390
grad_block = None
386391
for g in self.param_grad_ep_mapping[endpoint]["grads"]:
@@ -427,7 +432,6 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint):
427432

428433
new_inputs[key] = tmpvar
429434

430-
print("mark2")
431435
for key in opt_op.input_names:
432436
if key in ["Param", "Grad"]:
433437
continue
@@ -451,7 +455,6 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint):
451455
inputs=new_inputs,
452456
outputs=outputs,
453457
attrs=opt_op.attrs)
454-
print("mark3")
455458

456459
def _append_pserver_non_opt_ops(self, optimize_block, opt_op):
457460
program = optimize_block.program
@@ -505,8 +508,6 @@ def get_pserver_program(self, endpoint):
505508
suff_idx = v.name.find(".trainer_")
506509
if suff_idx >= 0:
507510
orig_var_name = v.name[:suff_idx]
508-
print("create variable for program: %s.trainer_%d" %
509-
(orig_var_name, trainer_id))
510511
var = pserver_program.global_block().create_var(
511512
name="%s.trainer_%d" % (orig_var_name, trainer_id),
512513
persistable=True,
@@ -517,11 +518,6 @@ def get_pserver_program(self, endpoint):
517518
optimize_block = pserver_program.create_block(0)
518519
# Iterate through the ops and append ops as needed
519520
for idx, opt_op in enumerate(self.optimize_ops):
520-
print("mark0")
521-
print(opt_op.inputs.keys())
522-
for v in opt_op.inputs.values():
523-
print(v.name)
524-
print(v.shape)
525521
is_op_on_pserver = self._is_op_on_pserver(endpoint,
526522
self.optimize_ops, idx)
527523
if not is_op_on_pserver:
@@ -547,7 +543,7 @@ def get_pserver_program(self, endpoint):
547543
# p.name
548544
# for p in self.param_grad_ep_mapping[endpoint]["grads"]
549545
# ],
550-
# "Fanin": self.trainers
546+
"Fanin": self.trainers
551547
})
552548
pserver_program.sync_with_cpp()
553549
return pserver_program

python/paddle/v2/fluid/framework.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -761,17 +761,6 @@ def rename_var(self, name, new_name):
761761
else:
762762
raise ValueError("unsupported var type: %s", type(v))
763763

764-
def _clear_op_io_for_var(name):
765-
for op in self.ops:
766-
for k in op.inputs.keys():
767-
768-
if op.inputs[k].name == name:
769-
op.inputs[k] = None
770-
for k in op.outputs.keys():
771-
if op.outputs[k].name == name:
772-
op.outputs[k] = None
773-
774-
_clear_op_io_for_var(name)
775764
self.desc.rename_var(name, new_name)
776765
d = self.desc.find_var(new_name)
777766
var = None
@@ -797,17 +786,6 @@ def _clear_op_io_for_var(name):
797786
# rename the python side, sync_with_cpp will only add
798787
# new vars/ops to python side.
799788
self.vars[new_name] = var
800-
for op in self.ops:
801-
print("### rename op i/o ", name, op.inputs)
802-
if op.inputs:
803-
for k in op.inputs.keys():
804-
if op.inputs[k] == None:
805-
print("rename input: ", name, var)
806-
op.inputs[k] = var
807-
if op.outputs:
808-
for k in op.outputs.keys():
809-
if op.outputs[k] == None:
810-
op.outputs[k] = var
811789
del self.vars[name]
812790
self.sync_with_cpp()
813791

@@ -901,7 +879,6 @@ def copy_param_info_from(self, other):
901879
for p in other.iter_parameters():
902880
assert isinstance(p, Parameter)
903881
v = self.vars.get(p.name, None)
904-
print("var shape to copy", v, p)
905882
if v is None:
906883
raise ValueError("copy_param_info_from should be invoked with "
907884
"same topology")

0 commit comments

Comments
 (0)