Skip to content

Commit 29c5ccf

Browse files
author
Peter Amstutz
committed
* Rename workflow_log to run_log to conform to latest WES draft.
* Log attachment staging and workflow_url to assist in debugging. * Fix bug in wes-client to provide correct type for 'auth' parameter of WESClient object. * Generalize --auth parameter of wes-client to support specifying alternate header.
1 parent 56c4abf commit 29c5ccf

File tree

5 files changed

+38
-19
lines changed

5 files changed

+38
-19
lines changed

wes_client/wes_client_main.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def main(argv=sys.argv[1:]):
1515
parser = argparse.ArgumentParser(description="Workflow Execution Service")
1616
parser.add_argument("--host", type=str, default=os.environ.get("WES_API_HOST"),
1717
help="Example: '--host=localhost:8080'. Defaults to WES_API_HOST.")
18-
parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_AUTH"), help="Defaults to WES_API_AUTH.")
18+
parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_AUTH"), help="Format is 'Header: value' or just 'value'. If header name is not provided, value goes in the 'Authorization'. Defaults to WES_API_AUTH.")
1919
parser.add_argument("--proto", type=str, default=os.environ.get("WES_API_PROTO", "https"),
2020
help="Options: [http, https]. Defaults to WES_API_PROTO (https).")
2121
parser.add_argument("--quiet", action="store_true", default=False)
@@ -49,7 +49,13 @@ def main(argv=sys.argv[1:]):
4949
print(u"%s %s" % (sys.argv[0], pkg[0].version))
5050
exit(0)
5151

52-
client = WESClient({'auth': args.auth, 'proto': args.proto, 'host': args.host})
52+
if ": " in args.auth:
53+
sp = args.auth.split(": ")
54+
auth = {sp[0]: sp[1]}
55+
else:
56+
auth = {"Authorization": auth}
57+
58+
client = WESClient({'auth': auth, 'proto': args.proto, 'host': args.host})
5359

5460
if args.list:
5561
response = client.list_runs() # how to include: page_token=args.page, page_size=args.page_size ?
@@ -106,13 +112,13 @@ def main(argv=sys.argv[1:]):
106112

107113
try:
108114
# TODO: Only works with Arvados atm
109-
logging.info(str(s["workflow_log"]["stderr"]))
110-
logs = requests.get(s["workflow_log"]["stderr"], headers={"Authorization": args.auth}).text
111-
logging.info("Workflow log:\n" + logs)
115+
logging.info(str(s["run_log"]["stderr"]))
116+
logs = requests.get(s["run_log"]["stderr"], headers=auth).text
117+
logging.info("Run log:\n" + logs)
112118
except InvalidSchema:
113-
logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"]))
119+
logging.info("Run log:\n" + str(s["run_log"]["stderr"]))
114120
except MissingSchema:
115-
logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"]))
121+
logging.info("Run log:\n" + str(s["run_log"]["stderr"]))
116122

117123
# print the output json
118124
if "fields" in s["outputs"] and s["outputs"]["fields"] is None:

wes_service/arvados_wes.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def GetServiceInfo(self):
6666
"workflow_type_versions": {
6767
"CWL": {"workflow_type_version": ["v1.0"]}
6868
},
69-
"supported_wes_versions": ["0.2.1"],
69+
"supported_wes_versions": ["0.3.0"],
7070
"supported_filesystem_protocols": ["http", "https", "keep"],
7171
"workflow_engine_versions": {
7272
"arvados-cwl-runner": stderr
@@ -108,6 +108,12 @@ def ListRuns(self, page_size=None, page_token=None, state_search=None):
108108
"next_page_token": workflow_list[-1]["run_id"] if workflow_list else ""
109109
}
110110

