Skip to content

Commit 06edd51

Browse files
authored
Merge pull request #1 from common-workflow-language/ga4gh-wes
GA4GH workflow execution service
2 parents 6d1f04a + d92225b commit 06edd51

File tree

7 files changed

+997
-11
lines changed

7 files changed

+997
-11
lines changed

setup.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,33 @@
88
from setuptools import setup, find_packages
99

1010
SETUP_DIR = os.path.dirname(__file__)
11-
README = os.path.join(SETUP_DIR, 'README')
11+
README = os.path.join(SETUP_DIR, 'README.md')
1212

13-
setup(name='cwltool_service',
13+
setup(name='wes_service',
1414
version='2.0',
15-
description='Common workflow language runner service',
15+
description='GA4GH Workflow Execution Service reference implementation',
1616
long_description=open(README).read(),
17-
author='Common workflow language working group',
17+
author='GA4GH Containers and Workflows task team',
1818
author_email='[email protected]',
1919
url="https://github.com/common-workflow-language/cwltool-service",
2020
download_url="https://github.com/common-workflow-language/cwltool-service",
2121
license='Apache 2.0',
22-
py_modules=["cwltool_stream", "cwl_flask", "cwltool_client"],
22+
packages=["wes_service", "wes_client"],
23+
package_data={'wes_service': ['swagger/proto/workflow_execution.swagger.json']},
24+
include_package_data=True,
2325
install_requires=[
24-
'Flask',
25-
'requests',
26-
'PyYAML'
26+
'connexion',
27+
'bravado',
28+
'ruamel.yaml >= 0.12.4, < 0.15',
2729
],
2830
entry_points={
29-
'console_scripts': [ "cwltool-stream=cwltool_stream:main",
30-
"cwl-server=cwl_flask:main",
31-
"cwl-client=cwl_client:main"]
31+
'console_scripts': [ "wes-server=wes_service:main",
32+
"wes-client=wes_client:main"]
33+
},
34+
extras_require={
35+
"arvados": [
36+
"arvados-cwl-runner"
37+
]
3238
},
3339
zip_safe=True
3440
)

wes_client/__init__.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#!/usr/bin/env python
2+
3+
from bravado.client import SwaggerClient
4+
from bravado.requests_client import RequestsClient
5+
import json
6+
import time
7+
import pprint
8+
import sys
9+
import os
10+
import argparse
11+
import logging
12+
import urlparse
13+
import pkg_resources # part of setuptools
14+
from wes_service.util import visit
15+
import urllib
16+
import ruamel.yaml as yaml
17+
18+
def main(argv=sys.argv[1:]):
19+
20+
parser = argparse.ArgumentParser(description='Workflow Execution Service')
21+
parser.add_argument("--host", type=str, default=os.environ.get("WES_API_HOST"))
22+
parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_AUTH"))
23+
parser.add_argument("--proto", type=str, default=os.environ.get("WES_API_PROTO", "https"))
24+
parser.add_argument("--quiet", action="store_true", default=False)
25+
parser.add_argument("--outdir", type=str)
26+
27+
exgroup = parser.add_mutually_exclusive_group()
28+
exgroup.add_argument("--run", action="store_true", default=False)
29+
exgroup.add_argument("--get", type=str, default=None)
30+
exgroup.add_argument("--log", type=str, default=None)
31+
exgroup.add_argument("--list", action="store_true", default=False)
32+
exgroup.add_argument("--version", action="store_true", default=False)
33+
34+
parser.add_argument("workflow_url", type=str, nargs="?", default=None)
35+
parser.add_argument("job_order", type=str, nargs="?", default=None)
36+
args = parser.parse_args(argv)
37+
38+
if args.version:
39+
pkg = pkg_resources.require("wes_service")
40+
print u"%s %s" % (sys.argv[0], pkg[0].version)
41+
exit(0)
42+
43+
http_client = RequestsClient()
44+
split = urlparse.urlsplit("%s://%s/" % (args.proto, args.host))
45+
46+
http_client.set_api_key(
47+
split.hostname, args.auth,
48+
param_name='Authorization', param_in='header')
49+
client = SwaggerClient.from_url("%s://%s/swagger.json" % (args.proto, args.host),
50+
http_client=http_client, config={'use_models': False})
51+
52+
if args.list:
53+
l = client.WorkflowExecutionService.ListWorkflows()
54+
json.dump(l.result(), sys.stdout, indent=4)
55+
return 0
56+
57+
if args.log:
58+
l = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.log)
59+
sys.stdout.write(l.result()["workflow_log"]["stderr"])
60+
return 0
61+
62+
if args.get:
63+
l = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.get)
64+
json.dump(l.result(), sys.stdout, indent=4)
65+
return 0
66+
67+
with open(args.job_order) as f:
68+
input = yaml.safe_load(f)
69+
basedir = os.path.dirname(args.job_order)
70+
def fixpaths(d):
71+
if isinstance(d, dict) and "location" in d:
72+
if not ":" in d["location"]:
73+
d["location"] = urllib.pathname2url(os.path.normpath(os.path.join(os.getcwd(), basedir, d["location"])))
74+
visit(input, fixpaths)
75+
76+
workflow_url = args.workflow_url
77+
if not workflow_url.startswith("/") and ":" not in workflow_url:
78+
workflow_url = os.path.abspath(workflow_url)
79+
80+
if args.quiet:
81+
logging.basicConfig(level=logging.WARNING)
82+
else:
83+
logging.basicConfig(level=logging.INFO)
84+
85+
r = client.WorkflowExecutionService.RunWorkflow(body={
86+
"workflow_url": workflow_url,
87+
"workflow_params": input,
88+
"workflow_type": "CWL",
89+
"workflow_type_version": "v1.0"}).result()
90+
91+
logging.info("Workflow id is %s", r["workflow_id"])
92+
93+
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
94+
while r["state"] in ("Queued", "Initializing", "Running"):
95+
time.sleep(1)
96+
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
97+
98+
logging.info("State is %s", r["state"])
99+
100+
s = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=r["workflow_id"]).result()
101+
logging.info(s["workflow_log"]["stderr"])
102+
103+
json.dump(s["outputs"], sys.stdout, indent=4)
104+
105+
if r["state"] == "Complete":
106+
return 0
107+
else:
108+
return 1
109+
110+
if __name__ == "__main__":
111+
sys.exit(main(sys.argv[1:]))

