Skip to content

Commit eef6acb

Browse files
committed
make hash does not need to be part of basic memoizer
and is more reusable when it isn't this isn't the only way to make a hash though. and hashing isn't the only way to compare checkpoint entries for equality.
1 parent e3d5ba7 commit eef6acb

File tree

1 file changed

+37
-36
lines changed

1 file changed

+37
-36
lines changed

parsl/dataflow/memoization.py

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,42 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
118118
return pickle.dumps(["types.FunctionType", f.__name__, f.__module__])
119119

120120

121+
def make_hash(task: TaskRecord) -> str:
122+
"""Create a hash of the task inputs.
123+
124+
Args:
125+
- task (dict) : Task dictionary from dfk.tasks
126+
127+
Returns:
128+
- hash (str) : A unique hash string
129+
"""
130+
131+
t: List[bytes] = []
132+
133+
# if kwargs contains an outputs parameter, that parameter is removed
134+
# and normalised differently - with output_ref set to True.
135+
# kwargs listed in ignore_for_cache will also be removed
136+
137+
filtered_kw = task['kwargs'].copy()
138+
139+
ignore_list = task['ignore_for_cache']
140+
141+
logger.debug("Ignoring these kwargs for checkpointing: %s", ignore_list)
142+
for k in ignore_list:
143+
logger.debug("Ignoring kwarg %s", k)
144+
del filtered_kw[k]
145+
146+
if 'outputs' in task['kwargs']:
147+
outputs = task['kwargs']['outputs']
148+
del filtered_kw['outputs']
149+
t.append(id_for_memo(outputs, output_ref=True))
150+
151+
t.extend(map(id_for_memo, (filtered_kw, task['func'], task['args'])))
152+
153+
x = b''.join(t)
154+
return hashlib.md5(x).hexdigest()
155+
156+
121157
class Memoizer:
122158
def update_memo_exception(self, task: TaskRecord, e: BaseException) -> None:
123159
raise NotImplementedError
@@ -241,41 +277,6 @@ def close(self) -> None:
241277
logger.info("Stopping checkpoint timer")
242278
self._checkpoint_timer.close()
243279

244-
def make_hash(self, task: TaskRecord) -> str:
245-
"""Create a hash of the task inputs.
246-
247-
Args:
248-
- task (dict) : Task dictionary from dfk.tasks
249-
250-
Returns:
251-
- hash (str) : A unique hash string
252-
"""
253-
254-
t: List[bytes] = []
255-
256-
# if kwargs contains an outputs parameter, that parameter is removed
257-
# and normalised differently - with output_ref set to True.
258-
# kwargs listed in ignore_for_cache will also be removed
259-
260-
filtered_kw = task['kwargs'].copy()
261-
262-
ignore_list = task['ignore_for_cache']
263-
264-
logger.debug("Ignoring these kwargs for checkpointing: %s", ignore_list)
265-
for k in ignore_list:
266-
logger.debug("Ignoring kwarg %s", k)
267-
del filtered_kw[k]
268-
269-
if 'outputs' in task['kwargs']:
270-
outputs = task['kwargs']['outputs']
271-
del filtered_kw['outputs']
272-
t.append(id_for_memo(outputs, output_ref=True))
273-
274-
t.extend(map(id_for_memo, (filtered_kw, task['func'], task['args'])))
275-
276-
x = b''.join(t)
277-
return hashlib.md5(x).hexdigest()
278-
279280
def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
280281
"""Create a hash of the task and its inputs and check the lookup table for this hash.
281282
@@ -297,7 +298,7 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
297298
logger.debug("Task {} will not be memoized".format(task_id))
298299
return None
299300

300-
hashsum = self.make_hash(task)
301+
hashsum = make_hash(task)
301302
logger.debug("Task {} has memoization hash {}".format(task_id, hashsum))
302303
result = None
303304
if hashsum in self.memo_lookup_table:

0 commit comments

Comments
 (0)