111+
def log_for_run(self, run_id, message):
112+
api.logs().create(body={"log": {"object_uuid": run_id,
113+
"event_type": "stderr",
114+
"properties": {"text": message}}}).execute()
115+
116+
111117
def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
112118
env, project_uuid,
113119
tempdir):
@@ -141,9 +147,8 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
141147
if proc.returncode != 0:
142148
api.container_requests().update(uuid=cr_uuid, body={"priority": 0}).execute()
143149

144-
api.logs().create(body={"log": {"object_uuid": cr_uuid,
145-
"event_type": "stderr",
146-
"properties": {"text": stderrdata}}}).execute()
150+
self.log_for_run(cr_uuid, stderrdata)
151+
147152
if tempdir:
148153
shutil.rmtree(tempdir)
149154

@@ -153,8 +158,6 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
153158

154159
@catch_exceptions
155160
def RunWorkflow(self, **args):
156-
tempdir, body = self.collect_attachments()
157-
158161
if not connexion.request.headers.get('Authorization'):
159162
raise MissingAuthorization()
160163

@@ -178,6 +181,8 @@ def RunWorkflow(self, **args):
178181
"output_path": "n/a",
179182
"priority": 500}}).execute()
180183

184+
tempdir, body = self.collect_attachments(cr["uuid"])
185+
181186
workflow_url = body.get("workflow_url")
182187

183188
project_uuid = body.get("workflow_engine_parameters", {}).get("project_uuid")
@@ -256,7 +261,7 @@ def log_object(cr):
256261
"workflow_params": request["mounts"].get("/var/lib/cwl/cwl.input.json", {}).get("content", {})
257262
},
258263
"state": statemap[container["state"]],
259-
"workflow_log": log_object(request),
264+
"run_log": log_object(request),
260265
"task_logs": [log_object(t) for t in task_reqs],
261266
"outputs": outputobj
262267
}

wes_service/cwl_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def getlog(self):
142142
"run_id": self.run_id,
143143
"request": request,
144144
"state": state,
145-
"workflow_log": {
145+
"run_log": {
146146
"cmd": [""],
147147
"start_time": "",
148148
"end_time": "",

wes_service/toil_wes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def getlog(self):
158158
"run_id": self.run_id,
159159
"request": request,
160160
"state": state,
161-
"workflow_log": {
161+
"run_log": {
162162
"cmd": cmd,
163163
"start_time": starttime,
164164
"end_time": endtime,

wes_service/util.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import tempfile
22
import json
33
import os
4+
import logging
45

56
from six import itervalues, iterlists
67
import connexion
@@ -42,15 +43,20 @@ def getoptlist(self, p):
4243
optlist.append(v)
4344
return optlist
4445

45-
def collect_attachments(self):
46+
def log_for_run(self, run_id, message):
47+
logging.info("Workflow %s: %s", run_id, message)
48+
49+
def collect_attachments(self, run_id=None):
4650
tempdir = tempfile.mkdtemp()
4751
body = {}
4852
for k, ls in iterlists(connexion.request.files):
4953
for v in ls:
5054
if k == "workflow_attachment":
5155
filename = secure_filename(v.filename)
52-
v.save(os.path.join(tempdir, filename))
53-
body[k] = "file://%s" % tempdir # Reference to tem working dir.
56+
dest = os.path.join(tempdir, filename)
57+
self.log_for_run(run_id, "Staging attachment '%s' to '%s'" % (v.filename, dest))
58+
v.save(dest)
59+
body[k] = "file://%s" % tempdir # Reference to temp working dir.
5460
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
5561
body[k] = json.loads(v.read())
5662
else:
@@ -59,4 +65,6 @@ def collect_attachments(self):
5965
if ":" not in body["workflow_url"]:
6066
body["workflow_url"] = "file://%s" % os.path.join(tempdir, secure_filename(body["workflow_url"]))
6167

68+
self.log_for_run(run_id, "Using workflow_url '%s'" % body.get("workflow_url"))
69+
6270
return tempdir, body

0 commit comments

Comments
 (0)