Skip to content

Commit d92225b

Browse files
author
Peter Amstutz
committed
Rework plugin API to support simple option passing.
1 parent 3cd484e commit d92225b

File tree

4 files changed

+190
-156
lines changed

4 files changed

+190
-156
lines changed

wes_service/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ def main(argv=sys.argv[1:]):
1818
parser = argparse.ArgumentParser(description='Workflow Execution Service')
1919
parser.add_argument("--backend", type=str, default="wes_service.cwl_runner")
2020
parser.add_argument("--port", type=int, default=8080)
21+
parser.add_argument("--opt", type=str, action="append")
2122
args = parser.parse_args(argv)
2223

2324
app = connexion.App(__name__)
25+
backend = utils.get_function_from_name(args.backend + ".create_backend")(args.opt)
2426
def rs(x):
25-
return utils.get_function_from_name(args.backend + "." + x)
27+
return getattr(backend, x)
2628

2729
res = resource_stream(__name__, 'swagger/proto/workflow_execution.swagger.json')
2830
app.add_api(json.load(res), resolver=Resolver(rs))

wes_service/arvados_wes.py

Lines changed: 110 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import subprocess
77
import tempfile
8+
from wes_service.util import visit, WESBackend
89

910
def get_api():
1011
return arvados.api_from_config(version="v1", apiconfig={
@@ -22,109 +23,112 @@ def get_api():
2223
"Cancelled": "Canceled"
2324
}
2425

25-
26-
def GetServiceInfo():
27-
return {
28-
"workflow_type_versions": {
29-
"CWL": ["v1.0"]
30-
},
31-
"supported_wes_versions": "0.1.0",
32-
"supported_filesystem_protocols": ["file"],
33-
"engine_versions": "cwl-runner",
34-
"system_state_counts": {},
35-
"key_values": {}
36-
}
37-
38-
def ListWorkflows(body=None):
39-
# body["page_size"]
40-
# body["page_token"]
41-
# body["key_value_search"]
42-
43-
api = get_api()
44-
45-
requests = api.container_requests().list(filters=[["requesting_container_uuid", "=", None]],
46-
select=["uuid", "command", "container_uuid"]).execute()
47-
containers = api.containers().list(filters=[["uuid", "in", [w["container_uuid"] for w in requests["items"]]]],
48-
select=["uuid", "state"]).execute()
49-
50-
uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers["items"]}
51-
52-
return {
53-
"workflows": [{"workflow_id": cr["uuid"],
54-
"state": uuidmap[cr["container_uuid"]]}
55-
for cr in requests["items"]
56-
if cr["command"][0] == "arvados-cwl-runner"],
57-
"next_page_token": ""
58-
}
59-
60-
def RunWorkflow(body):
61-
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
62-
return
63-
64-
env = {
65-
"PATH": os.environ["PATH"],
66-
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
67-
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
68-
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false")
69-
}
70-
with tempfile.NamedTemporaryFile() as inputtemp:
71-
json.dump(body["workflow_params"], inputtemp)
72-
inputtemp.flush()
73-
workflow_id = subprocess.check_output(["arvados-cwl-runner", "--submit", "--no-wait", "--api=containers",
74-
body.get("workflow_url"), inputtemp.name], env=env).strip()
75-
return {"workflow_id": workflow_id}
76-
77-
78-
def GetWorkflowLog(workflow_id):
79-
api = get_api()
80-
81-
request = api.container_requests().get(uuid=workflow_id).execute()
82-
container = api.containers().get(uuid=request["container_uuid"]).execute()
83-
84-
outputobj = {}
85-
if request["output_uuid"]:
86-
c = arvados.collection.CollectionReader(request["output_uuid"])
87-
with c.open("cwl.output.json") as f:
88-
outputobj = json.load(f)
89-
def keepref(d):
90-
if isinstance(d, dict) and "location" in d:
91-
d["location"] = "keep:%s/%s" % (c.portable_data_hash(), d["location"])
92-
visit(outputobj, keepref)
93-
94-
stderr = ""
95-
if request["log_uuid"]:
96-
c = arvados.collection.CollectionReader(request["log_uuid"])
97-
if "stderr.txt" in c:
98-
with c.open("stderr.txt") as f:
99-
stderr = f.read()
100-
101-
r = {
102-
"workflow_id": request["uuid"],
103-
"request": {},
104-
"state": statemap[container["state"]],
105-
"workflow_log": {
106-
"cmd": [""],
107-
"startTime": "",
108-
"endTime": "",
109-
"stdout": "",
110-
"stderr": stderr
111-
},
112-
"task_logs": [],
113-
"outputs": outputobj
114-
}
115-
if container["exit_code"] is not None:
116-
r["workflow_log"]["exitCode"] = container["exit_code"]
117-
return r
118-
119-
120-
def CancelJob(workflow_id):
121-
api = get_api()
122-
request = api.container_requests().update(body={"priority": 0}).execute()
123-
return {"workflow_id": request["uuid"]}
124-
125-
def GetWorkflowStatus(workflow_id):
126-
api = get_api()
127-
request = api.container_requests().get(uuid=workflow_id).execute()
128-
container = api.containers().get(uuid=request["container_uuid"]).execute()
129-
return {"workflow_id": request["uuid"],
130-
"state": statemap[container["state"]]}
26+
class ArvadosBackend(WESBackend):
27+
def GetServiceInfo(self):
28+
return {
29+
"workflow_type_versions": {
30+
"CWL": ["v1.0"]
31+
},
32+
"supported_wes_versions": "0.1.0",
33+
"supported_filesystem_protocols": ["file"],
34+
"engine_versions": "cwl-runner",
35+
"system_state_counts": {},
36+
"key_values": {}
37+
}
38+
39+
def ListWorkflows(self, body=None):
40+
# body["page_size"]
41+
# body["page_token"]
42+
# body["key_value_search"]
43+
44+
api = get_api()
45+
46+
requests = api.container_requests().list(filters=[["requesting_container_uuid", "=", None]],
47+
select=["uuid", "command", "container_uuid"]).execute()
48+
containers = api.containers().list(filters=[["uuid", "in", [w["container_uuid"] for w in requests["items"]]]],
49+
select=["uuid", "state"]).execute()
50+
51+
uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers["items"]}
52+
53+
return {
54+
"workflows": [{"workflow_id": cr["uuid"],
55+
"state": uuidmap[cr["container_uuid"]]}
56+
for cr in requests["items"]
57+
if cr["command"][0] == "arvados-cwl-runner"],
58+
"next_page_token": ""
59+
}
60+
61+
def RunWorkflow(self, body):
62+
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
63+
return
64+
65+
env = {
66+
"PATH": os.environ["PATH"],
67+
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
68+
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
69+
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false")
70+
}
71+
with tempfile.NamedTemporaryFile() as inputtemp:
72+
json.dump(body["workflow_params"], inputtemp)
73+
inputtemp.flush()
74+
workflow_id = subprocess.check_output(["arvados-cwl-runner", "--submit", "--no-wait", "--api=containers",
75+
body.get("workflow_url"), inputtemp.name], env=env).strip()
76+
return {"workflow_id": workflow_id}
77+
78+
79+
def GetWorkflowLog(self, workflow_id):
80+
api = get_api()
81+
82+
request = api.container_requests().get(uuid=workflow_id).execute()
83+
container = api.containers().get(uuid=request["container_uuid"]).execute()
84+
85+
outputobj = {}
86+
if request["output_uuid"]:
87+
c = arvados.collection.CollectionReader(request["output_uuid"])
88+
with c.open("cwl.output.json") as f:
89+
outputobj = json.load(f)
90+
def keepref(d):
91+
if isinstance(d, dict) and "location" in d:
92+
d["location"] = "keep:%s/%s" % (c.portable_data_hash(), d["location"])
93+
visit(outputobj, keepref)
94+
95+
stderr = ""
96+
if request["log_uuid"]:
97+
c = arvados.collection.CollectionReader(request["log_uuid"])
98+
if "stderr.txt" in c:
99+
with c.open("stderr.txt") as f:
100+
stderr = f.read()
101+
102+
r = {
103+
"workflow_id": request["uuid"],
104+
"request": {},
105+
"state": statemap[container["state"]],
106+
"workflow_log": {
107+
"cmd": [""],
108+
"startTime": "",
109+
"endTime": "",
110+
"stdout": "",
111+
"stderr": stderr
112+
},
113+
"task_logs": [],
114+
"outputs": outputobj
115+
}
116+
if container["exit_code"] is not None:
117+
r["workflow_log"]["exitCode"] = container["exit_code"]
118+
return r
119+
120+
121+
def CancelJob(self, workflow_id):
122+
api = get_api()
123+
request = api.container_requests().update(body={"priority": 0}).execute()
124+
return {"workflow_id": request["uuid"]}
125+
126+
def GetWorkflowStatus(self, workflow_id):
127+
api = get_api()
128+
request = api.container_requests().get(uuid=workflow_id).execute()
129+
container = api.containers().get(uuid=request["container_uuid"]).execute()
130+
return {"workflow_id": request["uuid"],
131+
"state": statemap[container["state"]]}
132+
133+
def create_backend(opts):
134+
return ArvadosBackend(optdict)

