Skip to content

Commit 24e0ffe

Browse files
author
Peter Amstutz
committed
Working on Arvados support.
1 parent 4c941e0 commit 24e0ffe

File tree

3 files changed

+108
-13
lines changed

3 files changed

+108
-13
lines changed

setup.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,10 @@
3030
'console_scripts': [ "wes-server=wes_service:main",
3131
"wes-client=wes_client:main"]
3232
},
33+
extras_require={
34+
"arvados": [
35+
"arvados-cwl-runner"
36+
]
37+
},
3338
zip_safe=True
3439
)

wes_client.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,31 @@
99
import os
1010
import argparse
1111
import logging
12+
import urlparse
1213

1314
def main(argv=sys.argv[1:]):
1415

1516
parser = argparse.ArgumentParser(description='Workflow Execution Service')
1617
parser.add_argument("--host", type=str, default=os.environ.get("WES_API_HOST"))
1718
parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_TOKEN"))
18-
parser.add_argument("--proto", type=str, default="https")
19+
parser.add_argument("--proto", type=str, default=os.environ.get("WES_API_PROTO", "https"))
1920
parser.add_argument("--quiet", action="store_true", default=False)
2021

2122
exgroup = parser.add_mutually_exclusive_group()
2223
exgroup.add_argument("--run", action="store_true", default=False)
2324
exgroup.add_argument("--get", type=str, default=None)
25+
exgroup.add_argument("--log", type=str, default=None)
2426
exgroup.add_argument("--list", action="store_true", default=False)
2527

2628
parser.add_argument("workflow_url", type=str, nargs="?", default=None)
2729
parser.add_argument("job_order", type=str, nargs="?", default=None)
2830
args = parser.parse_args(argv)
2931

3032
http_client = RequestsClient()
33+
split = urlparse.urlsplit("%s://%s/" % (args.proto, args.host))
34+
3135
http_client.set_api_key(
32-
args.host, args.auth,
36+
split.hostname, args.auth,
3337
param_name='Authorization', param_in='header')
3438
client = SwaggerClient.from_url("%s://%s/swagger.json" % (args.proto, args.host),
3539
http_client=http_client, config={'use_models': False})
@@ -39,6 +43,11 @@ def main(argv=sys.argv[1:]):
3943
json.dump(l.result(), sys.stdout, indent=4)
4044
return 0
4145

46+
if args.log:
47+
l = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.log)
48+
sys.stdout.write(l.result()["workflow_log"]["stderr"])
49+
return 0
50+
4251
if args.get:
4352
l = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.get)
4453
json.dump(l.result(), sys.stdout, indent=4)

wes_service/arvados_wes.py

Lines changed: 92 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,27 @@
11
import arvados
2+
import arvados.collection
3+
import os
4+
import connexion
5+
import json
6+
import subprocess
7+
import tempfile
8+
9+
def get_api():
10+
return arvados.api_from_config(version="v1", apiconfig={
11+
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
12+
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
13+
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false"),
14+
})
15+
16+
17+
statemap = {
18+
"Queued": "Queued",
19+
"Locked": "Initializing",
20+
"Running": "Running",
21+
"Complete": "Complete",
22+
"Cancelled": "Canceled"
23+
}
24+
225

326
def GetServiceInfo():
427
return {
@@ -12,31 +35,89 @@ def GetServiceInfo():
1235
"key_values": {}
1336
}
1437

15-
def ListWorkflows(body):
38+
def ListWorkflows(body=None):
1639
# body["page_size"]
1740
# body["page_token"]
1841
# body["key_value_search"]
1942

20-
wf = []
21-
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
22-
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
23-
wf.append(Workflow(l))
43+
api = get_api()
44+
45+
requests = api.container_requests().list(filters=[["requesting_container_uuid", "=", None]]).execute()
46+
containers = api.containers().list(filters=[["uuid", "in", [w["container_uuid"] for w in requests["items"]]]]).execute()
47+
48+
uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers["items"]}
49+
2450
return {
25-
"workflows": [{"workflow_id": w.workflow_id, "state": w.getstate()} for w in wf],
51+
"workflows": [{"workflow_id": cr["uuid"],
52+
"state": uuidmap[cr["container_uuid"]]}
53+
for cr in requests["items"]
54+
if cr["command"][0] == "arvados-cwl-runner"],
2655
"next_page_token": ""
2756
}
2857

2958
def RunWorkflow(body):
3059
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
3160
return
32-
workflow_id = uuid.uuid4().hex
33-
job = Workflow(workflow_id)
34-
job.run(body)
61+
62+
env = {
63+
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
64+
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
65+
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false")
66+
}
67+
with tempfile.NamedTemporaryFile() as inputtemp:
68+
json.dump(request["workflow_params"], inputtemp)
69+
workflow_id = subprocess.check_output(["arvados-cwl-runner", "--submit", "--no-wait",
70+
request.get("workflow_url"), inputtemp.name], env=env)
3571
return {"workflow_id": workflow_id}
3672

73+
def visit(d, op):
74+
op(d)
75+
if isinstance(d, list):
76+
for i in d:
77+
visit(i, op)
78+
elif isinstance(d, dict):
79+
for i in d.itervalues():
80+
visit(i, op)
81+
3782
def GetWorkflowLog(workflow_id):
38-
job = Workflow(workflow_id)
39-
return job.getlog()
83+
api = get_api()
84+
85+
request = api.container_requests().get(uuid=workflow_id).execute()
86+
container = api.containers().get(uuid=request["container_uuid"]).execute()
87+
88+
outputobj = {}
89+
if request["output_uuid"]:
90+
c = arvados.collection.CollectionReader(request["output_uuid"])
91+
with c.open("cwl.output.json") as f:
92+
outputobj = json.load(f)
93+
def keepref(d):
94+
if isinstance(d, dict) and "location" in d:
95+
d["location"] = "keep:%s/%s" % (c.portable_data_hash(), d["location"])
96+
visit(outputobj, keepref)
97+
98+
stderr = ""
99+
if request["log_uuid"]:
100+
c = arvados.collection.CollectionReader(request["log_uuid"])
101+
if "stderr.txt" in c:
102+
with c.open("stderr.txt") as f:
103+
stderr = f.read()
104+
105+
return {
106+
"workflow_id": request["uuid"],
107+
"request": {},
108+
"state": statemap[container["state"]],
109+
"workflow_log": {
110+
"cmd": [""],
111+
"startTime": "",
112+
"endTime": "",
113+
"stdout": "",
114+
"stderr": stderr,
115+
"exitCode": container["exit_code"]
116+
},
117+
"task_logs": [],
118+
"outputs": outputobj
119+
}
120+
40121

41122
def CancelJob(workflow_id):
42123
job = Workflow(workflow_id)

0 commit comments

Comments
 (0)