Skip to content

Commit e642e77

Browse files
author
Peter Amstutz
committed
Crazy!
1 parent 15b7004 commit e642e77

File tree

2 files changed

+165
-129
lines changed

2 files changed

+165
-129
lines changed

cwl_runner_wes.py

Lines changed: 97 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5,89 +5,114 @@
55
import threading
66
import tempfile
77
import subprocess
8-
9-
jobs_lock = threading.Lock()
10-
jobs = []
11-
12-
class Job(threading.Thread):
13-
def __init__(self, jobid, path, inputobj):
14-
super(Job, self).__init__()
15-
self.jobid = jobid
16-
self.path = path
17-
self.inputobj = inputobj
18-
self.updatelock = threading.Lock()
19-
self.begin()
20-
21-
def begin(self):
22-
loghandle, self.logname = tempfile.mkstemp()
23-
with self.updatelock:
24-
self.outdir = tempfile.mkdtemp()
25-
self.inputtemp = tempfile.NamedTemporaryFile()
26-
json.dump(self.inputtemp, self.inputobj)
27-
self.proc = subprocess.Popen(["cwl-runner", self.path, self.inputtemp.name],
28-
stdin=subprocess.PIPE,
29-
stdout=subprocess.PIPE,
30-
stderr=loghandle,
31-
close_fds=True,
32-
cwd=self.outdir)
33-
self.status = {
34-
"id": "%sjobs/%i" % (request.url_root, self.jobid),
35-
"log": "%sjobs/%i/log" % (request.url_root, self.jobid),
36-
"run": self.path,
37-
"state": "Running",
38-
"input": json.loads(self.inputobj),
39-
"output": None}
40-
41-
def run(self):
42-
self.stdoutdata, self.stderrdata = self.proc.communicate(self.inputobj)
43-
if self.proc.returncode == 0:
44-
outobj = yaml.load(self.stdoutdata)
45-
with self.updatelock:
46-
self.status["state"] = "Success"
47-
self.status["output"] = outobj
8+
import uuid
9+
import os
10+
import json
11+
12+
class Workflow(object):
13+
def __init__(self, workflow_ID):
14+
super(Workflow, self).__init__()
15+
self.workflow_ID = workflow_ID
16+
self.workdir = os.path.abspath(self.workflow_ID)
17+
18+
def run(self, path, inputobj):
19+
outdir = os.path.join(self.workdir, "outdir")
20+
with open(os.path.join(self.workdir, "cwl.input.json"), "w") as inputtemp:
21+
json.dump(inputtemp, inputobj)
22+
with open(os.path.join(self.workdir, "workflow_url"), "w") as f:
23+
f.write(path)
24+
output = open(os.path.join(self.workdir, "cwl.output.json"), "w")
25+
stderr = open(os.path.join(self.workdir, "stderr"), "w")
26+
27+
proc = subprocess.Popen(["cwl-runner", path, inputtemp.name],
28+
stdout=output,
29+
stderr=stderr,
30+
close_fds=True,
31+
cwd=outdir)
32+
stdout.close()
33+
stderr.close()
34+
with open(os.path.join(self.workdir, "pid"), "w") as pid:
35+
pid.write(str(proc.pid))
36+
37+
return self.getstatus()
38+
39+
def getstate(self):
40+
state = "Running"
41+
exit_code = -1
42+
43+
exc = os.path.join(self.workdir, "exit_code")
44+
if os.path.exists(exc):
45+
with open(exc) as f:
46+
exit_code = int(f.read())
47+
if exit_code == 0:
48+
state = "Complete"
49+
else:
50+
state = "Failed"
4851
else:
49-
with self.updatelock:
50-
self.status["state"] = "Failed"
52+
with open(os.path.join(self.workdir, "pid"), "r") as pid:
53+
pid = int(pid.read())
54+
(_pid, exit_status) = os.waitpid(pid, os.WNOHANG)
55+
# record exit code
56+
57+
return (state, exit_code)
5158

5259
def getstatus(self):
53-
with self.updatelock:
54-
return self.status.copy()
60+
state, exit_code = self.getstate()
61+
62+
with open(os.path.join(self.workdir, "cwl.input.json"), "r") as inputtemp:
63+
inputobj = json.load(inputtemp)
64+
with open(os.path.join(self.workdir, "workflow_url"), "r") as f:
65+
workflow_url = f.read()
66+
67+
outputobj = None
68+
if state == "Complete":
69+
with open(os.path.join(self.workdir, "cwl.output.json"), "r") as outputtemp:
70+
outputtobj = json.load(outputtemp)
71+
72+
return {
73+
"workflow_ID": self.workflow_ID,
74+
"workflow_url": workflow_url,
75+
"input": inputobj,
76+
"output": outputobj,
77+
"state": state
78+
}
79+
80+
81+
def getlog(self):
82+
state, exit_code = self.getstate()
83+
84+
return {
85+
"workflow_ID": self.workflow_ID,
86+
"log": {
87+
"cmd": "",
88+
"startTime": "",
89+
"endTime": "",
90+
"stdout": "",
91+
"stderr": "",
92+
"exitCode": exit_code
93+
}
94+
}
5595

5696
def cancel(self):
57-
if self.status["state"] == "Running":
58-
self.proc.send_signal(signal.SIGQUIT)
59-
with self.updatelock:
60-
self.status["state"] = "Canceled"
61-
62-
def pause(self):
63-
if self.status["state"] == "Running":
64-
self.proc.send_signal(signal.SIGTSTP)
65-
with self.updatelock:
66-
self.status["state"] = "Paused"
67-
68-
def resume(self):
69-
if self.status["state"] == "Paused":
70-
self.proc.send_signal(signal.SIGCONT)
71-
with self.updatelock:
72-
self.status["state"] = "Running"
73-
97+
pass
7498

7599
def GetWorkflowStatus(workflow_ID):
76-
return {"workflow_ID": workflow_ID}
100+
job = Workflow(workflow_ID)
101+
job.getstatus()
77102

78-
def GetWorkflowLog():
79-
pass
103+
def GetWorkflowLog(workflow_ID):
104+
job = Workflow(workflow_ID)
105+
job.getlog()
80106

81-
def CancelJob():
82-
pass
107+
def CancelWorkflow(workflow_ID):
108+
job = Workflow(workflow_ID)
109+
job.cancel()
83110

84111
def RunWorkflow(body):
85-
with jobs_lock:
86-
jobid = len(jobs)
87-
job = Job(jobid, body["workflow_url"], body["inputs"])
88-
job.start()
89-
jobs.append(job)
90-
return {"workflow_ID": str(jobid)}
112+
workflow_ID = uuid.uuid4().hex
113+
job = Workflow(workflow_ID)
114+
job.run(body["workflow_url"], body["input"])
115+
return job.getstatus()
91116

92117
def main():
93118
app = connexion.App(__name__, specification_dir='swagger/')

0 commit comments

Comments
 (0)