Skip to content

Commit acba50e

Browse files
committed
Attachment modifications in cwltool and toil.
1 parent b43443d commit acba50e

File tree

5 files changed

+97
-98
lines changed

5 files changed

+97
-98
lines changed

test/test_integration.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ def tearDown(self):
4040
time.sleep(3)
4141
except OSError as e:
4242
print(e)
43-
if os.path.exists('workflows'):
44-
shutil.rmtree('workflows')
43+
# if os.path.exists('workflows'):
44+
# shutil.rmtree('workflows')
4545
unittest.TestCase.tearDown(self)
4646

4747
def test_dockstore_md5sum(self):
@@ -85,7 +85,7 @@ def run_cwl_md5sum(cwl_input, json_input, workflow_attachment=None):
8585
json_input,
8686
attachments=workflow_attachment)
8787
response = requests.post(endpoint, files=parts).json()
88-
88+
assert 'run_id' in response, str(response.json())
8989
output_dir = os.path.abspath(os.path.join('workflows', response['run_id'], 'outdir'))
9090
return os.path.join(output_dir, 'md5sum.txt'), response['run_id']
9191

wes_client/util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def build_wes_request(workflow_file, json_path, attachments=None):
3333
3434
:return: A list of tuples formatted to be sent in a post to the wes-server (Swagger API).
3535
"""
36-
workflow_file = "file://" + workflow_file if "://" not in workflow_file else workflow_file
36+
workflow_file = "file://" + workflow_file if ":" not in workflow_file else workflow_file
3737
json_path = json_path[7:] if json_path.startswith("file://") else json_path
3838

3939
parts = [("workflow_params", json.dumps(json.load(open(json_path)))),

wes_client/wes_client_main.py

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import requests
1313
from requests.exceptions import InvalidSchema, MissingSchema
1414
from wes_service.util import visit
15+
from wes_client.util import build_wes_request
1516
from bravado.client import SwaggerClient
1617
from bravado.requests_client import RequestsClient
1718

@@ -25,6 +26,7 @@ def main(argv=sys.argv[1:]):
2526
help="Options: [http, https]. Defaults to WES_API_PROTO (https).")
2627
parser.add_argument("--quiet", action="store_true", default=False)
2728
parser.add_argument("--outdir", type=str)
29+
parser.add_argument("--attachments", type=list, default=None)
2830
parser.add_argument("--page", type=str, default=None)
2931
parser.add_argument("--page-size", type=int, default=None)
3032

@@ -81,15 +83,8 @@ def main(argv=sys.argv[1:]):
8183
json.dump(response.result(), sys.stdout, indent=4)
8284
return 0
8385

84-
if args.workflow_url.lower().endswith('wdl'):
85-
wf_type = 'WDL'
86-
elif args.workflow_url.lower().endswith('cwl'):
87-
wf_type = 'CWL'
88-
elif args.workflow_url.lower().endswith('py'):
89-
wf_type = 'PY'
90-
9186
if not args.job_order:
92-
logging.error("Missing job order")
87+
logging.error("Missing json/yaml file.")
9388
return 1
9489

9590
loader = schema_salad.ref_resolver.Loader({
@@ -112,35 +107,12 @@ def fixpaths(d):
112107
del d["path"]
113108
visit(input_dict, fixpaths)
114109

115-
workflow_url = args.workflow_url
116-
if ":" not in workflow_url:
117-
workflow_url = "file://" + os.path.abspath(workflow_url)
118-
119110
if args.quiet:
120111
logging.basicConfig(level=logging.WARNING)
121112
else:
122113
logging.basicConfig(level=logging.INFO)
123114

124-
parts = [
125-
("workflow_params", json.dumps(input_dict)),
126-
("workflow_type", wf_type),
127-
("workflow_type_version", "v1.0")
128-
]
129-
if workflow_url.startswith("file://"):
130-
# with open(workflow_url[7:], "rb") as f:
131-
# body["workflow_attachment"] = f.read()
132-
rootdir = os.path.dirname(workflow_url[7:])
133-
dirpath = rootdir
134-
# for dirpath, dirnames, filenames in os.walk(rootdir):
135-
for f in os.listdir(rootdir):
136-
if f.startswith("."):
137-
continue
138-
fn = os.path.join(dirpath, f)
139-
if os.path.isfile(fn):
140-
parts.append(('workflow_attachment', (fn[len(rootdir)+1:], open(fn, "rb"))))
141-
parts.append(("workflow_url", os.path.basename(workflow_url[7:])))
142-
else:
143-
parts.append(("workflow_url", workflow_url))
115+
parts = build_wes_request(args.workflow_url, args.job_order, attachments=args.attachments)
144116

145117
postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/runs" % (args.proto, args.host),
146118
files=parts,

wes_service/cwl_runner.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,15 @@
88

99

1010
class Workflow(object):
11-
def __init__(self, run_id, tempdir=None):
11+
def __init__(self, run_id):
1212
super(Workflow, self).__init__()
1313
self.run_id = run_id
1414
self.workdir = os.path.join(os.getcwd(), "workflows", self.run_id)
1515
self.outdir = os.path.join(self.workdir, 'outdir')
1616
if not os.path.exists(self.outdir):
1717
os.makedirs(self.outdir)
1818

19-
if tempdir:
20-
# tempdir is the folder where attachments were downloaded, if there were any
21-
# symlink everything inside into self.workdir
22-
for attachment in os.listdir(tempdir):
23-
os.symlink(os.path.join(tempdir, attachment), os.path.join(self.workdir, attachment))
24-
25-
def run(self, request, opts):
19+
def run(self, request, tempdir, opts):
2620
"""
2721
Constructs a command to run a cwl/json from requests and opts,
2822
runs it, and deposits the outputs in outdir.
@@ -46,8 +40,7 @@ def run(self, request, opts):
4640
with open(os.path.join(self.workdir, "request.json"), "w") as f:
4741
json.dump(request, f)
4842

