Skip to content

Commit 40518a3

Browse files
committed
Update.
1 parent 9b90803 commit 40518a3

File tree

1 file changed

+48
-64
lines changed

1 file changed

+48
-64
lines changed

wes_service/toil_wes.py

Lines changed: 48 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,23 @@
22
import json
33
import os
44
import subprocess
5-
import tempfile
65
import time
76
import logging
87
import urllib
98
import uuid
109

11-
import connexion
1210
from multiprocessing import Process
13-
from werkzeug.utils import secure_filename
1411
from wes_service.util import WESBackend
1512

1613
logging.basicConfig(level=logging.INFO)
1714

1815

1916
class ToilWorkflow(object):
20-
def __init__(self, workflow_id):
17+
def __init__(self, run_id):
2118
super(ToilWorkflow, self).__init__()
22-
self.workflow_id = workflow_id
19+
self.run_id = run_id
2320

24-
self.workdir = os.path.join(os.getcwd(), 'workflows', self.workflow_id)
21+
self.workdir = os.path.join(os.getcwd(), 'runs', self.run_id)
2522
self.outdir = os.path.join(self.workdir, 'outdir')
2623
if not os.path.exists(self.outdir):
2724
os.makedirs(self.outdir)
@@ -124,7 +121,7 @@ def getlog(self):
124121
outputobj = json.load(outputtemp)
125122

126123
return {
127-
"workflow_id": self.workflow_id,
124+
"run_id": self.run_id,
128125
"request": request,
129126
"state": state,
130127
"workflow_log": {
@@ -158,7 +155,7 @@ def run(self, request, opts):
158155
:param dict request: A dictionary containing the cwl/json information.
159156
:param wes_service.util.WESBackend opts: contains the user's arguments;
160157
specifically the runner and runner options
161-
:return: {"workflow_id": self.workflow_id, "state": state}
158+
:return: {"run_id": self.run_id, "state": state}
162159
"""
163160
wftype = request['workflow_type'].lower().strip()
164161
version = request['workflow_type_version']
@@ -170,7 +167,7 @@ def run(self, request, opts):
170167
raise RuntimeError('workflow_type "py" requires '
171168
'"workflow_type_version" to be "2.7": ' + str(version))
172169

173-
logging.info('Beginning Toil Workflow ID: ' + str(self.workflow_id))
170+
logging.info('Beginning Toil Workflow ID: ' + str(self.run_id))
174171

175172
with open(self.starttime, 'w') as f:
176173
f.write(str(time.time()))
@@ -199,37 +196,39 @@ def getstate(self):
199196
state = "RUNNING"
200197
exit_code = -1
201198

202-
# exitcode_file = os.path.join(self.workdir, "exit_code")
203-
#
204-
# if os.path.exists(exitcode_file):
205-
# with open(exitcode_file) as f:
206-
# exit_code = int(f.read())
207-
# elif os.path.exists(self.pidfile):
208-
# with open(self.pidfile, "r") as pid:
209-
# pid = int(pid.read())
210-
# try:
211-
# (_pid, exit_status) = os.waitpid(pid, os.WNOHANG)
212-
# if _pid != 0:
213-
# exit_code = exit_status >> 8
214-
# with open(exitcode_file, "w") as f:
215-
# f.write(str(exit_code))
216-
# os.unlink(self.pidfile)
217-
# except OSError:
218-
# os.unlink(self.pidfile)
219-
# exit_code = 255
220-
#
221-
# if exit_code == 0:
222-
# state = "COMPLETE"
223-
# elif exit_code != -1:
224-
# state = "EXECUTOR_ERROR"
199+
# TODO: This sections gets a pid that finishes before the workflow exits unless it is
200+
# very quick, like md5sum
201+
exitcode_file = os.path.join(self.workdir, "exit_code")
202+
203+
if os.path.exists(exitcode_file):
204+
with open(exitcode_file) as f:
205+
exit_code = int(f.read())
206+
elif os.path.exists(self.pidfile):
207+
with open(self.pidfile, "r") as pid:
208+
pid = int(pid.read())
209+
try:
210+
(_pid, exit_status) = os.waitpid(pid, os.WNOHANG)
211+
if _pid != 0:
212+
exit_code = exit_status >> 8
213+
with open(exitcode_file, "w") as f:
214+
f.write(str(exit_code))
215+
os.unlink(self.pidfile)
216+
except OSError:
217+
os.unlink(self.pidfile)
218+
exit_code = 255
219+
220+
if exit_code == 0:
221+
state = "COMPLETE"
222+
elif exit_code != -1:
223+
state = "EXECUTOR_ERROR"
225224

226225
return state, exit_code
227226

228227
def getstatus(self):
229228
state, exit_code = self.getstate()
230229

231230
return {
232-
"workflow_id": self.workflow_id,
231+
"run_id": self.run_id,
233232
"state": state
234233
}
235234

@@ -251,56 +250,41 @@ def GetServiceInfo(self):
251250
'key_values': {}
252251
}
253252

254-
def ListWorkflows(self):
253+
def ListRuns(self):
255254
# FIXME #15 results don't page
256255
wf = []
257256
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
258257
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
259258
wf.append(ToilWorkflow(l))
260259

261-
workflows = [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf] # NOQA
260+
workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA
262261
return {
263262
"workflows": workflows,
264263
"next_page_token": ""
265264
}
266265

267266
def RunWorkflow(self):
268-
tempdir = tempfile.mkdtemp()
269-
body = {}
270-
for k, ls in connexion.request.files.iterlists():
271-
for v in ls:
272-
if k == "workflow_descriptor":
273-
filename = secure_filename(os.path.basename(v.filename))
274-
v.save(os.path.join(tempdir, filename))
275-
body["workflow_url"] = "file:///%s/%s" % (tempdir, filename)
276-
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
277-
body[k] = json.loads(v.read())
278-
else:
279-
body[k] = v.read()
280-
281-
index = body["workflow_url"].find("http")
282-
if index > 0:
283-
body["workflow_url"] = body["workflow_url"][index:]
284-
285-
workflow_id = uuid.uuid4().hex
286-
job = ToilWorkflow(workflow_id)
267+
tempdir, body = self.collect_attachments()
268+
269+
run_id = uuid.uuid4().hex
270+
job = ToilWorkflow(run_id)
287271
p = Process(target=job.run, args=(body, self))
288272
p.start()
289-
self.processes[workflow_id] = p
290-
return {"workflow_id": workflow_id}
273+
self.processes[run_id] = p
274+
return {"run_id": run_id}
291275

292-
def GetWorkflowLog(self, workflow_id):
293-
job = ToilWorkflow(workflow_id)
276+
def GetRunLog(self, run_id):
277+
job = ToilWorkflow(run_id)
294278
return job.getlog()
295279

296-
def CancelJob(self, workflow_id):
280+
def CancelRun(self, run_id):
297281
# should this block with `p.is_alive()`?
298-
if workflow_id in self.processes:
299-
self.processes[workflow_id].terminate()
300-
return {'workflow_id': workflow_id}
282+
if run_id in self.processes:
283+
self.processes[run_id].terminate()
284+
return {'run_id': run_id}
301285

302-
def GetWorkflowStatus(self, workflow_id):
303-
job = ToilWorkflow(workflow_id)
286+
def GetRunStatus(self, run_id):
287+
job = ToilWorkflow(run_id)
304288
return job.getstatus()
305289

306290

0 commit comments

Comments
 (0)