Skip to content

Commit 6e960bf

Browse files
authored
Merge pull request #44 from common-workflow-language/wes-update
Update to latest WES (wip)
2 parents 7d77a6c + dd5d6c8 commit 6e960bf

File tree

7 files changed

+182
-169
lines changed

7 files changed

+182
-169
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
long_description = readmeFile.read()
1212

1313
setup(name='wes-service',
14-
version='2.4',
14+
version='2.5',
1515
description='GA4GH Workflow Execution Service reference implementation',
1616
long_description=long_description,
1717
author='GA4GH Containers and Workflows task team',

test/test_integration.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def test_multipart_upload(self):
6060

6161
def run_md5sum(cwl_input):
6262
"""Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created."""
63-
endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows'
63+
endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs'
6464
params = {'output_file': {'path': '/tmp/md5sum.txt', 'class': 'File'},
6565
'input_file': {'path': '../../testdata/md5sum.input', 'class': 'File'}}
6666

@@ -71,12 +71,12 @@ def run_md5sum(cwl_input):
7171
else:
7272
parts.append(("workflow_url", cwl_input))
7373
response = requests.post(endpoint, files=parts).json()
74-
output_dir = os.path.abspath(os.path.join('workflows', response['workflow_id'], 'outdir'))
75-
return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id']
74+
output_dir = os.path.abspath(os.path.join('workflows', response['run_id'], 'outdir'))
75+
return os.path.join(output_dir, 'md5sum.txt'), response['run_id']
7676

7777

7878
def get_log_request(run_id):
79-
endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows/{}'.format(run_id)
79+
endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs/{}'.format(run_id)
8080
return requests.get(endpoint).json()
8181

8282

wes_client/wes_client_main.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,17 @@ def main(argv=sys.argv[1:]):
6262
http_client=http_client, config={"use_models": False})
6363

6464
if args.list:
65-
response = client.WorkflowExecutionService.ListWorkflows(page_token=args.page, page_size=args.page_size)
65+
response = client.WorkflowExecutionService.ListRuns(page_token=args.page, page_size=args.page_size)
6666
json.dump(response.result(), sys.stdout, indent=4)
6767
return 0
6868

6969
if args.log:
70-
response = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.log)
70+
response = client.WorkflowExecutionService.GetRunLog(workflow_id=args.log)
7171
sys.stdout.write(response.result()["workflow_log"]["stderr"])
7272
return 0
7373

7474
if args.get:
75-
response = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.get)
75+
response = client.WorkflowExecutionService.GetRunLog(workflow_id=args.get)
7676
json.dump(response.result(), sys.stdout, indent=4)
7777
return 0
7878

@@ -81,6 +81,10 @@ def main(argv=sys.argv[1:]):
8181
json.dump(response.result(), sys.stdout, indent=4)
8282
return 0
8383

84+
if not args.job_order:
85+
logging.error("Missing job order")
86+
return 1
87+
8488
loader = schema_salad.ref_resolver.Loader({
8589
"location": {"@type": "@id"},
8690
"path": {"@type": "@id"}
@@ -102,7 +106,7 @@ def fixpaths(d):
102106
visit(input_dict, fixpaths)
103107

104108
workflow_url = args.workflow_url
105-
if not workflow_url.startswith("/") and ":" not in workflow_url:
109+
if ":" not in workflow_url:
106110
workflow_url = "file://" + os.path.abspath(workflow_url)
107111

108112
if args.quiet:
@@ -131,7 +135,7 @@ def fixpaths(d):
131135
else:
132136
parts.append(("workflow_url", workflow_url))
133137

134-
postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/workflows" % (args.proto, args.host),
138+
postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/runs" % (args.proto, args.host),
135139
files=parts,
136140
headers={"Authorization": args.auth})
137141

@@ -142,19 +146,19 @@ def fixpaths(d):
142146
exit(1)
143147

144148
if args.wait:
145-
logging.info("Workflow id is %s", r["workflow_id"])
149+
logging.info("Workflow run id is %s", r["run_id"])
146150
else:
147-
sys.stdout.write(r["workflow_id"] + "\n")
151+
sys.stdout.write(r["run_id"] + "\n")
148152
exit(0)
149153

150-
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
154+
r = client.WorkflowExecutionService.GetRunStatus(run_id=r["run_id"]).result()
151155
while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"):
152156
time.sleep(8)
153-
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
157+
r = client.WorkflowExecutionService.GetRunStatus(run_id=r["run_id"]).result()
154158

155159
logging.info("State is %s", r["state"])
156160

157-
s = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=r["workflow_id"]).result()
161+
s = client.WorkflowExecutionService.GetRunLog(run_id=r["run_id"]).result()
158162

159163
try:
160164
# TODO: Only works with Arvados atm

wes_service/arvados_wes.py

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import shutil
1414

1515
from wes_service.util import visit, WESBackend
16-
from werkzeug.utils import secure_filename
1716

1817

