@@ -600,20 +600,24 @@ def purge_jobs(self):
600600 """
601601 invalid_removed = 0
602602
603- invalid_success = len (self .jobs & "status = 'success'" ) - len (self .target )
604- if invalid_success > 0 :
605- for key , job_key in zip (* (self .jobs & "status = 'success'" ).fetch ("KEY" , "key" )):
606- if not (self .target & job_key ):
607- (self .jobs & key ).delete ()
608- invalid_removed += 1
609-
610- keys2do = self ._jobs_to_do ({}).fetch ("KEY" )
611- invalid_incomplete = len (self .jobs & "status != 'success'" ) - len (keys2do )
612- if invalid_incomplete > 0 :
613- for key , job_key in zip (* (self .jobs & "status != 'success'" ).fetch ("KEY" , "key" )):
614- if job_key not in keys2do :
615- (self .jobs & key ).delete ()
616- invalid_removed += 1
603+ success_query = self .jobs & {"table_name" : self .target .table_name } & "status = 'success'"
604+ if success_query :
605+ invalid_success = len (success_query ) - len (self .target )
606+ if invalid_success > 0 :
607+ for key , job_key in zip (* success_query .fetch ("KEY" , "key" )):
608+ if not (self .target & job_key ):
609+ (self .jobs & key ).delete ()
610+ invalid_removed += 1
611+
612+ incomplete_query = self .jobs & {"table_name" : self .target .table_name } & "status != 'success'"
613+ if incomplete_query :
614+ keys2do = self ._jobs_to_do ({}).fetch ("KEY" )
615+ invalid_incomplete = len (incomplete_query ) - len (keys2do )
616+ if invalid_incomplete > 0 :
617+ for key , job_key in zip (* incomplete_query .fetch ("KEY" , "key" )):
618+ if job_key not in keys2do :
619+ (self .jobs & key ).delete ()
620+ invalid_removed += 1
617621
618622 logger .info (
619623 f"{ invalid_removed } invalid jobs removed for `{ to_camel_case (self .target .table_name )} `"
0 commit comments