Skip to content

Commit ac92077

Browse files
committed
Symlink attachments. Add util to compose post.
1 parent 2982869 commit ac92077

File tree

7 files changed

+117
-63
lines changed

7 files changed

+117
-63
lines changed

test/test_integration.py

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,21 @@
1010
import shutil
1111
import logging
1212

13+
from wes_client.util import build_wes_request
14+
1315
logging.basicConfig(level=logging.INFO)
1416

1517

1618
class IntegrationTest(unittest.TestCase):
1719
"""A baseclass that's inherited for use with different cwl backends."""
20+
@classmethod
21+
def setUpClass(cls):
22+
23+
cls.cwl_dockstore_url = 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fbriandoconnor%2Fdockstore-tool-md5sum/versions/master/plain-CWL/descriptor/%2FDockstore.cwl'
24+
cls.cwl_local_path = os.path.abspath('testdata/md5sum.cwl')
25+
cls.json_input = "file://" + os.path.abspath('testdata/md5sum.json')
26+
cls.attachments = ['file://' + os.path.abspath('testdata/md5sum.input'),
27+
'file://' + os.path.abspath('testdata/dockstore-tool-md5sum.cwl')]
1828

1929
def setUp(self):
2030
"""Start a (local) wes-service server to make requests against."""
@@ -35,72 +45,54 @@ def tearDown(self):
3545
unittest.TestCase.tearDown(self)
3646

3747
def test_dockstore_md5sum(self):
38-
"""Fetch the md5sum cwl from dockstore, run it on the wes-service server, and check for the correct output."""
39-
cwl_dockstore_url = 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fbriandoconnor%2Fdockstore-tool-md5sum/versions/master/plain-CWL/descriptor/%2FDockstore.cwl'
40-
output_filepath, _ = run_cwl_md5sum(cwl_input=cwl_dockstore_url)
41-
42-
self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath))
48+
"""HTTP md5sum cwl (dockstore), run it on the wes-service server, and check for the correct output."""
49+
outfile_path, _ = run_cwl_md5sum(cwl_input=self.cwl_dockstore_url,
50+
json_input=self.json_input,
51+
workflow_attachment=self.attachments)
52+
self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + str(outfile_path))
4353

4454
def test_local_md5sum(self):
45-
"""Pass a local md5sum cwl to the wes-service server, and check for the correct output."""
46-
cwl_local_path = os.path.abspath('testdata/md5sum.cwl')
47-
workflow_attachment_path = os.path.abspath('testdata/dockstore-tool-md5sum.cwl')
48-
output_filepath, _ = run_cwl_md5sum(cwl_input='file://' + cwl_local_path,
49-
workflow_attachment='file://' + workflow_attachment_path)
50-
51-
self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath))
55+
"""LOCAL md5sum cwl to the wes-service server, and check for the correct output."""
56+
outfile_path, run_id = run_cwl_md5sum(cwl_input=self.cwl_local_path,
57+
json_input=self.json_input,
58+
workflow_attachment=self.attachments)
59+
self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + str(outfile_path))
5260

5361
def test_multipart_upload(self):
54-
"""Pass a local md5sum cwl to the wes-service server, and check for uploaded file in service."""
55-
cwl_local_path = os.path.abspath('testdata/md5sum.cwl')
56-
workflow_attachment_path = os.path.abspath('testdata/dockstore-tool-md5sum.cwl')
57-
out_file_path, run_id = run_cwl_md5sum(cwl_input='file://' + cwl_local_path,
58-
workflow_attachment='file://' + workflow_attachment_path)
59-
62+
"""LOCAL md5sum cwl to the wes-service server, and check for uploaded file in service."""
63+
outfile_path, run_id = run_cwl_md5sum(cwl_input=self.cwl_local_path,
64+
json_input=self.json_input,
65+
workflow_attachment=self.attachments)
6066
get_response = get_log_request(run_id)["request"]
61-
62-
self.assertTrue(check_for_file(out_file_path), 'Output file was not found: '
63-
+ get_response["workflow_attachment"])
64-
self.assertTrue(check_for_file(get_response["workflow_url"][7:]), 'Output file was not found: '
65-
+ get_response["workflow_url"][:7])
67+
self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + get_response["workflow_attachment"])
68+
self.assertTrue(check_for_file(get_response["workflow_url"][7:]), 'Output file was not found: ' + get_response["workflow_url"][:7])
6669

