Skip to content

Commit b233a4a

Browse files
committed
job submit interface
1 parent 322e6d8 commit b233a4a

File tree

7 files changed

+497
-88
lines changed

7 files changed

+497
-88
lines changed

src/bohrium/_client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def __init__(
5454

5555
if base_url is None:
5656
base_url = "https://openapi.dp.tech"
57-
57+
5858
super().__init__(
5959
_version=__version__,
6060
base_url=base_url,
@@ -65,6 +65,7 @@ def __init__(
6565
)
6666

6767
self.job = resources.Job(self)
68+
6869

6970
@property
7071
@override
@@ -119,3 +120,5 @@ def _make_status_error(
119120

120121
class AsyncBohrium(AsyncAPIClient):
121122
pass
123+
124+

src/bohrium/resources/job/job.py

Lines changed: 104 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,144 @@
11
import logging
22
from ..._resource import AsyncAPIResource, SyncAPIResource
3+
from ..tiefblue.tiefblue import Tiefblue
4+
from ...types.job.job import JobAddRequest
35
# from ..._resource import BaseClient
46
from pprint import pprint
5-
6-
log: logging.Logger = logging.getLogger(__name__)
7+
from typing import Optional
8+
import os
9+
from pathlib import Path
10+
import uuid
11+
# log: logging.Logger = logging.getLogger(__name__)
712

813

914
class Job(SyncAPIResource):
1015

11-
def submit(self, project_id, name):
12-
log.info(f"submit job {name},project_id:{project_id}")
13-
16+
def submit(
17+
self,
18+
project_id: int,
19+
job_name: str,
20+
machine_type: str,
21+
cmd: str,
22+
image_address: str,
23+
job_group_id: int = 0,
24+
work_dir: str = '',
25+
result: str = '',
26+
dataset_path: list = [],
27+
log_files: list = [],
28+
out_files: list = [],
29+
):
30+
# log.info(f"submit job {name},project_id:{project_id}")
31+
data = self.create_job(project_id, job_name, job_group_id)
32+
print(data)
33+
if work_dir != '':
34+
if not os.path.exists(work_dir):
35+
raise FileNotFoundError
36+
if os.path.isdir(work_dir):
37+
self.uploadr(work_dir, data["storePath"], data["token"])
38+
else:
39+
file_name = os.path.basename(work_dir)
40+
object_key = os.path.join(data["storePath"], file_name)
41+
self.upload(work_dir, object_key, data["token"])
42+
43+
ep = os.path.expanduser(result)
44+
p = Path(ep).absolute().resolve()
45+
p = p.joinpath(str(uuid.uuid4()) + "_temp.zip")
46+
47+
job_add_request = JobAddRequest(
48+
download_path=str(p.absolute().resolve()),
49+
dataset_path=dataset_path,
50+
job_name=job_name,
51+
project_id=project_id,
52+
job_id=data["jobId"],
53+
oss_path=data["storePath"],
54+
image_name=image_address,
55+
scass_type=machine_type,
56+
cmd=cmd,
57+
log_files=log_files,
58+
out_files=out_files
59+
)
60+
return self.insert(job_add_request.to_dict())
61+
62+
def insert(self, data):
63+
# log.info(f"insert job {data}")
64+
response = self._client.post(f"/openapi/v2/job/add", json=data)
65+
pprint(response.request)
66+
print(response.json())
67+
1468
def delete(self, job_id):
15-
log.info(f"delete job {job_id}")
69+
# log.info(f"delete job {job_id}")
1670
response = self._client.post(f"/openapi/v1/job/del/{job_id}")
17-
1871
pprint(response.request)
19-
2072
print(response.json())
2173

2274
def terminate(self, job_id):
23-
log.info(f"terminate job {job_id}")
75+
# log.info(f"terminate job {job_id}")
2476
response = self._client.post(f"/openapi/v1/job/terminate/{job_id}")
25-
2677
pprint(response.request)
27-
2878
print(response.json())
2979

3080
def kill(self, job_id):
31-
log.info(f"kill job {job_id}")
81+
# log.info(f"kill job {job_id}")
3282
response = self._client.post(f"/openapi/v1/job/kill/{job_id}")
33-
3483
pprint(response.request)
35-
3684
print(response.json())
3785

3886
def log(self, job_id, log_file="STDOUTERR", page=-1, page_size=8192):
39-
log.info(f"log job {job_id}")
87+
# log.info(f"log job {job_id}")
4088
response = self._client.get(f"/openapi/v1/job/{job_id}/log", params={"logFile": log_file, "page": page, "pageSize": page_size})
41-
4289
pprint(response.request)
43-
4490
print(response.json())
4591

4692
def detail(self, job_id):
47-
log.info(f"detail job {job_id}")
93+
# log.info(f"detail job {job_id}")
4894
response = self._client.get(f"/openapi/v1/job/{job_id}")
49-
5095
pprint(response.request)
51-
5296
print(response.json())
5397

98+
def create_job(
99+
self,
100+
project_id: int,
101+
name: Optional[str] = None,
102+
group_id: Optional[int] = 0,
103+
):
104+
# log.info(f"create job {name}")
105+
data = {
106+
"projectId": project_id,
107+
"name": name,
108+
"bohrGroupId": group_id,
109+
}
110+
response = self._client.post(f"/openapi/v1/job/create", json=data)
111+
pprint(response.request)
112+
print(response.json())
113+
return response.json().get("data")
114+
54115
def create_job_group(self, project_id, job_group_name):
55-
log.info(f"create job group {job_group_name}")
116+
# log.info(f"create job group {job_group_name}")
56117
response = self._client.post(f"/openapi/v1/job_group/add", json={"name": job_group_name, "projectId": project_id})
57-
58118
pprint(response.request)
59-
60119
print(response.json())
120+
121+
def upload(
122+
self,
123+
file_path: str,
124+
object_key: str,
125+
token: str,
126+
):
127+
tiefblue = Tiefblue()
128+
tiefblue.upload_From_file_multi_part(
129+
object_key=object_key,
130+
file_path=file_path,
131+
progress_bar=True)
132+
133+
def uploadr(self, work_dir, store_path, token):
134+
if not work_dir.endswith('/'):
135+
work_dir = work_dir + '/'
136+
for root, _, files in os.walk(work_dir):
137+
for file in files:
138+
full_path = os.path.join(root, file)
139+
object_key = full_path.replace(work_dir, store_path)
140+
self.upload(full_path, object_key, token)
141+
61142

62143
class AsyncJob(AsyncAPIResource):
63144
pass

src/bohrium/resources/tiefblue/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)