Skip to content

Commit 0d6df34

Browse files
author
Peter Amstutz
committed
Update to latest WES (wip)
1 parent 7d77a6c commit 0d6df34

File tree

4 files changed

+134
-116
lines changed

4 files changed

+134
-116
lines changed

wes_client/wes_client_main.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,17 @@ def main(argv=sys.argv[1:]):
6262
http_client=http_client, config={"use_models": False})
6363

6464
if args.list:
65-
response = client.WorkflowExecutionService.ListWorkflows(page_token=args.page, page_size=args.page_size)
65+
response = client.WorkflowExecutionService.ListRuns(page_token=args.page, page_size=args.page_size)
6666
json.dump(response.result(), sys.stdout, indent=4)
6767
return 0
6868

6969
if args.log:
70-
response = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.log)
70+
response = client.WorkflowExecutionService.GetRunLog(workflow_id=args.log)
7171
sys.stdout.write(response.result()["workflow_log"]["stderr"])
7272
return 0
7373

7474
if args.get:
75-
response = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.get)
75+
response = client.WorkflowExecutionService.GetRunLog(workflow_id=args.get)
7676
json.dump(response.result(), sys.stdout, indent=4)
7777
return 0
7878

@@ -147,14 +147,14 @@ def fixpaths(d):
147147
sys.stdout.write(r["workflow_id"] + "\n")
148148
exit(0)
149149

150-
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
150+
r = client.WorkflowExecutionService.GetRunStatus(workflow_id=r["workflow_id"]).result()
151151
while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"):
152152
time.sleep(8)
153-
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
153+
r = client.WorkflowExecutionService.GetRunStatus(workflow_id=r["workflow_id"]).result()
154154

155155
logging.info("State is %s", r["state"])
156156

157-
s = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=r["workflow_id"]).result()
157+
s = client.WorkflowExecutionService.GetRunLog(workflow_id=r["workflow_id"]).result()
158158

159159
try:
160160
# TODO: Only works with Arvados atm

wes_service/arvados_wes.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def GetServiceInfo(self):
8181
}
8282

8383
@catch_exceptions
84-
def ListWorkflows(self, page_size=None, page_token=None, tag_search=None, state_search=None):
84+
def ListRuns(self, page_size=None, page_token=None, state_search=None):
8585
api = get_api()
8686

8787
paging = []
@@ -100,13 +100,13 @@ def ListWorkflows(self, page_size=None, page_token=None, tag_search=None, state_
100100

101101
uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers}
102102

