Skip to content

Commit c27774b

Browse files
authored
Merge pull request #850 from psafont/oswalk
Make data relocation at the end of workflow runs faster
2 parents ffbe75d + 086b7ea commit c27774b

File tree

8 files changed

+278
-241
lines changed

8 files changed

+278
-241
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ eggs/
99
*.egg-info/
1010
*.egg
1111
.tox/
12+
.pytest_cache
1213

1314
# Editor Temps
1415
.*.sw?
@@ -44,4 +45,5 @@ output.txt
4445
pydocstyle_report.txt
4546
response.txt
4647
test.txt
48+
time.txt
4749
value

cwltool/process.py

Lines changed: 83 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,19 @@
99
import json
1010
import logging
1111
import os
12+
try:
13+
from os import scandir # type: ignore
14+
except ImportError:
15+
from scandir import scandir # type: ignore
1216
import shutil
1317
import stat
1418
import tempfile
1519
import textwrap
1620
import uuid
1721
from collections import Iterable # pylint: disable=unused-import
1822
from io import open
19-
from typing import (Any, Callable, Dict, Generator, List, Optional, Set, Tuple,
20-
Union, cast)
23+
from typing import (Any, Callable, Dict, Generator, Iterator, List, Optional,
24+
Set, Tuple, Union, cast)
2125
from typing_extensions import Text, TYPE_CHECKING # pylint: disable=unused-import
2226
# move to a regular typing import when Python 3.3-3.6 is no longer supported
2327

