Skip to content

Commit 9d30808

Browse files
authored
Merge branch 'deepmodeling:master' into master
2 parents 17cee1b + be02afa commit 9d30808

File tree

7 files changed

+106
-58
lines changed

7 files changed

+106
-58
lines changed

dpdispatcher/dp_cloud_server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def do_submit(self, job):
9090
input_data['command'] = f"bash {job.script_file_name}"
9191
# input_data['backward_files'] = self._gen_backward_files_list(job)
9292
if self.context.remote_profile.get('program_id') is None:
93-
warnings.warn('program_id will be compulsory in the future.')
93+
warnings.warn('program_id is compulsory.')
9494
job_id, group_id = self.api.job_create(
9595
job_type=input_data['job_type'],
9696
oss_path=input_data['job_resources'],
@@ -124,13 +124,13 @@ def check_status(self, job):
124124
try:
125125
dp_job_status = check_return["status"]
126126
except IndexError as e:
127-
dlog.error(f"cannot find job information in check_return. job {job.job_id}. check_return:{check_return}; retry one more time after 60 seconds")
127+
dlog.error(f"cannot find job information in bohrium for job {job.job_id}. check_return:{check_return}; retry one more time after 60 seconds")
128128
time.sleep(60)
129129
retry_return = self.api.get_tasks(job_id, group_id)
130130
try:
131131
dp_job_status = retry_return["status"]
132132
except IndexError as e:
133-
raise RuntimeError(f"cannot find job information in dpcloudserver's database for job {job.job_id} {check_return} {retry_return}")
133+
raise RuntimeError(f"cannot find job information in bohrium for job {job.job_id} {check_return} {retry_return}")
134134

135135
job_state = self.map_dp_job_state(dp_job_status)
136136
if job_state == JobStatus.finished:

dpdispatcher/dp_cloud_server_context.py

Lines changed: 56 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python
22
# coding: utf-8
33
# %%
4+
import time
45
import uuid
56

67
from dargs.dargs import Argument
@@ -14,7 +15,10 @@
1415
from .dpcloudserver import zip_file
1516
import shutil
1617
import tqdm
18+
1719
# from zip_file import zip_files
20+
from .dpcloudserver.config import ALI_OSS_BUCKET_URL
21+
1822
DP_CLOUD_SERVER_HOME_DIR = os.path.join(
1923
os.path.expanduser('~'),
2024
'.dpdispatcher/',
@@ -23,14 +27,15 @@
2327
ENDPOINT = 'http://oss-cn-shenzhen.aliyuncs.com'
2428
BUCKET_NAME = 'dpcloudserver'
2529

30+
2631
class DpCloudServerContext(BaseContext):
27-
def __init__ (self,
28-
local_root,
29-
remote_root=None,
30-
remote_profile={},
31-
*args,
32-
**kwargs,
33-
):
32+
def __init__(self,
33+
local_root,
34+
remote_root=None,
35+
remote_profile={},
36+
*args,
37+
**kwargs,
38+
):
3439
self.init_local_root = local_root
3540
self.init_remote_root = remote_root
3641
self.temp_local_root = os.path.abspath(local_root)
@@ -83,6 +88,43 @@ def _gen_oss_path(self, job, zip_filename):
8388
setattr(job, 'upload_path', path)
8489
return path
8590

91+
def upload_job(self, job, common_files=None):
92+
MAX_RETRY = 3
93+
if common_files is None:
94+
common_files = []
95+
self.machine.gen_local_script(job)
96+
zip_filename = job.job_hash + '.zip'
97+
oss_task_zip = self._gen_oss_path(job, zip_filename)
98+
zip_task_file = os.path.join(self.local_root, zip_filename)
99+
100+
upload_file_list = [job.script_file_name, ]
101+
upload_file_list.extend(common_files)
102+
103+
for task in job.job_task_list:
104+
for file in task.forward_files:
105+
upload_file_list.append(
106+
os.path.join(
107+
task.task_work_path, file
108+
)
109+
)
110+
111+
upload_zip = zip_file.zip_file_list(
112+
self.local_root,
113+
zip_task_file,
114+
file_list=upload_file_list
115+
)
116+
result = self.api.upload(oss_task_zip, upload_zip, ENDPOINT, BUCKET_NAME)
117+
retry_count = 0
118+
while True:
119+
if self.api.check_file_has_uploaded(ALI_OSS_BUCKET_URL + oss_task_zip):
120+
self._backup(self.local_root, upload_zip)
121+
break
122+
elif retry_count < MAX_RETRY:
123+
time.sleep(1 + retry_count)
124+
retry_count += 1
125+
else:
126+
raise ValueError(f"upload retried excess {MAX_RETRY} terminate.")
127+
86128
def upload(self, submission):
87129
# oss_task_dir = os.path.join('%s/%s/%s.zip' % ('indicate', file_uuid, file_uuid))
88130
# zip_filename = submission.submission_hash + '.zip'
@@ -100,30 +142,8 @@ def upload(self, submission):
100142
if len(job_to_be_uploaded) == 0:
101143
dlog.info("all job has been uploaded, continue")
102144
return result
103-
for job in tqdm.tqdm(job_to_be_uploaded, desc="Uploading to Lebesgue", bar_format=bar_format):
104-
self.machine.gen_local_script(job)
105-
zip_filename = job.job_hash + '.zip'
106-
oss_task_zip = self._gen_oss_path(job, zip_filename)
107-
zip_task_file = os.path.join(self.local_root, zip_filename)
108-
109-
upload_file_list = [job.script_file_name, ]
110-
upload_file_list.extend(submission.forward_common_files)
111-
112-
for task in job.job_task_list:
113-
for file in task.forward_files:
114-
upload_file_list.append(
115-
os.path.join(
116-
task.task_work_path, file
117-
)
118-
)
119-
120-
upload_zip = zip_file.zip_file_list(
121-
self.local_root,
122-
zip_task_file,
123-
file_list=upload_file_list
124-
)
125-
result = self.api.upload(oss_task_zip, upload_zip, ENDPOINT, BUCKET_NAME)
126-
self._backup(self.local_root, upload_zip)
145+
for job in tqdm.tqdm(job_to_be_uploaded, desc="Uploading to Lebesgue", bar_format=bar_format, leave=False):
146+
self.upload_job(job, submission.forward_common_files)
127147
return result
128148
# return oss_task_zip
129149
# api.upload(self.oss_task_dir, zip_task_file)
@@ -151,7 +171,8 @@ def download(self, submission):
151171
job_hash = job_hashs[each['task_id']]
152172
job_infos[job_hash] = each
153173
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
154-
for job_hash, info in tqdm.tqdm(job_infos.items(), desc="Validating download file from Lebesgue", bar_format=bar_format):
174+
for job_hash, info in tqdm.tqdm(job_infos.items(), desc="Validating download file from Lebesgue",
175+
bar_format=bar_format, leave=False):
155176
result_filename = job_hash + '_back.zip'
156177
target_result_zip = os.path.join(self.local_root, result_filename)
157178
if self._check_if_job_has_already_downloaded(target_result_zip, self.local_root):
@@ -234,7 +255,7 @@ def clean(self):
234255
# retcode = cmd_pipes['stdout'].channel.recv_exit_status()
235256
# return retcode, cmd_pipes['stdout'], cmd_pipes['stderr']
236257

237-
def kill(self, cmd_pipes) :
258+
def kill(self, cmd_pipes):
238259
pass
239260

240261
@classmethod
@@ -251,11 +272,12 @@ def machine_subfields(cls) -> List[Argument]:
251272
Argument("email", str, optional=False, doc="Email"),
252273
Argument("password", str, optional=False, doc="Password"),
253274
Argument("program_id", int, optional=False, doc="Program ID"),
275+
Argument("keep_backup", bool, optional=True, doc="keep download and upload zip"),
254276
Argument("input_data", dict, optional=False, doc="Configuration of job"),
255277
], doc=doc_remote_profile)]
256278

