Skip to content

Commit 060d7e4

Browse files
committed
Merge branch 'master' of https://github.com/PaddlePaddle/PaddleRec into model_fix
2 parents 4076049 + 8fd5aed commit 060d7e4

File tree

15 files changed

+1090
-137
lines changed

15 files changed

+1090
-137
lines changed

core/engine/cluster/cluster.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,16 @@
1919
import os
2020
import subprocess
2121
import warnings
22+
import sys
23+
import logging
2224

2325
from paddlerec.core.engine.engine import Engine
2426
from paddlerec.core.factory import TrainerFactory
2527
from paddlerec.core.utils import envs
28+
import paddlerec.core.engine.cluster_utils as cluster_utils
29+
30+
logger = logging.getLogger("root")
31+
logger.propagate = False
2632

2733

2834
class ClusterEngine(Engine):
@@ -47,8 +53,38 @@ def __init_impl__(self):
4753
self.backend))
4854

4955
def start_worker_procs(self):
50-
trainer = TrainerFactory.create(self.trainer)
51-
trainer.run()
56+
if (envs.get_runtime_environ("fleet_mode") == "COLLECTIVE"):
57+
#trainer_ports = os.getenv("TRAINER_PORTS", None).split(",")
58+
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
59+
if cuda_visible_devices is None or cuda_visible_devices == "":
60+
selected_gpus = range(int(os.getenv("TRAINER_GPU_CARD_COUNT")))
61+
else:
62+
# change selected_gpus into relative values
63+
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
64+
# therefore selected_gpus=0,1,2,3
65+
cuda_visible_devices_list = cuda_visible_devices.split(',')
66+
for x in range(int(os.getenv("TRAINER_GPU_CARD_COUNT"))):
67+
assert x in cuda_visible_devices_list, "Can't find "\
68+
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
69+
% (x, cuda_visible_devices)
70+
selected_gpus = [cuda_visible_devices_list.index(x)]
71+
print("selected_gpus:{}".format(selected_gpus))
72+
73+
factory = "paddlerec.core.factory"
74+
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
75+
logs_dir = envs.get_runtime_environ("log_dir")
76+
print("use_paddlecloud_flag:{}".format(
77+
cluster_utils.use_paddlecloud()))
78+
if cluster_utils.use_paddlecloud():
79+
cluster, pod = cluster_utils.get_cloud_cluster(selected_gpus)
80+
logger.info("get cluster from cloud:{}".format(cluster))
81+
procs = cluster_utils.start_local_trainers(
82+
cluster, pod, cmd, log_dir=logs_dir)
83+
print("cluster:{}".format(cluster))
84+
print("pod:{}".format(pod))
85+
else:
86+
trainer = TrainerFactory.create(self.trainer)
87+
trainer.run()
5288

5389
def start_master_procs(self):
5490
if self.backend == "PADDLECLOUD":

