@@ -25,38 +25,73 @@ class FlareCleanupTask(CodecovCronTask, name=flare_cleanup_task_name):
2525 def get_min_seconds_interval_between_executions (cls ):
2626 return 72000 # 20h
2727
28- def run_cron_task (self , db_session , * args , ** kwargs ):
29- # for any Pull that is not OPEN, clear the flare field(s)
30- non_open_pulls = Pull .objects .exclude (state = PullStates .OPEN .value )
28+ def run_cron_task (self , db_session , batch_size = 1000 , limit = 10000 , * args , ** kwargs ):
29+ # for any Pull that is not OPEN, clear the flare field(s), targeting older data
30+ non_open_pulls = Pull .objects .exclude (state = PullStates .OPEN .value ).order_by (
31+ "updatestamp"
32+ )
3133
3234 log .info ("Starting FlareCleanupTask" )
3335
3436 # clear in db
3537 non_open_pulls_with_flare_in_db = non_open_pulls .filter (
3638 _flare__isnull = False
3739 ).exclude (_flare = {})
38- # single query, objs are not loaded into memory, does not call .save(), does not refresh updatestamp
39- n_updated = non_open_pulls_with_flare_in_db .update (_flare = None )
40- log .info (f"FlareCleanupTask cleared { n_updated } _flares" )
40+
41+ # Process in batches using an offset
42+ total_updated = 0
43+ offset = 0
44+ while offset < limit :
45+ batch = non_open_pulls_with_flare_in_db .values_list ("id" , flat = True )[
46+ offset : offset + batch_size
47+ ]
48+ if not batch :
49+ break
50+ n_updated = non_open_pulls_with_flare_in_db .filter (id__in = batch ).update (
51+ _flare = None
52+ )
53+ total_updated += n_updated
54+ offset += batch_size
55+
56+ log .info (f"FlareCleanupTask cleared { total_updated } database flares" )
4157
4258 # clear in Archive
4359 non_open_pulls_with_flare_in_archive = non_open_pulls .filter (
4460 _flare_storage_path__isnull = False
45- ).select_related ("repository" )
46- log .info (
47- f"FlareCleanupTask will clear { non_open_pulls_with_flare_in_archive .count ()} Archive flares"
48- )
49- # single query, loads all pulls and repos in qset into memory, deletes file in Archive 1 by 1
50- for pull in non_open_pulls_with_flare_in_archive :
51- archive_service = ArchiveService (repository = pull .repository )
52- archive_service .delete_file (pull ._flare_storage_path )
53-
54- # single query, objs are not loaded into memory, does not call .save(), does not refresh updatestamp
55- n_updated = non_open_pulls_with_flare_in_archive .update (
56- _flare_storage_path = None
5761 )
5862
59- log .info (f"FlareCleanupTask cleared { n_updated } Archive flares" )
63+ # Process archive deletions in batches using an offset
64+ total_updated = 0
65+ offset = 0
66+ while offset < limit :
67+ batch = non_open_pulls_with_flare_in_archive .values_list ("id" , flat = True )[
68+ offset : offset + batch_size
69+ ]
70+ if not batch :
71+ break
72+ flare_paths_from_batch = Pull .objects .filter (id__in = batch ).values_list (
73+ "_flare_storage_path" , flat = True
74+ )
75+ try :
76+ archive_service = ArchiveService ()
77+ archive_service .delete_files (flare_paths_from_batch )
78+ except Exception as e :
79+ # if something fails with deleting from archive, leave the _flare_storage_path on the pull object.
80+ # only delete _flare_storage_path if the deletion from archive was successful.
81+ log .error (f"FlareCleanupTask failed to delete archive files: { e } " )
82+ continue
83+
84+ # Update the _flare_storage_path field for successfully processed pulls
85+ n_updated = Pull .objects .filter (id__in = batch ).update (
86+ _flare_storage_path = None
87+ )
88+ total_updated += n_updated
89+ offset += batch_size
90+
91+ log .info (f"FlareCleanupTask cleared { total_updated } Archive flares" )
92+
93+ def manual_run (self , db_session = None , limit = 1000 , * args , ** kwargs ):
94+ self .run_cron_task (db_session , limit = limit , * args , ** kwargs )
6095
6196
6297RegisteredFlareCleanupTask = celery_app .register_task (FlareCleanupTask ())
0 commit comments