Skip to content

Commit 2033372

Browse files
author
Peter Amstutz
committed
Update to 0.1.0 spec (WIP)
1 parent ed75de8 commit 2033372

File tree

2 files changed

+361
-142
lines changed

2 files changed

+361
-142
lines changed

cwl_runner_wes.py

Lines changed: 76 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,36 @@
88
import uuid
99
import os
1010
import json
11+
import urllib
1112

1213
class Workflow(object):
13-
def __init__(self, workflow_ID):
14+
def __init__(self, workflow_id):
1415
super(Workflow, self).__init__()
15-
self.workflow_ID = workflow_ID
16-
self.workdir = os.path.abspath(self.workflow_ID)
16+
self.workflow_id = workflow_id
17+
self.workdir = os.path.join(os.getcwd(), "workflows", self.workflow_id)
1718

18-
def run(self, path, inputobj):
19-
path = os.path.abspath(path)
20-
os.mkdir(self.workdir)
19+
def run(self, request):
20+
os.makedirs(self.workdir)
2121
outdir = os.path.join(self.workdir, "outdir")
2222
os.mkdir(outdir)
23+
24+
with open(os.path.join(self.workdir, "request.json"), "w") as f:
25+
json.dump(request, f)
26+
2327
with open(os.path.join(self.workdir, "cwl.input.json"), "w") as inputtemp:
24-
json.dump(inputobj, inputtemp)
25-
with open(os.path.join(self.workdir, "workflow_url"), "w") as f:
26-
f.write(path)
28+
inputtemp.write(request["workflow_params"])
29+
30+
if request.get("workflow_descriptor"):
31+
with open(os.path.join(self.workdir, "workflow.cwl"), "w") as f:
32+
f.write(workflow_descriptor)
33+
workflow_url = urllib.pathname2url(os.path.join(self.workdir, "workflow.cwl"))
34+
else:
35+
workflow_url = request.get("workflow_url")
36+
2737
output = open(os.path.join(self.workdir, "cwl.output.json"), "w")
2838
stderr = open(os.path.join(self.workdir, "stderr"), "w")
2939

30-
proc = subprocess.Popen(["cwl-runner", path, inputtemp.name],
40+
proc = subprocess.Popen(["cwl-runner", workflow_url, inputtemp.name],
3141
stdout=output,
3242
stderr=stderr,
3343
close_fds=True,
@@ -60,70 +70,96 @@ def getstate(self):
6070
if exit_code == 0:
6171
state = "Complete"
6272
elif exit_code != -1:
63-
state = "Failed"
73+
state = "Error"
6474

6575
return (state, exit_code)
6676

6777
def getstatus(self):
6878
state, exit_code = self.getstate()
6979

70-
with open(os.path.join(self.workdir, "cwl.input.json"), "r") as inputtemp:
71-
inputobj = json.load(inputtemp)
72-
with open(os.path.join(self.workdir, "workflow_url"), "r") as f:
73-
workflow_url = f.read()
74-
75-
outputobj = {}
76-
if state == "Complete":
77-
with open(os.path.join(self.workdir, "cwl.output.json"), "r") as outputtemp:
78-
outputobj = json.load(outputtemp)
79-
8080
return {
81-
"workflow_ID": self.workflow_ID,
82-
"workflow_url": workflow_url,
83-
"input": inputobj,
84-
"output": outputobj,
81+
"workflow_id": self.workflow_id,
8582
"state": state
8683
}
8784

8885

8986
def getlog(self):
9087
state, exit_code = self.getstate()
9188

89+
with open(os.path.join(self.workdir, "request.json"), "r") as f:
90+
request = json.load(f)
91+
9292
with open(os.path.join(self.workdir, "stderr"), "r") as f:
9393
stderr = f.read()
9494

95+
if state == "Complete":
96+
with open(os.path.join(self.workdir, "cwl.output.json"), "r") as outputtemp:
97+
outputobj = json.load(outputtemp)
98+
9599
return {
96-
"workflow_ID": self.workflow_ID,
97-
"log": {
100+
"workflow_id": self.workflow_id,
101+
"request": request,
102+
"state": state,
103+
"workflow_log": {
98104
"cmd": [""],
99105
"startTime": "",
100106
"endTime": "",
101107
"stdout": "",
102108
"stderr": stderr,
103109
"exitCode": exit_code
104-
}
110+
},
111+
"task_logs": [],
112+
"outputs": []
105113
}
106114

107115
def cancel(self):
108116
pass
109117

110-
def GetWorkflowStatus(workflow_ID):
111-
job = Workflow(workflow_ID)
112-
return job.getstatus()
118+
def GetServiceInfo():
119+
return {
120+
"workflow_type_versions": {
121+
"CWL": ["v1.0"]
122+
},
123+
"supported_wes_versions": "0.1.0",
124+
"supported_filesystem_protocols": ["file"],
125+
"engine_versions": "cwl-runner",
126+
"system_state_counts": {},
127+
"key_values": {}
128+
}
129+
130+
def ListWorkflows(body):
131+
# body["page_size"]
132+
# body["page_token"]
133+
# body["key_value_search"]
134+
135+
wf = []
136+
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
137+
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
138+
wf.append(Workflow(l))
139+
return {
140+
"workflows": [{"workflow_id": w.workflow_id, "state": w.getstate()} for w in wf],
141+
"next_page_token": ""
142+
}
113143

114-
def GetWorkflowLog(workflow_ID):
115-
job = Workflow(workflow_ID)
144+
def RunWorkflow(body):
145+
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
146+
return
147+
workflow_id = uuid.uuid4().hex
148+
job = Workflow(workflow_id)
149+
job.run(body)
150+
return {"workflow_id": workflow_id}
151+
152+
def GetWorkflowLog(workflow_id):
153+
job = Workflow(workflow_id)
116154
return job.getlog()
117155

118-
def CancelWorkflow(workflow_ID):
119-
job = Workflow(workflow_ID)
156+
def CancelJob(workflow_id):
157+
job = Workflow(workflow_id)
120158
job.cancel()
121-
return job.getstatus()
159+
return {"workflow_id": workflow_id}
122160

123-
def RunWorkflow(body):
124-
workflow_ID = uuid.uuid4().hex
125-
job = Workflow(workflow_ID)
126-
job.run(body["workflow_url"], body["input"])
161+
def GetWorkflowStatus(workflow_id):
162+
job = Workflow(workflow_id)
127163
return job.getstatus()
128164

129165
def main():

0 commit comments

Comments
 (0)