core/engine/cluster_utils.py

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import functools
16+
import logging
17+
import socket
18+
import time
19+
import os
20+
import signal
21+
import copy
22+
import sys
23+
import subprocess
24+
from contextlib import closing
25+
import socket
26+
27+
logger = logging.getLogger("root")
28+
logger.propagate = False
29+
30+
31+
class Cluster(object):
32+
def __init__(self, hdfs):
33+
self.job_server = None
34+
self.pods = []
35+
self.hdfs = None
36+
self.job_stage_flag = None
37+
38+
def __str__(self):
39+
return "job_server:{} pods:{} job_stage_flag:{} hdfs:{}".format(
40+
self.job_server, [str(pod) for pod in self.pods],
41+
self.job_stage_flag, self.hdfs)
42+
43+
def __eq__(self, cluster):
44+
if len(self.pods) != len(cluster.pods):
45+
return False
46+
47+
for a, b in zip(self.pods, cluster.pods):
48+
if a != b:
49+
return False
50+
51+
if self.job_stage_flag != cluster.job_stage_flag:
52+
return False
53+
54+
return True
55+
56+
def __ne__(self, cluster):
57+
return not self.__eq__(cluster)
58+
59+
def update_pods(cluster):
60+
self.pods = copy.copy(cluster.pods)
61+
62+
def trainers_nranks(self):
63+
return len(self.trainers_endpoints())
64+
65+
def pods_nranks(self):
66+
return len(self.pods)
67+
68+
def trainers_endpoints(self):
69+
r = []
70+
for pod in self.pods:
71+
for t in pod.trainers:
72+
r.append(t.endpoint)
73+
return r
74+
75+
def pods_endpoints(self):
76+
r = []
77+
for pod in self.pods:
78+
ep = "{}:{}".format(pod.addr, pod.port)
79+
assert pod.port != None and pod.addr != None, "{} not a valid endpoint".format(
80+
ep)
81+
r.append(ep)
82+
83+
return r
84+
85+
def get_pod_by_id(self, pod_id):
86+
for pod in self.pods:
87+
if str(pod_id) == str(pod.id):
88+
return pod
89+
90+
return None
91+
92+
93+
class JobServer(object):
94+
def __init__(self):
95+
self.endpoint = None
96+
97+
def __str__(self):
98+
return "{}".format(self.endpoint)
99+
100+
def __eq__(self, j):
101+
return self.endpint == j.endpoint
102+
103+
def __ne__(self, j):
104+
return not self == j
105+
106+
107+
class Trainer(object):
108+
def __init__(self):
109+
self.gpus = []
110+
self.endpoint = None
111+
self.rank = None
112+
113+
def __str__(self):
114+
return "gpu:{} endpoint:{} rank:{}".format(self.gpus, self.endpoint,
115+
self.rank)
116+
117+
def __eq__(self, t):
118+
if len(self.gpus) != len(t.gpus):
119+
return False
120+
121+
if self.endpoint != t.endpoint or \
122+
self.rank != t.rank:
123+
return False
124+
125+
for a, b in zip(self.gpus, t.gpus):
126+
if a != b:
127+
return False
128+
129+
return True
130+
131+
def __ne__(self, t):
132+
return not self == t
133+
134+
def rank(self):
135+
return self.rank
136+
137+
138+
class Pod(object):
139+
def __init__(self):
140+
self.rank = None
141+
self.id = None
142+
self.addr = None
143+
self.port = None
144+
self.trainers = []
145+
self.gpus = []
146+
147+
def __str__(self):
148+
return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}".format(
149+
self.rank, self.id, self.addr, self.port, self.gpus,
150+
[str(t) for t in self.trainers])
151+
152+
def __eq__(self, pod):
153+
if self.rank != pod.rank or \
154+
self.id != pod.id or \
155+
self.addr != pod.addr or \
156+
self.port != pod.port:
157+
logger.debug("pod {} != pod".format(self, pod))
158+
return False
159+
160+
if len(self.trainers) != len(pod.trainers):
161+
logger.debug("trainers {} != {}".format(self.trainers,
162+
pod.trainers))
163+
return False
164+
165+
for i in range(len(self.trainers)):
166+
if self.trainers[i] != pod.trainers[i]:
167+
logger.debug("trainer {} != {}".format(self.trainers[i],
168+
pod.trainers[i]))
169+
return False
170+
171+
return True
172+
173+
def __ne__(self, pod):
174+
return not self == pod
175+
176+
def parse_response(self, res_pods):
177+
pass
178+
179+
def rank(self):
180+
return self.rank
181+
182+
def get_visible_gpus(self):
183+
r = ""
184+
for g in self.gpus:
185+
r += "{},".format(g)
186+
187+
assert r != "", "this pod {} can't see any gpus".format(self)
188+
189+
r = r[:-1]
190+
return r
191+
192+
193+
def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus):
194+
assert type(paddle_ports) is list, "paddle_ports must be list"
195+
cluster = Cluster(hdfs=None)
196+
trainer_rank = 0
197+
for node_rank, ip in enumerate(node_ips):
198+
pod = Pod()
199+
pod.rank = node_rank
200+
pod.addr = ip
201+
for i in range(len(selected_gpus)):
202+
trainer = Trainer()
203+
trainer.gpus.append(selected_gpus[i])
204+
trainer.endpoint = "%s:%d" % (ip, paddle_ports[i])
205+
trainer.rank = trainer_rank
206+
trainer_rank += 1
207+
208+
pod.trainers.append(trainer)
209+
cluster.pods.append(pod)
210+
211+
pod_rank = node_ips.index(node_ip)
212+
return cluster, cluster.pods[pod_rank]
213+
214+
215+
def get_cloud_cluster(selected_gpus, args_port=None):
216+
#you can automatically get ip info while using paddlecloud multi nodes mode.
217+
node_ips = os.getenv("PADDLE_TRAINERS")
218+
assert node_ips is not None, "PADDLE_TRAINERS should not be None"
219+
print("node_ips:{}".format(node_ips))
220+
node_ip = os.getenv("POD_IP")
221+
assert node_ip is not None, "POD_IP should not be None"
222+
print("node_ip:{}".format(node_ip))
223+
node_rank = os.getenv("PADDLE_TRAINER_ID")
224+
assert node_rank is not None, "PADDLE_TRAINER_ID should not be None"
225+
print("node_rank:{}".format(node_rank))
226+
node_ips = node_ips.split(",")
227+
num_nodes = len(node_ips)
228+
node_rank = int(node_rank)
229+
230+
started_port = args_port
231+
print("num_nodes:", num_nodes)
232+
if num_nodes > 1:
233+
try:
234+
paddle_port = int(os.getenv("PADDLE_PORT", ""))
235+
paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", ""))
236+
237+
if paddle_port_num >= len(
238+
selected_gpus) and paddle_port != args_port:
239+
logger.warning("Use Cloud specified port:{}.".format(
240+
paddle_port))
241+
started_port = paddle_port
242+
243+
except Exception as e:
244+
print(e)
245+
pass
246+
247+
if started_port is None:
248+
started_port = 6170
249+
250+
logger.debug("parsed from args:node_ips:{} \
251+
node_ip:{} node_rank:{} started_port:{}"
252+
.format(node_ips, node_ip, node_rank, started_port))
253+
254+
ports = [x for x in range(started_port, started_port + len(selected_gpus))]
255+
cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus)
256+
return cluster, cluster.pods[node_rank]
257+
258+
259+
def use_paddlecloud():
260+
node_ips = os.getenv("PADDLE_TRAINERS", None)
261+
node_ip = os.getenv("POD_IP", None)
262+
node_rank = os.getenv("PADDLE_TRAINER_ID", None)
263+
if node_ips is None or node_ip is None or node_rank is None:
264+
return False
265+
else:
266+
return True
267+
268+
269+
class TrainerProc(object):
270+
def __init__(self):
271+
self.proc = None
272+
self.log_fn = None
273+
self.log_offset = None
274+
self.rank = None
275+
self.local_rank = None
276+
self.cmd = None
277+
278+
279+
def start_local_trainers(cluster, pod, cmd, log_dir=None):
280+
current_env = copy.copy(os.environ.copy())
281+
#paddle broadcast ncclUniqueId use socket, and
282+
#proxy maybe make trainers unreachable, so delete them.
283+
#if we set them to "", grpc will log error message "bad uri"
284+
#so just delete them.
285+
current_env.pop("http_proxy", None)
286+
current_env.pop("https_proxy", None)
287+
288+
procs = []
289+
for idx, t in enumerate(pod.trainers):
290+
proc_env = {
291+
"FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]),
292+
"PADDLE_TRAINER_ID": "%d" % t.rank,
293+
"PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
294+
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
295+
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints())
296+
}
297+
298+
current_env.update(proc_env)
299+
300+
logger.debug("trainer proc env:{}".format(current_env))
301+
302+
# cmd = [sys.executable, "-u", training_script]
303+
304+
logger.info("start trainer proc:{} env:{}".format(cmd, proc_env))
305+
306+
fn = None
307+
if log_dir is not None:
308+
os.system("mkdir -p {}".format(log_dir))
309+
fn = open("%s/workerlog.%d" % (log_dir, idx), "a")
310+
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
311+
else:
312+
proc = subprocess.Popen(cmd, env=current_env)
313+
314+
tp = TrainerProc()
315+
tp.proc = proc
316+
tp.rank = t.rank
317+
tp.local_rank = idx
318+
tp.log_fn = fn
319+
tp.log_offset = fn.tell() if fn else None
320+
tp.cmd = cmd
321+
322+
procs.append(proc)
323+
324+
return procs

0 commit comments

Comments
 (0)