@@ -118,6 +118,42 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
118118 return pickle .dumps (["types.FunctionType" , f .__name__ , f .__module__ ])
119119
120120
121+ def make_hash (task : TaskRecord ) -> str :
122+ """Create a hash of the task inputs.
123+
124+ Args:
125+ - task (dict) : Task dictionary from dfk.tasks
126+
127+ Returns:
128+ - hash (str) : A unique hash string
129+ """
130+
131+ t : List [bytes ] = []
132+
133+ # if kwargs contains an outputs parameter, that parameter is removed
134+ # and normalised differently - with output_ref set to True.
135+ # kwargs listed in ignore_for_cache will also be removed
136+
137+ filtered_kw = task ['kwargs' ].copy ()
138+
139+ ignore_list = task ['ignore_for_cache' ]
140+
141+ logger .debug ("Ignoring these kwargs for checkpointing: %s" , ignore_list )
142+ for k in ignore_list :
143+ logger .debug ("Ignoring kwarg %s" , k )
144+ del filtered_kw [k ]
145+
146+ if 'outputs' in task ['kwargs' ]:
147+ outputs = task ['kwargs' ]['outputs' ]
148+ del filtered_kw ['outputs' ]
149+ t .append (id_for_memo (outputs , output_ref = True ))
150+
151+ t .extend (map (id_for_memo , (filtered_kw , task ['func' ], task ['args' ])))
152+
153+ x = b'' .join (t )
154+ return hashlib .md5 (x ).hexdigest ()
155+
156+
121157class Memoizer :
122158 def update_memo_exception (self , task : TaskRecord , e : BaseException ) -> None :
123159 raise NotImplementedError
@@ -236,41 +272,6 @@ def close(self) -> None:
236272 logger .info ("Stopping checkpoint timer" )
237273 self ._checkpoint_timer .close ()
238274
239- def make_hash (self , task : TaskRecord ) -> str :
240- """Create a hash of the task inputs.
241-
242- Args:
243- - task (dict) : Task dictionary from dfk.tasks
244-
245- Returns:
246- - hash (str) : A unique hash string
247- """
248-
249- t : List [bytes ] = []
250-
251- # if kwargs contains an outputs parameter, that parameter is removed
252- # and normalised differently - with output_ref set to True.
253- # kwargs listed in ignore_for_cache will also be removed
254-
255- filtered_kw = task ['kwargs' ].copy ()
256-
257- ignore_list = task ['ignore_for_cache' ]
258-
259- logger .debug ("Ignoring these kwargs for checkpointing: %s" , ignore_list )
260- for k in ignore_list :
261- logger .debug ("Ignoring kwarg %s" , k )
262- del filtered_kw [k ]
263-
264- if 'outputs' in task ['kwargs' ]:
265- outputs = task ['kwargs' ]['outputs' ]
266- del filtered_kw ['outputs' ]
267- t .append (id_for_memo (outputs , output_ref = True ))
268-
269- t .extend (map (id_for_memo , (filtered_kw , task ['func' ], task ['args' ])))
270-
271- x = b'' .join (t )
272- return hashlib .md5 (x ).hexdigest ()
273-
274275 def check_memo (self , task : TaskRecord ) -> Optional [Future [Any ]]:
275276 """Create a hash of the task and its inputs and check the lookup table for this hash.
276277
@@ -292,7 +293,7 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
292293 logger .debug ("Task {} will not be memoized" .format (task_id ))
293294 return None
294295
295- hashsum = self . make_hash (task )
296+ hashsum = make_hash (task )
296297 logger .debug ("Task {} has memoization hash {}" .format (task_id , hashsum ))
297298 result = None
298299 if hashsum in self .memo_lookup_table :
0 commit comments