|
| 1 | +import logging |
| 2 | + |
| 3 | +from shared.api_archive.archive import ArchiveService |
| 4 | +from shared.celery_config import flare_cleanup_task_name |
| 5 | +from shared.django_apps.core.models import Pull, PullStates |
| 6 | + |
| 7 | +from app import celery_app |
| 8 | +from tasks.crontasks import CodecovCronTask |
| 9 | + |
| 10 | +log = logging.getLogger(__name__) |
| 11 | + |
| 12 | + |
| 13 | +class FlareCleanupTask(CodecovCronTask, name=flare_cleanup_task_name): |
| 14 | + """ |
| 15 | + Flare is a field on a Pull object. |
| 16 | + Flare is used to draw static graphs (see GraphHandler view in api) and can be large. |
| 17 | + The majority of flare graphs are used in pr comments, so we keep the (maybe large) flare "available" |
| 18 | + in either the db or Archive storage while the pull is OPEN. |
| 19 | + If the pull is not OPEN, we dump the flare to save space. |
| 20 | + If we need to generate a flare graph for a non-OPEN pull, we build_report_from_commit |
| 21 | + and generate fresh flare from that report (see GraphHandler view in api). |
| 22 | + """ |
| 23 | + |
| 24 | + @classmethod |
| 25 | + def get_min_seconds_interval_between_executions(cls): |
| 26 | + return 72000 # 20h |
| 27 | + |
| 28 | + def run_cron_task(self, db_session, batch_size=1000, limit=10000, *args, **kwargs): |
| 29 | + # for any Pull that is not OPEN, clear the flare field(s), targeting older data |
| 30 | + non_open_pulls = Pull.objects.exclude(state=PullStates.OPEN.value).order_by( |
| 31 | + "updatestamp" |
| 32 | + ) |
| 33 | + |
| 34 | + log.info("Starting FlareCleanupTask") |
| 35 | + |
| 36 | + # clear in db |
| 37 | + non_open_pulls_with_flare_in_db = non_open_pulls.filter( |
| 38 | + _flare__isnull=False |
| 39 | + ).exclude(_flare={}) |
| 40 | + |
| 41 | + # Process in batches |
| 42 | + total_updated = 0 |
| 43 | + start = 0 |
| 44 | + while start < limit: |
| 45 | + stop = start + batch_size if start + batch_size < limit else limit |
| 46 | + batch = non_open_pulls_with_flare_in_db.values_list("id", flat=True)[ |
| 47 | + start:stop |
| 48 | + ] |
| 49 | + if not batch: |
| 50 | + break |
| 51 | + n_updated = non_open_pulls_with_flare_in_db.filter(id__in=batch).update( |
| 52 | + _flare=None |
| 53 | + ) |
| 54 | + total_updated += n_updated |
| 55 | + start = stop |
| 56 | + |
| 57 | + log.info(f"FlareCleanupTask cleared {total_updated} database flares") |
| 58 | + |
| 59 | + # clear in Archive |
| 60 | + non_open_pulls_with_flare_in_archive = non_open_pulls.filter( |
| 61 | + _flare_storage_path__isnull=False |
| 62 | + ) |
| 63 | + |
| 64 | + # Process archive deletions in batches |
| 65 | + total_updated = 0 |
| 66 | + start = 0 |
| 67 | + while start < limit: |
| 68 | + stop = start + batch_size if start + batch_size < limit else limit |
| 69 | + batch = non_open_pulls_with_flare_in_archive.values_list("id", flat=True)[ |
| 70 | + start:stop |
| 71 | + ] |
| 72 | + if not batch: |
| 73 | + break |
| 74 | + flare_paths_from_batch = Pull.objects.filter(id__in=batch).values_list( |
| 75 | + "_flare_storage_path", flat=True |
| 76 | + ) |
| 77 | + try: |
| 78 | + archive_service = ArchiveService(repository=None) |
| 79 | + archive_service.delete_files(flare_paths_from_batch) |
| 80 | + except Exception as e: |
| 81 | + # if something fails with deleting from archive, leave the _flare_storage_path on the pull object. |
| 82 | + # only delete _flare_storage_path if the deletion from archive was successful. |
| 83 | + log.error(f"FlareCleanupTask failed to delete archive files: {e}") |
| 84 | + continue |
| 85 | + |
| 86 | + # Update the _flare_storage_path field for successfully processed pulls |
| 87 | + n_updated = Pull.objects.filter(id__in=batch).update( |
| 88 | + _flare_storage_path=None |
| 89 | + ) |
| 90 | + total_updated += n_updated |
| 91 | + start = stop |
| 92 | + |
| 93 | + log.info(f"FlareCleanupTask cleared {total_updated} Archive flares") |
| 94 | + |
| 95 | + def manual_run(self, db_session=None, limit=1000, *args, **kwargs): |
| 96 | + self.run_cron_task(db_session, limit=limit, *args, **kwargs) |
| 97 | + |
| 98 | + |
| 99 | +RegisteredFlareCleanupTask = celery_app.register_task(FlareCleanupTask()) |
| 100 | +flare_cleanup_task = celery_app.tasks[RegisteredFlareCleanupTask.name] |
0 commit comments