49-
with open(os.path.join(
50-
self.workdir, "cwl.input.json"), "w") as inputtemp:
43+
with open(os.path.join(self.workdir, "cwl.input.json"), "w") as inputtemp:
5144
json.dump(request["workflow_params"], inputtemp)
5245

5346
workflow_url = request.get("workflow_url") # Will always be local path to descriptor cwl, or url.
@@ -57,12 +50,27 @@ def run(self, request, opts):
5750

5851
runner = opts.getopt("runner", default="cwl-runner")
5952
extra = opts.getoptlist("extra")
60-
command_args = [runner] + extra + [workflow_url, inputtemp.name]
53+
54+
# replace any locally specified outdir with the default
55+
for e in extra:
56+
if e.startswith('--outdir='):
57+
extra.remove(e)
58+
extra.append('--outdir=' + self.outdir)
59+
60+
# link the cwl and json into the tempdir/cwd
61+
if workflow_url.startswith('file://'):
62+
os.link(workflow_url[7:], os.path.join(tempdir, "wes_workflow.cwl"))
63+
workflow_url = os.path.join(tempdir, "wes_workflow.cwl")
64+
os.link(inputtemp.name, os.path.join(tempdir, "cwl.input.json"))
65+
jsonpath = os.path.join(tempdir, "cwl.input.json")
66+
67+
# build args and run
68+
command_args = [runner] + extra + [workflow_url, jsonpath]
6169
proc = subprocess.Popen(command_args,
6270
stdout=output,
6371
stderr=stderr,
6472
close_fds=True,
65-
cwd=self.outdir)
73+
cwd=tempdir)
6674
output.close()
6775
stderr.close()
6876
with open(os.path.join(self.workdir, "pid"), "w") as pid:
@@ -180,9 +188,9 @@ def RunWorkflow(self, **args):
180188
tempdir, body = self.collect_attachments()
181189

182190
run_id = uuid.uuid4().hex
183-
job = Workflow(run_id, tempdir)
191+
job = Workflow(run_id)
184192

185-
job.run(body, self)
193+
job.run(body, tempdir, self)
186194
return {"run_id": run_id}
187195

188196
def GetRunLog(self, run_id):

0 commit comments

Comments
 (0)