wes_service/__init__.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import connexion
2+
from connexion.resolver import Resolver
3+
import connexion.utils as utils
4+
5+
import threading
6+
import tempfile
7+
import subprocess
8+
import uuid
9+
import os
10+
import json
11+
import urllib
12+
import argparse
13+
import sys
14+
15+
from pkg_resources import resource_stream
16+
17+
def main(argv=sys.argv[1:]):
18+
parser = argparse.ArgumentParser(description='Workflow Execution Service')
19+
parser.add_argument("--backend", type=str, default="wes_service.cwl_runner")
20+
parser.add_argument("--port", type=int, default=8080)
21+
parser.add_argument("--opt", type=str, action="append")
22+
args = parser.parse_args(argv)
23+
24+
app = connexion.App(__name__)
25+
backend = utils.get_function_from_name(args.backend + ".create_backend")(args.opt)
26+
def rs(x):
27+
return getattr(backend, x)
28+
29+
res = resource_stream(__name__, 'swagger/proto/workflow_execution.swagger.json')
30+
app.add_api(json.load(res), resolver=Resolver(rs))
31+
32+
app.run(port=args.port)
33+
34+
if __name__ == "__main__":
35+
main(sys.argv[1:])

wes_service/arvados_wes.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import arvados
2+
import arvados.collection
3+
import os
4+
import connexion
5+
import json
6+
import subprocess
7+
import tempfile
8+
from wes_service.util import visit, WESBackend
9+
10+
def get_api():
11+
return arvados.api_from_config(version="v1", apiconfig={
12+
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
13+
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
14+
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false"),
15+
})
16+
17+
18+
statemap = {
19+
"Queued": "Queued",
20+
"Locked": "Initializing",
21+
"Running": "Running",
22+
"Complete": "Complete",
23+
"Cancelled": "Canceled"
24+
}
25+
26+
class ArvadosBackend(WESBackend):
27+
def GetServiceInfo(self):
28+
return {
29+
"workflow_type_versions": {
30+
"CWL": ["v1.0"]
31+
},
32+
"supported_wes_versions": "0.1.0",
33+
"supported_filesystem_protocols": ["file"],
34+
"engine_versions": "cwl-runner",
35+
"system_state_counts": {},
36+
"key_values": {}
37+
}
38+
39+
def ListWorkflows(self, body=None):
40+
# body["page_size"]
41+
# body["page_token"]
42+
# body["key_value_search"]
43+
44+
api = get_api()
45+
46+
requests = api.container_requests().list(filters=[["requesting_container_uuid", "=", None]],
47+
select=["uuid", "command", "container_uuid"]).execute()
48+
containers = api.containers().list(filters=[["uuid", "in", [w["container_uuid"] for w in requests["items"]]]],
49+
select=["uuid", "state"]).execute()
50+
51+
uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers["items"]}
52+
53+
return {
54+
"workflows": [{"workflow_id": cr["uuid"],
55+
"state": uuidmap[cr["container_uuid"]]}
56+
for cr in requests["items"]
57+
if cr["command"][0] == "arvados-cwl-runner"],
58+
"next_page_token": ""
59+
}
60+
61+
def RunWorkflow(self, body):
62+
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
63+
return
64+
65+
env = {
66+
"PATH": os.environ["PATH"],
67+
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
68+
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
69+
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false")
70+
}
71+
with tempfile.NamedTemporaryFile() as inputtemp:
72+
json.dump(body["workflow_params"], inputtemp)
73+
inputtemp.flush()
74+
workflow_id = subprocess.check_output(["arvados-cwl-runner", "--submit", "--no-wait", "--api=containers",
75+
body.get("workflow_url"), inputtemp.name], env=env).strip()
76+
return {"workflow_id": workflow_id}
77+
78+
79+
def GetWorkflowLog(self, workflow_id):
80+
api = get_api()
81+
82+
request = api.container_requests().get(uuid=workflow_id).execute()
83+
container = api.containers().get(uuid=request["container_uuid"]).execute()
84+
85+
outputobj = {}
86+
if request["output_uuid"]:
87+
c = arvados.collection.CollectionReader(request["output_uuid"])
88+
with c.open("cwl.output.json") as f:
89+
outputobj = json.load(f)
90+
def keepref(d):
91+
if isinstance(d, dict) and "location" in d:
92+
d["location"] = "keep:%s/%s" % (c.portable_data_hash(), d["location"])
93+
visit(outputobj, keepref)
94+
95+
stderr = ""
96+
if request["log_uuid"]:
97+
c = arvados.collection.CollectionReader(request["log_uuid"])
98+
if "stderr.txt" in c:
99+
with c.open("stderr.txt") as f:
100+
stderr = f.read()
101+
102+
r = {
103+
"workflow_id": request["uuid"],
104+
"request": {},
105+
"state": statemap[container["state"]],
106+
"workflow_log": {
107+
"cmd": [""],
108+
"startTime": "",
109+
"endTime": "",
110+
"stdout": "",
111+
"stderr": stderr
112+
},
113+
"task_logs": [],
114+
"outputs": outputobj
115+
}
116+
if container["exit_code"] is not None:
117+
r["workflow_log"]["exitCode"] = container["exit_code"]
118+
return r
119+
120+
121+
def CancelJob(self, workflow_id):
122+
api = get_api()
123+
request = api.container_requests().update(body={"priority": 0}).execute()
124+
return {"workflow_id": request["uuid"]}
125+
126+
def GetWorkflowStatus(self, workflow_id):
127+
api = get_api()
128+
request = api.container_requests().get(uuid=workflow_id).execute()
129+
container = api.containers().get(uuid=request["container_uuid"]).execute()
130+
return {"workflow_id": request["uuid"],
131+
"state": statemap[container["state"]]}
132+
133+
def create_backend(opts):
134+
return ArvadosBackend(optdict)

0 commit comments

Comments
 (0)