@@ -118,6 +118,42 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
118118 return pickle .dumps (["types.FunctionType" , f .__name__ , f .__module__ ])
119119
120120
121+ def make_hash (task : TaskRecord ) -> str :
122+ """Create a hash of the task inputs.
123+
124+ Args:
125+ - task (dict) : Task dictionary from dfk.tasks
126+
127+ Returns:
128+ - hash (str) : A unique hash string
129+ """
130+
131+ t : List [bytes ] = []
132+
133+ # if kwargs contains an outputs parameter, that parameter is removed
134+ # and normalised differently - with output_ref set to True.
135+ # kwargs listed in ignore_for_cache will also be removed
136+
137+ filtered_kw = task ['kwargs' ].copy ()
138+
139+ ignore_list = task ['ignore_for_cache' ]
140+
141+ logger .debug ("Ignoring these kwargs for checkpointing: %s" , ignore_list )
142+ for k in ignore_list :
143+ logger .debug ("Ignoring kwarg %s" , k )
144+ del filtered_kw [k ]
145+
146+ if 'outputs' in task ['kwargs' ]:
147+ outputs = task ['kwargs' ]['outputs' ]
148+ del filtered_kw ['outputs' ]
149+ t .append (id_for_memo (outputs , output_ref = True ))
150+
151+ t .extend (map (id_for_memo , (filtered_kw , task ['func' ], task ['args' ])))
152+
153+ x = b'' .join (t )
154+ return hashlib .md5 (x ).hexdigest ()
155+
156+
121157class Memoizer :
122158 def update_memo_exception (self , task : TaskRecord , e : BaseException ) -> None :
123159 raise NotImplementedError
@@ -241,41 +277,6 @@ def close(self) -> None:
241277 logger .info ("Stopping checkpoint timer" )
242278 self ._checkpoint_timer .close ()
243279
244- def make_hash (self , task : TaskRecord ) -> str :
245- """Create a hash of the task inputs.
246-
247- Args:
248- - task (dict) : Task dictionary from dfk.tasks
249-
250- Returns:
251- - hash (str) : A unique hash string
252- """
253-
254- t : List [bytes ] = []
255-
256- # if kwargs contains an outputs parameter, that parameter is removed
257- # and normalised differently - with output_ref set to True.
258- # kwargs listed in ignore_for_cache will also be removed
259-
260- filtered_kw = task ['kwargs' ].copy ()
261-
262- ignore_list = task ['ignore_for_cache' ]
263-
264- logger .debug ("Ignoring these kwargs for checkpointing: %s" , ignore_list )
265- for k in ignore_list :
266- logger .debug ("Ignoring kwarg %s" , k )
267- del filtered_kw [k ]
268-
269- if 'outputs' in task ['kwargs' ]:
270- outputs = task ['kwargs' ]['outputs' ]
271- del filtered_kw ['outputs' ]
272- t .append (id_for_memo (outputs , output_ref = True ))
273-
274- t .extend (map (id_for_memo , (filtered_kw , task ['func' ], task ['args' ])))
275-
276- x = b'' .join (t )
277- return hashlib .md5 (x ).hexdigest ()
278-
279280 def check_memo (self , task : TaskRecord ) -> Optional [Future [Any ]]:
280281 """Create a hash of the task and its inputs and check the lookup table for this hash.
281282
@@ -297,7 +298,7 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
297298 logger .debug ("Task {} will not be memoized" .format (task_id ))
298299 return None
299300
300- hashsum = self . make_hash (task )
301+ hashsum = make_hash (task )
301302 logger .debug ("Task {} has memoization hash {}" .format (task_id , hashsum ))
302303 result = None
303304 if hashsum in self .memo_lookup_table :
0 commit comments