257279

258280
class LebesgueContext(DpCloudServerContext):
259281
pass
260282

261-
#%%
283+
# %%

dpdispatcher/dpcloudserver/api.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
from dpdispatcher import dlog
1414

1515
from .retcode import RETCODE
16-
from .config import HTTP_TIME_OUT, API_HOST
16+
from .config import HTTP_TIME_OUT, API_HOST, API_LOGGER_STACK_INFO
17+
18+
ENABLE_STACK = True if API_LOGGER_STACK_INFO else False
1719

1820

1921
class API:
@@ -35,12 +37,13 @@ def get(self, url, params, retry=0):
3537
headers=headers
3638
)
3739
except Exception as e:
38-
dlog.error(f"request error {e}")
40+
dlog.error(f"request error {e}", stack_info=ENABLE_STACK)
3941
continue
4042
if ret.ok:
4143
break
4244
else:
43-
dlog.error(f"request error status_code:{ret.status_code} reason: {ret.reason} body: \n{ret.text}")
45+
dlog.error(f"request error status_code:{ret.status_code} reason: {ret.reason} body: \n{ret.text}",
46+
stack_info=ENABLE_STACK)
4447
time.sleep(retry_count * 10)
4548
if ret is None:
4649
raise ConnectionError("request fail")
@@ -69,7 +72,7 @@ def post(self, url, params, retry=0):
6972
headers=headers
7073
)
7174
except Exception as e:
72-
dlog.error(f"request error {e}")
75+
dlog.error(f"request error {e}", stack_info=ENABLE_STACK)
7376
continue
7477
if ret.ok:
7578
break
@@ -132,7 +135,7 @@ def download_from_url(self, url, save_file):
132135
stream=True
133136
)
134137
except Exception as e:
135-
dlog.error(f"request error {e}")
138+
dlog.error(f"request error {e}", stack_info=ENABLE_STACK)
136139
continue
137140
if ret.ok:
138141
break
@@ -147,7 +150,6 @@ def download_from_url(self, url, save_file):
147150
f.write(chunk)
148151
ret.close()
149152

