33import itertools
44import json
55import logging
6+ import os
67import sys
78import warnings
89from abc import ABC , abstractmethod
@@ -64,13 +65,9 @@ def __init__(
6465 skip_upload : bool = False ,
6566 skip_validation : bool = False ,
6667 ):
67- # set up logger
6868 self .logger = logging .getLogger (self .name )
6969
70- # set this to avoid confusion in destructor if called during validation
71- self ._save_workdir = True
72-
73- # validate input payload...or not
70+ # validate input payload... or not
7471 if not skip_validation :
7572 if not self .validate (payload ):
7673 raise FailedValidation ()
@@ -90,12 +87,6 @@ def __init__(
9087 # if a workdir was specified we don't want to rm by default
9188 self ._save_workdir = save_workdir if save_workdir is not None else True
9289
93- def __del__ (self ) -> None :
94- # remove work directory if not running locally
95- if not self ._save_workdir :
96- self .logger .debug ("Removing work directory %s" , self ._workdir )
97- rmtree (self ._workdir )
98-
9990 @property
10091 def process_definition (self ) -> Dict [str , Any ]:
10192 process = self ._payload .get ("process" , {})
@@ -198,6 +189,21 @@ def add_software_version_to_item(cls, item: Dict[str, Any]) -> Dict[str, Any]:
198189 item ["properties" ]["processing:software" ] = {cls .name : cls .version }
199190 return item
200191
192+ def cleanup_workdir (self ) -> None :
193+ """Remove work directory if configured not to save it"""
194+ try :
195+ if (
196+ not self ._save_workdir
197+ and self ._workdir
198+ and os .path .exists (self ._workdir )
199+ ):
200+ self .logger .debug ("Removing work directory %s" , self ._workdir )
201+ rmtree (self ._workdir )
202+ except Exception as e :
203+ self .logger .warning (
204+ "Failed removing work directory %s: %s" , self ._workdir , e
205+ )
206+
201207 def assign_collections (self ) -> None :
202208 """Assigns new collection names based on"""
203209 for i , (coll , expr ) in itertools .product (
@@ -305,24 +311,27 @@ def post_process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
305311
306312 @classmethod
307313 def handler (cls , payload : Dict [str , Any ], ** kwargs : Any ) -> Dict [str , Any ]:
308- if "href" in payload or "url" in payload :
309- # read input
310- with fsspec .open (payload .get ("href" , payload .get ("url" ))) as f :
311- payload = json .loads (f .read ())
312-
313- task = cls (payload , ** kwargs )
314314 try :
315- items = list ()
316- for item in task .process (** task .parameters ):
317- items .append (task .post_process_item (item ))
318-
319- task ._payload ["features" ] = items
320- task .assign_collections ()
321-
322- return task ._payload
323- except Exception as err :
324- task .logger .error (err , exc_info = True )
325- raise err
315+ if "href" in payload or "url" in payload :
316+ # read input
317+ with fsspec .open (payload .get ("href" , payload .get ("url" ))) as f :
318+ payload = json .loads (f .read ())
319+
320+ task = cls (payload , ** kwargs )
321+ try :
322+ items = list ()
323+ for item in task .process (** task .parameters ):
324+ items .append (task .post_process_item (item ))
325+
326+ task ._payload ["features" ] = items
327+ task .assign_collections ()
328+
329+ return task ._payload
330+ except Exception as err :
331+ task .logger .error (err , exc_info = True )
332+ raise err
333+ finally :
334+ task .cleanup_workdir ()
326335
327336 @classmethod
328337 def parse_args (cls , args : List [str ]) -> Dict [str , Any ]:
0 commit comments