Skip to content

Commit 96603d2

Browse files
committed
make hash does not need to be part of basic memoizer
and is more reusable when it isn't this isn't the only way to make a hash though. and hashing isn't the only way to compare checkpoint entries for equality.
1 parent bb5e882 commit 96603d2

File tree

1 file changed

+37
-36
lines changed

1 file changed

+37
-36
lines changed

parsl/dataflow/memoization.py

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,42 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
118118
return pickle.dumps(["types.FunctionType", f.__name__, f.__module__])
119119

120120

121+
def make_hash(task: TaskRecord) -> str:
122+
"""Create a hash of the task inputs.
123+
124+
Args:
125+
- task (dict) : Task dictionary from dfk.tasks
126+
127+
Returns:
128+
- hash (str) : A unique hash string
129+
"""
130+
131+
t: List[bytes] = []
132+
133+
# if kwargs contains an outputs parameter, that parameter is removed
134+
# and normalised differently - with output_ref set to True.
135+
# kwargs listed in ignore_for_cache will also be removed
136+
137+
filtered_kw = task['kwargs'].copy()
138+
139+
ignore_list = task['ignore_for_cache']
140+
141+
logger.debug("Ignoring these kwargs for checkpointing: %s", ignore_list)
142+
for k in ignore_list:
143+
logger.debug("Ignoring kwarg %s", k)
144+
del filtered_kw[k]
145+
146+
if 'outputs' in task['kwargs']:
147+
outputs = task['kwargs']['outputs']
148+
del filtered_kw['outputs']
149+
t.append(id_for_memo(outputs, output_ref=True))
150+
151+
t.extend(map(id_for_memo, (filtered_kw, task['func'], task['args'])))
152+
153+
x = b''.join(t)
154+
return hashlib.md5(x).hexdigest()
155+
156+
121157
class Memoizer:
122158
def update_memo_exception(self, task: TaskRecord, e: BaseException) -> None:
123159
raise NotImplementedError
@@ -239,41 +275,6 @@ def close(self) -> None:
239275
logger.info("Stopping checkpoint timer")
240276
self._checkpoint_timer.close()
241277

242-
def make_hash(self, task: TaskRecord) -> str:
243-
"""Create a hash of the task inputs.
244-
245-
Args:
246-
- task (dict) : Task dictionary from dfk.tasks
247-
248-
Returns:
249-
- hash (str) : A unique hash string
250-
"""
251-
252-
t: List[bytes] = []
253-
254-
# if kwargs contains an outputs parameter, that parameter is removed
255-
# and normalised differently - with output_ref set to True.
256-
# kwargs listed in ignore_for_cache will also be removed
257-
258-
filtered_kw = task['kwargs'].copy()
259-
260-
ignore_list = task['ignore_for_cache']
261-
262-
logger.debug("Ignoring these kwargs for checkpointing: %s", ignore_list)
263-
for k in ignore_list:
264-
logger.debug("Ignoring kwarg %s", k)
265-
del filtered_kw[k]
266-
267-
if 'outputs' in task['kwargs']:
268-
outputs = task['kwargs']['outputs']
269-
del filtered_kw['outputs']
270-
t.append(id_for_memo(outputs, output_ref=True))
271-
272-
t.extend(map(id_for_memo, (filtered_kw, task['func'], task['args'])))
273-
274-
x = b''.join(t)
275-
return hashlib.md5(x).hexdigest()
276-
277278
def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
278279
"""Create a hash of the task and its inputs and check the lookup table for this hash.
279280
@@ -295,7 +296,7 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
295296
logger.debug("Task {} will not be memoized".format(task_id))
296297
return None
297298

298-
hashsum = self.make_hash(task)
299+
hashsum = make_hash(task)
299300
logger.debug("Task {} has memoization hash {}".format(task_id, hashsum))
300301
result = None
301302
if hashsum in self.memo_lookup_table:

0 commit comments

Comments
 (0)