Skip to content

Commit 984359c

Browse files
authored
Merge branch 'deepmodeling:master' into master
2 parents a06a725 + 2e260eb commit 984359c

16 files changed

+221
-96
lines changed

doc/examples/expanse.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@
22

33
[Expanse](https://www.sdsc.edu/support/user_guides/expanse.html) is a cluster operated by the San Diego Supercomputer Center. Here we provide an example to run jobs on the expanse.
44

5-
The machine parameters are provided below. Expanse uses the SLURM workload manager for job scheduling. `remote_root` has been created in advance. It's worth metioned that we do not recommend to use the password, so [SSH keys](https://www.ssh.com/academy/ssh/key) are used instead to improve security.
5+
The machine parameters are provided below. Expanse uses the SLURM workload manager for job scheduling. {ref}`remote_root <machine/remote_root>` has been created in advance. It's worth metioned that we do not recommend to use the password, so [SSH keys](https://www.ssh.com/academy/ssh/key) are used instead to improve security.
66

77
```{literalinclude} ../../examples/machine/expanse.json
88
:language: json
99
:linenos:
1010
```
1111

12-
Expanse's standard compute nodes are each powered by two 64-core AMD EPYC 7742 processors and contain 256 GB of DDR4 memory. Here, we request one node with 32 cores and 16 GB memory from the `shared` partition. Expanse does not support `--gres=gpu:0` command, so we use `custom_gpu_line` to customize the statement.
12+
Expanse's standard compute nodes are each powered by two 64-core AMD EPYC 7742 processors and contain 256 GB of DDR4 memory. Here, we request one node with 32 cores and 16 GB memory from the `shared` partition. Expanse does not support `--gres=gpu:0` command, so we use {ref}`custom_gpu_line <resources[Slurm]/kwargs/custom_gpu_line>` to customize the statement.
1313

1414
```{literalinclude} ../../examples/resources/expanse_cpu.json
1515
:language: json
1616
:linenos:
1717
```
1818

19-
The following task parameter runs a DeePMD-kit task, forwarding an input file and backwarding graph files. Here, the data set will be used among all the tasks, so it is not included in the `forward_files`. Instead, it should be included in the submission's `forward_common_files`.
19+
The following task parameter runs a DeePMD-kit task, forwarding an input file and backwarding graph files. Here, the data set will be used among all the tasks, so it is not included in the {ref}`forward_files <task/forward_files>`. Instead, it should be included in the submission's {ref}`forward_common_files <task/forward_common_files>`.
2020

2121
```{literalinclude} ../../examples/task/deepmd-kit.json
2222
:language: json

doc/examples/shell.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
# Running multiple MD tasks on a GPU workstation
22

3-
In this example, we are going to show how to run multiple MD tasks on a GPU workstation. This workstation does not install any job scheduling packages installed, so we will use `Shell` as `batch_type`.
3+
In this example, we are going to show how to run multiple MD tasks on a GPU workstation. This workstation does not install any job scheduling packages installed, so we will use `Shell` as {ref}`batch_type <machine/batch_type>`.
44

55
```{literalinclude} ../../examples/machine/mandu.json
66
:language: json
77
:linenos:
88
```
99

10-
The workstation has 48 cores of CPUs and 8 RTX3090 cards. Here we hope each card runs 6 tasks at the same time, as each task does not consume too many GPU resources. Thus, `strategy/if_cuda_multi_devices` is set to `true` and `para_deg` is set to 6.
10+
The workstation has 48 cores of CPUs and 8 RTX3090 cards. Here we hope each card runs 6 tasks at the same time, as each task does not consume too many GPU resources. Thus, {ref}`strategy/if_cuda_multi_devices <resources/strategy/if_cuda_multi_devices>` is set to `true` and {ref}`para_deg <resources/para_deg>` is set to 6.
1111

1212
```{literalinclude} ../../examples/resources/mandu.json
1313
:language: json
1414
:linenos:
1515
```
1616

17-
Note that `group_size` should be set to `0` (means infinity) to ensure there is only one job and avoid running multiple jobs at the same time.
17+
Note that {ref}`group_size <resources/group_size>` should be set to `0` (means infinity) to ensure there is only one job and avoid running multiple jobs at the same time.

doc/getting-started.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
DPDispatcher provides the following classes:
44

5-
- `Task` class, which represents a command to be run on batch job system, as well as the essential files need by the command.
6-
- `Submission` class, which represents a collection of jobs defined by the HPC system.
5+
- {class}`Task <dpdispatcher.submission.Task>` class, which represents a command to be run on batch job system, as well as the essential files need by the command.
6+
- {class}`Submission <dpdispatcher.submission.Submission>` class, which represents a collection of jobs defined by the HPC system.
77
And there may be common files to be uploaded by them.
8-
DPDispatcher will create and submit these jobs when a `submission` instance execute `run_submission` method.
8+
DPDispatcher will create and submit these jobs when a `submission` instance execute {meth}`run_submission <dpdispatcher.submission.Submission.run_submission>` method.
99
This method will poke until the jobs finish and return.
10-
- `Job` class, a class used by `Submission` class, which represents a job on the HPC system.
11-
`Submission` will generate `job`s' submitting scripts used by HPC systems automatically with the `Task` and `Resources`
12-
- `Resources` class, which represents the computing resources for each job within a `submission`.
10+
- {class}`Job <dpdispatcher.submission.Job>` class, a class used by {class}`Submission <dpdispatcher.submission.Submission>` class, which represents a job on the HPC system.
11+
{class}`Submission <dpdispatcher.submission.Submission>` will generate `job`s' submitting scripts used by HPC systems automatically with the {class}`Task <dpdispatcher.submission.Task>` and {class}`Resources <dpdispatcher.submission.Resources>`
12+
- {class}`Resources <dpdispatcher.submission.Resources>` class, which represents the computing resources for each job within a `submission`.
1313

1414
You can use DPDispatcher in a Python script to submit five tasks:
1515

dpdispatcher/base_context.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
from abc import ABCMeta, abstractmethod
12
from dargs import Argument
23
from typing import List
34

45
from dpdispatcher import dlog
56

6-
class BaseContext(object):
7+
class BaseContext(metaclass=ABCMeta):
78
subclasses_dict = {}
89
options = set()
910
def __new__(cls, *args, **kwargs):
@@ -37,22 +38,27 @@ def load_from_dict(cls, context_dict):
3738
def bind_submission(self, submission):
3839
self.submission = submission
3940

41+
@abstractmethod
4042
def upload(self, submission):
4143
raise NotImplementedError('abstract method')
4244

45+
@abstractmethod
4346
def download(self,
4447
submission,
4548
check_exists = False,
4649
mark_failure = True,
4750
back_error=False):
4851
raise NotImplementedError('abstract method')
4952

53+
@abstractmethod
5054
def clean(self):
5155
raise NotImplementedError('abstract method')
5256

57+
@abstractmethod
5358
def write_file(self, fname, write_str):
5459
raise NotImplementedError('abstract method')
5560

61+
@abstractmethod
5662
def read_file(self, fname):
5763
raise NotImplementedError('abstract method')
5864

dpdispatcher/dp_cloud_server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def do_submit(self, job):
9090
input_data['command'] = f"bash {job.script_file_name}"
9191
# input_data['backward_files'] = self._gen_backward_files_list(job)
9292
if self.context.remote_profile.get('program_id') is None:
93-
warnings.warn('program_id will be compulsory in the future.')
93+
warnings.warn('program_id is compulsory.')
9494
job_id, group_id = self.api.job_create(
9595
job_type=input_data['job_type'],
9696
oss_path=input_data['job_resources'],
@@ -124,13 +124,13 @@ def check_status(self, job):
124124
try:
125125
dp_job_status = check_return["status"]
126126
except IndexError as e:
127-
dlog.error(f"cannot find job information in check_return. job {job.job_id}. check_return:{check_return}; retry one more time after 60 seconds")
127+
dlog.error(f"cannot find job information in bohrium for job {job.job_id}. check_return:{check_return}; retry one more time after 60 seconds")
128128
time.sleep(60)
129129
retry_return = self.api.get_tasks(job_id, group_id)
130130
try:
131131
dp_job_status = retry_return["status"]
132132
except IndexError as e:
133-
raise RuntimeError(f"cannot find job information in dpcloudserver's database for job {job.job_id} {check_return} {retry_return}")
133+
raise RuntimeError(f"cannot find job information in bohrium for job {job.job_id} {check_return} {retry_return}")
134134

135135
job_state = self.map_dp_job_state(dp_job_status)
136136
if job_state == JobStatus.finished:

dpdispatcher/dp_cloud_server_context.py

Lines changed: 56 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python
22
# coding: utf-8
33
# %%
4+
import time
45
import uuid
56

67
from dargs.dargs import Argument
@@ -14,7 +15,10 @@
1415
from .dpcloudserver import zip_file
1516
import shutil
1617
import tqdm
18+
1719
# from zip_file import zip_files
20+
from .dpcloudserver.config import ALI_OSS_BUCKET_URL
21+
1822
DP_CLOUD_SERVER_HOME_DIR = os.path.join(
1923
os.path.expanduser('~'),
2024
'.dpdispatcher/',
@@ -23,14 +27,15 @@
2327
ENDPOINT = 'http://oss-cn-shenzhen.aliyuncs.com'
2428
BUCKET_NAME = 'dpcloudserver'
2529

30+
2631
class DpCloudServerContext(BaseContext):
27-
def __init__ (self,
28-
local_root,
29-
remote_root=None,
30-
remote_profile={},
31-
*args,
32-
**kwargs,
33-
):
32+
def __init__(self,
33+
local_root,
34+
remote_root=None,
35+
remote_profile={},
36+
*args,
37+
**kwargs,
38+
):
3439
self.init_local_root = local_root
3540
self.init_remote_root = remote_root
3641
self.temp_local_root = os.path.abspath(local_root)
@@ -83,6 +88,43 @@ def _gen_oss_path(self, job, zip_filename):
8388
setattr(job, 'upload_path', path)
8489
return path
8590

91+
def upload_job(self, job, common_files=None):
92+
MAX_RETRY = 3
93+
if common_files is None:
94+
common_files = []
95+
self.machine.gen_local_script(job)
96+
zip_filename = job.job_hash + '.zip'
97+
oss_task_zip = self._gen_oss_path(job, zip_filename)
98+
zip_task_file = os.path.join(self.local_root, zip_filename)
99+
100+
upload_file_list = [job.script_file_name, ]
101+
upload_file_list.extend(common_files)
102+
103+
for task in job.job_task_list:
104+
for file in task.forward_files:
105+
upload_file_list.append(
106+
os.path.join(
107+
task.task_work_path, file
108+
)
109+
)
110+
111+
upload_zip = zip_file.zip_file_list(
112+
self.local_root,
113+
zip_task_file,
114+
file_list=upload_file_list
115+
)
116+
result = self.api.upload(oss_task_zip, upload_zip, ENDPOINT, BUCKET_NAME)
117+
retry_count = 0
118+
while True:
119+
if self.api.check_file_has_uploaded(ALI_OSS_BUCKET_URL + oss_task_zip):
120+
self._backup(self.local_root, upload_zip)
121+
break
122+
elif retry_count < MAX_RETRY:
123+
time.sleep(1 + retry_count)
124+
retry_count += 1
125+
else:
126+
raise ValueError(f"upload retried excess {MAX_RETRY} terminate.")
127+
86128
def upload(self, submission):
87129
# oss_task_dir = os.path.join('%s/%s/%s.zip' % ('indicate', file_uuid, file_uuid))
88130
# zip_filename = submission.submission_hash + '.zip'
@@ -100,30 +142,8 @@ def upload(self, submission):
100142
if len(job_to_be_uploaded) == 0:
101143
dlog.info("all job has been uploaded, continue")
102144
return result
103-
for job in tqdm.tqdm(job_to_be_uploaded, desc="Uploading to Lebesgue", bar_format=bar_format):
104-
self.machine.gen_local_script(job)
105-
zip_filename = job.job_hash + '.zip'
106-
oss_task_zip = self._gen_oss_path(job, zip_filename)
107-
zip_task_file = os.path.join(self.local_root, zip_filename)
108-
109-
upload_file_list = [job.script_file_name, ]
110-
upload_file_list.extend(submission.forward_common_files)
111-
112-
for task in job.job_task_list:
113-
for file in task.forward_files:
114-
upload_file_list.append(
115-
os.path.join(
116-
task.task_work_path, file
117-
)
118-
)
119-
120-
upload_zip = zip_file.zip_file_list(
121-
self.local_root,
122-
zip_task_file,
123-
file_list=upload_file_list
124-
)
125-
result = self.api.upload(oss_task_zip, upload_zip, ENDPOINT, BUCKET_NAME)
126-
self._backup(self.local_root, upload_zip)
145+
for job in tqdm.tqdm(job_to_be_uploaded, desc="Uploading to Lebesgue", bar_format=bar_format, leave=False):
146+
self.upload_job(job, submission.forward_common_files)
127147
return result
128148
# return oss_task_zip
129149
# api.upload(self.oss_task_dir, zip_task_file)
@@ -151,7 +171,8 @@ def download(self, submission):
151171
job_hash = job_hashs[each['task_id']]
152172
job_infos[job_hash] = each
153173
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
154-
for job_hash, info in tqdm.tqdm(job_infos.items(), desc="Validating download file from Lebesgue", bar_format=bar_format):
174+
for job_hash, info in tqdm.tqdm(job_infos.items(), desc="Validating download file from Lebesgue",
175+
bar_format=bar_format, leave=False):
155176
result_filename = job_hash + '_back.zip'
156177
target_result_zip = os.path.join(self.local_root, result_filename)
157178
if self._check_if_job_has_already_downloaded(target_result_zip, self.local_root):
@@ -234,7 +255,7 @@ def clean(self):
234255
# retcode = cmd_pipes['stdout'].channel.recv_exit_status()
235256
# return retcode, cmd_pipes['stdout'], cmd_pipes['stderr']
236257

237-
def kill(self, cmd_pipes) :
258+
def kill(self, cmd_pipes):
238259
pass
239260

240261
@classmethod
@@ -251,11 +272,12 @@ def machine_subfields(cls) -> List[Argument]:
251272
Argument("email", str, optional=False, doc="Email"),
252273
Argument("password", str, optional=False, doc="Password"),
253274
Argument("program_id", int, optional=False, doc="Program ID"),
275+
Argument("keep_backup", bool, optional=True, doc="keep download and upload zip"),
254276
Argument("input_data", dict, optional=False, doc="Configuration of job"),
255277
], doc=doc_remote_profile)]
256278

257279

258280
class LebesgueContext(DpCloudServerContext):
259281
pass
260282

261-
#%%
283+
# %%

dpdispatcher/dpcloudserver/api.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
from dpdispatcher import dlog
1414

1515
from .retcode import RETCODE
16-
from .config import HTTP_TIME_OUT, API_HOST
16+
from .config import HTTP_TIME_OUT, API_HOST, API_LOGGER_STACK_INFO
17+
18+
ENABLE_STACK = True if API_LOGGER_STACK_INFO else False
1719

1820

1921
class API:
@@ -35,12 +37,13 @@ def get(self, url, params, retry=0):
3537
headers=headers
3638
)
3739
except Exception as e:
38-
dlog.error(f"request error {e}")
40+
dlog.error(f"request error {e}", stack_info=ENABLE_STACK)
3941
continue
4042
if ret.ok:
4143
break
4244
else:
43-
dlog.error(f"request error status_code:{ret.status_code} reason: {ret.reason} body: \n{ret.text}")
45+
dlog.error(f"request error status_code:{ret.status_code} reason: {ret.reason} body: \n{ret.text}",
46+
stack_info=ENABLE_STACK)
4447
time.sleep(retry_count * 10)
4548
if ret is None:
4649
raise ConnectionError("request fail")
@@ -69,7 +72,7 @@ def post(self, url, params, retry=0):
6972
headers=headers
7073
)
7174
except Exception as e:
72-
dlog.error(f"request error {e}")
75+
dlog.error(f"request error {e}", stack_info=ENABLE_STACK)
7376
continue
7477
if ret.ok:
7578
break
@@ -132,7 +135,7 @@ def download_from_url(self, url, save_file):
132135
stream=True
133136
)
134137
except Exception as e:
135-
dlog.error(f"request error {e}")
138+
dlog.error(f"request error {e}", stack_info=ENABLE_STACK)
136139
continue
137140
if ret.ok:
138141
break
@@ -147,7 +150,6 @@ def download_from_url(self, url, save_file):
147150
f.write(chunk)
148151
ret.close()
149152

150-
151153
def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name):
152154
dlog.debug(f"debug: upload: oss_task_zip:{oss_task_zip}; zip_task_file:{zip_task_file}")
153155
bucket = self._get_oss_bucket(endpoint, bucket_name)
@@ -170,7 +172,6 @@ def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name):
170172
# print('debug:upload_result:', result, dir())
171173
return result
172174

173-
174175
def job_create(self, job_type, oss_path, input_data, program_id=None, group_id=None):
175176
post_data = {
176177
'job_type': job_type,
@@ -244,11 +245,23 @@ def check_job_has_uploaded(self, job_id):
244245
if len(ret) == 0:
245246
return False
246247
if ret.get('input_data'):
247-
return True
248+
return self.check_file_has_uploaded(ret.get('input_data'))
248249
else:
249250
return False
250251
except ValueError as e:
251-
dlog.error(e)
252+
dlog.error(e, stack_info=ENABLE_STACK)
253+
return False
254+
255+
def check_file_has_uploaded(self, file_url):
256+
try:
257+
if not file_url:
258+
return False
259+
resp = requests.head(file_url)
260+
if resp.ok:
261+
return True
262+
return False
263+
except Exception as e:
264+
dlog.error(e, stack_info=ENABLE_STACK)
252265
return False
253266

254267
def get_job_result_url(self, job_id):
@@ -264,7 +277,7 @@ def get_job_result_url(self, job_id):
264277
else:
265278
return None
266279
except ValueError as e:
267-
dlog.error(e)
280+
dlog.error(e, stack_info=ENABLE_STACK)
268281
return None
269282

270283
# %%

dpdispatcher/dpcloudserver/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import os
22
HTTP_TIME_OUT = 30
33

4-
API_HOST = os.environ.get('DPDISPATCHER_LEBESGUE_API_HOST', "https://lebesgue.dp.tech")
5-
4+
API_HOST = os.environ.get('DPDISPATCHER_LEBESGUE_API_HOST', "https://bohrium.dp.tech/")
5+
API_LOGGER_STACK_INFO = os.environ.get('API_LOGGER_STACK_INFO', "")
66
ALI_STS_ENDPOINT = os.environ.get('DPDISPATCHER_LEBESGUE_ALI_STS_ENDPOINT', 'http://oss-cn-shenzhen.aliyuncs.com')
77
ALI_STS_BUCKET_NAME = os.environ.get('DPDISPATCHER_LEBESGUE_ALI_STS_BUCKET_NAME', "dpcloudserver")
88
ALI_OSS_BUCKET_URL = os.environ.get('DPDISPATCHER_LEBESGUE_ALI_OSS_BUCKET_URL', "https://dpcloudserver.oss-cn-shenzhen.aliyuncs.com/")

0 commit comments

Comments
 (0)