Skip to content

Commit cbc6e6e

Browse files
authored
Merge pull request #12247 from seiriosPlus/dis_ckpt_fix
add load slice_vars in io.py
2 parents 7296522 + 0815291 commit cbc6e6e

File tree

8 files changed

+229
-15
lines changed

8 files changed

+229
-15
lines changed

paddle/fluid/API.spec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ paddle.fluid.io.load_vars ArgSpec(args=['executor', 'dirname', 'main_program', '
7878
paddle.fluid.io.load_params ArgSpec(args=['executor', 'dirname', 'main_program', 'filename'], varargs=None, keywords=None, defaults=(None, None))
7979
paddle.fluid.io.load_persistables ArgSpec(args=['executor', 'dirname', 'main_program', 'filename'], varargs=None, keywords=None, defaults=(None, None))
8080
paddle.fluid.io.save_inference_model ArgSpec(args=['dirname', 'feeded_var_names', 'target_vars', 'executor', 'main_program', 'model_filename', 'params_filename', 'export_for_deployment'], varargs=None, keywords=None, defaults=(None, None, None, True))
81-
paddle.fluid.io.load_inference_model ArgSpec(args=['dirname', 'executor', 'model_filename', 'params_filename'], varargs=None, keywords=None, defaults=(None, None))
81+
paddle.fluid.io.load_inference_model ArgSpec(args=['dirname', 'executor', 'model_filename', 'params_filename', 'pserver_endpoints'], varargs=None, keywords=None, defaults=(None, None, None))
8282
paddle.fluid.io.get_inference_program ArgSpec(args=['target_vars', 'main_program'], varargs=None, keywords=None, defaults=(None,))
8383
paddle.fluid.initializer.ConstantInitializer.__init__ ArgSpec(args=['self', 'value', 'force_cpu'], varargs=None, keywords=None, defaults=(0.0, False))
8484
paddle.fluid.initializer.UniformInitializer.__init__ ArgSpec(args=['self', 'low', 'high', 'seed'], varargs=None, keywords=None, defaults=(-1.0, 1.0, 0))

paddle/fluid/operators/distributed/request_handler_impl.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,13 @@ bool RequestCheckpointHandler::Handle(const std::string& varname,
130130
checkpoint_notify_id != -1,
131131
"when checkpoint_notify_id = -1, there should be no RPC invoke.");
132132

133-
auto* lt_var = scope->FindVar(LOOKUP_TABLE_PATH)->GetMutable<std::string>();
133+
// TODO(tangwei12): find out why scope will be error.
134+
auto* lt_var = scope_->FindVar(LOOKUP_TABLE_PATH)->GetMutable<std::string>();
134135
lt_var->clear();
135136
lt_var->append(out_var_name);
136137
VLOG(4) << "RequestCheckpointHandler update var kLookupTablePath to: "
137138
<< out_var_name;
138-
executor_->RunPreparedContext(checkpoint_prepared_ctx_.get(), scope);
139+
executor_->RunPreparedContext(checkpoint_prepared_ctx_.get(), scope_);
139140
return true;
140141
}
141142

paddle/fluid/operators/load_op.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class LoadOp : public framework::OperatorBase {
9292
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
9393
auto &dev_ctx = *pool.Get(place);
9494
framework::DeserializeFromStream(fin, selectedRows, dev_ctx);
95+
selectedRows->SyncIndex();
9596
}
9697
};
9798

