Skip to content

Commit a7e6f95

Browse files
authored
Master will not add any worker if the current rendezvous hosts become empty after starting training. (#2508)
* Master will not add any worker if the rendezvous hosts become empty * Don't add a new worker host if the next group is empty * Fix the error message * Split a long function * Fix a typo
1 parent 94029ca commit a7e6f95

File tree

2 files changed

+23
-14
lines changed

2 files changed

+23
-14
lines changed

elasticai_api/common/base_controller.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ def init_horovod_if_needed(self):
6363
time.sleep(RETRY_ALLREDUCE_INTERVAL_SECS)
6464
else:
6565
break
66+
if rank_response.rank_id < 0:
67+
raise ValueError("Invalid rank {}".format(rank_response.rank_id))
6668

6769
# If the rendezvous from master is unequal to self._rendezvous_id,
6870
# the worker should rebuild the communication because the master
@@ -73,19 +75,22 @@ def init_horovod_if_needed(self):
7375
rank_response.rank_id, rank_response.world_size
7476
)
7577
)
76-
os.environ[HorovodEnv.RENDEZVOUS_PORT] = str(
77-
rank_response.rendezvous_port
78-
)
79-
os.environ[HorovodEnv.RANK] = str(rank_response.rank_id)
80-
os.environ[HorovodEnv.SIZE] = str(rank_response.world_size)
81-
# Not using Horovod elastic feature in init, but need it for
82-
# allreduce to call allreduce op when size=1.
83-
os.environ[HorovodEnv.ELASTIC] = str(0)
84-
hvd.shutdown()
85-
hvd.init()
86-
os.environ[HorovodEnv.ELASTIC] = str(1)
87-
self._rendezvous_id = rank_response.rendezvous_id
88-
self.need_broadcast = True
78+
self._restart_hvd(rank_response)
79+
80+
def _restart_hvd(self, rank_response):
81+
os.environ[HorovodEnv.RENDEZVOUS_PORT] = str(
82+
rank_response.rendezvous_port
83+
)
84+
os.environ[HorovodEnv.RANK] = str(rank_response.rank_id)
85+
os.environ[HorovodEnv.SIZE] = str(rank_response.world_size)
86+
# Not using Horovod elastic feature in init, but need it for
87+
# allreduce to call allreduce op when size=1.
88+
os.environ[HorovodEnv.ELASTIC] = str(0)
89+
hvd.shutdown()
90+
hvd.init()
91+
os.environ[HorovodEnv.ELASTIC] = str(1)
92+
self._rendezvous_id = rank_response.rendezvous_id
93+
self.need_broadcast = True
8994

9095
def _set_horovod_env(self):
9196
master_addr_port = os.getenv(WorkerEnv.MASTER_ADDR, None)
@@ -103,7 +108,7 @@ def notify_training_loop_status(self, status):
103108

104109
class AllReduceController(object):
105110
"""The controller initializes Horovod and calls the function with forward
106-
and backward computation using a mini-batch of data. If Horovod raise an
111+
and backward computation using a mini-batch of data. If Horovod raises an
107112
exception about AllReduce, Allgather and Broadcast, the controller will
108113
catch the exception and re-initialize Horovod. Then, it will broadcast
109114
the variables and retry to call those functions.

elasticdl/python/master/rendezvous_server.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ def add_worker(self, worker_host):
145145
self._next_rendezvous_hosts = copy.deepcopy(
146146
self._cur_rendezvous_hosts
147147
)
148+
# Master will not add any worker if the current rendezvous
149+
# hosts become empty after starting training.
150+
if self._rendezvous_id > 0 and not self._next_rendezvous_hosts:
151+
return
148152
if worker_host not in self._next_rendezvous_hosts:
149153
self._next_rendezvous_hosts.append(worker_host)
150154

0 commit comments

Comments
 (0)