Skip to content

Commit 0b429a2

Browse files
authored
[Cherry-pick]Cherry pick paddle cloud role maker (#20947)
* Fix Paddle Cloud role maker (#20860)
1 parent ad86739 commit 0b429a2

File tree

5 files changed

+47
-21
lines changed

5 files changed

+47
-21
lines changed

paddle/fluid/operators/distributed/rpc_client.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
// default to 3min to avoid temprary network failures.
1919
DEFINE_int32(rpc_deadline, 180000, "deadline timeouts for rpc");
20-
DEFINE_int32(rpc_retry_times, 0, "retry times for rpc");
20+
DEFINE_int32(rpc_retry_times, 3, "retry times for rpc");
2121

2222
namespace paddle {
2323
namespace operators {

python/paddle/fluid/incubate/fleet/base/role_maker.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -335,18 +335,13 @@ def generate_role(self):
335335
if not self._role_is_generated:
336336
if not self._is_collective:
337337
try:
338-
port = os.environ["PADDLE_PORT"]
339-
pserver_ips = os.environ["PADDLE_PSERVERS"].split(",")
340-
if "," in port:
341-
ports = port.split(",")
342-
else:
343-
ports = [port] * len(pserver_ips)
344-
eplist = []
338+
# Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set
339+
# format: string(ip:port), eg. 127.0.0.1:6001
340+
eplist = os.environ["PADDLE_PSERVERS_IP_PORT_LIST"].split(
341+
",")
345342
# note that, we usually assign the same port to different ips
346343
# if we run parameter server training in local mode
347344
# port should be different in environment variables
348-
for i, ip in enumerate(pserver_ips):
349-
eplist.append(':'.join([ip, ports[i]]))
350345

351346
trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"])
352347
training_role = os.environ["TRAINING_ROLE"]
@@ -361,9 +356,9 @@ def generate_role(self):
361356
elif training_role == "PSERVER":
362357
role = Role.SERVER
363358
cur_ip = os.environ["POD_IP"]
364-
cur_idx = pserver_ips.index(cur_ip)
365-
current_id = eplist.index(":".join(
366-
[cur_ip, ports[cur_idx]]))
359+
curr_port = os.environ["PADDLE_PORT"]
360+
curr_endpoint = ":".join([cur_ip, curr_port])
361+
current_id = eplist.index(curr_endpoint)
367362
else:
368363
raise ValueError(
369364
"TRAINING_ROLE must be PSERVER or TRAINER")

python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ def distributed_optimizer(self, optimizer, strategy=None):
152152

153153
if not isinstance(optimizer, Optimizer):
154154
raise ValueError("optimizer must be an instance of Optimizer")
155+
if not fleet._is_initialized:
156+
raise ValueError(
157+
"use fleet.init(role) to initialize the role before use fleet.distributed_optimizer()"
158+
)
155159
self._optimizer = TranspilerOptimizer(optimizer, strategy)
156160
return self._optimizer
157161

python/paddle/fluid/tests/unittests/test_fleet_api_input.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedRoleMaker
2121
from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedCollectiveRoleMaker
2222
from paddle.fluid.incubate.fleet.base.role_maker import Role
23+
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
2324
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
2425
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import TranspilerOptimizer
2526
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer
27+
from dist_simnet_bow import train_network
2628

2729

2830
class DistributeTranspilerConfigTest(unittest.TestCase):
@@ -97,6 +99,30 @@ def testInvalidInputs(self):
9799
main_program=compiled_prog)
98100
self.assertRaises(Exception, fleet._transpile, "config")
99101

102+
def set_program(self, avg_cost, strategy):
103+
optimizer = fluid.optimizer.SGD(0.1)
104+
optimizer = fleet.distributed_optimizer(optimizer, strategy)
105+
optimizer.minimize(avg_cost)
106+
107+
def test_init_role(self):
108+
role = role_maker.UserDefinedRoleMaker(
109+
current_id=0,
110+
role=role_maker.Role.SERVER,
111+
worker_num=2,
112+
server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"])
113+
# for test optimizer without init(role)
114+
# fleet.init(role)
115+
batch_size = 128
116+
is_sparse = True
117+
is_distribute = False
118+
strategy = DistributeTranspilerConfig()
119+
strategy.sync_mode = False
120+
strategy.geo_sgd_mode = True
121+
strategy.geo_sgd_need_push_nums = 5
122+
avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse)
123+
124+
self.assertRaises(Exception, self.set_program, avg_cost, strategy)
125+
100126

101127
class TranspilerOptimizerTest(unittest.TestCase):
102128
def testInvalidInputs(self):
@@ -124,7 +150,7 @@ def createRoleMaker(self,
124150

125151
def testRoleMaker(self):
126152
self.createRoleMaker()
127-
## test all invalid server_endpoints
153+
# test all invalid server_endpoints
128154
self.assertRaises(
129155
Exception, self.createRoleMaker,
130156
server_endpoints=None) # server_endpoints must be as list
@@ -140,7 +166,7 @@ def testRoleMaker(self):
140166
self.createRoleMaker,
141167
server_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"]
142168
) # element in server_endpoints can't be duplicate
143-
## test all invalid current_id
169+
# test all invalid current_id
144170
self.assertRaises(
145171
Exception, self.createRoleMaker,
146172
current_id="0") # current_id must be as int
@@ -154,14 +180,14 @@ def testRoleMaker(self):
154180
role=Role.SERVER,
155181
server_endpoints=["127.0.0.1:8080"]
156182
) # if role is server, current_id must be less than len(server_endpoints)
157-
## test all invalid worker_num
183+
# test all invalid worker_num
158184
self.assertRaises(
159185
Exception, self.createRoleMaker,
160186
worker_num="1") # worker_num must be as int
161187
self.assertRaises(
162188
Exception, self.createRoleMaker,
163189
worker_num=0) # worker_num must be greater than 0
164-
## test all invalid role
190+
# test all invalid role
165191
self.assertRaises(
166192
Exception, self.createRoleMaker,
167193
role=3) # role must be as Role(Role.WORKER=1, Role.SERVER=2)
@@ -174,7 +200,7 @@ def createRoleMaker(self, current_id=0,
174200

175201
def testRoleMaker(self):
176202
self.createRoleMaker()
177-
## test all invalid worker_endpoints
203+
# test all invalid worker_endpoints
178204
self.assertRaises(
179205
Exception, self.createRoleMaker,
180206
worker_endpoints=None) # worker_endpoints must be as list
@@ -190,7 +216,7 @@ def testRoleMaker(self):
190216
self.createRoleMaker,
191217
worker_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"]
192218
) # element in worker_endpoints can't be duplicate
193-
## test all invalid current_id
219+
# test all invalid current_id
194220
self.assertRaises(
195221
Exception, self.createRoleMaker,
196222
current_id="0") # current_id must be as int

python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121

2222
class TestCloudRoleMaker(unittest.TestCase):
2323
def setUp(self):
24-
os.environ["PADDLE_PORT"] = "36001"
25-
os.environ["PADDLE_PSERVERS"] = "127.0.0.1,127.0.0.2"
2624
os.environ["PADDLE_TRAINERS_NUM"] = "2"
25+
os.environ[
26+
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001"
2727

2828
def test_tr_rolemaker(self):
2929
os.environ["TRAINING_ROLE"] = "TRAINER"
@@ -39,6 +39,7 @@ def test_tr_rolemaker(self):
3939
def test_ps_rolemaker(self):
4040
os.environ["TRAINING_ROLE"] = "PSERVER"
4141
os.environ["POD_IP"] = "127.0.0.1"
42+
os.environ["PADDLE_PORT"] = "36001"
4243

4344
ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
4445
ro.generate_role()

0 commit comments

Comments
 (0)