150-
151153
def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name):
152154
dlog.debug(f"debug: upload: oss_task_zip:{oss_task_zip}; zip_task_file:{zip_task_file}")
153155
bucket = self._get_oss_bucket(endpoint, bucket_name)
@@ -170,7 +172,6 @@ def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name):
170172
# print('debug:upload_result:', result, dir())
171173
return result
172174

173-
174175
def job_create(self, job_type, oss_path, input_data, program_id=None, group_id=None):
175176
post_data = {
176177
'job_type': job_type,
@@ -244,11 +245,23 @@ def check_job_has_uploaded(self, job_id):
244245
if len(ret) == 0:
245246
return False
246247
if ret.get('input_data'):
247-
return True
248+
return self.check_file_has_uploaded(ret.get('input_data'))
248249
else:
249250
return False
250251
except ValueError as e:
251-
dlog.error(e)
252+
dlog.error(e, stack_info=ENABLE_STACK)
253+
return False
254+
255+
def check_file_has_uploaded(self, file_url):
256+
try:
257+
if not file_url:
258+
return False
259+
resp = requests.head(file_url)
260+
if resp.ok:
261+
return True
262+
return False
263+
except Exception as e:
264+
dlog.error(e, stack_info=ENABLE_STACK)
252265
return False
253266

254267
def get_job_result_url(self, job_id):
@@ -264,7 +277,7 @@ def get_job_result_url(self, job_id):
264277
else:
265278
return None
266279
except ValueError as e:
267-
dlog.error(e)
280+
dlog.error(e, stack_info=ENABLE_STACK)
268281
return None
269282

270283
# %%

dpdispatcher/dpcloudserver/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import os
22
HTTP_TIME_OUT = 30
33

4-
API_HOST = os.environ.get('DPDISPATCHER_LEBESGUE_API_HOST', "https://lebesgue.dp.tech")
5-
4+
API_HOST = os.environ.get('DPDISPATCHER_LEBESGUE_API_HOST', "https://bohrium.dp.tech/")
5+
API_LOGGER_STACK_INFO = os.environ.get('API_LOGGER_STACK_INFO', "")
66
ALI_STS_ENDPOINT = os.environ.get('DPDISPATCHER_LEBESGUE_ALI_STS_ENDPOINT', 'http://oss-cn-shenzhen.aliyuncs.com')
77
ALI_STS_BUCKET_NAME = os.environ.get('DPDISPATCHER_LEBESGUE_ALI_STS_BUCKET_NAME', "dpcloudserver")
88
ALI_OSS_BUCKET_URL = os.environ.get('DPDISPATCHER_LEBESGUE_ALI_OSS_BUCKET_URL', "https://dpcloudserver.oss-cn-shenzhen.aliyuncs.com/")

dpdispatcher/machine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ def arginfo(cls):
334334
Argument("batch_type", str, optional=False, doc=doc_batch_type),
335335
# TODO: add default to local_root and remote_root after refactor the code
336336
Argument("local_root", [str, None], optional=False, doc=doc_local_root),
337-
Argument("remote_root", str, optional=True, doc=doc_remote_root),
337+
Argument("remote_root", [str, None], optional=True, doc=doc_remote_root),
338338
Argument("clean_asynchronously", bool, optional=True, default=False, doc=doc_clean_asynchronously),
339339
]
340340