6770
def test_run_attachments(self):
68-
"""Pass a local md5sum cwl to the wes-service server, check for attachments."""
69-
cwl_local_path = os.path.abspath('testdata/md5sum.cwl')
70-
workflow_attachment_path = os.path.abspath('testdata/dockstore-tool-md5sum.cwl')
71-
out_file_path, run_id = run_cwl_md5sum(cwl_input='file://' + cwl_local_path,
72-
workflow_attachment='file://' + workflow_attachment_path)
73-
71+
"""LOCAL md5sum cwl to the wes-service server, check for attachments."""
72+
outfile_path, run_id = run_cwl_md5sum(cwl_input=self.cwl_local_path,
73+
json_input=self.json_input,
74+
workflow_attachment=self.attachments)
7475
get_response = get_log_request(run_id)["request"]
7576
attachment_tool_path = get_response["workflow_attachment"][7:] + "/dockstore-tool-md5sum.cwl"
76-
self.assertTrue(check_for_file(out_file_path), 'Output file was not found: '
77-
+ get_response["workflow_attachment"])
78-
self.assertTrue(check_for_file(attachment_tool_path), 'Attachment file was not found: '
79-
+ get_response["workflow_attachment"])
77+
self.assertTrue(check_for_file(outfile_path), 'Output file was not found: ' + get_response["workflow_attachment"])
78+
self.assertTrue(check_for_file(attachment_tool_path), 'Attachment file was not found: ' + get_response["workflow_attachment"])
8079

8180

82-
def run_cwl_md5sum(cwl_input, workflow_attachment=None):
81+
def run_cwl_md5sum(cwl_input, json_input, workflow_attachment=None):
8382
"""Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created."""
8483
endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs'
85-
params = {'output_file': {'path': '/tmp/md5sum.txt', 'class': 'File'},
86-
'input_file': {'path': os.path.abspath('testdata/md5sum.input'), 'class': 'File'}}
87-
88-
parts = [("workflow_params", json.dumps(params)), ("workflow_type", "CWL"), ("workflow_type_version", "v1.0")]
89-
if cwl_input.startswith("file://"):
90-
parts.append(("workflow_attachment", ("md5sum.cwl", open(cwl_input[7:], "rb"))))
91-
parts.append(("workflow_url", os.path.basename(cwl_input[7:])))
92-
if workflow_attachment:
93-
parts.append(("workflow_attachment", ("dockstore-tool-md5sum.cwl", open(workflow_attachment[7:], "rb"))))
94-
else:
95-
parts.append(("workflow_url", cwl_input))
84+
parts = build_wes_request(cwl_input,
85+
json_input,
86+
attachments=workflow_attachment)
9687
response = requests.post(endpoint, files=parts).json()
88+
9789
output_dir = os.path.abspath(os.path.join('workflows', response['run_id'], 'outdir'))
9890
return os.path.join(output_dir, 'md5sum.txt'), response['run_id']
9991

10092

10193
def run_wdl_md5sum(wdl_input):
10294
"""Pass a local md5sum wdl to the wes-service server, and return the path of the output file that was created."""
103-
endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows'
95+
endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs'
10496
params = '{"ga4ghMd5.inputFile": "' + os.path.abspath('testdata/md5sum.input') + '"}'
10597
parts = [("workflow_params", params),
10698
("workflow_type", "WDL"),
@@ -136,8 +128,6 @@ def check_for_file(filepath, seconds=40):
136128
while not os.path.exists(filepath):
137129
time.sleep(1)
138130
wait_counter += 1
139-
if os.path.exists(filepath):
140-
return True
141131
if wait_counter > seconds:
142132
return False
143133
return True

testdata/md5sum.cwl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ steps:
1515
in:
1616
input_file: input_file
1717
out: [output_file]
18+

testdata/md5sum.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"output_file": {"path": "/tmp/md5sum.txt", "class": "File"},
2+
"input_file": {"path": "md5sum.input", "class": "File"}}

wes_client/util.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import os
2+
import json
3+
4+
5+
def wf_type(workflow_file):
6+
if workflow_file.lower().endswith('wdl'):
7+
return 'WDL'
8+
elif workflow_file.lower().endswith('cwl'):
9+
return 'CWL'
10+
elif workflow_file.lower().endswith('py'):
11+
return 'PY'
12+
else:
13+
raise ValueError('Unrecognized/unsupported workflow file extension: %s' % workflow_file.lower().split('.')[-1])
14+
15+
16+
def wf_version(workflow_file):
17+
# TODO: Check inside of the file, handling local/http/etc.
18+
if wf_type(workflow_file) == 'PY':
19+
return '2.7'
20+
# elif wf_type(workflow_file) == 'CWL':
21+
# # only works locally
22+
# return yaml.load(open(workflow_file))['cwlVersion']
23+
else:
24+
# TODO: actually check the wdl file
25+
return "v1.0"
26+
27+
28+
def build_wes_request(workflow_file, json_path, attachments=None):
29+
"""
30+
:param str workflow_file: Path to cwl/wdl file. Can be http/https/file.
31+
:param json_path: Path to accompanying json file. Currently must be local.
32+
:param attachments: Any other files needing to be uploaded to the server.
33+
34+
:return: A list of tuples formatted to be sent in a post to the wes-server (Swagger API).
35+
"""
36+
workflow_file = "file://" + workflow_file if "://" not in workflow_file else workflow_file
37+
json_path = json_path[7:] if json_path.startswith("file://") else json_path
38+
39+
parts = [("workflow_params", json.dumps(json.load(open(json_path)))),
40+
("workflow_type", wf_type(workflow_file)),
41+
("workflow_type_version", wf_version(workflow_file))]
42+
43+
if workflow_file.startswith("file://") or '://' not in workflow_file:
44+
parts.append(("workflow_attachment", (os.path.basename(workflow_file[7:]), open(workflow_file[7:], "rb"))))
45+
parts.append(("workflow_url", os.path.basename(workflow_file[7:])))
46+
else:
47+
parts.append(("workflow_url", workflow_file))
48+
49+
if attachments:
50+
for attachment in attachments:
51+
attachment = attachment[7:] if attachment.startswith("file://") else attachment
52+
parts.append(("workflow_attachment", (os.path.basename(attachment), open(attachment, "rb"))))
53+
54+
return parts

