Skip to content

Commit e8d3e63

Browse files
committed
resolving conflicts
2 parents 0c84519 + 18a4803 commit e8d3e63

23 files changed

+1389
-417
lines changed

.github/workflows/testpydra.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
steps:
2121
- uses: actions/checkout@v2
2222
- name: Disable etelemetry
23-
run: echo ::set-env name=NO_ET::TRUE
23+
run: echo "NO_ET=TRUE" >> $GITHUB_ENV
2424
- name: Set up Python ${{ matrix.python-version }} on ${{ matrix.os }}
2525
uses: actions/setup-python@v2
2626
with:

.github/workflows/testsingularity.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ jobs:
1414
steps:
1515
- name: Set env
1616
run: |
17-
echo ::set-env name=RELEASE_VERSION::v3.5.0
18-
echo ::set-env name=NO_ET::TRUE
17+
echo "RELEASE_VERSION=v3.5.0" >> $GITHUB_ENV
18+
echo "NO_ET=TRUE" >> $GITHUB_ENV
1919
- name: Setup Singularity
2020
uses: actions/checkout@v2
2121
with:

.github/workflows/testslurm.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ jobs:
1010

1111
steps:
1212
- name: Disable etelemetry
13-
run: echo ::set-env name=NO_ET::TRUE
13+
run: echo "NO_ET=TRUE" >> $GITHUB_ENV
1414
- uses: actions/checkout@v2
1515
- name: Pull docker image
1616
run: |

pydra/engine/core.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import os
77
from pathlib import Path
88
import typing as ty
9-
from copy import deepcopy, copy
9+
from copy import deepcopy
10+
from uuid import uuid4
1011

1112
import cloudpickle as cp
1213
from filelock import SoftFileLock
@@ -163,7 +164,6 @@ def __init__(
163164

164165
# checking if metadata is set properly
165166
self.inputs.check_metadata()
166-
self.state_inputs = inputs
167167
# dictionary to save the connections with lazy fields
168168
self.inp_lf = {}
169169
self.state = None
@@ -184,6 +184,7 @@ def __init__(
184184
self.cache_locations = cache_locations
185185
self.allow_cache_override = True
186186
self._checksum = None
187+
self._uid = uuid4().hex
187188
# if True the results are not checked (does not propagate to nodes)
188189
self.task_rerun = rerun
189190

@@ -262,6 +263,9 @@ def checksum_states(self, state_index=None):
262263
TODO
263264
264265
"""
266+
if is_workflow(self) and self.inputs._graph_checksums is attr.NOTHING:
267+
self.inputs._graph_checksums = [nd.checksum for nd in self.graph_sorted]
268+
265269
if state_index is not None:
266270
inputs_copy = deepcopy(self.inputs)
267271
for key, ind in self.state.inputs_ind[state_index].items():
@@ -289,6 +293,14 @@ def checksum_states(self, state_index=None):
289293
checksum_list.append(self.checksum_states(state_index=ind))
290294
return checksum_list
291295

296+
@property
297+
def uid(self):
298+
""" the unique id number for the task
299+
It will be used to create unique names for slurm scripts etc.
300+
without a need to run checksum
301+
"""
302+
return self._uid
303+
292304
def set_state(self, splitter, combiner=None):
293305
"""
294306
Set a particular state on this task.
@@ -410,7 +422,10 @@ def _run(self, rerun=False, **kwargs):
410422
lockfile = self.cache_dir / (checksum + ".lock")
411423
# Eagerly retrieve cached - see scenarios in __init__()
412424
self.hooks.pre_run(self)
413-
# TODO add signal handler for processes killed after lock acquisition
425+
# adding info file with the checksum in case the task was cancelled
426+
# and the lockfile has to be removed
427+
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
428+
json.dump({"checksum": self.checksum}, jsonfile)
414429
with SoftFileLock(lockfile):
415430
if not (rerun or self.task_rerun):
416431
result = self.result()
@@ -444,6 +459,8 @@ def _run(self, rerun=False, **kwargs):
444459
self.hooks.post_run_task(self, result)
445460
self.audit.finalize_audit(result)
446461
save(odir, result=result, task=self)
462+
# removing the additional file with the chcksum
463+
(self.cache_dir / f"{self.uid}_info.json").unlink()
447464
# # function etc. shouldn't change anyway, so removing
448465
orig_inputs = dict(
449466
(k, v) for (k, v) in orig_inputs.items() if not k.startswith("_")
@@ -481,7 +498,6 @@ def split(self, splitter, overwrite=False, **kwargs):
481498
)
482499
if kwargs:
483500
self.inputs = attr.evolve(self.inputs, **kwargs)
484-
self.state_inputs = kwargs
485501
if not self.state or splitter != self.state.splitter:
486502
self.set_state(splitter)
487503
return self
@@ -548,8 +564,8 @@ def pickle_task(self):
548564
""" Pickling the tasks with full inputs"""
549565
pkl_files = self.cache_dir / "pkl_files"
550566
pkl_files.mkdir(exist_ok=True, parents=True)
551-
task_main_path = pkl_files / f"{self.name}_{self.checksum}_task.pklz"
552-
save(task_path=pkl_files, task=self, name_prefix=f"{self.name}_{self.checksum}")
567+
task_main_path = pkl_files / f"{self.name}_{self.uid}_task.pklz"
568+
save(task_path=pkl_files, task=self, name_prefix=f"{self.name}_{self.uid}")
553569
return task_main_path
554570

555571
@property
@@ -935,7 +951,6 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
935951
"Workflow output cannot be None, use set_output to define output(s)"
936952
)
937953
checksum = self.checksum
938-
lockfile = self.cache_dir / (checksum + ".lock")
939954
# Eagerly retrieve cached
940955
if not (rerun or self.task_rerun):
941956
result = self.result()
@@ -953,6 +968,11 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
953968
task.cache_locations = task._cache_locations + self.cache_locations
954969
self.create_connections(task)
955970
# TODO add signal handler for processes killed after lock acquisition
971+
# adding info file with the checksum in case the task was cancelled
972+
# and the lockfile has to be removed
973+
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
974+
json.dump({"checksum": checksum}, jsonfile)
975+
lockfile = self.cache_dir / (checksum + ".lock")
956976
self.hooks.pre_run(self)
957977
with SoftFileLock(lockfile):
958978
# # Let only one equivalent process run
@@ -977,6 +997,8 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
977997
self.hooks.post_run_task(self, result)
978998
self.audit.finalize_audit(result=result)
979999
save(odir, result=result, task=self)
1000+
# removing the additional file with the chcksum
1001+
(self.cache_dir / f"{self.uid}_info.json").unlink()
9801002
os.chdir(cwd)
9811003
self.hooks.post_run(self, result)
9821004
return result

pydra/engine/helpers.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import os
99
import sys
1010
from hashlib import sha256
11+
from uuid import uuid4
1112
import subprocess as sp
1213
import getpass
1314
import re
@@ -27,6 +28,8 @@
2728
LazyField,
2829
MultiOutputObj,
2930
MultiInputObj,
31+
MultiInputFile,
32+
MultiOutputFile,
3033
)
3134
from .helpers_file import hash_file, hash_dir, copyfile, is_existing_file
3235

