Skip to content

Commit 0bd64a6

Browse files
tetrondavid4096
authored andcommitted
Multipart upload (#22)
* Improvements to arvados_wes around logs and paging. * Add page-size to wes-client * Proof of concept using multipart upload. * Set cwd to tempdir, clean up afterwards. * Update to latest multipart proposal. * Fix interface
1 parent 3f1ed26 commit 0bd64a6

File tree

5 files changed

+264
-104
lines changed

5 files changed

+264
-104
lines changed

wes_client/__init__.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,20 @@
99
import argparse
1010
import logging
1111
import schema_salad.ref_resolver
12+
import requests
1213
from wes_service.util import visit
1314
from bravado.client import SwaggerClient
1415
from bravado.requests_client import RequestsClient
1516

16-
1717
def main(argv=sys.argv[1:]):
1818
parser = argparse.ArgumentParser(description='Workflow Execution Service')
1919
parser.add_argument("--host", type=str, default=os.environ.get("WES_API_HOST"))
2020
parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_AUTH"))
2121
parser.add_argument("--proto", type=str, default=os.environ.get("WES_API_PROTO", "https"))
2222
parser.add_argument("--quiet", action="store_true", default=False)
2323
parser.add_argument("--outdir", type=str)
24+
parser.add_argument("--page", type=str, default=None)
25+
parser.add_argument("--page-size", type=int, default=None)
2426

2527
exgroup = parser.add_mutually_exclusive_group()
2628
exgroup.add_argument("--run", action="store_true", default=False)
@@ -54,7 +56,7 @@ def main(argv=sys.argv[1:]):
5456
http_client=http_client, config={'use_models': False})
5557

5658
if args.list:
57-
response = client.WorkflowExecutionService.ListWorkflows()
59+
response = client.WorkflowExecutionService.ListWorkflows(page_token=args.page, page_size=args.page_size)
5860
json.dump(response.result(), sys.stdout, indent=4)
5961
return 0
6062

@@ -98,10 +100,10 @@ def fixpaths(d):
98100
if loc.startswith("http:") or loc.startswith("https:"):
99101
logging.error("Directory inputs not supported with http references")
100102
exit(33)
101-
if not (loc.startswith("http:") or loc.startswith("https:")
102-
or args.job_order.startswith("http:") or args.job_order.startswith("https:")):
103-
logging.error("Upload local files not supported, must use http: or https: references.")
104-
exit(33)
103+
# if not (loc.startswith("http:") or loc.startswith("https:")
104+
# or args.job_order.startswith("http:") or args.job_order.startswith("https:")):
105+
# logging.error("Upload local files not supported, must use http: or https: references.")
106+
# exit(33)
105107

106108
visit(input, fixpaths)
107109

@@ -114,19 +116,37 @@ def fixpaths(d):
114116
else:
115117
logging.basicConfig(level=logging.INFO)
116118

117-
body = {
118-
"workflow_params": input,
119-
"workflow_type": "CWL",
120-
"workflow_type_version": "v1.0"
121-
}
119+
parts = [
120+
("workflow_params", json.dumps(input)),
121+
("workflow_type", "CWL"),
122+
("workflow_type_version", "v1.0")
123+
]
122124

123125
if workflow_url.startswith("file://"):
124-
with open(workflow_url[7:], "r") as f:
125-
body["workflow_descriptor"] = f.read()
126+
# with open(workflow_url[7:], "rb") as f:
127+
# body["workflow_descriptor"] = f.read()
128+
rootdir = os.path.dirname(workflow_url[7:])
129+
dirpath = rootdir
130+
#for dirpath, dirnames, filenames in os.walk(rootdir):
131+
for f in os.listdir(rootdir):
132+
if f.startswith("."):
133+
continue
134+
fn = os.path.join(dirpath, f)
135+
if os.path.isfile(fn):
136+
parts.append(('workflow_descriptor', (fn[len(rootdir)+1:], open(fn, "rb"))))
137+
parts.append(("workflow_url", os.path.basename(workflow_url[7:])))
126138
else:
127-
body["workflow_url"] = workflow_url
139+
parts.append(("workflow_url", workflow_url))
140+
141+
postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/workflows" % (args.proto, args.host),
142+
files=parts,
143+
headers={"Authorization": args.auth})
144+
145+
r = json.loads(postresult.text)
128146

129-
r = client.WorkflowExecutionService.RunWorkflow(body=body).result()
147+
if postresult.status_code != 200:
148+
logging.error("%s", r)
149+
exit(1)
130150

131151
if args.wait:
132152
logging.info("Workflow id is %s", r["workflow_id"])
@@ -137,15 +157,17 @@ def fixpaths(d):
137157
r = client.WorkflowExecutionService.GetWorkflowStatus(
138158
workflow_id=r["workflow_id"]).result()
139159
while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"):
140-
time.sleep(1)
160+
time.sleep(8)
141161
r = client.WorkflowExecutionService.GetWorkflowStatus(
142162
workflow_id=r["workflow_id"]).result()
143163

144164
logging.info("State is %s", r["state"])
145165

146166
s = client.WorkflowExecutionService.GetWorkflowLog(
147167
workflow_id=r["workflow_id"]).result()
148-
logging.info("Workflow log:\n"+s["workflow_log"]["stderr"])
168+
logging.info("%s", s["workflow_log"]["stderr"])
169+
logs = requests.get(s["workflow_log"]["stderr"], headers={"Authorization": args.auth}).text
170+
logging.info("Workflow log:\n"+logs)
149171

150172
if "fields" in s["outputs"] and s["outputs"]["fields"] is None:
151173
del s["outputs"]["fields"]

wes_service/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ def setup(args=None):
2828
logging.info(" %s: %s", n, getattr(args, n))
2929

3030
app = connexion.App(__name__)
31-
backend = utils.get_function_from_name(args.backend + ".create_backend")(args.opt)
31+
backend = utils.get_function_from_name(
32+
args.backend + ".create_backend")(app, args.opt)
3233

3334
def rs(x):
34-
return getattr(backend, x)
35+
return getattr(backend, x.split('.')[-1])
3536

3637
app.add_api(
3738
'openapi/workflow_execution_service.swagger.yaml',

0 commit comments

Comments
 (0)