Skip to content

Commit 4c941e0

Browse files
author
Peter Amstutz
committed
Add wes-client --list and --get subcommands.
1 parent 41740a2 commit 4c941e0

File tree

4 files changed

+251
-169
lines changed

4 files changed

+251
-169
lines changed

wes_client.py

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,32 @@ def main(argv=sys.argv[1:]):
1717
parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_TOKEN"))
1818
parser.add_argument("--proto", type=str, default="https")
1919
parser.add_argument("--quiet", action="store_true", default=False)
20-
parser.add_argument("workflow_url", type=str)
21-
parser.add_argument("job_order", type=str)
20+
21+
exgroup = parser.add_mutually_exclusive_group()
22+
exgroup.add_argument("--run", action="store_true", default=False)
23+
exgroup.add_argument("--get", type=str, default=None)
24+
exgroup.add_argument("--list", action="store_true", default=False)
25+
26+
parser.add_argument("workflow_url", type=str, nargs="?", default=None)
27+
parser.add_argument("job_order", type=str, nargs="?", default=None)
2228
args = parser.parse_args(argv)
2329

2430
http_client = RequestsClient()
2531
http_client.set_api_key(
2632
args.host, args.auth,
2733
param_name='Authorization', param_in='header')
28-
client = SwaggerClient.from_url("%s://%s/swagger.json" % (args.proto, args.host), http_client=http_client)
34+
client = SwaggerClient.from_url("%s://%s/swagger.json" % (args.proto, args.host),
35+
http_client=http_client, config={'use_models': False})
36+
37+
if args.list:
38+
l = client.WorkflowExecutionService.ListWorkflows()
39+
json.dump(l.result(), sys.stdout, indent=4)
40+
return 0
41+
42+
if args.get:
43+
l = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.get)
44+
json.dump(l.result(), sys.stdout, indent=4)
45+
return 0
2946

3047
with open(args.job_order) as f:
3148
input = json.load(f)
@@ -45,22 +62,21 @@ def main(argv=sys.argv[1:]):
4562
"workflow_type": "CWL",
4663
"workflow_type_version": "v1.0"}).result()
4764

48-
logging.info("Workflow id is %s", r.workflow_id)
65+
logging.info("Workflow id is %s", r["workflow_id"])
4966

50-
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r.workflow_id).result()
51-
while r.state == "Running":
67+
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
68+
while r["state"] == "Running":
5269
time.sleep(1)
53-
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r.workflow_id).result()
70+
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
5471

55-
logging.info("State is %s", r.state)
72+
logging.info("State is %s", r["state"])
5673

57-
s = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=r.workflow_id).result()
58-
logging.info(s.workflow_log.stderr)
74+
s = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=r["workflow_id"]).result()
75+
logging.info(s["workflow_log"]["stderr"])
5976

60-
d = {k: s.outputs[k] for k in s.outputs if k != "fields"}
61-
json.dump(d, sys.stdout, indent=4)
77+
json.dump(s["outputs"], sys.stdout, indent=4)
6278

63-
if r.state == "Complete":
79+
if r["state"] == "Complete":
6480
return 0
6581
else:
6682
return 1

wes_service/__init__.py

Lines changed: 10 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -9,171 +9,25 @@
99
import os
1010
import json
1111
import urllib
12+
import argparse
13+
import sys
1214

1315
from pkg_resources import resource_stream
1416

15-
class Workflow(object):
16-
def __init__(self, workflow_id):
17-
super(Workflow, self).__init__()
18-
self.workflow_id = workflow_id
19-
self.workdir = os.path.join(os.getcwd(), "workflows", self.workflow_id)
17+
def main(argv=sys.argv[1:]):
18+
parser = argparse.ArgumentParser(description='Workflow Execution Service')
19+
parser.add_argument("--backend", type=str, default="wes_service.cwl_runner")
20+
parser.add_argument("--port", type=int, default=8080)
21+
args = parser.parse_args(argv)
2022

