Skip to content

Commit bbb6506

Browse files
committed
Add a toil backend.
1 parent 7d77a6c commit bbb6506

File tree

2 files changed

+311
-12
lines changed

2 files changed

+311
-12
lines changed

test/test_integration.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
class IntegrationTest(unittest.TestCase):
1414
"""A baseclass that's inherited for use with different cwl backends."""
15-
1615
def setUp(self):
1716
"""Start a (local) wes-service server to make requests against."""
1817
raise NotImplementedError
@@ -28,41 +27,39 @@ def tearDown(self):
2827
except OSError as e:
2928
print(e)
3029

30+
shutil.rmtree('workflows')
3131
unittest.TestCase.tearDown(self)
3232

3333
def test_dockstore_md5sum(self):
3434
"""Fetch the md5sum cwl from dockstore, run it on the wes-service server, and check for the correct output."""
3535
cwl_dockstore_url = 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fbriandoconnor%2Fdockstore-tool-md5sum/versions/master/plain-CWL/descriptor/%2FDockstore.cwl'
36-
output_filepath, _ = run_md5sum(cwl_input=cwl_dockstore_url)
36+
output_filepath, _ = run_cwl_md5sum(cwl_input=cwl_dockstore_url)
3737

3838
self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath))
39-
shutil.rmtree('workflows')
4039

4140
def test_local_md5sum(self):
4241
"""Pass a local md5sum cwl to the wes-service server, and check for the correct output."""
4342
cwl_local_path = os.path.abspath('testdata/md5sum.cwl')
44-
output_filepath, _ = run_md5sum(cwl_input='file://' + cwl_local_path)
43+
output_filepath, _ = run_cwl_md5sum(cwl_input='file://' + cwl_local_path)
4544

4645
self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath))
47-
shutil.rmtree('workflows')
4846

4947
def test_multipart_upload(self):
5048
"""Pass a local md5sum cwl to the wes-service server, and check for uploaded file in service."""
5149
cwl_local_path = os.path.abspath('testdata/md5sum.cwl')
52-
_, run_id = run_md5sum(cwl_input='file://' + cwl_local_path)
50+
_, run_id = run_cwl_md5sum(cwl_input='file://' + cwl_local_path)
5351

5452
get_response = get_log_request(run_id)["request"]
5553

5654
self.assertTrue(check_for_file(get_response["workflow_url"][7:]), 'Output file was not found: '
5755
+ get_response["workflow_url"][:7])
58-
shutil.rmtree('workflows')
5956

6057

61-
def run_md5sum(cwl_input):
58+
def run_cwl_md5sum(cwl_input):
6259
"""Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created."""
6360
endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows'
6461
params = {'output_file': {'path': '/tmp/md5sum.txt', 'class': 'File'},
65-
'input_file': {'path': '../../testdata/md5sum.input', 'class': 'File'}}
62+
'input_file': {'path': os.path.abspath('testdata/md5sum.input'), 'class': 'File'}}
6663

6764
parts = [("workflow_params", json.dumps(params)), ("workflow_type", "CWL"), ("workflow_type_version", "v1.0")]
6865
if cwl_input.startswith("file://"):
@@ -117,14 +114,12 @@ def setUp(self):
117114

