Skip to content

Commit d8e1e50

Browse files
author
lilong12
authored
[Cherry-pick] Fix bug in gloo that gloo initialization hangs (#29449)
* update, test=develop (#29331)
1 parent 4926587 commit d8e1e50

File tree

4 files changed

+21
-18
lines changed

4 files changed

+21
-18
lines changed

python/paddle/distributed/fleet/base/role_maker.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def init(rank, nodes, role):
171171

172172
def _init_http(self, ip, port, prefix, start_http_server, http_server_d):
173173
def __start_kv_server(http_server_d, size_d):
174+
print("start http_server: {}, {}".format(port, size_d))
174175
from paddle.distributed.fleet.utils.http_server import KVServer
175176
http_server = KVServer(port, size_d)
176177
http_server.start()
@@ -181,11 +182,9 @@ def __start_kv_server(http_server_d, size_d):
181182
http_server.stop()
182183

183184
def init_kv_server(http_server_d):
184-
size_d = {
185-
"trainer": self._worker_num,
186-
"pserver": self._server_num,
187-
"all": self._worker_num + self._server_num
188-
}
185+
worker_key = prefix + '_' + 'worker'
186+
size_d = {worker_key: self._worker_num, }
187+
print("worker_key:{}, size: {}".format(worker_key, size_d))
189188

190189
http_server_d["running"] = True
191190
# child process for http server
@@ -205,7 +204,7 @@ def init(rank, nodes, role):
205204
gloo.set_iface(self._iface)
206205
gloo.set_timeout_seconds(self._init_timeout_seconds,
207206
self._run_timeout_seconds)
208-
gloo.set_http_store(ip, port, role)
207+
gloo.set_http_store(ip, port, 'worker')
209208
ep = ":".join([ip, str(port)])
210209
wait_server_ready([ep])
211210
gloo.init()
@@ -214,6 +213,7 @@ def init(rank, nodes, role):
214213
port = int(port)
215214

216215
if start_http_server:
216+
print("to start http_server")
217217
http_server = init_kv_server(http_server_d)
218218

219219
if self._role == Role.WORKER:

python/paddle/distributed/fleet/utils/http_server.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ def do_DELETE(self):
112112
_, scope, key = paths
113113
with self.server.delete_kv_lock:
114114
if self.server.delete_kv.get(scope) is None:
115-
self.server.delete_kv[scope] = []
116-
self.server.delete_kv[scope].append(key)
115+
self.server.delete_kv[scope] = set()
116+
self.server.delete_kv[scope].add(key)
117117
self.send_status_code(200)
118118
_http_server_logger.info(log_str)
119119

@@ -151,7 +151,7 @@ def get_deleted_size(self, key):
151151
"""
152152
ret = 0
153153
with self.delete_kv_lock:
154-
ret = self.delete_kv.get(key, 0)
154+
ret = len(self.delete_kv.get(key, set()))
155155
return ret
156156

157157

@@ -164,7 +164,7 @@ def __init__(self, port, size={}):
164164
"""Init."""
165165
self.http_server = KVHTTPServer(port, KVHandler)
166166
self.listen_thread = None
167-
self.size = {}
167+
self.size = size
168168

169169
def start(self):
170170
"""

python/paddle/distributed/parallel.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ def _get_global_parallel_env():
4444
return _global_parallel_env
4545

4646

47-
def _start_kv_server(port, http_server_d):
47+
def _start_kv_server(port, http_server_d, size):
4848
from paddle.distributed.fleet.utils.http_server import KVServer
49-
http_server = KVServer(int(port))
49+
http_server = KVServer(int(port), size=size)
5050
http_server.start()
51-
wait_seconds = 5
51+
wait_seconds = 3
5252
while http_server_d.get("running", False) or not http_server.should_stop():
5353
time.sleep(wait_seconds)
5454
http_server.stop()
@@ -149,8 +149,11 @@ def _check_var_exists(var_name):
149149
http_server_d = manager.dict()
150150
http_server_d["running"] = False
151151
if parallel_env.rank == 0:
152+
# The scope for worker used by http server is '_worker'
153+
size = {'_worker': parallel_env.world_size}
152154
http_server = Process(
153-
target=_start_kv_server, args=(int(ep_rank_0[1]), http_server_d))
155+
target=_start_kv_server,
156+
args=(int(ep_rank_0[1]), http_server_d, size))
154157
http_server.daemon = True
155158
http_server_d["running"] = True
156159
http_server.start()

python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ def test_fs_gloo4(self):
274274
print("skip gloo UT on MacOS/Win")
275275
return
276276

277-
os.environ["TRAINING_ROLE"] = "PSERVER"
277+
os.environ["TRAINING_ROLE"] = "WORKER"
278278
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
279279
os.environ["POD_IP"] = "127.0.0.1"
280280
os.environ["PADDLE_PORT"] = "36001"
@@ -284,7 +284,7 @@ def test_fs_gloo4(self):
284284
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "3"
285285
os.environ["PADDLE_GLOO_HTTP_ENDPOINT"] = "127.0.0.1:30019"
286286

287-
role = role_maker.PaddleCloudRoleMaker()
287+
role = role_maker.PaddleCloudRoleMaker(is_collecitve=True)
288288
role._generate_role()
289289
import time
290290
time.sleep(3)
@@ -532,7 +532,7 @@ def test_fs_gloo4(self):
532532
print("skip gloo UT on MacOS/Win")
533533
return
534534

535-
os.environ["TRAINING_ROLE"] = "PSERVER"
535+
os.environ["TRAINING_ROLE"] = "WORKER"
536536
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
537537
os.environ["POD_IP"] = "127.0.0.1"
538538
os.environ["PADDLE_PORT"] = "36001"
@@ -542,7 +542,7 @@ def test_fs_gloo4(self):
542542
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "3"
543543
os.environ["PADDLE_GLOO_HTTP_ENDPOINT"] = "127.0.0.1:30019"
544544

545-
role = role_maker.PaddleCloudRoleMaker()
545+
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
546546
role._generate_role()
547547
import time
548548
time.sleep(3)

0 commit comments

Comments
 (0)