21-
def run(self, request):
22-
os.makedirs(self.workdir)
23-
outdir = os.path.join(self.workdir, "outdir")
24-
os.mkdir(outdir)
25-
26-
with open(os.path.join(self.workdir, "request.json"), "w") as f:
27-
json.dump(request, f)
28-
29-
with open(os.path.join(self.workdir, "cwl.input.json"), "w") as inputtemp:
30-
json.dump(request["workflow_params"], inputtemp)
31-
32-
if request.get("workflow_descriptor"):
33-
with open(os.path.join(self.workdir, "workflow.cwl"), "w") as f:
34-
f.write(workflow_descriptor)
35-
workflow_url = urllib.pathname2url(os.path.join(self.workdir, "workflow.cwl"))
36-
else:
37-
workflow_url = request.get("workflow_url")
38-
39-
output = open(os.path.join(self.workdir, "cwl.output.json"), "w")
40-
stderr = open(os.path.join(self.workdir, "stderr"), "w")
41-
42-
proc = subprocess.Popen(["cwl-runner", workflow_url, inputtemp.name],
43-
stdout=output,
44-
stderr=stderr,
45-
close_fds=True,
46-
cwd=outdir)
47-
output.close()
48-
stderr.close()
49-
with open(os.path.join(self.workdir, "pid"), "w") as pid:
50-
pid.write(str(proc.pid))
51-
52-
return self.getstatus()
53-
54-
def getstate(self):
55-
state = "Running"
56-
exit_code = -1
57-
58-
exc = os.path.join(self.workdir, "exit_code")
59-
if os.path.exists(exc):
60-
with open(exc) as f:
61-
exit_code = int(f.read())
62-
else:
63-
with open(os.path.join(self.workdir, "pid"), "r") as pid:
64-
pid = int(pid.read())
65-
(_pid, exit_status) = os.waitpid(pid, os.WNOHANG)
66-
if _pid != 0:
67-
exit_code = exit_status >> 8
68-
with open(exc, "w") as f:
69-
f.write(str(exit_code))
70-
os.unlink(os.path.join(self.workdir, "pid"))
71-
72-
if exit_code == 0:
73-
state = "Complete"
74-
elif exit_code != -1:
75-
state = "Error"
76-
77-
return (state, exit_code)
78-
79-
def getstatus(self):
80-
state, exit_code = self.getstate()
81-
82-
return {
83-
"workflow_id": self.workflow_id,
84-
"state": state
85-
}
86-
87-
88-
def getlog(self):
89-
state, exit_code = self.getstate()
90-
91-
with open(os.path.join(self.workdir, "request.json"), "r") as f:
92-
request = json.load(f)
93-
94-
with open(os.path.join(self.workdir, "stderr"), "r") as f:
95-
stderr = f.read()
96-
97-
outputobj = {}
98-
if state == "Complete":
99-
with open(os.path.join(self.workdir, "cwl.output.json"), "r") as outputtemp:
100-
outputobj = json.load(outputtemp)
101-
102-
return {
103-
"workflow_id": self.workflow_id,
104-
"request": request,
105-
"state": state,
106-
"workflow_log": {
107-
"cmd": [""],
108-
"startTime": "",
109-
"endTime": "",
110-
"stdout": "",
111-
"stderr": stderr,
112-
"exitCode": exit_code
113-
},
114-
"task_logs": [],
115-
"outputs": outputobj
116-
}
117-
118-
def cancel(self):
119-
pass
120-
121-
def GetServiceInfo():
122-
return {
123-
"workflow_type_versions": {
124-
"CWL": ["v1.0"]
125-
},
126-
"supported_wes_versions": "0.1.0",
127-
"supported_filesystem_protocols": ["file"],
128-
"engine_versions": "cwl-runner",
129-
"system_state_counts": {},
130-
"key_values": {}
131-
}
132-
133-
def ListWorkflows(body):
134-
# body["page_size"]
135-
# body["page_token"]
136-
# body["key_value_search"]
137-
138-
wf = []
139-
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
140-
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
141-
wf.append(Workflow(l))
142-
return {
143-
"workflows": [{"workflow_id": w.workflow_id, "state": w.getstate()} for w in wf],
144-
"next_page_token": ""
145-
}
146-
147-
def RunWorkflow(body):
148-
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
149-
return
150-
workflow_id = uuid.uuid4().hex
151-
job = Workflow(workflow_id)
152-
job.run(body)
153-
return {"workflow_id": workflow_id}
154-
155-
def GetWorkflowLog(workflow_id):
156-
job = Workflow(workflow_id)
157-
return job.getlog()
158-
159-
def CancelJob(workflow_id):
160-
job = Workflow(workflow_id)
161-
job.cancel()
162-
return {"workflow_id": workflow_id}
163-
164-
def GetWorkflowStatus(workflow_id):
165-
job = Workflow(workflow_id)
166-
return job.getstatus()
167-
168-
def main():
16923
app = connexion.App(__name__)
17024
def rs(x):
171-
return utils.get_function_from_name("wes_service." + x)
25+
return utils.get_function_from_name(args.backend + "." + x)
17226

17327
res = resource_stream(__name__, 'swagger/proto/workflow_execution.swagger.json')
17428
app.add_api(json.load(res), resolver=Resolver(rs))
17529

176-
app.run(port=8080)
30+
app.run(port=args.port)
17731

17832
if __name__ == "__main__":
179-
main()
33+
main(sys.argv[1:])

wes_service/arvados_wes.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import arvados
2+
3+
def GetServiceInfo():
4+
return {
5+
"workflow_type_versions": {
6+
"CWL": ["v1.0"]
7+
},
8+
"supported_wes_versions": "0.1.0",
9+
"supported_filesystem_protocols": ["file"],
10+
"engine_versions": "cwl-runner",
11+
"system_state_counts": {},
12+
"key_values": {}
13+
}
14+
15+
def ListWorkflows(body):
16+
# body["page_size"]
17+
# body["page_token"]
18+
# body["key_value_search"]
19+
20+
wf = []
21+
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
22+
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
23+
wf.append(Workflow(l))
24+
return {
25+
"workflows": [{"workflow_id": w.workflow_id, "state": w.getstate()} for w in wf],
26+
"next_page_token": ""
27+
}
28+
29+
def RunWorkflow(body):
30+
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
31+
return
32+
workflow_id = uuid.uuid4().hex
33+
job = Workflow(workflow_id)
34+
job.run(body)
35+
return {"workflow_id": workflow_id}
36+
37+
def GetWorkflowLog(workflow_id):
38+
job = Workflow(workflow_id)
39+
return job.getlog()
40+
41+
def CancelJob(workflow_id):
42+
job = Workflow(workflow_id)
43+
job.cancel()
44+
return {"workflow_id": workflow_id}
45+
46+
def GetWorkflowStatus(workflow_id):
47+
job = Workflow(workflow_id)
48+
return job.getstatus()

0 commit comments

Comments
 (0)