Skip to content

Commit d068493

Browse files
authored
parameter dispather. (#12666)
1 parent efc5392 commit d068493

File tree

9 files changed

+162
-31
lines changed

9 files changed

+162
-31
lines changed

paddle/fluid/framework/threadpool.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
DEFINE_int32(io_threadpool_size, 100,
2121
"number of threads used for doing IO, default 100");
2222

23+
DEFINE_int32(dist_threadpool_size, 0,
24+
"number of threads used for distributed executed.");
25+
2326
namespace paddle {
2427
namespace framework {
2528

@@ -35,6 +38,10 @@ void ThreadPool::Init() {
3538
if (threadpool_.get() == nullptr) {
3639
// TODO(Yancey1989): specify the max threads number
3740
int num_threads = std::thread::hardware_concurrency();
41+
if (FLAGS_dist_threadpool_size > 0) {
42+
num_threads = FLAGS_dist_threadpool_size;
43+
VLOG(1) << "set dist_threadpool_size to " << num_threads;
44+
}
3845
PADDLE_ENFORCE_GT(num_threads, 0);
3946
threadpool_.reset(new ThreadPool(num_threads));
4047
}

paddle/fluid/operators/distributed/variable_response.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,15 @@ bool VariableResponse::ProcSerializedField(
190190
#endif
191191
}
192192

193+
VLOG(7) << "ProcSerializedField:" << meta_.varname()
194+
<< ", type:" << meta_.type() << std::endl;
193195
framework::DDim dims = GetDims(meta_.dims());
194196
if (meta_.type() == sendrecv::LOD_TENSOR) {
195197
PADDLE_ENFORCE(meta_.lod_size() >= 0, "lod info should be got first!");
196198
if (!CopyLodTensorData(input, *dev_ctx_, dims, num_bytes)) {
197199
return false;
198200
}
201+
199202
return true;
200203
}
201204

@@ -206,7 +209,9 @@ bool VariableResponse::ProcSerializedField(
206209
return true;
207210
}
208211

209-
return true;
212+
PADDLE_ENFORCE("not supported var types:", meta_.varname(), meta_.type());
213+
214+
return false;
210215
}
211216

212217
}; // namespace distributed

paddle/fluid/operators/listen_and_serv_op.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,11 @@ void ListenAndServOp::RunSyncLoop(
123123
optimize_prepared.begin(),
124124
std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));
125125

126+
// Trainers will get all parameters from pserver in the
127+
// startup program, so we will wait RequestGet first
128+
rpc_service_->SetCond(distributed::kRequestGet);
129+
rpc_service_->WaitBarrier(distributed::kRequestGet);
126130
rpc_service_->ResetBarrierCounter();
127-
128131
while (true) {
129132
rpc_service_->Profiler().OneStep();
130133
// Get from multiple trainers, we don't care about the order in which

python/paddle/fluid/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def __bootstrap__():
122122
'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir',
123123
'eager_delete_scope', 'use_mkldnn', 'initial_cpu_memory_in_mb',
124124
'init_allocated_mem', 'free_idle_memory', 'paddle_num_threads',
125-
'cpu_deterministic'
125+
"dist_threadpool_size", 'cpu_deterministic'
126126
]
127127
if core.is_compiled_with_dist():
128128
read_env_flags.append('rpc_deadline')

python/paddle/fluid/initializer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from . import framework
1616
import numpy as np
1717
import contextlib
18-
from .framework import convert_np_dtype_to_dtype_
1918
from .core import VarDesc
2019

2120
__all__ = [

python/paddle/fluid/tests/unittests/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=$
5959
if(WITH_DISTRIBUTE)
6060
py_test_modules(test_dist_train MODULES test_dist_train SERIAL)
6161
set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 20)
62-
set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 180)
63-
set_tests_properties(test_dist_word2vec PROPERTIES TIMEOUT 180)
62+
set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 200)
63+
set_tests_properties(test_dist_word2vec PROPERTIES TIMEOUT 200)
6464
endif()
6565
py_test_modules(test_parallel_executor_crf MODULES test_parallel_executor_crf SERIAL)
6666
py_test_modules(test_parallel_executor_fetch_feed MODULES test_parallel_executor_fetch_feed SERIAL)

