Skip to content

Commit 674596a

Browse files
committed
More flake fixes
1 parent 6812c9c commit 674596a

File tree

4 files changed

+62
-38
lines changed

4 files changed

+62
-38
lines changed

wes_client/__init__.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from bravado.requests_client import RequestsClient
55
import json
66
import time
7-
import pprint
87
import sys
98
import os
109
import argparse
@@ -15,12 +14,15 @@
1514
import urllib
1615
import ruamel.yaml as yaml
1716

18-
def main(argv=sys.argv[1:]):
1917

18+
def main(argv=sys.argv[1:]):
2019
parser = argparse.ArgumentParser(description='Workflow Execution Service')
21-
parser.add_argument("--host", type=str, default=os.environ.get("WES_API_HOST"))
22-
parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_AUTH"))
23-
parser.add_argument("--proto", type=str, default=os.environ.get("WES_API_PROTO", "https"))
20+
parser.add_argument(
21+
"--host", type=str, default=os.environ.get("WES_API_HOST"))
22+
parser.add_argument(
23+
"--auth", type=str, default=os.environ.get("WES_API_AUTH"))
24+
parser.add_argument(
25+
"--proto", type=str, default=os.environ.get("WES_API_PROTO", "https"))
2426
parser.add_argument("--quiet", action="store_true", default=False)
2527
parser.add_argument("--outdir", type=str)
2628

@@ -32,8 +34,10 @@ def main(argv=sys.argv[1:]):
3234
exgroup.add_argument("--version", action="store_true", default=False)
3335

3436
exgroup = parser.add_mutually_exclusive_group()
35-
exgroup.add_argument("--wait", action="store_true", default=True, dest="wait")
36-
exgroup.add_argument("--no-wait", action="store_false", default=True, dest="wait")
37+
exgroup.add_argument(
38+
"--wait", action="store_true", default=True, dest="wait")
39+
exgroup.add_argument(
40+
"--no-wait", action="store_false", default=True, dest="wait")
3741

3842
parser.add_argument("workflow_url", type=str, nargs="?", default=None)
3943
parser.add_argument("job_order", type=str, nargs="?", default=None)
@@ -50,31 +54,37 @@ def main(argv=sys.argv[1:]):
5054
http_client.set_api_key(
5155
split.hostname, args.auth,
5256
param_name='Authorization', param_in='header')
53-
client = SwaggerClient.from_url("%s://%s/ga4gh/wes/v1/swagger.json" % (args.proto, args.host),
54-
http_client=http_client, config={'use_models': False})
57+
client = SwaggerClient.from_url(
58+
"%s://%s/ga4gh/wes/v1/swagger.json" % (args.proto, args.host),
59+
http_client=http_client, config={'use_models': False})
5560

5661
if args.list:
5762
l = client.WorkflowExecutionService.ListWorkflows()
5863
json.dump(l.result(), sys.stdout, indent=4)
5964
return 0
6065

6166
if args.log:
62-
l = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.log)
67+
l = client.WorkflowExecutionService.GetWorkflowLog(
68+
workflow_id=args.log)
6369
sys.stdout.write(l.result()["workflow_log"]["stderr"])
6470
return 0
6571

6672
if args.get:
67-
l = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.get)
73+
l = client.WorkflowExecutionService.GetWorkflowLog(
74+
workflow_id=args.get)
6875
json.dump(l.result(), sys.stdout, indent=4)
6976
return 0
7077

7178
with open(args.job_order) as f:
7279
input = yaml.safe_load(f)
7380
basedir = os.path.dirname(args.job_order)
81+
7482
def fixpaths(d):
7583
if isinstance(d, dict) and "location" in d:
76-
if not ":" in d["location"]:
77-
d["location"] = urllib.pathname2url(os.path.normpath(os.path.join(os.getcwd(), basedir, d["location"])))
84+
if ":" not in d["location"]:
85+
local_path = os.path.normpath(
86+
os.path.join(os.getcwd(), basedir, d["location"]))
87+
d["location"] = urllib.pathname2url(local_path)
7888
visit(input, fixpaths)
7989