@@ -263,22 +267,10 @@ def stageFiles(pm, stageFunc=None, ignoreWritable=False, symLink=True, secret_st
263267
n.write(p.resolved.encode("utf-8"))
264268
ensure_writable(p.target)
265269

266-
def collectFilesAndDirs(obj, out):
267-
# type: (Union[Dict[Text, Any], List[Dict[Text, Any]]], List[Dict[Text, Any]]) -> None
268-
if isinstance(obj, dict):
269-
if obj.get("class") in ("File", "Directory"):
270-
out.append(obj)
271-
else:
272-
for v in obj.values():
273-
collectFilesAndDirs(v, out)
274-
if isinstance(obj, list):
275-
for l in obj:
276-
collectFilesAndDirs(l, out)
277-
278270

279271
def relocateOutputs(outputObj, # type: Union[Dict[Text, Any],List[Dict[Text, Any]]]
280-
outdir, # type: Text
281-
output_dirs, # type: Set[Text]
272+
destination_path, # type: Text
273+
source_directories, # type: Set[Text]
282274
action, # type: Text
283275
fs_access, # type: StdFsAccess
284276
compute_checksum=True # type: bool
@@ -289,40 +281,56 @@ def relocateOutputs(outputObj, # type: Union[Dict[Text, Any],List[Di
289281
if action not in ("move", "copy"):
290282
return outputObj
291283

292-
def moveIt(src, dst):
284+
def _collectDirEntries(obj):
285+
# type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> Iterator[Dict[Text, Any]]
286+
if isinstance(obj, dict):
287+
if obj.get("class") in ("File", "Directory"):
288+
yield obj
289+
else:
290+
for sub_obj in obj.values():
291+
for dir_entry in _collectDirEntries(sub_obj):
292+
yield dir_entry
293+
elif isinstance(obj, list):
294+
for sub_obj in obj:
295+
for dir_entry in _collectDirEntries(sub_obj):
296+
yield dir_entry
297+
298+
def _relocate(src, dst):
299+
if src == dst:
300+
return
301+
293302
if action == "move":
294-
for a in output_dirs:
295-
if src.startswith(a+"/"):
296-
_logger.debug("Moving %s to %s", src, dst)
297-
if os.path.isdir(src) and os.path.isdir(dst):
298-
# merge directories
299-
for root, dirs, files in os.walk(src):
300-
for f in dirs+files:
301-
moveIt(os.path.join(root, f), os.path.join(dst, f))
302-
else:
303-
shutil.move(src, dst)
303+
# do not move anything if we are trying to move an entity from
304+
# outside of the source directories
305+
if any(src.startswith(path + "/") for path in source_directories):
306+
_logger.debug("Moving %s to %s", src, dst)
307+
if os.path.isdir(src) and os.path.isdir(dst):
308+
# merge directories
309+
for dir_entry in scandir(src):
310+
_relocate(dir_entry, os.path.join(dst, dir_entry.name))
311+
else:
312+
shutil.move(src, dst)
304313
return
305-
if src != dst:
306-
_logger.debug("Copying %s to %s", src, dst)
307-
if os.path.isdir(src):
308-
if os.path.isdir(dst):
309-
shutil.rmtree(dst)
310-
elif os.path.isfile(dst):
311-
os.unlink(dst)
312-
shutil.copytree(src, dst)
313-
else:
314-
shutil.copy2(src, dst)
315314

316-
outfiles = [] # type: List[Dict[Text, Any]]
317-
collectFilesAndDirs(outputObj, outfiles)
318-
pm = PathMapper(outfiles, "", outdir, separateDirs=False)
319-
stageFiles(pm, stageFunc=moveIt, symLink=False)
315+
_logger.debug("Copying %s to %s", src, dst)
316+
if os.path.isdir(src):
317+
if os.path.isdir(dst):
318+
shutil.rmtree(dst)
319+
elif os.path.isfile(dst):
320+
os.unlink(dst)
321+
shutil.copytree(src, dst)
322+
else:
323+
shutil.copy2(src, dst)
324+
325+
outfiles = list(_collectDirEntries(outputObj))
326+
pm = PathMapper(outfiles, "", destination_path, separateDirs=False)
327+
stageFiles(pm, stageFunc=_relocate, symLink=False)
320328

321-
def _check_adjust(f):
322-
f["location"] = file_uri(pm.mapper(f["location"])[1])
323-
if "contents" in f:
324-
del f["contents"]
325-
return f
329+
def _check_adjust(file):
330+
file["location"] = file_uri(pm.mapper(file["location"])[1])
331+
if "contents" in file:
332+
del file["contents"]
333+
return file
326334

327335
visit_class(outputObj, ("File", "Directory"), _check_adjust)
328336
if compute_checksum:
@@ -331,30 +339,35 @@ def _check_adjust(f):
331339
# If there are symlinks to intermediate output directories, we want to move
332340
# the real files into the final output location. If a file is linked more than once,
333341
# make an internal relative symlink.
342+
def relink(relinked, # type: Dict[Text, Text]
343+
root_path # type: Text
344+
):
345+
for dir_entry in scandir(root_path):
346+
path = dir_entry.path
347+
if os.path.islink(path):
348+
real_path = os.path.realpath(path)
349+
if real_path in relinked:
350+
link_name = relinked[real_path]
351+
if onWindows():
352+
if os.path.isfile(path):
353+
shutil.copy(os.path.relpath(link_name, path), path)
354+
elif os.path.exists(path) and os.path.isdir(path):
355+
shutil.rmtree(path)
356+
copytree_with_merge(os.path.relpath(link_name, path), path)
357+
else:
358+
os.unlink(path)
359+
os.symlink(os.path.relpath(link_name, path), path)
360+
else:
361+
if any(real_path.startswith(path + "/") for path in source_directories):
362+
os.unlink(path)
363+
os.rename(real_path, path)
364+
relinked[real_path] = path
365+
if os.path.isdir(path):
366+
relink(relinked, path)
367+
334368
if action == "move":
335369
relinked = {} # type: Dict[Text, Text]
336-
for root, dirs, files in os.walk(outdir):
337-
for f in dirs+files:
338-
path = os.path.join(root, f)
339-
rp = os.path.realpath(path)
340-
if path != rp:
341-
if rp in relinked:
342-
if onWindows():
343-
if os.path.isfile(path):
344-
shutil.copy(os.path.relpath(relinked[rp], path), path)
345-
elif os.path.exists(path) and os.path.isdir(path):
346-
shutil.rmtree(path)
347-
copytree_with_merge(os.path.relpath(relinked[rp], path), path)
348-
else:
349-
os.unlink(path)
350-
os.symlink(os.path.relpath(relinked[rp], path), path)
351-
else:
352-
for od in output_dirs:
353-
if rp.startswith(od+"/"):
354-
os.unlink(path)
355-
os.rename(rp, path)
356-
relinked[rp] = path
357-
break
370+
relink(relinked, destination_path)
358371

359372
return outputObj
360373

@@ -457,7 +470,6 @@ def eval_resource(builder, resource_req): # type: (Builder, Text) -> Any
457470
return builder.do_eval(resource_req)
458471
return resource_req
459472

460-
461473
class Process(six.with_metaclass(abc.ABCMeta, HasReqsHints)):
462474
def __init__(self,
463475
toolpath_object, # type: Dict[Text, Any]
@@ -573,8 +585,8 @@ def __init__(self,
573585
if dockerReq and dockerReq.get("dockerOutputDirectory") and not is_req:
574586
_logger.warning(SourceLine(
575587
item=dockerReq, raise_type=Text).makeError(
576-
"When 'dockerOutputDirectory' is declared, DockerRequirement "
577-
"should go in the 'requirements' section, not 'hints'."""))
588+
"When 'dockerOutputDirectory' is declared, DockerRequirement "
589+
"should go in the 'requirements' section, not 'hints'."""))
578590

579591
if dockerReq and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl":
580592
if is_req:

cwltool/utils.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,30 +52,16 @@ def aslist(l): # type: (Any) -> List[Any]
5252
return l
5353
return [l]
5454

55-
def copytree_with_merge(src, dst, symlinks=False, ignore=None):
56-
# type: (Text, Text, bool, Callable[..., Any]) -> None
55+
def copytree_with_merge(src, dst): # type: (Text, Text) -> None
5756
if not os.path.exists(dst):
5857
os.makedirs(dst)
5958
shutil.copystat(src, dst)
6059
lst = os.listdir(src)
61-
if ignore:
62-
excl = ignore(src, lst)
63-
lst = [x for x in lst if x not in excl]
6460
for item in lst:
6561
spath = os.path.join(src, item)
6662
dpath = os.path.join(dst, item)
67-
if symlinks and os.path.islink(spath):
68-
if os.path.lexists(dpath):
69-
os.remove(dpath)
70-
os.symlink(os.readlink(spath), dpath)
71-
try:
72-
s_stat = os.lstat(spath)
73-
mode = stat.S_IMODE(s_stat.st_mode)
74-
os.lchmod(dpath, mode)
75-
except:
76-
pass # lchmod not available, only available on unix
77-
elif os.path.isdir(spath):
78-
copytree_with_merge(spath, dpath, symlinks, ignore)
63+
if os.path.isdir(spath):
64+
copytree_with_merge(spath, dpath)
7965
else:
8066
shutil.copy2(spath, dpath)
8167

@@ -144,7 +130,6 @@ def onWindows():
144130
return os.name == 'nt'
145131

146132

147-
148133
def convert_pathsep_to_unix(path): # type: (Text) -> (Text)
149134
"""
150135
On windows os.path.join would use backslash to join path, since we would

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ prov==1.5.1
99
bagit==1.6.4
1010
mypy-extensions
1111
psutil
12+
scandir
1213
subprocess32 >= 3.5.0; os.name=="posix"
1314
typing-extensions

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
'mypy-extensions',
6060
'six >= 1.9.0', # >= 1.9.0 required by prov
6161
'psutil',
62+
'scandir',
6263
'prov == 1.5.1',
6364
'bagit >= 1.6.4',
6465
'typing-extensions',

0 commit comments

Comments
 (0)