Skip to content

Commit f37bd03

Browse files
authored
Merge pull request #14153 from jacquesqiao/fix-pserver-crash-when-no-parameter
set en empty optimize block if pserver has no optimize block
2 parents 7825ae9 + d78e8f2 commit f37bd03

File tree

3 files changed

+96
-5
lines changed

3 files changed

+96
-5
lines changed

python/paddle/fluid/tests/unittests/test_dist_transpiler.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,31 @@ def transpiler_test_impl(self):
405405
["sum", "scale", "scale", "elementwise_add", "momentum"])
406406

407407

408+
class TestEmptyPserverOptimizeBlocks(TranspilerTest):
409+
def net_conf(self):
410+
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
411+
# only one parameter
412+
y_predict = fluid.layers.fc(input=x,
413+
size=1000,
414+
act=None,
415+
param_attr=fluid.ParamAttr(name='fc_w'),
416+
bias_attr=False)
417+
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
418+
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
419+
avg_cost = fluid.layers.mean(cost)
420+
sgd_optimizer = fluid.optimizer.SGD(learning_rate=1.0)
421+
sgd_optimizer.minimize(avg_cost)
422+
423+
def transpiler_test_impl(self):
424+
config = fluid.DistributeTranspilerConfig()
425+
config.slice_var_up = False
426+
427+
pserver, startup = self.get_pserver(ep=self.pserver2_ep, config=config)
428+
429+
self.assertEqual(len(pserver.blocks), 2)
430+
self.assertEqual(len(pserver.blocks[1].ops), 0)
431+
432+
408433
class TestDistLookupTableBase(TranspilerTest):
409434
def network_with_table(self, is_sparse, is_distributed):
410435
self.table_size = 1000

python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,46 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
5555
exe.run(pserver_prog)
5656

5757

58+
def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers,
59+
trainer_id):
60+
x = fluid.layers.data(name='x', shape=[1], dtype='float32')
61+
y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False)
62+
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
63+
64+
# loss function
65+
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
66+
avg_cost = fluid.layers.mean(cost)
67+
68+
# optimizer
69+
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
70+
sgd_optimizer.minimize(avg_cost)
71+
72+
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
73+
exe = fluid.Executor(place)
74+
75+
ps1 = ip + ":" + str(int(port) + 1)
76+
ps2 = ip + ":" + port
77+
pserver_endpoints = ps1 + "," + ps2
78+
79+
config = fluid.DistributeTranspilerConfig()
80+
config.slice_var_up = False
81+
t = fluid.DistributeTranspiler(config=config)
82+
t.transpile(
83+
trainer_id,
84+
pservers=pserver_endpoints,
85+
trainers=trainers,
86+
sync_mode=sync_mode)
87+
pserver_prog = t.get_pserver_program(ps2)
88+
89+
# pserver2 have no parameter
90+
assert (len(pserver_prog.blocks) == 2)
91+
assert (len(pserver_prog.blocks[1].ops) == 0)
92+
93+
pserver_startup = t.get_startup_program(ps2, pserver_prog)
94+
exe.run(pserver_startup)
95+
exe.run(pserver_prog)
96+
97+
5898
class TestListenAndServOp(OpTest):
5999
def setUp(self):
60100
self.ps_timeout = 5
@@ -63,9 +103,9 @@ def setUp(self):
63103
self.trainers = 1
64104
self.trainer_id = 0
65105

66-
def _start_pserver(self, use_cuda, sync_mode):
106+
def _start_pserver(self, use_cuda, sync_mode, pserver_func):
67107
p = Process(
68-
target=run_pserver,
108+
target=pserver_func,
69109
args=(use_cuda, sync_mode, self.ip, self.port, self.trainers,
70110
self.trainer_id))
71111
p.daemon = True
@@ -92,15 +132,32 @@ def test_rpc_interfaces(self):
92132

93133
def test_handle_signal_in_serv_op(self):
94134
# run pserver on CPU in sync mode
95-
p1 = self._start_pserver(False, True)
135+
p1 = self._start_pserver(False, True, run_pserver)
136+
self._wait_ps_ready(p1.pid)
137+
138+
# raise SIGTERM to pserver
139+
os.kill(p1.pid, signal.SIGINT)
140+
p1.join()
141+
142+
# run pserver on CPU in async mode
143+
p2 = self._start_pserver(False, False, run_pserver)
144+
self._wait_ps_ready(p2.pid)
145+
146+
# raise SIGTERM to pserver
147+
os.kill(p2.pid, signal.SIGTERM)
148+
p2.join()
149+
150+
def test_list_and_serv_run_empty_optimize_block(self):
151+
# run pserver on CPU in sync mode
152+
p1 = self._start_pserver(False, True, run_pserver_with_empty_block)
96153
self._wait_ps_ready(p1.pid)
97154

98155
# raise SIGTERM to pserver
99156
os.kill(p1.pid, signal.SIGINT)
100157
p1.join()
101158

102159
# run pserver on CPU in async mode
103-
p2 = self._start_pserver(False, False)
160+
p2 = self._start_pserver(False, False, run_pserver_with_empty_block)
104161
self._wait_ps_ready(p2.pid)
105162

106163
# raise SIGTERM to pserver

python/paddle/fluid/transpiler/distribute_transpiler.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import numpy as np
3636
import collections
3737
import six
38+
import logging
3839

3940
from .ps_dispatcher import RoundRobin, HashName, PSDispatcher
4041
from .. import core, framework
@@ -767,6 +768,15 @@ def __clone_lr_op_sub_block__(op, program, lr_block):
767768
prefetch_var_name_to_block_id.extend(
768769
lookup_table_var_name_to_block_id)
769770

771+
if len(optimize_blocks) == 0:
772+
logging.warn("pserver [" + str(endpoint) +
773+
"] has no optimize block!!")
774+
pre_block_idx = pserver_program.num_blocks - 1
775+
empty_block = pserver_program._create_block(pre_block_idx)
776+
optimize_blocks.append(empty_block)
777+
778+
# In some case, some parameter server will have no parameter to optimize
779+
# So we give an empty optimize block to parameter server.
770780
attrs = {
771781
"optimize_blocks": optimize_blocks,
772782
"endpoint": endpoint,
@@ -1280,7 +1290,6 @@ def _create_table_optimize_block(self, pserver_index, pserver_program,
12801290
}
12811291
outputs = {"ParamOut": [param_var]}
12821292
# only support sgd now
1283-
import logging
12841293
logging.warn(
12851294
"distribute lookup table only support sgd optimizer, change it's optimizer to sgd instead of "
12861295
+ table_opt_op.type)

0 commit comments

Comments
 (0)