118115
class ToilTest(IntegrationTest):
119116
"""Test using Toil."""
120-
121117
def setUp(self):
122118
"""
123119
Start a (local) wes-service server to make requests against.
124120
Use toil as the wes-service server 'backend'.
125121
"""
126-
self.wes_server_process = subprocess.Popen('python {} '
127-
'--opt runner=cwltoil --opt extra=--logLevel=CRITICAL'
122+
self.wes_server_process = subprocess.Popen('python {} --backend=wes_service.toil_wes --opt="extra=--logLevel=CRITICAL"'
128123
''.format(os.path.abspath('wes_service/wes_service_main.py')),
129124
shell=True)
130125
time.sleep(5)

wes_service/toil_wes.py

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
from __future__ import print_function
2+
import json
3+
import os
4+
import subprocess
5+
import tempfile
6+
import time
7+
import logging
8+
import urllib
9+
import uuid
10+
11+
import connexion
12+
from multiprocessing import Process
13+
from werkzeug.utils import secure_filename
14+
from wes_service.util import WESBackend
15+
16+
logging.basicConfig(level=logging.INFO)
17+
18+
19+
class ToilWorkflow(object):
20+
def __init__(self, workflow_id):
21+
super(ToilWorkflow, self).__init__()
22+
self.workflow_id = workflow_id
23+
24+
self.workdir = os.path.join(os.getcwd(), 'workflows', self.workflow_id)
25+
self.outdir = os.path.join(self.workdir, 'outdir')
26+
if not os.path.exists(self.outdir):
27+
os.makedirs(self.outdir)
28+
29+
self.outfile = os.path.join(self.workdir, 'stdout')
30+
self.errfile = os.path.join(self.workdir, 'stderr')
31+
self.starttime = os.path.join(self.workdir, 'starttime')
32+
self.endtime = os.path.join(self.workdir, 'endtime')
33+
self.pidfile = os.path.join(self.workdir, 'pid')
34+
self.cmdfile = os.path.join(self.workdir, 'cmd')
35+
self.request_json = os.path.join(self.workdir, 'request.json')
36+
self.output_json = os.path.join(self.workdir, "output.json")
37+
self.input_wf_filename = os.path.join(self.workdir, "workflow.cwl")
38+
self.input_json = os.path.join(self.workdir, "input.json")
39+
40+
def write_workflow(self, request, opts, wftype='cwl'):
41+
"""Writes a cwl, wdl, or python file as appropriate from the request dictionary."""
42+
self.input_wf_filename = os.path.join(self.workdir, 'workflow.' + wftype)
43+
44+
if request.get("workflow_descriptor"):
45+
workflow_descriptor = request.get('workflow_descriptor')
46+
with open(self.input_wf_filename, "w") as f:
47+
f.write(workflow_descriptor)
48+
workflow_url = urllib.pathname2url(self.input_wf_filename)
49+
else:
50+
workflow_url = request.get("workflow_url")
51+
52+
extra = opts.getoptlist("extra")
53+
if wftype == 'cwl':
54+
command_args = ['toil-cwl-runner'] + extra + [workflow_url, self.input_json]
55+
elif wftype == 'wdl':
56+
command_args = ['toil-wdl-runner'] + extra + [workflow_url, self.input_json]
57+
elif wftype == 'py':
58+
command_args = ['python'] + extra + [workflow_url]
59+
else:
60+
raise RuntimeError('workflow_type is not "cwl", "wdl", or "py": ' + str(wftype))
61+
62+
return command_args
63+
64+
def write_json(self, request_dict):
65+
input_json = os.path.join(self.workdir, 'input.json')
66+
with open(input_json, 'w') as inputtemp:
67+
json.dump(request_dict['workflow_params'], inputtemp)
68+
return input_json
69+
70+
def call_cmd(self, cmd):
71+
"""
72+
Calls a command with Popen.
73+
Writes stdout, stderr, and the command to separate files.
74+
75+
:param cmd: A string or array of strings.
76+
:return: The pid of the command.
77+
"""
78+
with open(self.cmdfile, 'w') as f:
79+
f.write(str(cmd))
80+
stdout = open(self.outfile, 'w')
81+
stderr = open(self.errfile, 'w')
82+
logging.info('Calling: ' + ' '.join(cmd))
83+
process = subprocess.Popen(cmd,
84+
stdout=stdout,
85+
stderr=stderr,
86+
close_fds=True,
87+
cwd=self.outdir)
88+
stdout.close()
89+
stderr.close()
90+
return process.pid
91+
92+
def cancel(self):
93+
pass
94+
95+
def fetch(self, filename):
96+
if os.path.exists(filename):
97+
with open(filename, "r") as f:
98+
return f.read()
99+
return ''
100+
101+
def getlog(self):
102+
state, exit_code = self.getstate()
103+
104+
with open(self.request_json, "r") as f:
105+
request = json.load(f)
106+
107+
stderr = self.fetch(self.errfile)
108+
starttime = self.fetch(self.starttime)
109+
endtime = self.fetch(self.endtime)
110+
cmd = self.fetch(self.cmdfile)
111+
112+
113+
outputobj = {}
114+
if state == "COMPLETE":
115+
with open(self.output_json, "r") as outputtemp:
116+
outputobj = json.load(outputtemp)
117+
118+
return {
119+
"workflow_id": self.workflow_id,
120+
"request": request,
121+
"state": state,
122+
"workflow_log": {
123+
"cmd": cmd, # array?
124+
"start_time": starttime,
125+
"end_time": endtime,
126+
"stdout": "",
127+
"stderr": stderr,
128+
"exit_code": exit_code
129+
},
130+
"task_logs": [],
131+
"outputs": outputobj
132+
}
133+
134+
def run(self, request, opts):
135+
"""
136+
Constructs a command to run a cwl/json from requests and opts,
137+
runs it, and deposits the outputs in outdir.
138+
139+
Runner:
140+
opts.getopt("runner", default="cwl-runner")
141+
142+
CWL (url):
143+
request["workflow_url"] == a url to a cwl file
144+
or
145+
request["workflow_descriptor"] == input cwl text (written to a file and a url constructed for that file)
146+
147+
JSON File:
148+
request["workflow_params"] == input json text (to be written to a file)
149+
150+
:param dict request: A dictionary containing the cwl/json information.
151+
:param wes_service.util.WESBackend opts: contains the user's arguments;
152+
specifically the runner and runner options
153+
:return: {"workflow_id": self.workflow_id, "state": state}
154+
"""
155+
wftype = request['workflow_type'].lower().strip()
156+
version = request['workflow_type_version']
157+
158+
if version != 'v1.0' and wftype in ('cwl', 'wdl'):
159+
raise RuntimeError('workflow_type "cwl", "wdl" requires '
160+
'"workflow_type_version" to be "v1.0": ' + str(version))
161+
if version != '2.7' and wftype == 'py':
162+
raise RuntimeError('workflow_type "py" requires '
163+
'"workflow_type_version" to be "2.7": ' + str(version))
164+
165+
logging.info('Beginning Toil Workflow ID: ' + str(self.workflow_id))
166+
167+
with open(self.starttime, 'w') as f:
168+
f.write(str(time.time()))
169+
with open(self.request_json, 'w') as f:
170+
json.dump(request, f)
171+
with open(self.input_json, "w") as inputtemp:
172+
json.dump(request["workflow_params"], inputtemp)
173+
174+
command_args = self.write_workflow(request, opts, wftype=wftype)
175+
pid = self.call_cmd(command_args)
176+
177+
with open(self.endtime, 'w') as f:
178+
f.write(str(time.time()))
179+
with open(self.pidfile, 'w') as f:
180+
f.write(str(pid))
181+
182+
return self.getstatus()
183+
184+
def getstate(self):
185+
"""
186+
Returns RUNNING, -1
187+
COMPLETE, 0
188+
or
189+
EXECUTOR_ERROR, 255
190+
"""
191+
state = "RUNNING"
192+
exit_code = -1
193+
194+
exitcode_file = os.path.join(self.workdir, "exit_code")
195+
196+
if os.path.exists(exitcode_file):
197+
with open(exitcode_file) as f:
198+
exit_code = int(f.read())
199+
elif os.path.exists(self.pidfile):
200+
with open(self.pidfile, "r") as pid:
201+
pid = int(pid.read())
202+
try:
203+
(_pid, exit_status) = os.waitpid(pid, os.WNOHANG)
204+
if _pid != 0:
205+
exit_code = exit_status >> 8
206+
with open(exitcode_file, "w") as f:
207+
f.write(str(exit_code))
208+
os.unlink(self.pidfile)
209+
except OSError:
210+
os.unlink(self.pidfile)
211+
exit_code = 255
212+
213+
if exit_code == 0:
214+
state = "COMPLETE"
215+
elif exit_code != -1:
216+
state = "EXECUTOR_ERROR"
217+
218+
return state, exit_code
219+
220+
def getstatus(self):
221+
state, exit_code = self.getstate()
222+
223+
return {
224+
"workflow_id": self.workflow_id,
225+
"state": state
226+
}
227+
228+
229+
class ToilBackend(WESBackend):
230+
processes = {}
231+
232+
def GetServiceInfo(self):
233+
return {
234+
'workflow_type_versions': {
235+
'CWL': {'workflow_type_version': ['v1.0']},
236+
'WDL': {'workflow_type_version': ['v1.0']},
237+
'py': {'workflow_type_version': ['2.7']}
238+
},
239+
'supported_wes_versions': '0.3.0',
240+
'supported_filesystem_protocols': ['file', 'http', 'https'],
241+
'engine_versions': ['3.16.0'],
242+
'system_state_counts': {},
243+
'key_values': {}
244+
}
245+
246+
def ListWorkflows(self):
247+
# FIXME #15 results don't page
248+
wf = []
249+
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
250+
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
251+
wf.append(ToilWorkflow(l))
252+
253+
workflows = [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf] # NOQA
254+
return {
255+
"workflows": workflows,
256+
"next_page_token": ""
257+
}
258+
259+
def RunWorkflow(self):
260+
tempdir = tempfile.mkdtemp()
261+
body = {}
262+
for k, ls in connexion.request.files.iterlists():
263+
for v in ls:
264+
if k == "workflow_descriptor":
265+
filename = secure_filename(v.filename)
266+
v.save(os.path.join(tempdir, filename))
267+
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
268+
body[k] = json.loads(v.read())
269+
else:
270+
body[k] = v.read()
271+
272+
if body['workflow_type'] != "CWL" or \
273+
body['workflow_type_version'] != "v1.0":
274+
return
275+
276+
body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"])
277+
index = body["workflow_url"].find("http")
278+
if index > 0:
279+
body["workflow_url"] = body["workflow_url"][index:]
280+
281+
workflow_id = uuid.uuid4().hex
282+
job = ToilWorkflow(workflow_id)
283+
p = Process(target=job.run, args=(body, self))
284+
p.start()
285+
self.processes[workflow_id] = p
286+
return {"workflow_id": workflow_id}
287+
288+
def GetWorkflowLog(self, workflow_id):
289+
job = ToilWorkflow(workflow_id)
290+
return job.getlog()
291+
292+
def CancelJob(self, workflow_id):
293+
# should this block with `p.is_alive()`?
294+
if workflow_id in self.processes:
295+
self.processes[workflow_id].terminate()
296+
return {'workflow_id': workflow_id}
297+
298+
def GetWorkflowStatus(self, workflow_id):
299+
job = ToilWorkflow(workflow_id)
300+
return job.getstatus()
301+
302+
303+
def create_backend(app, opts):
304+
return ToilBackend(opts)

0 commit comments

Comments
 (0)