1918
class MissingAuthorization(Exception):
@@ -81,7 +80,7 @@ def GetServiceInfo(self):
8180
}
8281

8382
@catch_exceptions
84-
def ListWorkflows(self, page_size=None, page_token=None, tag_search=None, state_search=None):
83+
def ListRuns(self, page_size=None, page_token=None, state_search=None):
8584
api = get_api()
8685

8786
paging = []
@@ -100,13 +99,13 @@ def ListWorkflows(self, page_size=None, page_token=None, tag_search=None, state_
10099

101100
uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers}
102101

103-
workflow_list = [{"workflow_id": cr["uuid"],
102+
workflow_list = [{"run_id": cr["uuid"],
104103
"state": uuidmap.get(cr["container_uuid"])}
105104
for cr in requests
106105
if cr["command"] and cr["command"][0] == "arvados-cwl-runner"]
107106
return {
108107
"workflows": workflow_list,
109-
"next_page_token": workflow_list[-1]["workflow_id"] if workflow_list else ""
108+
"next_page_token": workflow_list[-1]["run_id"] if workflow_list else ""
110109
}
111110

112111
def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
@@ -156,23 +155,10 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
156155
workflow_descriptor_file.close()
157156

158157
@catch_exceptions
159-
def RunWorkflow(self, workflow_params, workflow_type, workflow_type_version,
160-
workflow_url, workflow_descriptor, workflow_engine_parameters=None, tags=None):
161-
tempdir = tempfile.mkdtemp()
162-
body = {}
163-
for k, ls in connexion.request.files.iterlists():
164-
for v in ls:
165-
if k == "workflow_descriptor":
166-
filename = secure_filename(v.filename)
167-
v.save(os.path.join(tempdir, filename))
168-
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
169-
body[k] = json.loads(v.read())
170-
else:
171-
body[k] = v.read()
172-
body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"])
173-
174-
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0": # NOQA
175-
return
158+
def RunWorkflow(self, **args):
159+
tempdir, body = self.collect_attachments()
160+
161+
print(body)
176162

177163
if not connexion.request.headers.get('Authorization'):
178164
raise MissingAuthorization()
@@ -215,13 +201,13 @@ def RunWorkflow(self, workflow_params, workflow_type, workflow_type_version,
215201
project_uuid,
216202
tempdir)).start()
217203

218-
return {"workflow_id": cr["uuid"]}
204+
return {"run_id": cr["uuid"]}
219205

220206
@catch_exceptions
221-
def GetWorkflowLog(self, workflow_id):
207+
def GetRunLog(self, run_id):
222208
api = get_api()
223209

224-
request = api.container_requests().get(uuid=workflow_id).execute()
210+
request = api.container_requests().get(uuid=run_id).execute()
225211
if request["container_uuid"]:
226212
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
227213
task_reqs = arvados.util.list_all(api.container_requests().list, filters=[["requesting_container_uuid", "=", container["uuid"]]])
@@ -273,7 +259,7 @@ def log_object(cr):
273259
return r
274260

275261
r = {
276-
"workflow_id": request["uuid"],
262+
"run_id": request["uuid"],
277263
"request": {
278264
"workflow_url": "",
279265
"workflow_params": request["mounts"].get("/var/lib/cwl/cwl.input.json", {}).get("content", {})
@@ -287,30 +273,30 @@ def log_object(cr):
287273
return r
288274

289275
@catch_exceptions
290-
def CancelJob(self, workflow_id): # NOQA
276+
def CancelRun(self, run_id): # NOQA
291277
api = get_api()
292-
request = api.container_requests().update(uuid=workflow_id, body={"priority": 0}).execute() # NOQA
293-
return {"workflow_id": request["uuid"]}
278+
request = api.container_requests().update(uuid=run_id, body={"priority": 0}).execute() # NOQA
279+
return {"run_id": request["uuid"]}
294280

295281
@catch_exceptions
296-
def GetWorkflowStatus(self, workflow_id):
282+
def GetRunStatus(self, run_id):
297283
api = get_api()
298-
request = api.container_requests().get(uuid=workflow_id).execute()
284+
request = api.container_requests().get(uuid=run_id).execute()
299285
if request["container_uuid"]:
300286
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
301287
elif request["priority"] == 0:
302288
container = {"state": "Cancelled"}
303289
else:
304290
container = {"state": "Queued"}
305-
return {"workflow_id": request["uuid"],
291+
return {"run_id": request["uuid"],
306292
"state": statemap[container["state"]]}
307293

308294

309-
def dynamic_logs(workflow_id, logstream):
295+
def dynamic_logs(run_id, logstream):
310296
api = get_api()
311-
cr = api.container_requests().get(uuid=workflow_id).execute()
297+
cr = api.container_requests().get(uuid=run_id).execute()
312298
l1 = [t["properties"]["text"]
313-
for t in api.logs().list(filters=[["object_uuid", "=", workflow_id],
299+
for t in api.logs().list(filters=[["object_uuid", "=", run_id],
314300
["event_type", "=", logstream]],
315301
order="created_at desc",
316302
limit=100).execute()["items"]]
@@ -327,5 +313,5 @@ def dynamic_logs(workflow_id, logstream):
327313

328314
def create_backend(app, opts):
329315
ab = ArvadosBackend(opts)
330-
app.app.route('/ga4gh/wes/v1/workflows/<workflow_id>/x-dynamic-logs/<logstream>')(dynamic_logs)
316+
app.app.route('/ga4gh/wes/v1/runs/<run_id>/x-dynamic-logs/<logstream>')(dynamic_logs)
331317
return ab

wes_service/cwl_runner.py

Lines changed: 21 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,17 @@
22
import json
33
import os
44
import subprocess
5-
import tempfile
65
import urllib
76
import uuid
87

9-
import connexion
10-
from werkzeug.utils import secure_filename
11-
128
from wes_service.util import WESBackend
139

1410

1511
class Workflow(object):
16-
def __init__(self, workflow_id):
12+
def __init__(self, run_id):
1713
super(Workflow, self).__init__()
18-
self.workflow_id = workflow_id
19-
self.workdir = os.path.join(os.getcwd(), "workflows", self.workflow_id)
14+
self.run_id = run_id
15+
self.workdir = os.path.join(os.getcwd(), "workflows", self.run_id)
2016

2117
def run(self, request, opts):
2218
"""
@@ -37,7 +33,7 @@ def run(self, request, opts):
3733
:param dict request: A dictionary containing the cwl/json information.
3834
:param wes_service.util.WESBackend opts: contains the user's arguments;
3935
specifically the runner and runner options
40-
:return: {"workflow_id": self.workflow_id, "state": state}
36+
:return: {"run_id": self.run_id, "state": state}
4137
"""
4238
os.makedirs(self.workdir)
4339
outdir = os.path.join(self.workdir, "outdir")
@@ -117,7 +113,7 @@ def getstatus(self):
117113
state, exit_code = self.getstate()
118114

119115
return {
120-
"workflow_id": self.workflow_id,
116+
"run_id": self.run_id,
121117
"state": state
122118
}
123119

@@ -137,7 +133,7 @@ def getlog(self):
137133
outputobj = json.load(outputtemp)
138134

139135
return {
140-
"workflow_id": self.workflow_id,
136+
"run_id": self.run_id,
141137
"request": request,
142138
"state": state,
143139
"workflow_log": {
@@ -169,58 +165,39 @@ def GetServiceInfo(self):
169165
"key_values": {}
170166
}
171167

172-
def ListWorkflows(self):
168+
def ListRuns(self, page_size=None, page_token=None, state_search=None):
173169
# FIXME #15 results don't page
174170
wf = []
175171
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
176172
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
177173
wf.append(Workflow(l))
178174

179-
workflows = [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf] # NOQA
175+
workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA
180176
return {
181177
"workflows": workflows,
182178
"next_page_token": ""
183179
}
184180

185-
def RunWorkflow(self):
186-
tempdir = tempfile.mkdtemp()
187-
body = {}
188-
for k, ls in connexion.request.files.iterlists():
189-
for v in ls:
190-
if k == "workflow_descriptor":
191-
filename = secure_filename(v.filename)
192-
v.save(os.path.join(tempdir, filename))
193-
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
194-
body[k] = json.loads(v.read())
195-
else:
196-
body[k] = v.read()
197-
198-
if body['workflow_type'] != "CWL" or \
199-
body['workflow_type_version'] != "v1.0":
200-
return
201-
202-
body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"])
203-
index = body["workflow_url"].find("http")
204-
if index > 0:
205-
body["workflow_url"] = body["workflow_url"][index:]
206-
207-
workflow_id = uuid.uuid4().hex
208-
job = Workflow(workflow_id)
181+
def RunWorkflow(self, **args):
182+
tempdir, body = self.collect_attachments()
183+
184+
run_id = uuid.uuid4().hex
185+
job = Workflow(run_id)
209186

210187
job.run(body, self)
211-
return {"workflow_id": workflow_id}
188+
return {"run_id": run_id}
212189

213-
def GetWorkflowLog(self, workflow_id):
214-
job = Workflow(workflow_id)
190+
def GetRunLog(self, run_id):
191+
job = Workflow(run_id)
215192
return job.getlog()
216193

217-
def CancelJob(self, workflow_id):
218-
job = Workflow(workflow_id)
194+
def CancelRun(self, run_id):
195+
job = Workflow(run_id)
219196
job.cancel()
220-
return {"workflow_id": workflow_id}
197+
return {"run_id": run_id}
221198

222-
def GetWorkflowStatus(self, workflow_id):
223-
job = Workflow(workflow_id)
199+
def GetRunStatus(self, run_id):
200+
job = Workflow(run_id)
224201
return job.getstatus()
225202

226203

0 commit comments

Comments
 (0)