Skip to content

Commit c5e8721

Browse files
author
Peter Amstutz
committed
GA4GH workflow execution service prototype using bravado and connexion.
1 parent f871fd5 commit c5e8721

File tree

3 files changed

+87
-13
lines changed

3 files changed

+87
-13
lines changed

cwl_runner_wes.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import threading
2+
import tempfile
3+
import subprocess
4+
5+
jobs_lock = threading.Lock()
6+
jobs = []
7+
8+
class Job(threading.Thread):
9+
def __init__(self, jobid, path, inputobj):
10+
super(Job, self).__init__()
11+
self.jobid = jobid
12+
self.path = path
13+
self.inputobj = inputobj
14+
self.updatelock = threading.Lock()
15+
self.begin()
16+
17+
def begin(self):
18+
loghandle, self.logname = tempfile.mkstemp()
19+
with self.updatelock:
20+
self.outdir = tempfile.mkdtemp()
21+
self.inputtemp = tempfile.NamedTemporaryFile()
22+
json.dump(self.inputtemp, self.inputobj)
23+
self.proc = subprocess.Popen(["cwl-runner", self.path, self.inputtemp.name],
24+
stdin=subprocess.PIPE,
25+
stdout=subprocess.PIPE,
26+
stderr=loghandle,
27+
close_fds=True,
28+
cwd=self.outdir)
29+
self.status = {
30+
"id": "%sjobs/%i" % (request.url_root, self.jobid),
31+
"log": "%sjobs/%i/log" % (request.url_root, self.jobid),
32+
"run": self.path,
33+
"state": "Running",
34+
"input": json.loads(self.inputobj),
35+
"output": None}
36+
37+
def run(self):
38+
self.stdoutdata, self.stderrdata = self.proc.communicate(self.inputobj)
39+
if self.proc.returncode == 0:
40+
outobj = yaml.load(self.stdoutdata)
41+
with self.updatelock:
42+
self.status["state"] = "Success"
43+
self.status["output"] = outobj
44+
else:
45+
with self.updatelock:
46+
self.status["state"] = "Failed"
47+
48+
def getstatus(self):
49+
with self.updatelock:
50+
return self.status.copy()
51+
52+
def cancel(self):
53+
if self.status["state"] == "Running":
54+
self.proc.send_signal(signal.SIGQUIT)
55+
with self.updatelock:
56+
self.status["state"] = "Canceled"
57+
58+
def pause(self):
59+
if self.status["state"] == "Running":
60+
self.proc.send_signal(signal.SIGTSTP)
61+
with self.updatelock:
62+
self.status["state"] = "Paused"
63+
64+
def resume(self):
65+
if self.status["state"] == "Paused":
66+
self.proc.send_signal(signal.SIGCONT)
67+
with self.updatelock:
68+
self.status["state"] = "Running"
69+
70+
71+
def GetWorkflowStatus(workflow_ID):
72+
return {"workflow_ID": workflow_ID}
73+
74+
def GetWorkflowLog():
75+
pass
76+
77+
def CancelJob():
78+
pass
79+
80+
def RunWorkflow(body):
81+
with jobs_lock:
82+
jobid = len(jobs)
83+
job = Job(jobid, body["workflow_url"], body["inputs"])
84+
job.start()
85+
jobs.append(job)
86+
return {"workflow_ID": str(jobid)}

myapp.py

Lines changed: 0 additions & 12 deletions
This file was deleted.

server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
app = connexion.App(__name__, specification_dir='swagger/')
77
def rs(x):
8-
return utils.get_function_from_name("myapp." + x)
8+
return utils.get_function_from_name("cwl_runner_wes." + x)
99

1010
app.add_api('proto/workflow_execution.swagger.json', resolver=Resolver(rs))
1111

0 commit comments

Comments
 (0)