paddle/fluid/operators/save_op.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ class SaveOp : public framework::OperatorBase {
142142
std::string filename = lt_var->data();
143143
VLOG(4) << "SaveSelectedRows get File name: " << filename;
144144

145+
MkDirRecursively(DirName(filename).c_str());
146+
145147
auto &selectedRows = var->Get<framework::SelectedRows>();
146148

147149
// get device context from pool

python/paddle/fluid/framework.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,6 +1363,13 @@ def __init__(self):
13631363
self._current_role = core.op_proto_and_checker_maker.OpRole.Forward
13641364
self._op_role_var = []
13651365

1366+
# for distribute
1367+
self._is_distributed = False
1368+
self._is_chief = False
1369+
self._slice_vars_and_attrs = []
1370+
self._endpoints = []
1371+
self._distributed_lookup_table = None
1372+
13661373
@property
13671374
def op_role(self):
13681375
"""

python/paddle/fluid/io.py

Lines changed: 123 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ def name_has_fc(var):
372372
load_vars(
373373
executor,
374374
dirname=dirname,
375+
main_program=main_program,
375376
vars=list(filter(predicate, main_program.list_vars())),
376377
filename=filename)
377378
else:
@@ -403,9 +404,12 @@ def name_has_fc(var):
403404
inputs={},
404405
outputs={"Out": load_var_list},
405406
attrs={'file_path': os.path.join(dirname, filename)})
406-
407407
executor.run(load_prog)
408408

409+
# load slice vars on pserver, if have it.
410+
_load_slice_up_vars(executor, dirname,
411+
main_program._slice_vars_and_attrs)
412+
409413

410414
def load_params(executor, dirname, main_program=None, filename=None):
411415
"""
@@ -659,11 +663,19 @@ def save_inference_model(dirname,
659663

660664
save_persistables(executor, dirname, inference_program, params_filename)
661665

666+
# if there is lookup table, the trainer 0 will notify all pserver to save.
667+
if main_program._is_distributed and main_program._is_chief and main_program._distributed_lookup_table:
668+
lookup_table_filename = os.path.join(dirname, "__lookup_table__")
669+
_save_lookup_tables_by_notify(executor, lookup_table_filename,
670+
main_program._distributed_lookup_table,
671+
main_program._endpoints)
672+
662673

663674
def load_inference_model(dirname,
664675
executor,
665676
model_filename=None,
666-
params_filename=None):
677+
params_filename=None,
678+
pserver_endpoints=None):
667679
"""
668680
Load inference model from a directory
669681
@@ -679,6 +691,10 @@ def load_inference_model(dirname,
679691
parameters were saved in a single binary
680692
file. If parameters were saved in separate
681693
files, set it as 'None'.
694+
pserver_endpoints(list|None): This only need by distributed inference.
695+
When use distributed look up table in training,
696+
We also need it in inference.The parameter is
697+
a list of pserver endpoints.
682698
683699
Returns:
684700
tuple: The return of this function is a tuple with three elements:
@@ -697,12 +713,16 @@ def load_inference_model(dirname,
697713
698714
exe = fluid.Executor(fluid.CPUPlace())
699715
path = "./infer_model"
716+
endpoints = ["127.0.0.1:2023","127.0.0.1:2024"]
700717
[inference_program, feed_target_names, fetch_targets] =
701718
fluid.io.load_inference_model(dirname=path, executor=exe)
702719
results = exe.run(inference_program,
703720
feed={feed_target_names[0]: tensor_img},
704721
fetch_list=fetch_targets)
705722
723+
# if we need lookup table, we will use:
724+
fluid.io.load_inference_model(dirname=path, executor=exe, pserver_endpoints=endpoints)
725+
706726
# In this exsample, the inference program was saved in the
707727
# "./infer_model/__model__" and parameters were saved in
708728
# separate files in ""./infer_model".
@@ -729,6 +749,9 @@ def load_inference_model(dirname,
729749
program = Program.parse_from_string(program_desc_str)
730750
load_persistables(executor, dirname, program, params_filename)
731751

752+
if pserver_endpoints:
753+
program = _endpoints_replacement(program, pserver_endpoints)
754+
732755
feed_target_names = program.desc.get_feed_target_names()
733756
fetch_target_names = program.desc.get_fetch_target_names()
734757
fetch_targets = [
@@ -738,6 +761,61 @@ def load_inference_model(dirname,
738761
return [program, feed_target_names, fetch_targets]
739762

740763

764+
def _save_lookup_tables_by_notify(executor, dirname, lookup_table,
765+
pserver_endpoints):
766+
"""
767+
This function will send checkpoint notify message from Trainer 0
768+
to all the pservers.
769+
The checkpoint notify message contains lookup table name,
770+
the absolute path on pserver to save lookup_table.
771+
772+
Args:
773+
executor(Executor): The executor to run for send checkpoint notify.
774+
dirname(str): The folder where to save.
775+
lookup_table(string): the lookup table name, when use distribute
776+
lookup table, we can get lookup table name by DistributeTranspiler.
777+
table_name
778+
ps_endpoint_list(list): the parameter server ip:port list.
779+
when use distribute lookup table, we can get ps_endpoint_list by
780+
distribute arguments.
781+
Return:
782+
None
783+
784+
Examples:
785+
.. code-block:: python
786+
787+
exe = fluid.Executor(fluid.CPUPlace())
788+
param_path = "./my_paddle_model"
789+
table_name = "share_w"
790+
ps_endpoints = ["127.0.0.1:6000","127.0.0.1:6001"]
791+
792+
_save_pserver_vars_by_notify(executor=exe,
793+
dirname=param_path, lookup_table=table_name,
794+
pserver_endpoints=ps_endpoints)
795+
"""
796+
797+
pserver_notify_program = Program()
798+
pserver_notify_block = pserver_notify_program.global_block()
799+
800+
attrs = {}
801+
attrs['epmap'] = pserver_endpoints
802+
attrs['dir'] = dirname
803+
attrs['lookup_table'] = lookup_table
804+
805+
pserver_notify_block.append_op(
806+
type='checkpoint_notify', inputs={}, outputs={}, attrs=attrs)
807+
executor.run(pserver_notify_program)
808+
809+
810+
def _endpoints_replacement(program, endpoints):
811+
ENDPOINT_MAP = "epmap"
812+
for op in program.global_block().ops:
813+
if op.has_attr(ENDPOINT_MAP):
814+
op.set_attr(ENDPOINT_MAP, endpoints)
815+
program._sync_with_cpp()
816+
return program
817+
818+
741819
def get_parameter_value(para, executor):
742820
"""
743821
Get the LoDTensor value of the given parameter.
@@ -799,3 +877,46 @@ def get_parameter_value_by_name(name, executor, program=None):
799877
program = default_main_program()
800878
var = program.global_block().var(name)
801879
return get_parameter_value(var, executor)
880+
881+
882+
def _load_slice_up_vars(executor, dirname, slice_vars_and_attrs):
883+
if not slice_vars_and_attrs:
884+
return
885+
886+
load_prog = Program()
887+
load_block = load_prog.global_block()
888+
889+
for var_tuple in slice_vars_and_attrs:
890+
orig_var = var_tuple[0]
891+
start = var_tuple[1]
892+
slice_var = var_tuple[2]
893+
end = start + reduce(lambda x, y: x * y, slice_var.shape)
894+
895+
clone_orig_var = load_block.create_var(
896+
name=orig_var.name,
897+
type=orig_var.type,
898+
shape=orig_var.shape,
899+
dtype=orig_var.dtype,
900+
persistable=True)
901+
902+
clone_slice_var = load_block.create_var(
903+
name=slice_var.name,
904+
type=slice_var.type,
905+
shape=slice_var.shape,
906+
dtype=slice_var.dtype,
907+
persistable=True)
908+
909+
load_block.append_op(
910+
type='load',
911+
inputs={},
912+
outputs={'Out': [clone_orig_var]},
913+
attrs={'file_path': os.path.join(dirname, clone_orig_var.name)})
914+
load_block.append_op(
915+
type="slice",
916+
inputs={'Input': clone_orig_var},
917+
outputs={'Out': clone_slice_var},
918+
attrs={'axes': [0],
919+
'starts': [start],
920+
'ends': [end]})
921+
922+
executor.run(load_prog)

python/paddle/fluid/tests/unittests/test_dist_transpiler.py

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ def net_conf(self):
4747
avg_cost = fluid.layers.mean(cost)
4848
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1)
4949
sgd_optimizer.minimize(avg_cost)
50-
return
5150