103-
workflow_list = [{"workflow_id": cr["uuid"],
103+
workflow_list = [{"run_id": cr["uuid"],
104104
"state": uuidmap.get(cr["container_uuid"])}
105105
for cr in requests
106106
if cr["command"] and cr["command"][0] == "arvados-cwl-runner"]
107107
return {
108108
"workflows": workflow_list,
109-
"next_page_token": workflow_list[-1]["workflow_id"] if workflow_list else ""
109+
"next_page_token": workflow_list[-1]["run_id"] if workflow_list else ""
110110
}
111111

112112
def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params,
@@ -215,13 +215,13 @@ def RunWorkflow(self, workflow_params, workflow_type, workflow_type_version,
215215
project_uuid,
216216
tempdir)).start()
217217

218-
return {"workflow_id": cr["uuid"]}
218+
return {"run_id": cr["uuid"]}
219219

220220
@catch_exceptions
221-
def GetWorkflowLog(self, workflow_id):
221+
def GetRunLog(self, run_id):
222222
api = get_api()
223223

224-
request = api.container_requests().get(uuid=workflow_id).execute()
224+
request = api.container_requests().get(uuid=run_id).execute()
225225
if request["container_uuid"]:
226226
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
227227
task_reqs = arvados.util.list_all(api.container_requests().list, filters=[["requesting_container_uuid", "=", container["uuid"]]])
@@ -273,7 +273,7 @@ def log_object(cr):
273273
return r
274274

275275
r = {
276-
"workflow_id": request["uuid"],
276+
"run_id": request["uuid"],
277277
"request": {
278278
"workflow_url": "",
279279
"workflow_params": request["mounts"].get("/var/lib/cwl/cwl.input.json", {}).get("content", {})
@@ -287,30 +287,30 @@ def log_object(cr):
287287
return r
288288

289289
@catch_exceptions
290-
def CancelJob(self, workflow_id): # NOQA
290+
def CancelRun(self, run_id): # NOQA
291291
api = get_api()
292-
request = api.container_requests().update(uuid=workflow_id, body={"priority": 0}).execute() # NOQA
293-
return {"workflow_id": request["uuid"]}
292+
request = api.container_requests().update(uuid=run_id, body={"priority": 0}).execute() # NOQA
293+
return {"run_id": request["uuid"]}
294294

295295
@catch_exceptions
296-
def GetWorkflowStatus(self, workflow_id):
296+
def GetRunStatus(self, run_id):
297297
api = get_api()
298-
request = api.container_requests().get(uuid=workflow_id).execute()
298+
request = api.container_requests().get(uuid=run_id).execute()
299299
if request["container_uuid"]:
300300
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
301301
elif request["priority"] == 0:
302302
container = {"state": "Cancelled"}
303303
else:
304304
container = {"state": "Queued"}
305-
return {"workflow_id": request["uuid"],
305+
return {"run_id": request["uuid"],
306306
"state": statemap[container["state"]]}
307307

308308

309-
def dynamic_logs(workflow_id, logstream):
309+
def dynamic_logs(run_id, logstream):
310310
api = get_api()
311-
cr = api.container_requests().get(uuid=workflow_id).execute()
311+
cr = api.container_requests().get(uuid=run_id).execute()
312312
l1 = [t["properties"]["text"]
313-
for t in api.logs().list(filters=[["object_uuid", "=", workflow_id],
313+
for t in api.logs().list(filters=[["object_uuid", "=", run_id],
314314
["event_type", "=", logstream]],
315315
order="created_at desc",
316316
limit=100).execute()["items"]]
@@ -327,5 +327,5 @@ def dynamic_logs(workflow_id, logstream):
327327

328328
def create_backend(app, opts):
329329
ab = ArvadosBackend(opts)
330-
app.app.route('/ga4gh/wes/v1/workflows/<workflow_id>/x-dynamic-logs/<logstream>')(dynamic_logs)
330+
app.app.route('/ga4gh/wes/v1/runs/<run_id>/x-dynamic-logs/<logstream>')(dynamic_logs)
331331
return ab

wes_service/cwl_runner.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313

1414

1515
class Workflow(object):
16-
def __init__(self, workflow_id):
16+
def __init__(self, run_id):
1717
super(Workflow, self).__init__()
18-
self.workflow_id = workflow_id
19-
self.workdir = os.path.join(os.getcwd(), "workflows", self.workflow_id)
18+
self.run_id = run_id
19+
self.workdir = os.path.join(os.getcwd(), "workflows", self.run_id)
2020

2121
def run(self, request, opts):
2222
"""
@@ -37,7 +37,7 @@ def run(self, request, opts):
3737
:param dict request: A dictionary containing the cwl/json information.
3838
:param wes_service.util.WESBackend opts: contains the user's arguments;
3939
specifically the runner and runner options
40-
:return: {"workflow_id": self.workflow_id, "state": state}
40+
:return: {"run_id": self.run_id, "state": state}
4141
"""
4242
os.makedirs(self.workdir)
4343
outdir = os.path.join(self.workdir, "outdir")
@@ -117,7 +117,7 @@ def getstatus(self):
117117
state, exit_code = self.getstate()
118118

119119
return {
120-
"workflow_id": self.workflow_id,
120+
"run_id": self.run_id,
121121
"state": state
122122
}
123123

@@ -137,7 +137,7 @@ def getlog(self):
137137
outputobj = json.load(outputtemp)
138138

139139
return {
140-
"workflow_id": self.workflow_id,
140+
"run_id": self.run_id,
141141
"request": request,
142142
"state": state,
143143
"workflow_log": {
@@ -169,14 +169,14 @@ def GetServiceInfo(self):
169169
"key_values": {}
170170
}
171171

172-
def ListWorkflows(self):
172+
def ListRuns(self, page_size=None, page_token=None, state_search=None):
173173
# FIXME #15 results don't page
174174
wf = []
175175
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
176176
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
177177
wf.append(Workflow(l))
178178

179-
workflows = [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf] # NOQA
179+
workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA
180180
return {
181181
"workflows": workflows,
182182
"next_page_token": ""
@@ -204,23 +204,23 @@ def RunWorkflow(self):
204204
if index > 0:
205205
body["workflow_url"] = body["workflow_url"][index:]
206206

207-
workflow_id = uuid.uuid4().hex
208-
job = Workflow(workflow_id)
207+
run_id = uuid.uuid4().hex
208+
job = Workflow(run_id)
209209

210210
job.run(body, self)
211-
return {"workflow_id": workflow_id}
211+
return {"run_id": run_id}
212212

213-
def GetWorkflowLog(self, workflow_id):
214-
job = Workflow(workflow_id)
213+
def GetRunLog(self, run_id):
214+
job = Workflow(run_id)
215215
return job.getlog()
216216

217-
def CancelJob(self, workflow_id):
218-
job = Workflow(workflow_id)
217+
def CancelRun(self, run_id):
218+
job = Workflow(run_id)
219219
job.cancel()
220-
return {"workflow_id": workflow_id}
220+
return {"run_id": run_id}
221221

222-
def GetWorkflowStatus(self, workflow_id):
223-
job = Workflow(workflow_id)
222+
def GetRunStatus(self, run_id):
223+
job = Workflow(run_id)
224224
return job.getstatus()
225225

226226

0 commit comments

Comments
 (0)