Skip to content

Commit 3cc066d

Browse files
author
Peter Amstutz
committed
Create a placeholder container request and run arvados-cwl-runner in a
separate thread.
1 parent 57ccf82 commit 3cc066d

File tree

1 file changed

+44
-9
lines changed

1 file changed

+44
-9
lines changed

wes_service/arvados_wes.py

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import subprocess
99
import tempfile
1010
import functools
11+
import threading
12+
import logging
13+
1114
from wes_service.util import visit, WESBackend
1215

1316

@@ -36,6 +39,7 @@ def catch_exceptions_wrapper(self, *args, **kwargs):
3639
try:
3740
return orig_func(self, *args, **kwargs)
3841
except arvados.errors.ApiError as e:
42+
logging.exception("Failure")
3943
return {"msg": e._get_reason(), "status_code": e.resp.status}, int(e.resp.status)
4044
except subprocess.CalledProcessError as e:
4145
return {"msg": str(e), "status_code": 500}, 500
@@ -77,6 +81,22 @@ def ListWorkflows(self):
7781
"next_page_token": ""
7882
}
7983

84+
def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params, env):
85+
try:
86+
with tempfile.NamedTemporaryFile() as inputtemp:
87+
json.dump(workflow_params, inputtemp)
88+
inputtemp.flush()
89+
workflow_id = subprocess.check_output(["arvados-cwl-runner", "--submit-request-uuid="+cr_uuid, # NOQA
90+
"--submit", "--no-wait", "--api=containers", # NOQA
91+
workflow_url, inputtemp.name], env=env).strip() # NOQA
92+
except subprocess.CalledProcessError as e:
93+
api = arvados.api_from_config(version="v1", apiconfig={
94+
"ARVADOS_API_HOST": env["ARVADOS_API_HOST"],
95+
"ARVADOS_API_TOKEN": env['ARVADOS_API_TOKEN'],
96+
"ARVADOS_API_HOST_INSECURE": env["ARVADOS_API_HOST_INSECURE"] # NOQA
97+
})
98+
request = api.container_requests().update(uuid=cr_uuid, body={"priority": 0}).execute() # NOQA
99+
80100
@catch_exceptions
81101
def RunWorkflow(self, body):
82102
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0": # NOQA
@@ -88,19 +108,29 @@ def RunWorkflow(self, body):
88108
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
89109
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false") # NOQA
90110
}
91-
with tempfile.NamedTemporaryFile() as inputtemp:
92-
json.dump(body["workflow_params"], inputtemp)
93-
inputtemp.flush()
94-
workflow_id = subprocess.check_output(["arvados-cwl-runner", "--submit", "--no-wait", "--api=containers", # NOQA
95-
body.get("workflow_url"), inputtemp.name], env=env).strip() # NOQA
96-
return {"workflow_id": workflow_id}
111+
112+
api = get_api()
113+
114+
cr = api.container_requests().create(body={"container_request":
115+
{"command": [""],
116+
"container_image": "n/a",
117+
"state": "Uncommitted",
118+
"output_path": "n/a",
119+
"priority": 500}}).execute()
120+
121+
threading.Thread(target=self.invoke_cwl_runner, args=(cr["uuid"], body.get("workflow_url"), body["workflow_params"], env)).start()
122+
123+
return {"workflow_id": cr["uuid"]}
97124

98125
@catch_exceptions
99126
def GetWorkflowLog(self, workflow_id):
100127
api = get_api()
101128

102129
request = api.container_requests().get(uuid=workflow_id).execute()
103-
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
130+
if request["container_uuid"]:
131+
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
132+
else:
133+
container = {"state": "Queued", "exit_code": None}
104134

105135
outputobj = {}
106136
if request["output_uuid"]:
@@ -142,14 +172,19 @@ def keepref(d):
142172
@catch_exceptions
143173
def CancelJob(self, workflow_id): # NOQA
144174
api = get_api()
145-
request = api.container_requests().update(body={"priority": 0}).execute() # NOQA
175+
request = api.container_requests().update(uuid=workflow_id, body={"priority": 0}).execute() # NOQA
146176
return {"workflow_id": request["uuid"]}
147177

148178
@catch_exceptions
149179
def GetWorkflowStatus(self, workflow_id):
150180
api = get_api()
151181
request = api.container_requests().get(uuid=workflow_id).execute()
152-
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
182+
if request["container_uuid"]:
183+
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
184+
elif request["priority"] == 0:
185+
container = {"state": "Cancelled"}
186+
else:
187+
container = {"state": "Queued"}
153188
return {"workflow_id": request["uuid"],
154189
"state": statemap[container["state"]]}
155190

0 commit comments

Comments
 (0)