Skip to content

Commit 84f283a

Browse files
author
Peter Amstutz
committed
Improve logging, fix wes-client bugs
1 parent 6437668 commit 84f283a

File tree

3 files changed

+47
-20
lines changed

3 files changed

+47
-20
lines changed

wes_client/wes_client_main.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,13 @@ def main(argv=sys.argv[1:]):
4949
print(u"%s %s" % (sys.argv[0], pkg[0].version))
5050
exit(0)
5151

52-
if ": " in args.auth:
53-
sp = args.auth.split(": ")
54-
auth = {sp[0]: sp[1]}
55-
else:
56-
auth = {"Authorization": auth}
52+
auth = {}
53+
if args.auth:
54+
if ": " in args.auth:
55+
sp = args.auth.split(": ")
56+
auth[sp[0]] = sp[1]
57+
else:
58+
auth["Authorization"] = args.auth
5759

5860
client = WESClient({'auth': auth, 'proto': args.proto, 'host': args.host})
5961

@@ -85,15 +87,15 @@ def main(argv=sys.argv[1:]):
8587
logging.error("Missing json/yaml file.")
8688
return 1
8789

88-
modify_jsonyaml_paths(args.job_order)
90+
job_order = modify_jsonyaml_paths(args.job_order)
8991

9092
if args.quiet:
9193
logging.basicConfig(level=logging.WARNING)
9294
else:
9395
logging.basicConfig(level=logging.INFO)
9496

9597
args.attachments = "" if not args.attachments else args.attachments.split(',')
96-
r = client.run(args.workflow_url, args.job_order, args.attachments)
98+
r = client.run(args.workflow_url, job_order, args.attachments)
9799

98100
if args.wait:
99101
logging.info("Workflow run id is %s", r["run_id"])

wes_service/arvados_wes.py

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ class MissingAuthorization(Exception):
1919
pass
2020

2121

22-
def get_api():
23-
if not connexion.request.headers.get('Authorization'):
24-
raise MissingAuthorization()
25-
authtoken = connexion.request.headers['Authorization']
26-
if authtoken.startswith("Bearer ") or authtoken.startswith("OAuth2 "):
27-
authtoken = authtoken[7:]
22+
def get_api(authtoken=None):
23+
if authtoken is None:
24+
if not connexion.request.headers.get('Authorization'):
25+
raise MissingAuthorization()
26+
authtoken = connexion.request.headers['Authorization']
27+
if authtoken.startswith("Bearer ") or authtoken.startswith("OAuth2 "):
28+
authtoken = authtoken[7:]
2829
return arvados.api_from_config(version="v1", apiconfig={
2930
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
3031
"ARVADOS_API_TOKEN": authtoken,
@@ -55,6 +56,10 @@ def catch_exceptions_wrapper(self, *args, **kwargs):
5556
return {"msg": str(e), "status_code": 500}, 500
5657
except MissingAuthorization:
5758
return {"msg": "'Authorization' header is missing or empty, expecting Arvados API token", "status_code": 401}, 401
59+
except ValueError as e:
60+
return {"msg": str(e), "status_code": 400}, 400
61+
except Exception as e:
62+
return {"msg": str(e), "status_code": 500}, 500
5863

5964
return catch_exceptions_wrapper
6065

@@ -108,10 +113,10 @@ def ListRuns(self, page_size=None, page_token=None, state_search=None):
108113
"next_page_token": workflow_list[-1]["run_id"] if workflow_list else ""
109114
}
110115

111-
def log_for_run(self, run_id, message):
112-
get_api().logs().create(body={"log": {"object_uuid": run_id,
116+
def log_for_run(self, run_id, message, authtoken=None):
117+
get_api(authtoken).logs().create(body={"log": {"object_uuid": run_id,
113118
"event_type": "stderr",
114-
"properties": {"text": message}}}).execute()
119+
"properties": {"text": message+"\n"}}}).execute()
115120

116121
def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
117122
env, project_uuid,
@@ -123,9 +128,18 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
123128
})
124129

125130
try:
126-
with tempfile.NamedTemporaryFile() as inputtemp:
131+
with tempfile.NamedTemporaryFile(dir=tempdir, suffix=".json") as inputtemp:
127132
json.dump(workflow_params, inputtemp)
128133
inputtemp.flush()
134+
135+
msg = ""
136+
for dirpath, dirs, files in os.walk(tempdir):
137+
for f in files:
138+
msg += " " + dirpath + "/" + f + "\n"
139+
140+
self.log_for_run(cr_uuid, "Contents of %s:\n%s" % (tempdir, msg),
141+
env['ARVADOS_API_TOKEN'])
142+
129143
# TODO: run submission process in a container to prevent
130144
# a-c-r submission processes from seeing each other.
131145

@@ -138,6 +152,8 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
138152
cmd.append(workflow_url)
139153
cmd.append(inputtemp.name)
140154

155+
self.log_for_run(cr_uuid, "Executing %s" % cmd, env['ARVADOS_API_TOKEN'])
156+
141157
proc = subprocess.Popen(cmd, env=env,
142158
cwd=tempdir,
143159
stdout=subprocess.PIPE,
@@ -146,7 +162,7 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
146162
if proc.returncode != 0:
147163
api.container_requests().update(uuid=cr_uuid, body={"priority": 0}).execute()
148164

149-
self.log_for_run(cr_uuid, stderrdata)
165+
self.log_for_run(cr_uuid, stderrdata, env['ARVADOS_API_TOKEN'])
150166

151167
if tempdir:
152168
shutil.rmtree(tempdir)
@@ -180,7 +196,15 @@ def RunWorkflow(self, **args):
180196
"output_path": "n/a",
181197
"priority": 500}}).execute()
182198

183-
tempdir, body = self.collect_attachments(cr["uuid"])
199+
try:
200+
tempdir, body = self.collect_attachments(cr["uuid"])
201+
except Exception as e:
202+
self.log_for_run(cr["uuid"], str(e))
203+
cr = api.container_requests().update(uuid=cr["uuid"],
204+
body={"container_request":
205+
{"priority": 0}}).execute()
206+
207+
return {"run_id": cr["uuid"]}
184208

185209
workflow_url = body.get("workflow_url")
186210

wes_service/util.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ def collect_attachments(self, run_id=None):
5858
v.save(dest)
5959
body[k] = "file://%s" % tempdir # Reference to temp working dir.
6060
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
61-
body[k] = json.loads(v.read())
61+
content = v.read()
62+
body[k] = json.loads(content)
6263
else:
6364
body[k] = v.read()
6465

0 commit comments

Comments
 (0)