@@ -311,7 +314,15 @@ def custom_validator(instance, attribute, value):
311314
or value is None
312315
or attribute.name.startswith("_") # e.g. _func
313316
or isinstance(value, LazyField)
314-
or tp_attr in [ty.Any, inspect._empty, MultiOutputObj, MultiInputObj]
317+
or tp_attr
318+
in [
319+
ty.Any,
320+
inspect._empty,
321+
MultiOutputObj,
322+
MultiInputObj,
323+
MultiOutputFile,
324+
MultiInputFile,
325+
]
315326
):
316327
check_type = False # no checking of the type
317328
elif isinstance(tp_attr, type) or tp_attr in [File, Directory]:
@@ -651,14 +662,16 @@ def hash_function(obj):
651662
return sha256(str(obj).encode()).hexdigest()
652663

653664

654-
def hash_value(value, tp=None, metadata=None):
665+
def hash_value(value, tp=None, metadata=None, precalculated=None):
655666
"""calculating hash or returning values recursively"""
656667
if metadata is None:
657668
metadata = {}
658669
if isinstance(value, (tuple, list)):
659-
return [hash_value(el, tp, metadata) for el in value]
670+
return [hash_value(el, tp, metadata, precalculated) for el in value]
660671
elif isinstance(value, dict):
661-
dict_hash = {k: hash_value(v, tp, metadata) for (k, v) in value.items()}
672+
dict_hash = {
673+
k: hash_value(v, tp, metadata, precalculated) for (k, v) in value.items()
674+
}
662675
# returning a sorted object
663676
return [list(el) for el in sorted(dict_hash.items(), key=lambda x: x[0])]
664677
else: # not a container
@@ -667,13 +680,13 @@ def hash_value(value, tp=None, metadata=None):
667680
and is_existing_file(value)
668681
and "container_path" not in metadata
669682
):
670-
return hash_file(value)
683+
return hash_file(value, precalculated=precalculated)
671684
elif (
672685
(tp is File or "pydra.engine.specs.Directory" in str(tp))
673686
and is_existing_file(value)
674687
and "container_path" not in metadata
675688
):
676-
return hash_dir(value)
689+
return hash_dir(value, precalculated=precalculated)
677690
else:
678691
return value
679692

@@ -793,6 +806,8 @@ def load_task(task_pkl, ind=None):
793806
_, inputs_dict = task.get_input_el(ind)
794807
task.inputs = attr.evolve(task.inputs, **inputs_dict)
795808
task.state = None
809+
# resetting uid for task
810+
task._uid = uuid4().hex
796811
return task
797812

798813

0 commit comments

Comments
 (0)