5251
def get_main_program(self):
5352
main = fluid.Program()
@@ -95,8 +94,9 @@ def transpiler_test_impl(self):
9594
def test_transpiler(self):
9695
main = fluid.Program()
9796
startup = fluid.Program()
98-
with fluid.program_guard(main, startup):
99-
self.transpiler_test_impl()
97+
with fluid.unique_name.guard():
98+
with fluid.program_guard(main, startup):
99+
self.transpiler_test_impl()
100100

101101

102102
class TestBasicModel(TranspilerTest):
@@ -249,7 +249,6 @@ def net_conf(self):
249249
decay_rate=0.1,
250250
staircase=True))
251251
sgd_optimizer.minimize(avg_cost)
252-
return
253252

254253
def transpiler_test_impl(self):
255254
pserver, startup = self.get_pserver(self.pserver1_ep)
@@ -279,7 +278,6 @@ def net_conf(self):
279278
learning_rate=fluid.layers.piecewise_decay([10000, 20000],
280279
[1.0, 0.5, 1.0]))
281280
sgd_optimizer.minimize(avg_cost)
282-
return
283281

284282
def transpiler_test_impl(self):
285283
pserver, startup = self.get_pserver(self.pserver1_ep)
@@ -328,7 +326,6 @@ def net_conf(self):
328326
avg_cost = fluid.layers.mean(cost)
329327
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1)
330328
sgd_optimizer.minimize(avg_cost)
331-
return
332329

333330
def transpiler_test_impl(self):
334331
pserver, startup = self.get_pserver(self.pserver1_ep)
@@ -363,7 +360,6 @@ def net_conf(self):
363360
momentum=0.9,
364361
regularization=fluid.regularizer.L2Decay(1e-4))
365362
sgd_optimizer.minimize(avg_cost)
366-
return
367363

