Skip to content

Commit eabc46e

Browse files
authored
Improve memory usage on large inputs (WIP) (#1008)
* Improve memory efficiency by avoiding filling in "nameext" and "nameroot" strings redundantly * Add warning recommending changing loadListing when number of input files is very large.
1 parent 70eb049 commit eabc46e

File tree

12 files changed

+117
-23
lines changed

12 files changed

+117
-23
lines changed

MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ include *requirements.txt
44
include gittaggers.py Makefile cwltool.py
55
include tests/*
66
include tests/tmp1/tmp2/tmp3/.gitkeep
7+
include tests/tmp4/alpha/*
78
include tests/wf/*
89
include tests/override/*
910
include tests/checker_wf/*

cwltool/command_line_tool.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,17 @@ def check_adjust(builder, file_o):
203203
assert builder.pathmapper is not None
204204
file_o["path"] = docker_windows_path_adjust(
205205
builder.pathmapper.mapper(file_o["location"])[1])
206-
file_o["dirname"], file_o["basename"] = os.path.split(file_o["path"])
206+
dn, bn = os.path.split(file_o["path"])
207+
if file_o.get("dirname") != dn:
208+
file_o["dirname"] = Text(dn)
209+
if file_o.get("basename") != bn:
210+
file_o["basename"] = Text(bn)
207211
if file_o["class"] == "File":
208-
file_o["nameroot"], file_o["nameext"] = os.path.splitext(
209-
file_o["basename"])
212+
nr, ne = os.path.splitext(file_o["basename"])
213+
if file_o.get("nameroot") != nr:
214+
file_o["nameroot"] = Text(nr)
215+
if file_o.get("nameext") != ne:
216+
file_o["nameext"] = Text(ne)
210217
if not ACCEPTLIST_RE.match(file_o["basename"]):
211218
raise WorkflowException(
212219
"Invalid filename: '{}' contains illegal characters".format(

cwltool/pathmapper.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,18 @@ def addLocation(d):
7474
path = path.rstrip("/")
7575
d["location"] = urllib.parse.urlunparse((parse.scheme, parse.netloc, path, parse.params, parse.query, parse.fragment))
7676

77-
if "basename" not in d:
78-
d["basename"] = os.path.basename(urllib.request.url2pathname(path))
77+
if not d.get("basename"):
78+
if path.startswith("_:"):
79+
d["basename"] = Text(path[2:])
80+
else:
81+
d["basename"] = Text(os.path.basename(urllib.request.url2pathname(path)))
7982

8083
if d["class"] == "File":
81-
d["nameroot"], d["nameext"] = os.path.splitext(d["basename"])
84+
nr, ne = os.path.splitext(d["basename"])
85+
if d.get("nameroot") != nr:
86+
d["nameroot"] = Text(nr)
87+
if d.get("nameext") != ne:
88+
d["nameext"] = Text(ne)
8289

8390
visit_class(job, ("File", "Directory"), addLocation)
8491

@@ -106,6 +113,12 @@ def mark(d):
106113

107114
def get_listing(fs_access, rec, recursive=True):
108115
# type: (StdFsAccess, MutableMapping[Text, Any], bool) -> None
116+
if rec.get("class") != "Directory":
117+
finddirs = [] # type: List[MutableMapping]
118+
visit_class(rec, ("Directory",), finddirs.append)
119+
for f in finddirs:
120+
get_listing(fs_access, f, recursive=recursive)
121+
return
109122
if "listing" in rec:
110123
return
111124
listing = []

cwltool/process.py

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
Text)
2929
from schema_salad import schema, validate
3030
from schema_salad.ref_resolver import Loader, file_uri
31-
from schema_salad.sourceline import SourceLine
31+
from schema_salad.sourceline import SourceLine, strip_dup_lineno
3232

3333
from . import expression
3434
from .builder import Builder, HasReqsHints
@@ -369,7 +369,6 @@ def fill_in_defaults(inputs, # type: List[Dict[Text, Text]]
369369
job[fieldname] = None
370370
else:
371371
raise WorkflowException("Missing required input parameter '%s'" % shortname(inp["id"]))
372-
visit_class(job, ("File",), functools.partial(add_sizes, fsaccess))
373372

374373

375374
def avroize_type(field_type, name_prefix=""):
@@ -434,10 +433,12 @@ def var_spool_cwl_detector(obj, # type: Union[MutableMapping, List, Te
434433

435434
def eval_resource(builder, resource_req): # type: (Builder, Text) -> Any
436435
if expression.needs_parsing(resource_req):
437-
visit_class(builder.job, ("File",), functools.partial(add_sizes, builder.fs_access))
438436
return builder.do_eval(resource_req)
439437
return resource_req
440438

439+
# Threshold where the "too many files" warning kicks in
440+
FILE_COUNT_WARNING = 5000
441+
441442
class Process(with_metaclass(abc.ABCMeta, HasReqsHints)):
442443
def __init__(self,
443444
toolpath_object, # type: MutableMapping[Text, Any]
@@ -581,16 +582,59 @@ def _init_job(self, joborder, runtime_context):
581582
make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess)
582583
fs_access = make_fs_access(runtime_context.basedir)
583584

585+
load_listing_req, _ = self.get_requirement(
586+
"http://commonwl.org/cwltool#LoadListingRequirement")
587+
if load_listing_req is not None:
588+
load_listing = load_listing_req.get("loadListing")
589+
else:
590+
load_listing = "deep_listing" # will default to "no_listing" in CWL v1.1
591+
584592
# Validate job order
585593
try:
586594
fill_in_defaults(self.tool[u"inputs"], job, fs_access)
595+
587596
normalizeFilesDirs(job)
588597
schema = self.names.get_name("input_record_schema", "")
589598
if schema is None:
590599
raise WorkflowException("Missing input record schema: "
591600
"{}".format(self.names))
592601
validate.validate_ex(schema, job, strict=False,
593602
logger=_logger_validation_warnings)
603+
604+
if load_listing and load_listing != "no_listing":
605+
get_listing(fs_access, job, recursive=(load_listing == "deep_listing"))
606+
607+
visit_class(job, ("File",), functools.partial(add_sizes, fs_access))
608+
609+
if load_listing == "deep_listing" and load_listing_req is None:
610+
for i, inparm in enumerate(self.tool["inputs"]):
611+
k = shortname(inparm["id"])
612+
if k not in job:
613+
continue
614+
v = job[k]
615+
dircount = [0]
616+
def inc(d): # type: (List[int]) -> None
617+
d[0] += 1
618+
visit_class(v, ("Directory",), lambda x: inc(dircount))
619+
if dircount[0] == 0:
620+
continue
621+
filecount = [0]
622+
visit_class(v, ("File",), lambda x: inc(filecount))
623+
if filecount[0] > FILE_COUNT_WARNING:
624+
# Long lines in this message are okay, will be reflowed based on terminal columns.
625+
_logger.warning(strip_dup_lineno(SourceLine(self.tool["inputs"], i, Text).makeError(
626+
"""Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use.
627+
628+
If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior:
629+
630+
$namespaces:
631+
cwltool: "http://commonwl.org/cwltool#"
632+
hints:
633+
cwltool:LoadListingRequirement:
634+
loadListing: shallow_listing
635+
636+
""" % (filecount[0], k))))
637+
594638
except (validate.ValidationException, WorkflowException) as err:
595639
raise WorkflowException("Invalid job input record:\n" + Text(err))
596640

@@ -599,13 +643,6 @@ def _init_job(self, joborder, runtime_context):
599643
tmpdir = u""
600644
stagedir = u""
601645

602-
load_listing_req, _ = self.get_requirement(
603-
"http://commonwl.org/cwltool#LoadListingRequirement")
604-
if load_listing_req is not None:
605-
load_listing = load_listing_req.get("loadListing")
606-
else:
607-
load_listing = "deep_listing" # will default to "no_listing" in CWL v1.1
608-
609646
docker_req, _ = self.get_requirement("DockerRequirement")
610647
default_docker = None
611648

cwltool/workflow.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
import tempfile
99
from collections import namedtuple
1010
from typing import (Any, Callable, Dict, Generator, Iterable, List,
11-
Mapping, MutableMapping, MutableSequence, Optional, Tuple, Union)
11+
Mapping, MutableMapping, MutableSequence,
12+
Optional, Tuple, Union, cast)
1213
from uuid import UUID # pylint: disable=unused-import
1314

1415
from ruamel.yaml.comments import CommentedMap
@@ -290,6 +291,9 @@ def receive_output(self, step, outputparms, final_output_callback, jobout, proce
290291
_logger.info(u"[%s] completed %s", step.name, processStatus)
291292

292293
step.completed = True
294+
# Release the iterable related to this step to
295+
# reclaim memory.
296+
step.iterable = None
293297
self.made_progress = True
294298

295299
completed = sum(1 for s in self.steps if s.completed)
@@ -570,7 +574,7 @@ def job(self,
570574
runtimeContext # type: RuntimeContext
571575
): # type: (...) -> Generator[Any, None, None]
572576
builder = self._init_job(job_order, runtimeContext)
573-
#relativeJob=copy.deepcopy(builder.job)
577+
574578
if runtimeContext.research_obj is not None:
575579
if runtimeContext.toplevel:
576580
# Record primary-job.json
@@ -810,12 +814,18 @@ def __init__(self,
810814
self.processStatus = u"success"
811815
self.total = total
812816
self.output_callback = output_callback
817+
self.steps = [] # type: List[Optional[Generator]]
813818

814819
def receive_scatter_output(self, index, jobout, processStatus):
815820
# type: (int, Dict[Text, Text], Text) -> None
816821
for key, val in jobout.items():
817822
self.dest[key][index] = val
818823

824+
# Release the iterable related to this step to
825+
# reclaim memory.
826+
if self.steps:
827+
self.steps[index] = None
828+
819829
if processStatus != "success":
820830
if self.processStatus != "permanentFail":
821831
self.processStatus = processStatus
@@ -825,8 +835,9 @@ def receive_scatter_output(self, index, jobout, processStatus):
825835
if self.completed == self.total:
826836
self.output_callback(self.dest, self.processStatus)
827837

828-
def setTotal(self, total): # type: (int) -> None
838+
def setTotal(self, total, steps): # type: (int, List[Generator]) -> None
829839
self.total = total
840+
self.steps = cast(List[Optional[Generator]], steps)
830841
if self.completed == self.total:
831842
self.output_callback(self.dest, self.processStatus)
832843

@@ -838,6 +849,8 @@ def parallel_steps(steps, rc, runtimeContext):
838849
for index, step in enumerate(steps):
839850
if getdefault(runtimeContext.on_error, "stop") == "stop" and rc.processStatus != "success":
840851
break
852+
if step is None:
853+
continue
841854
try:
842855
for j in step:
843856
if getdefault(runtimeContext.on_error, "stop") == "stop" and rc.processStatus != "success":
@@ -847,6 +860,8 @@ def parallel_steps(steps, rc, runtimeContext):
847860
yield j
848861
else:
849862
break
863+
if made_progress:
864+
break
850865
except WorkflowException as exc:
851866
_logger.error(u"Cannot make scatter job: %s", exc)
852867
_logger.debug("", exc_info=True)
@@ -890,7 +905,7 @@ def dotproduct_scatter(process, # type: WorkflowJobStep
890905
sjobo, functools.partial(rc.receive_scatter_output, index),
891906
runtimeContext))
892907

893-
rc.setTotal(jobl)
908+
rc.setTotal(jobl, steps)
894909
return parallel_steps(steps, rc, runtimeContext)
895910

896911

@@ -925,7 +940,7 @@ def nested_crossproduct_scatter(process, # type: WorkflowJobStep
925940
functools.partial(rc.receive_scatter_output, index),
926941
runtimeContext))
927942

928-
rc.setTotal(jobl)
943+
rc.setTotal(jobl, steps)
929944
return parallel_steps(steps, rc, runtimeContext)
930945

931946

@@ -952,7 +967,7 @@ def flat_crossproduct_scatter(process, # type: WorkflowJobStep
952967
callback = ReceiveScatterOutput(output_callback, output, 0)
953968
(steps, total) = _flat_crossproduct_scatter(
954969
process, joborder, scatter_keys, callback, 0, runtimeContext)
955-
callback.setTotal(total)
970+
callback.setTotal(total, steps)
956971
return parallel_steps(steps, callback, runtimeContext)
957972

958973
def _flat_crossproduct_scatter(process, # type: WorkflowJobStep

tests/listing2-job.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
d:
2+
class: Directory
3+
location: tmp4

tests/test_ext.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,17 @@
33
import os
44
import shutil
55
import tempfile
6+
import sys
7+
import re
8+
from io import StringIO
69

710
import pytest
811

912
from cwltool.main import main
13+
import cwltool.process
1014

11-
from .util import get_data, needs_docker, temp_dir, windows_needs_docker
15+
16+
from .util import get_data, needs_docker, temp_dir, windows_needs_docker, get_main_output
1217

1318

1419
@needs_docker
@@ -197,3 +202,16 @@ def test_require_prefix_timelimit():
197202
assert main(["--enable-ext", get_data('tests/wf/timelimit.cwl')]) == 0
198203
assert main([get_data('tests/wf/timelimit.cwl')]) != 0
199204
assert main(["--enable-ext", get_data('tests/wf/timelimit-fail.cwl')]) != 0
205+
206+
def test_warn_large_inputs():
207+
was = cwltool.process.FILE_COUNT_WARNING
208+
try:
209+
stream = StringIO()
210+
211+
cwltool.process.FILE_COUNT_WARNING = 3
212+
main([get_data('tests/wf/listing_v1_0.cwl'), get_data('tests/listing2-job.yml')],
213+
stderr=stream)
214+
215+
assert "Recursive directory listing has resulted in a large number of File objects" in re.sub("\n *", " ", stream.getvalue())
216+
finally:
217+
cwltool.process.FILE_COUNT_WARNING = was

tests/tmp4/alpha/baker

Whitespace-only changes.

tests/tmp4/alpha/charlie

Whitespace-only changes.

tests/tmp4/alpha/delta

Whitespace-only changes.

0 commit comments

Comments
 (0)