python/paddle/fluid/tests/unittests/test_dist_train.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626
from paddle.fluid.layers.io import Recv
2727
from paddle.fluid.layers.io import Send
2828

29+
from paddle.fluid import core
30+
31+
RPC_OP_ROLE_ATTR_NAME = op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName(
32+
)
33+
RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC
34+
2935

3036
class TestSendOp(unittest.TestCase):
3137
def test_send(self):
@@ -89,18 +95,29 @@ def init_serv(self, place):
8995
def init_client(self, place, port):
9096
main = fluid.Program()
9197
with fluid.program_guard(main):
98+
main.global_block().append_op(
99+
type="fetch_barrier",
100+
inputs={},
101+
outputs={},
102+
attrs={
103+
"endpoints": ["127.0.0.1:{0}".format(port)],
104+
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
105+
})
106+
92107
x = layers.data(
93108
shape=[32, 32],
94109
dtype='float32',
95110
name='X',
96111
append_batch_size=False)
97112
fluid.initializer.Constant(value=2.3)(x, main.global_block())
113+
98114
get_var = main.global_block().create_var(
99115
name="scale_0.tmp_0", # server side var
100116
dtype="float32",
101117
persistable=False,
102118
shape=[32, 32])
103119
fluid.initializer.Constant(value=2.3)(get_var, main.global_block())
120+
104121
Send("127.0.0.1:%d" % port, [x])
105122
o = Recv("127.0.0.1:%d" % port, [get_var])
106123

python/paddle/fluid/tests/unittests/test_dist_transpiler.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import paddle.fluid as fluid
1919
from paddle.fluid.transpiler.distribute_transpiler import delete_ops
2020
import traceback
21+
import collections
2122

2223

2324
class TranspilerTest(unittest.TestCase):
@@ -53,9 +54,18 @@ def get_main_program(self):
5354
self.origin_prog = main.clone()
5455
return main
5556

56-
def get_trainer(self, config=None, sync_mode=True):
57-
t = self._transpiler_instance(config, sync_mode)
58-
return t.get_trainer_program()
57+
def get_trainer(self, config=None):
58+
src = fluid.default_startup_program().clone()
59+
60+
t = self._transpiler_instance(config)
61+
62+
trainer_main = t.get_trainer_program()
63+
trainer_startup = fluid.default_startup_program()
64+
65+
assert (src.num_blocks == 1)
66+
assert (trainer_startup.num_blocks == src.num_blocks)
67+
68+
return trainer_main, trainer_startup
5969

6070
def get_pserver(self, ep, config=None, sync_mode=True):
6171
t = self._transpiler_instance(config, sync_mode)
@@ -91,7 +101,21 @@ def transpiler_test_impl(self):
91101
pserver, startup = self.get_pserver(self.pserver1_ep)
92102
pserver2, startup2 = self.get_pserver(self.pserver2_ep)
93103

94-
trainer = self.get_trainer()
104+
trainer, trainer_startup = self.get_trainer()
105+
106+
# splited var blocks should be in startup program
107+
self.assertTrue("fc_w.block0" in trainer_startup.global_block().vars)
108+
self.assertTrue("fc_w.block1" in trainer_startup.global_block().vars)
109+
self.assertTrue("fc_w" in trainer_startup.global_block().vars)
110+
self.assertTrue("fc_b" in trainer_startup.global_block().vars)
111+
self.assertTrue("fc_w@GRAD" not in trainer_startup.global_block().vars)
112+
self.assertTrue("fc_b@GRAD" not in trainer_startup.global_block().vars)
113+
114+
src = [op.type for op in trainer_startup.global_block().ops]
115+
dst = ['fill_constant', 'fill_constant', 'uniform_random', 'recv', 'recv', \
116+
'fetch_barrier', 'concat']
117+
118+
self.assertEqual(src, dst)
95119