368364
def transpiler_test_impl(self):
369365
pserver, startup = self.get_pserver(self.pserver1_ep)
@@ -393,13 +389,14 @@ class TestDistLookupTableBase(TranspilerTest):
393389
def network_with_table(self, is_sparse, is_distributed):
394390
self.table_size = 1000
395391
self.emb_size = 64
392+
self.lookup_table_name = 'shared_w'
396393

397394
def emb_pool(ids):
398395
emb = fluid.layers.embedding(
399396
input=ids,
400397
size=[self.table_size, self.emb_size],
401398
dtype='float32',
402-
param_attr='shared_w', # share parameter
399+
param_attr=self.lookup_table_name, # share parameter
403400
is_sparse=is_sparse,
404401
is_distributed=is_distributed)
405402
pool = fluid.layers.sequence_pool(input=emb, pool_type='average')
@@ -572,7 +569,7 @@ def net_conf(self):
572569

573570
def transpiler_test_impl(self):
574571
config = fluid.DistributeTranspilerConfig()
575-
pserver1, startup1 = self.get_pserver(self.pserver1_ep, config)
572+
pserver1, _ = self.get_pserver(self.pserver1_ep, config)
576573

577574
self.assertTrue(self.transpiler.has_distributed_lookup_table)
578575
lookup_table_var = pserver1.global_block().vars[
@@ -582,6 +579,21 @@ def transpiler_test_impl(self):
582579
self.assertEqual(row_size, calc_row_size)
583580

584581

582+
class TestDistArgsInProgram(TestDistLookupTableBase):
583+
def net_conf(self):
584+
self.network_with_table(is_sparse=True, is_distributed=True)
585+
586+
def transpiler_test_impl(self):
587+
trainer, _ = self.get_trainer()
588+
589+
self.assertTrue(trainer._is_distributed)
590+
self.assertTrue(trainer._is_chief)
591+
self.assertEqual(trainer._distributed_lookup_table,
592+
self.lookup_table_name)
593+
self.assertEqual(trainer._endpoints,
594+
[self.pserver1_ep, self.pserver2_ep])
595+
596+
585597
class TestRMSPropOptimizer(TranspilerTest):
586598
def net_conf(self):
587599
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
@@ -595,7 +607,6 @@ def net_conf(self):
595607
avg_cost = fluid.layers.mean(cost)
596608
optimizer = fluid.optimizer.RMSProp(learning_rate=0.1)
597609
optimizer.minimize(avg_cost)
598-
return
599610

600611
def transpiler_test_impl(self):
601612
pserver, startup = self.get_pserver(self.pserver1_ep)
@@ -612,5 +623,40 @@ def transpiler_test_impl(self):
612623
self.assertEqual(moment_var.shape, (500, 1000))
613624

614625

626+
class TestLoadSliceVar(TranspilerTest):
627+
def net_conf(self):
628+
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
629+
y_predict = fluid.layers.fc(input=x,
630+
size=1000,
631+
act=None,
632+
param_attr=fluid.ParamAttr(name='fc_w'),
633+
bias_attr=fluid.ParamAttr(name='fc_b'))
634+
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
635+
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
636+
avg_cost = fluid.layers.mean(cost)
637+
optimizer = fluid.optimizer.RMSProp(learning_rate=0.1)
638+
optimizer.minimize(avg_cost)
639+
640+
def transpiler_test_impl(self):
641+
pserver, _ = self.get_pserver(self.pserver1_ep)
642+
pserver2, _ = self.get_pserver(self.pserver2_ep)
643+
644+
self.assertTrue(pserver._slice_vars_and_attrs)
645+
self.assertTrue(pserver2._slice_vars_and_attrs)
646+
647+
for idx in xrange(len(pserver._slice_vars_and_attrs)):
648+
self.assertEqual(pserver._slice_vars_and_attrs[idx][0],
649+
pserver2._slice_vars_and_attrs[idx][0])
650+
651+
total_numel = reduce(lambda x, y: x * y,
652+
pserver._slice_vars_and_attrs[idx][0].shape)
653+
self.assertEqual(
654+
total_numel,
655+
reduce(lambda x, y: x * y,
656+
pserver._slice_vars_and_attrs[idx][2].shape) + reduce(
657+
lambda x, y: x * y,
658+
pserver2._slice_vars_and_attrs[idx][2].shape))
659+
660+
615661
if __name__ == "__main__":
616662
unittest.main()

0 commit comments

Comments
 (0)