dpdispatcher/pbs.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,16 @@ def check_status(self, job):
114114
return JobStatus.terminated
115115
else :
116116
return JobStatus.unknown
117+
118+
def gen_script_header(self, job):
119+
# ref: https://support.adaptivecomputing.com/wp-content/uploads/2021/02/torque/torque.htm#topics/torque/2-jobs/requestingRes.htm
120+
resources = job.resources
121+
pbs_script_header_dict= {}
122+
pbs_script_header_dict['select_node_line']="#PBS -l nodes={number_node}:ppn={cpu_per_node}".format(
123+
number_node=resources.number_node, cpu_per_node=resources.cpu_per_node
124+
)
125+
if (resources.gpu_per_node != 0):
126+
pbs_script_header_dict['select_node_line'] += ":gpus={gpu_per_node}".format(gpu_per_node=resources.gpu_per_node)
127+
pbs_script_header_dict['queue_name_line']="#PBS -q {queue_name}".format(queue_name=resources.queue_name)
128+
pbs_script_header = pbs_script_header_template.format(**pbs_script_header_dict)
129+
return pbs_script_header

dpdispatcher/submission.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,13 @@ def serialize(self, if_static=False):
130130

131131
def register_task(self, task):
132132
if self.belonging_jobs:
133-
raise RuntimeError("Not allowed to register tasks after generating jobs."
133+
raise RuntimeError("Not allowed to register tasks after generating jobs. "
134134
"submission hash error {self}".format(self=self))
135135
self.belonging_tasks.append(task)
136136

137137
def register_task_list(self, task_list):
138138
if self.belonging_jobs:
139-
raise RuntimeError("Not allowed to register tasks after generating jobs."
139+
raise RuntimeError("Not allowed to register tasks after generating jobs. "
140140
"submission hash error {self}".format(self=self))
141141
self.belonging_tasks.extend(task_list)
142142

@@ -245,8 +245,8 @@ def handle_unexpected_submission_state(self):
245245
f"Meet errors will handle unexpected submission state.\n"
246246
f"Debug information: remote_root=={self.machine.context.remote_root}.\n"
247247
f"Debug information: submission_hash=={self.submission_hash}.\n"
248-
f"Please check the dirs and scripts in remote_root"
249-
f"The job information mentioned above may help"
248+
f"Please check the dirs and scripts in remote_root. "
249+
f"The job information mentioned above may help."
250250
) from e
251251

252252
# not used here, submitting job is in handle_unexpected_submission_state.
@@ -840,10 +840,10 @@ def arginfo(detail_kwargs=True):
840840
strategy_format = Argument("strategy", dict, strategy_args, optional=True, doc=doc_strategy)
841841

842842
resources_args = [
843-
Argument("number_node", int, optional=False, doc=doc_number_node),
844-
Argument("cpu_per_node", int, optional=False, doc=doc_cpu_per_node),
845-
Argument("gpu_per_node", int, optional=False, doc=doc_gpu_per_node),
846-
Argument("queue_name", str, optional=False, doc=doc_queue_name),
843+
Argument("number_node", int, optional=True, doc=doc_number_node, default=1),
844+
Argument("cpu_per_node", int, optional=True, doc=doc_cpu_per_node, default=1),
845+
Argument("gpu_per_node", int, optional=True, doc=doc_gpu_per_node, default=0),
846+
Argument("queue_name", str, optional=True, doc=doc_queue_name, default=""),
847847
Argument("group_size", int, optional=False, doc=doc_group_size),
848848

849849
Argument("custom_flags", list, optional=True, doc=doc_custom_flags),

0 commit comments

Comments
 (0)