8090
workflow_url = args.workflow_url
@@ -98,14 +108,17 @@ def fixpaths(d):
98108
sys.stdout.write(r["workflow_id"]+"\n")
99109
exit(0)
100110

101-
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
111+
r = client.WorkflowExecutionService.GetWorkflowStatus(
112+
workflow_id=r["workflow_id"]).result()
102113
while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"):
103114
time.sleep(1)
104-
r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result()
115+
r = client.WorkflowExecutionService.GetWorkflowStatus(
116+
workflow_id=r["workflow_id"]).result()
105117

106118
logging.info("State is %s", r["state"])
107119

108-
s = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=r["workflow_id"]).result()
120+
s = client.WorkflowExecutionService.GetWorkflowLog(
121+
workflow_id=r["workflow_id"]).result()
109122
logging.info(s["workflow_log"]["stderr"])
110123

111124
if "fields" in s["outputs"] and s["outputs"]["fields"] is None:
@@ -117,5 +130,6 @@ def fixpaths(d):
117130
else:
118131
return 1
119132

133+
120134
if __name__ == "__main__":
121135
sys.exit(main(sys.argv[1:]))

wes_service/__init__.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,23 @@
88

99
def main(argv=sys.argv[1:]):
1010
parser = argparse.ArgumentParser(description='Workflow Execution Service')
11-
parser.add_argument("--backend", type=str, default="wes_service.cwl_runner")
11+
parser.add_argument(
12+
"--backend", type=str, default="wes_service.cwl_runner")
1213
parser.add_argument("--port", type=int, default=8080)
1314
parser.add_argument("--opt", type=str, action="append")
1415
parser.add_argument("--debug", action="store_true", default=False)
1516
args = parser.parse_args(argv)
1617

1718
app = connexion.App(__name__)
18-
backend = utils.get_function_from_name(args.backend + ".create_backend")(args.opt)
19+
backend = utils.get_function_from_name(
20+
args.backend + ".create_backend")(args.opt)
1921

2022
def rs(x):
2123
return getattr(backend, x)
2224

23-
app.add_api('openapi/workflow_execution_service.swagger.yaml', resolver=Resolver(rs))
25+
app.add_api(
26+
'openapi/workflow_execution_service.swagger.yaml',
27+
resolver=Resolver(rs))
2428

2529
app.run(port=args.port, debug=args.debug)
2630

wes_service/arvados_wes.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def get_api():
1212
return arvados.api_from_config(version="v1", apiconfig={
1313
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
1414
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
15-
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false"),
15+
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false"), # NOQA
1616
})
1717

1818

@@ -41,12 +41,13 @@ def GetServiceInfo(self):
4141
def ListWorkflows(self):
4242
api = get_api()
4343

44-
requests = api.container_requests().list(filters=[["requesting_container_uuid", "=", None]],
45-
select=["uuid", "command", "container_uuid"]).execute()
46-
containers = api.containers().list(filters=[["uuid", "in", [w["container_uuid"] for w in requests["items"]]]],
44+
requests = api.container_requests().list(
45+
filters=[["requesting_container_uuid", "=", None]],
46+
select=["uuid", "command", "container_uuid"]).execute()
47+
containers = api.containers().list(filters=[["uuid", "in", [w["container_uuid"] for w in requests["items"]]]], # NOQA
4748
select=["uuid", "state"]).execute()
4849

49-
uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers["items"]}
50+
uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers["items"]} # NOQA
5051