wes_service/cwl_runner.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,19 @@
88

99

1010
class Workflow(object):
11-
def __init__(self, run_id):
11+
def __init__(self, run_id, tempdir=None):
1212
super(Workflow, self).__init__()
1313
self.run_id = run_id
1414
self.workdir = os.path.join(os.getcwd(), "workflows", self.run_id)
15+
self.outdir = os.path.join(self.workdir, 'outdir')
16+
if not os.path.exists(self.outdir):
17+
os.makedirs(self.outdir)
18+
19+
if tempdir:
20+
# tempdir is the folder where attachments were downloaded, if there were any
21+
# symlink everything inside into self.workdir
22+
for attachment in os.listdir(tempdir):
23+
os.symlink(os.path.join(tempdir, attachment), os.path.join(self.workdir, attachment))
1524

1625
def run(self, request, opts):
1726
"""
@@ -34,10 +43,6 @@ def run(self, request, opts):
3443
specifically the runner and runner options
3544
:return: {"run_id": self.run_id, "state": state}
3645
"""
37-
os.makedirs(self.workdir)
38-
outdir = os.path.join(self.workdir, "outdir")
39-
os.mkdir(outdir)
40-
4146
with open(os.path.join(self.workdir, "request.json"), "w") as f:
4247
json.dump(request, f)
4348

@@ -57,7 +62,7 @@ def run(self, request, opts):
5762
stdout=output,
5863
stderr=stderr,
5964
close_fds=True,
60-
cwd=outdir)
65+
cwd=self.outdir)
6166
output.close()
6267
stderr.close()
6368
with open(os.path.join(self.workdir, "pid"), "w") as pid:
@@ -175,7 +180,7 @@ def RunWorkflow(self, **args):
175180
tempdir, body = self.collect_attachments()
176181

177182
run_id = uuid.uuid4().hex
178-
job = Workflow(run_id)
183+
job = Workflow(run_id, tempdir)
179184

180185
job.run(body, self)
181186
return {"run_id": run_id}

wes_service/toil_wes.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414

1515
class ToilWorkflow(object):
16-
def __init__(self, run_id):
16+
def __init__(self, run_id, tempdir=None):
1717
super(ToilWorkflow, self).__init__()
1818
self.run_id = run_id
1919

@@ -22,6 +22,12 @@ def __init__(self, run_id):
2222
if not os.path.exists(self.outdir):
2323
os.makedirs(self.outdir)
2424

25+
if tempdir:
26+
# tempdir is where attachments were downloaded, if any
27+
# symlink everything inside into self.workdir
28+
for attachment in os.listdir(tempdir):
29+
os.symlink(os.path.join(tempdir, attachment), os.path.join(self.workdir, attachment))
30+
2531
self.outfile = os.path.join(self.workdir, 'stdout')
2632
self.errfile = os.path.join(self.workdir, 'stderr')
2733
self.starttime = os.path.join(self.workdir, 'starttime')
@@ -266,7 +272,7 @@ def RunWorkflow(self):
266272
tempdir, body = self.collect_attachments()
267273

268274
run_id = uuid.uuid4().hex
269-
job = ToilWorkflow(run_id)
275+
job = ToilWorkflow(run_id, tempdir)
270276
p = Process(target=job.run, args=(body, self))
271277
p.start()
272278
self.processes[run_id] = p

wes_service/util.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,13 @@ def collect_attachments(self):
5050
if k == "workflow_attachment":
5151
filename = secure_filename(v.filename)
5252
v.save(os.path.join(tempdir, filename))
53-
body[k] = "file://%s" % os.path.join(tempdir) # Reference to tem working dir.
53+
body[k] = "file://%s" % tempdir # Reference to tem working dir.
5454
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
5555
body[k] = json.loads(v.read())
5656
else:
5757
body[k] = v.read()
5858

59-
if body['workflow_type'] != "CWL" or \
60-
body['workflow_type_version'] != "v1.0":
61-
return
62-
6359
if ":" not in body["workflow_url"]:
6460
body["workflow_url"] = "file://%s" % os.path.join(tempdir, secure_filename(body["workflow_url"]))
6561

66-
return (tempdir, body)
62+
return tempdir, body

0 commit comments

Comments
 (0)