96120
self.assertEqual([op.type for op in trainer.global_block().ops], [
97121
'mul', 'elementwise_add', 'elementwise_sub', 'square', 'mean',
@@ -142,7 +166,7 @@ def transpiler_test_impl(self):
142166
pserver, startup = self.get_pserver(self.pserver1_ep, config)
143167
pserver2, startup2 = self.get_pserver(self.pserver2_ep, config)
144168

145-
trainer = self.get_trainer(config)
169+
trainer, _ = self.get_trainer(config)
146170

147171
self.assertEqual([op.type for op in trainer.global_block().ops], [
148172
'mul', 'elementwise_add', 'elementwise_sub', 'square', 'mean',
@@ -226,7 +250,7 @@ def net_conf(self):
226250

227251
def transpiler_test_impl(self):
228252
pserver, startup = self.get_pserver(self.pserver1_ep)
229-
trainer = self.get_trainer()
253+
trainer, _ = self.get_trainer()
230254

231255
self.assertEqual(len(pserver.blocks), 4)
232256
lr_decay_ops = [op.type for op in pserver.blocks[1].ops]
@@ -256,7 +280,7 @@ def net_conf(self):
256280

257281
def transpiler_test_impl(self):
258282
pserver, startup = self.get_pserver(self.pserver1_ep)
259-
trainer = self.get_trainer()
283+
trainer, _ = self.get_trainer()
260284

261285
serv_op = pserver.blocks[0].ops[0]
262286
sub_blocks = []
@@ -305,7 +329,7 @@ def net_conf(self):
305329

306330
def transpiler_test_impl(self):
307331
pserver, startup = self.get_pserver(self.pserver1_ep)
308-
trainer = self.get_trainer()
332+
trainer, _ = self.get_trainer()
309333

310334
self.assertEqual(len(pserver.blocks), 3)
311335
self.assertEqual([op.type for op in pserver.blocks[1].ops],
@@ -340,7 +364,7 @@ def net_conf(self):
340364

341365
def transpiler_test_impl(self):
342366
pserver, startup = self.get_pserver(self.pserver1_ep)
343-
trainer = self.get_trainer()
367+
trainer, _ = self.get_trainer()
344368

345369
self.assertEqual(len(pserver.blocks), 9)
346370
self.assertEqual([op.type for op in pserver.blocks[1].ops], [
@@ -415,7 +439,7 @@ def transpiler_test_impl(self):
415439
self.assertEqual([op.type for op in pserver1.blocks[2].ops],
416440
["sum", "adam", "scale", "scale"])
417441

418-
trainer = self.get_trainer()
442+
trainer, _ = self.get_trainer()
419443
self.assertEqual(len(trainer.blocks), 1)
420444
ops = [
421445
'lookup_table', 'sequence_pool', 'lookup_table', 'sequence_pool',
@@ -453,7 +477,7 @@ def transpiler_test_impl(self):
453477
# 5 save table
454478
self.assertEqual([op.type for op in pserver1.blocks[5].ops], ["save"])
455479

456-
trainer = self.get_trainer()
480+
trainer, _ = self.get_trainer()
457481
self.assertEqual(len(trainer.blocks), 1)
458482
ops = [
459483
'split_ids', 'prefetch', 'merge_ids', 'sequence_pool', 'split_ids',
@@ -486,7 +510,7 @@ def transpiler_test_impl(self):
486510
self.assertEqual([op.type for op in pserver1.blocks[2].ops],
487511
["adam", "scale", "scale"])
488512

489-
trainer = self.get_trainer(config)
513+
trainer, _ = self.get_trainer(config)
490514
self.assertEqual(len(trainer.blocks), 1)
491515
ops = [
492516
'lookup_table', 'sequence_pool', 'lookup_table', 'sequence_pool',
@@ -525,7 +549,7 @@ def transpiler_test_impl(self):
525549
# 5 save table
526550
self.assertEqual([op.type for op in pserver1.blocks[5].ops], ["save"])
527551

528-
trainer = self.get_trainer(config)
552+
trainer, _ = self.get_trainer(config)
529553
self.assertEqual(len(trainer.blocks), 1)
530554
ops = [
531555
'split_ids', 'prefetch', 'merge_ids', 'sequence_pool', 'split_ids',

0 commit comments

Comments
 (0)