@@ -121,6 +121,42 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
121121 return pickle .dumps (["types.FunctionType" , f .__name__ , f .__module__ ])
122122
123123
124+ def make_hash (task : TaskRecord ) -> str :
125+ """Create a hash of the task inputs.
126+
127+ Args:
128+ - task (dict) : Task dictionary from dfk.tasks
129+
130+ Returns:
131+ - hash (str) : A unique hash string
132+ """
133+
134+ t : List [bytes ] = []
135+
136+ # if kwargs contains an outputs parameter, that parameter is removed
137+ # and normalised differently - with output_ref set to True.
138+ # kwargs listed in ignore_for_cache will also be removed
139+
140+ filtered_kw = task ['kwargs' ].copy ()
141+
142+ ignore_list = task ['ignore_for_cache' ]
143+
144+ logger .debug ("Ignoring these kwargs for checkpointing: %s" , ignore_list )
145+ for k in ignore_list :
146+ logger .debug ("Ignoring kwarg %s" , k )
147+ del filtered_kw [k ]
148+
149+ if 'outputs' in task ['kwargs' ]:
150+ outputs = task ['kwargs' ]['outputs' ]
151+ del filtered_kw ['outputs' ]
152+ t .append (id_for_memo (outputs , output_ref = True ))
153+
154+ t .extend (map (id_for_memo , (filtered_kw , task ['func' ], task ['args' ])))
155+
156+ x = b'' .join (t )
157+ return hashlib .md5 (x ).hexdigest ()
158+
159+
124160class Memoizer :
125161 def start (self , * , dfk : DataFlowKernel , memoize : bool = True , checkpoint_files : Sequence [str ], run_dir : str ) -> None :
126162 raise NotImplementedError
@@ -203,41 +239,6 @@ def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files:
203239 logger .info ("App caching disabled for all apps" )
204240 self .memo_lookup_table = {}
205241
206- def make_hash (self , task : TaskRecord ) -> str :
207- """Create a hash of the task inputs.
208-
209- Args:
210- - task (dict) : Task dictionary from dfk.tasks
211-
212- Returns:
213- - hash (str) : A unique hash string
214- """
215-
216- t : List [bytes ] = []
217-
218- # if kwargs contains an outputs parameter, that parameter is removed
219- # and normalised differently - with output_ref set to True.
220- # kwargs listed in ignore_for_cache will also be removed
221-
222- filtered_kw = task ['kwargs' ].copy ()
223-
224- ignore_list = task ['ignore_for_cache' ]
225-
226- logger .debug ("Ignoring these kwargs for checkpointing: %s" , ignore_list )
227- for k in ignore_list :
228- logger .debug ("Ignoring kwarg %s" , k )
229- del filtered_kw [k ]
230-
231- if 'outputs' in task ['kwargs' ]:
232- outputs = task ['kwargs' ]['outputs' ]
233- del filtered_kw ['outputs' ]
234- t .append (id_for_memo (outputs , output_ref = True ))
235-
236- t .extend (map (id_for_memo , (filtered_kw , task ['func' ], task ['args' ])))
237-
238- x = b'' .join (t )
239- return hashlib .md5 (x ).hexdigest ()
240-
241242 def check_memo (self , task : TaskRecord ) -> Optional [Future [Any ]]:
242243 """Create a hash of the task and its inputs and check the lookup table for this hash.
243244
@@ -259,7 +260,7 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
259260 logger .debug ("Task {} will not be memoized" .format (task_id ))
260261 return None
261262
262- hashsum = self . make_hash (task )
263+ hashsum = make_hash (task )
263264 logger .debug ("Task {} has memoization hash {}" .format (task_id , hashsum ))
264265 result = None
265266 if hashsum in self .memo_lookup_table :
0 commit comments