5152
return {
5253
"workflows": [{"workflow_id": cr["uuid"],
@@ -57,27 +58,27 @@ def ListWorkflows(self):
5758
}
5859

5960
def RunWorkflow(self, body):
60-
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
61+
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0": # NOQA
6162
return
6263

6364
env = {
6465
"PATH": os.environ["PATH"],
6566
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
6667
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
67-
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false")
68+
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false") # NOQA
6869
}
6970
with tempfile.NamedTemporaryFile() as inputtemp:
7071
json.dump(body["workflow_params"], inputtemp)
7172
inputtemp.flush()
72-
workflow_id = subprocess.check_output(["arvados-cwl-runner", "--submit", "--no-wait", "--api=containers",
73-
body.get("workflow_url"), inputtemp.name], env=env).strip()
73+
workflow_id = subprocess.check_output(["arvados-cwl-runner", "--submit", "--no-wait", "--api=containers", # NOQA
74+
body.get("workflow_url"), inputtemp.name], env=env).strip() # NOQA
7475
return {"workflow_id": workflow_id}
7576

7677
def GetWorkflowLog(self, workflow_id):
7778
api = get_api()
7879

7980
request = api.container_requests().get(uuid=workflow_id).execute()
80-
container = api.containers().get(uuid=request["container_uuid"]).execute()
81+
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
8182

8283
outputobj = {}
8384
if request["output_uuid"]:
@@ -87,7 +88,7 @@ def GetWorkflowLog(self, workflow_id):
8788

8889
def keepref(d):
8990
if isinstance(d, dict) and "location" in d:
90-
d["location"] = "keep:%s/%s" % (c.portable_data_hash(), d["location"])
91+
d["location"] = "keep:%s/%s" % (c.portable_data_hash(), d["location"]) # NOQA
9192

9293
visit(outputobj, keepref)
9394

@@ -116,15 +117,15 @@ def keepref(d):
116117
r["workflow_log"]["exitCode"] = container["exit_code"]
117118
return r
118119

119-
def CancelJob(self, workflow_id): # NOQA
120+
def CancelJob(self, workflow_id): # NOQA
120121
api = get_api()
121-
request = api.container_requests().update(body={"priority": 0}).execute()
122+
request = api.container_requests().update(body={"priority": 0}).execute() # NOQA
122123
return {"workflow_id": request["uuid"]}
123124

124125
def GetWorkflowStatus(self, workflow_id):
125126
api = get_api()
126127
request = api.container_requests().get(uuid=workflow_id).execute()
127-
container = api.containers().get(uuid=request["container_uuid"]).execute()
128+
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
128129
return {"workflow_id": request["uuid"],
129130
"state": statemap[container["state"]]}
130131

wes_service/cwl_runner.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ def run(self, request, opts):
4141

4242
runner = opts.getopt("runner", "cwl-runner")
4343
extra = opts.getoptlist("extra")
44-
45-
proc = subprocess.Popen([runner] + extra + [workflow_url, inputtemp.name],
44+
command_args = [runner] + extra + [workflow_url, inputtemp.name]
45+
proc = subprocess.Popen(command_args,
4646
stdout=output,
4747
stderr=stderr,
4848
close_fds=True,
@@ -102,7 +102,8 @@ def getlog(self):
102102

103103
outputobj = {}
104104
if state == "COMPLETE":
105-
with open(os.path.join(self.workdir, "cwl.output.json"), "r") as outputtemp:
105+
output_path = os.path.join(self.workdir, "cwl.output.json")
106+
with open(output_path, "r") as outputtemp:
106107
outputobj = json.load(outputtemp)
107108

108109
return {
@@ -144,13 +145,17 @@ def ListWorkflows(self):
144145
for l in os.listdir(os.path.join(os.getcwd(), "workflows")):
145146
if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)):
146147
wf.append(Workflow(l))
148+
149+
workflows = [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf] # NOQA
147150
return {
148-
"workflows": [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf],
151+
"workflows": workflows,
149152
"next_page_token": ""
150153
}
151154

152155
def RunWorkflow(self, body):
153-
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0":
156+
# FIXME Add error responses #16
157+
if body["workflow_type"] != "CWL" or \
158+
body["workflow_type_version"] != "v1.0":
154159
return
155160
workflow_id = uuid.uuid4().hex
156161
job = Workflow(workflow_id)

0 commit comments

Comments
 (0)