wes_service/cwl_runner.py

Lines changed: 57 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66
import json
77
import urllib
88
import sys
9-
from wes_service.util import visit
9+
from wes_service.util import visit, WESBackend
1010

1111
class Workflow(object):
1212
def __init__(self, workflow_id):
1313
super(Workflow, self).__init__()
1414
self.workflow_id = workflow_id
1515
self.workdir = os.path.join(os.getcwd(), "workflows", self.workflow_id)
1616

17-
def run(self, request):
17+
def run(self, request, opts):
1818
os.makedirs(self.workdir)
1919
outdir = os.path.join(self.workdir, "outdir")
2020
os.mkdir(outdir)
@@ -35,7 +35,10 @@ def run(self, request):
3535
output = open(os.path.join(self.workdir, "cwl.output.json"), "w")
3636
stderr = open(os.path.join(self.workdir, "stderr"), "w")
3737

38-
proc = subprocess.Popen(["cwl-runner", workflow_url, inputtemp.name],
38+
runner = opts.getopt("runner", "cwl-runner")
39+
extra = opts.getoptlist("extra")
40+
41+
proc = subprocess.Popen([runner]+extra+[workflow_url, inputtemp.name],
3942
stdout=output,
4043
stderr=stderr,
4144
close_fds=True,
@@ -117,49 +120,54 @@ def getlog(self):
117120
def cancel(self):
118121
pass
119122

120-
def GetServiceInfo():
121-
return {
122-
"workflow_type_versions": {
123-
"CWL": ["v1.0"]
124-
},
125-
"supported_wes_versions": "0.1.0",
126-
"supported_filesystem_protocols": ["file"],
127-
"engine_versions": "cwl-runner",
128-
"system_state_counts": {},
129-
"key_values": {}
130-
}
131-
132-
def ListWorkflows(body=None):
133-
# body["page_size"]
134-
# body["page_token"]
135-
# body["key_value_search"]
136-
137-
wf = []
138-
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
139-
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
140-
wf.append(Workflow(l))
141-
return {
142-
"workflows": [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf],
143-
"next_page_token": ""
144-
}
145-
146-
def RunWorkflow(body):
147-
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
148-
return
149-
workflow_id = uuid.uuid4().hex
150-
job = Workflow(workflow_id)
151-
job.run(body)
152-
return {"workflow_id": workflow_id}
153-
154-
def GetWorkflowLog(workflow_id):
155-
job = Workflow(workflow_id)
156-
return job.getlog()
157-
158-
def CancelJob(workflow_id):
159-
job = Workflow(workflow_id)
160-
job.cancel()
161-
return {"workflow_id": workflow_id}
162-
163-
def GetWorkflowStatus(workflow_id):
164-
job = Workflow(workflow_id)
165-
return job.getstatus()
123+
124+
class CWLRunnerBackend(WESBackend):
125+
def GetServiceInfo(self):
126+
return {
127+
"workflow_type_versions": {
128+
"CWL": ["v1.0"]
129+
},
130+
"supported_wes_versions": "0.1.0",
131+
"supported_filesystem_protocols": ["file"],
132+
"engine_versions": "cwl-runner",
133+
"system_state_counts": {},
134+
"key_values": {}
135+
}
136+
137+
def ListWorkflows(self ,body=None):
138+
# body["page_size"]
139+
# body["page_token"]
140+
# body["key_value_search"]
141+
142+
wf = []
143+
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
144+
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
145+
wf.append(Workflow(l))
146+
return {
147+
"workflows": [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf],
148+
"next_page_token": ""
149+
}
150+
151+
def RunWorkflow(self, body):
152+
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
153+
return
154+
workflow_id = uuid.uuid4().hex
155+
job = Workflow(workflow_id)
156+
job.run(body, self)
157+
return {"workflow_id": workflow_id}
158+
159+
def GetWorkflowLog(self, workflow_id):
160+
job = Workflow(workflow_id)
161+
return job.getlog()
162+
163+
def CancelJob(self, workflow_id):
164+
job = Workflow(workflow_id)
165+
job.cancel()
166+
return {"workflow_id": workflow_id}
167+
168+
def GetWorkflowStatus(self, workflow_id):
169+
job = Workflow(workflow_id)
170+
return job.getstatus()
171+
172+
def create_backend(opts):
173+
return CWLRunnerBackend(opts)

wes_service/util.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,23 @@ def visit(d, op):
66
elif isinstance(d, dict):
77
for i in d.itervalues():
88
visit(i, op)
9+
10+
class WESBackend(object):
11+
def __init__(self, opts):
12+
self.pairs = []
13+
for o in opts:
14+
k, v = o.split("=", 1)
15+
self.pairs.append((k, v))
16+
17+
def getopt(self, p, default=None):
18+
for k,v in self.pairs:
19+
if k == p:
20+
return v
21+
return default
22+
23+
def getoptlist(self, p):
24+
l = []
25+
for k, v in self.pairs:
26+
if k == p:
27+
l.append(v)
28+
return l

0 commit comments

Comments
 (0)