Skip to content

Commit 37fefac

Browse files
committed
make hash does not need to be part of basic memoizer
and is more reusable when it isn't this isn't the only way to make a hash though. and hashing isn't the only way to compare checkpoint entries for equality.
1 parent 3e323e6 commit 37fefac

File tree

1 file changed

+37
-36
lines changed

1 file changed

+37
-36
lines changed

parsl/dataflow/memoization.py

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,42 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
118118
return pickle.dumps(["types.FunctionType", f.__name__, f.__module__])
119119

120120

121+
def make_hash(task: TaskRecord) -> str:
122+
"""Create a hash of the task inputs.
123+
124+
Args:
125+
- task (dict) : Task dictionary from dfk.tasks
126+
127+
Returns:
128+
- hash (str) : A unique hash string
129+
"""
130+
131+
t: List[bytes] = []
132+
133+
# if kwargs contains an outputs parameter, that parameter is removed
134+
# and normalised differently - with output_ref set to True.
135+
# kwargs listed in ignore_for_cache will also be removed
136+
137+
filtered_kw = task['kwargs'].copy()
138+
139+
ignore_list = task['ignore_for_cache']
140+
141+
logger.debug("Ignoring these kwargs for checkpointing: %s", ignore_list)
142+
for k in ignore_list:
143+
logger.debug("Ignoring kwarg %s", k)
144+
del filtered_kw[k]
145+
146+
if 'outputs' in task['kwargs']:
147+
outputs = task['kwargs']['outputs']
148+
del filtered_kw['outputs']
149+
t.append(id_for_memo(outputs, output_ref=True))
150+
151+
t.extend(map(id_for_memo, (filtered_kw, task['func'], task['args'])))
152+
153+
x = b''.join(t)
154+
return hashlib.md5(x).hexdigest()
155+
156+
121157
class Memoizer:
122158
def update_memo_exception(self, task: TaskRecord, e: BaseException) -> None:
123159
raise NotImplementedError
@@ -236,41 +272,6 @@ def close(self) -> None:
236272
logger.info("Stopping checkpoint timer")
237273
self._checkpoint_timer.close()
238274

239-
def make_hash(self, task: TaskRecord) -> str:
240-
"""Create a hash of the task inputs.
241-
242-
Args:
243-
- task (dict) : Task dictionary from dfk.tasks
244-
245-
Returns:
246-
- hash (str) : A unique hash string
247-
"""
248-
249-
t: List[bytes] = []
250-
251-
# if kwargs contains an outputs parameter, that parameter is removed
252-
# and normalised differently - with output_ref set to True.
253-
# kwargs listed in ignore_for_cache will also be removed
254-
255-
filtered_kw = task['kwargs'].copy()
256-
257-
ignore_list = task['ignore_for_cache']
258-
259-
logger.debug("Ignoring these kwargs for checkpointing: %s", ignore_list)
260-
for k in ignore_list:
261-
logger.debug("Ignoring kwarg %s", k)
262-
del filtered_kw[k]
263-
264-
if 'outputs' in task['kwargs']:
265-
outputs = task['kwargs']['outputs']
266-
del filtered_kw['outputs']
267-
t.append(id_for_memo(outputs, output_ref=True))
268-
269-
t.extend(map(id_for_memo, (filtered_kw, task['func'], task['args'])))
270-
271-
x = b''.join(t)
272-
return hashlib.md5(x).hexdigest()
273-
274275
def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
275276
"""Create a hash of the task and its inputs and check the lookup table for this hash.
276277
@@ -292,7 +293,7 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
292293
logger.debug("Task {} will not be memoized".format(task_id))
293294
return None
294295

295-
hashsum = self.make_hash(task)
296+
hashsum = make_hash(task)
296297
logger.debug("Task {} has memoization hash {}".format(task_id, hashsum))
297298
result = None
298299
if hashsum in self.memo_lookup_table:

0 commit comments

Comments
 (0)