diff --git a/celery_config.py b/celery_config.py index b39c03075..c3d547a27 100644 --- a/celery_config.py +++ b/celery_config.py @@ -10,6 +10,7 @@ from shared.celery_config import ( BaseCeleryConfig, brolly_stats_rollup_task_name, + # flare_cleanup_task_name, gh_app_webhook_check_task_name, health_check_task_name, profiling_finding_task_name, @@ -88,12 +89,18 @@ def _beat_schedule(): }, "trial_expiration_cron": { "task": trial_expiration_cron_task_name, - # 4 UTC is 12am EDT - "schedule": crontab(minute="0", hour="4"), + "schedule": crontab(minute="0", hour="4"), # 4 UTC is 12am EDT "kwargs": { "cron_task_generation_time_iso": BeatLazyFunc(get_utc_now_as_iso_format) }, }, + # "flare_cleanup": { + # "task": flare_cleanup_task_name, + # "schedule": crontab(minute="0", hour="5"), # every day, 5am UTC (10pm PDT) + # "kwargs": { + # "cron_task_generation_time_iso": BeatLazyFunc(get_utc_now_as_iso_format) + # }, + # }, } if get_config("setup", "find_uncollected_profilings", "enabled", default=True): diff --git a/conftest.py b/conftest.py index 81dee94bf..16153bd38 100644 --- a/conftest.py +++ b/conftest.py @@ -276,6 +276,18 @@ def mock_storage(mocker): return storage_server +@pytest.fixture +def mock_archive_storage(mocker): + m = mocker.patch("shared.api_archive.archive.StorageService") + use_archive = mocker.patch( + "shared.django_apps.core.models.should_write_data_to_storage_config_check" + ) + use_archive.return_value = True + storage_server = MemoryStorageService({}) + m.return_value = storage_server + return storage_server + + @pytest.fixture def mock_smtp(mocker): m = mocker.patch("services.smtp.SMTPService") diff --git a/database/models/core.py b/database/models/core.py index ba410d68b..5e2e85e59 100644 --- a/database/models/core.py +++ b/database/models/core.py @@ -380,7 +380,7 @@ def get_repository(self): def get_commitid(self): return self.commitid - def should_write_to_storage(self) -> bool: + def should_write_to_storage(self: object) -> bool: if self.repository is None or self.repository.owner is None: return False is_codecov_repo = self.repository.owner.username == "codecov" @@ -447,7 +447,6 @@ class Pull(CodecovBaseModel): commentid = Column(types.Text) bundle_analysis_commentid = Column(types.Text) diff = Column(postgresql.JSON) - flare = Column(postgresql.JSON) author_id = Column("author", types.Integer, ForeignKey("owners.ownerid")) behind_by = Column(types.Integer) behind_by_commit = Column(types.Text) @@ -457,6 +456,22 @@ class Pull(CodecovBaseModel): Repository, backref=backref("pulls", cascade="delete", lazy="dynamic") ) + def should_write_to_storage(self: object) -> bool: + if self.repository is None or self.repository.owner is None: + return False + is_codecov_repo = self.repository.owner.username == "codecov" + return should_write_data_to_storage_config_check( + master_switch_key="pull_flare", + is_codecov_repo=is_codecov_repo, + repoid=self.repository.repoid, + ) + + _flare = Column("flare", postgresql.JSON) + _flare_storage_path = Column("flare_storage_path", types.Text, nullable=True) + flare = ArchiveField( + should_write_to_storage_fn=should_write_to_storage, default_value_class=dict + ) + __table_args__ = (Index("pulls_repoid_pullid", "repoid", "pullid", unique=True),) def __repr__(self): @@ -503,16 +518,6 @@ def external_id(self): def id(self): return self.id_ - def should_write_to_storage(self) -> bool: - if self.repository is None or self.repository.owner is None: - return False - is_codecov_repo = self.repository.owner.username == "codecov" - return should_write_data_to_storage_config_check( - master_switch_key="pull_flare", - is_codecov_repo=is_codecov_repo, - repoid=self.repository.repoid, - ) - @cached_property def is_first_coverage_pull(self): """ @@ -536,12 +541,6 @@ def is_first_coverage_pull(self): return first_pull_with_coverage.id_ == self.id_ return True - _flare = Column("flare", postgresql.JSON) - _flare_storage_path = Column("flare_storage_path", types.Text, nullable=True) - flare = ArchiveField( - should_write_to_storage_fn=should_write_to_storage, default_value_class=dict - ) - class CommitNotification(CodecovBaseModel): __tablename__ = "commit_notifications" diff --git a/requirements.in b/requirements.in index 742663e58..94c83567b 100644 --- a/requirements.in +++ b/requirements.in @@ -1,5 +1,5 @@ https://github.com/codecov/test-results-parser/archive/c840502d1b4dd7d05b2efc2c1328affaf2acd27c.tar.gz#egg=test-results-parser -https://github.com/codecov/shared/archive/2674ae99811767e63151590906691aed4c5ce1f9.tar.gz#egg=shared +https://github.com/codecov/shared/archive/efe48352e172f658c21465371453dcefc98f6793.tar.gz#egg=shared https://github.com/codecov/timestring/archive/d37ceacc5954dff3b5bd2f887936a98a668dda42.tar.gz#egg=timestring asgiref>=3.7.2 analytics-python==1.3.0b1 diff --git a/requirements.txt b/requirements.txt index d45fb95b7..6f3ce1f06 100644 --- a/requirements.txt +++ b/requirements.txt @@ -336,7 +336,7 @@ sentry-sdk==2.13.0 # shared setuptools==75.6.0 # via nodeenv -shared @ https://github.com/codecov/shared/archive/2674ae99811767e63151590906691aed4c5ce1f9.tar.gz#egg=shared +shared @ https://github.com/codecov/shared/archive/efe48352e172f658c21465371453dcefc98f6793.tar.gz#egg=shared # via -r requirements.in six==1.16.0 # via diff --git a/tasks/flare_cleanup.py b/tasks/flare_cleanup.py new file mode 100644 index 000000000..207731d8c --- /dev/null +++ b/tasks/flare_cleanup.py @@ -0,0 +1,100 @@ +import logging + +from shared.api_archive.archive import ArchiveService +from shared.celery_config import flare_cleanup_task_name +from shared.django_apps.core.models import Pull, PullStates + +from app import celery_app +from tasks.crontasks import CodecovCronTask + +log = logging.getLogger(__name__) + + +class FlareCleanupTask(CodecovCronTask, name=flare_cleanup_task_name): + """ + Flare is a field on a Pull object. + Flare is used to draw static graphs (see GraphHandler view in api) and can be large. + The majority of flare graphs are used in pr comments, so we keep the (maybe large) flare "available" + in either the db or Archive storage while the pull is OPEN. + If the pull is not OPEN, we dump the flare to save space. + If we need to generate a flare graph for a non-OPEN pull, we build_report_from_commit + and generate fresh flare from that report (see GraphHandler view in api). + """ + + @classmethod + def get_min_seconds_interval_between_executions(cls): + return 72000 # 20h + + def run_cron_task(self, db_session, batch_size=1000, limit=10000, *args, **kwargs): + # for any Pull that is not OPEN, clear the flare field(s), targeting older data + non_open_pulls = Pull.objects.exclude(state=PullStates.OPEN.value).order_by( + "updatestamp" + ) + + log.info("Starting FlareCleanupTask") + + # clear in db + non_open_pulls_with_flare_in_db = non_open_pulls.filter( + _flare__isnull=False + ).exclude(_flare={}) + + # Process in batches + total_updated = 0 + start = 0 + while start < limit: + stop = start + batch_size if start + batch_size < limit else limit + batch = non_open_pulls_with_flare_in_db.values_list("id", flat=True)[ + start:stop + ] + if not batch: + break + n_updated = non_open_pulls_with_flare_in_db.filter(id__in=batch).update( + _flare=None + ) + total_updated += n_updated + start = stop + + log.info(f"FlareCleanupTask cleared {total_updated} database flares") + + # clear in Archive + non_open_pulls_with_flare_in_archive = non_open_pulls.filter( + _flare_storage_path__isnull=False + ) + + # Process archive deletions in batches + total_updated = 0 + start = 0 + while start < limit: + stop = start + batch_size if start + batch_size < limit else limit + batch = non_open_pulls_with_flare_in_archive.values_list("id", flat=True)[ + start:stop + ] + if not batch: + break + flare_paths_from_batch = Pull.objects.filter(id__in=batch).values_list( + "_flare_storage_path", flat=True + ) + try: + archive_service = ArchiveService(repository=None) + archive_service.delete_files(flare_paths_from_batch) + except Exception as e: + # if something fails with deleting from archive, leave the _flare_storage_path on the pull object. + # only delete _flare_storage_path if the deletion from archive was successful. + log.error(f"FlareCleanupTask failed to delete archive files: {e}") + continue + + # Update the _flare_storage_path field for successfully processed pulls + n_updated = Pull.objects.filter(id__in=batch).update( + _flare_storage_path=None + ) + total_updated += n_updated + start = stop + + log.info(f"FlareCleanupTask cleared {total_updated} Archive flares") + + def manual_run(self, db_session=None, limit=1000, *args, **kwargs): + self.run_cron_task(db_session, limit=limit, *args, **kwargs) + + +RegisteredFlareCleanupTask = celery_app.register_task(FlareCleanupTask()) +flare_cleanup_task = celery_app.tasks[RegisteredFlareCleanupTask.name] diff --git a/tasks/tests/unit/test_flare_cleanup.py b/tasks/tests/unit/test_flare_cleanup.py new file mode 100644 index 000000000..06d3968ab --- /dev/null +++ b/tasks/tests/unit/test_flare_cleanup.py @@ -0,0 +1,204 @@ +from unittest.mock import call + +from shared.django_apps.core.models import Pull, PullStates +from shared.django_apps.core.tests.factories import PullFactory, RepositoryFactory + +from tasks.flare_cleanup import FlareCleanupTask + + +class TestFlareCleanupTask(object): + def test_get_min_seconds_interval_between_executions(self): + assert isinstance( + FlareCleanupTask.get_min_seconds_interval_between_executions(), + int, + ) + assert FlareCleanupTask.get_min_seconds_interval_between_executions() > 17000 + + def test_successful_run(self, transactional_db, mocker, mock_archive_storage): + mock_logs = mocker.patch("logging.Logger.info") + archive_value_for_flare = {"some": "data"} + local_value_for_flare = {"test": "test"} + + open_pull_with_local_flare = PullFactory( + state=PullStates.OPEN.value, + _flare=local_value_for_flare, + repository=RepositoryFactory(), + ) + assert open_pull_with_local_flare.flare == local_value_for_flare + assert open_pull_with_local_flare._flare == local_value_for_flare + assert open_pull_with_local_flare._flare_storage_path is None + + closed_pull_with_local_flare = PullFactory( + state=PullStates.CLOSED.value, + _flare=local_value_for_flare, + repository=RepositoryFactory(), + ) + assert closed_pull_with_local_flare.flare == local_value_for_flare + assert closed_pull_with_local_flare._flare == local_value_for_flare + assert closed_pull_with_local_flare._flare_storage_path is None + + open_pull_with_archive_flare = PullFactory( + state=PullStates.OPEN.value, + _flare=None, + repository=RepositoryFactory(), + ) + open_pull_with_archive_flare.flare = archive_value_for_flare + open_pull_with_archive_flare.save() + open_pull_with_archive_flare.refresh_from_db() + assert open_pull_with_archive_flare.flare == archive_value_for_flare + assert open_pull_with_archive_flare._flare is None + assert open_pull_with_archive_flare._flare_storage_path is not None + + merged_pull_with_archive_flare = PullFactory( + state=PullStates.MERGED.value, + _flare=None, + repository=RepositoryFactory(), + ) + merged_pull_with_archive_flare.flare = archive_value_for_flare + merged_pull_with_archive_flare.save() + merged_pull_with_archive_flare.refresh_from_db() + assert merged_pull_with_archive_flare.flare == archive_value_for_flare + assert merged_pull_with_archive_flare._flare is None + assert merged_pull_with_archive_flare._flare_storage_path is not None + + task = FlareCleanupTask() + task.manual_run() + + mock_logs.assert_has_calls( + [ + call("Starting FlareCleanupTask"), + call("FlareCleanupTask cleared 1 database flares"), + call("FlareCleanupTask cleared 1 Archive flares"), + ] + ) + + # there is a cache for flare on the object (all ArchiveFields have this), + # so get a fresh copy of each object without the cached value + open_pull_with_local_flare = Pull.objects.get(id=open_pull_with_local_flare.id) + assert open_pull_with_local_flare.flare == local_value_for_flare + assert open_pull_with_local_flare._flare == local_value_for_flare + assert open_pull_with_local_flare._flare_storage_path is None + + closed_pull_with_local_flare = Pull.objects.get( + id=closed_pull_with_local_flare.id + ) + assert closed_pull_with_local_flare.flare == {} + assert closed_pull_with_local_flare._flare is None + assert closed_pull_with_local_flare._flare_storage_path is None + + open_pull_with_archive_flare = Pull.objects.get( + id=open_pull_with_archive_flare.id + ) + assert open_pull_with_archive_flare.flare == archive_value_for_flare + assert open_pull_with_archive_flare._flare is None + assert open_pull_with_archive_flare._flare_storage_path is not None + + merged_pull_with_archive_flare = Pull.objects.get( + id=merged_pull_with_archive_flare.id + ) + assert merged_pull_with_archive_flare.flare == {} + assert merged_pull_with_archive_flare._flare is None + assert merged_pull_with_archive_flare._flare_storage_path is None + + mock_logs.reset_mock() + # check that once these pulls are corrected they are not corrected again + task = FlareCleanupTask() + task.manual_run() + + mock_logs.assert_has_calls( + [ + call("Starting FlareCleanupTask"), + call("FlareCleanupTask cleared 0 database flares"), + call("FlareCleanupTask cleared 0 Archive flares"), + ] + ) + + def test_limits_on_manual_run(self, transactional_db, mocker, mock_archive_storage): + mock_logs = mocker.patch("logging.Logger.info") + local_value_for_flare = {"test": "test"} + archive_value_for_flare = {"some": "data"} + + oldest_to_newest_pulls_with_local_flare = [] + for i in range(5): + merged_pull_with_local_flare = PullFactory( + state=PullStates.MERGED.value, + _flare=local_value_for_flare, + repository=RepositoryFactory(), + ) + assert merged_pull_with_local_flare.flare == local_value_for_flare + assert merged_pull_with_local_flare._flare == local_value_for_flare + assert merged_pull_with_local_flare._flare_storage_path is None + oldest_to_newest_pulls_with_local_flare.append( + merged_pull_with_local_flare.id + ) + + oldest_to_newest_pulls_with_archive_flare = [] + for i in range(5): + merged_pull_with_archive_flare = PullFactory( + state=PullStates.MERGED.value, + _flare=None, + repository=RepositoryFactory(), + ) + merged_pull_with_archive_flare.flare = archive_value_for_flare + merged_pull_with_archive_flare.save() + assert merged_pull_with_archive_flare.flare == archive_value_for_flare + assert merged_pull_with_archive_flare._flare is None + assert merged_pull_with_archive_flare._flare_storage_path is not None + oldest_to_newest_pulls_with_archive_flare.append( + merged_pull_with_archive_flare.id + ) + + everything_in_archive_storage = mock_archive_storage.list_folder_contents( + bucket_name="archive" + ) + assert len(everything_in_archive_storage) == 5 + + task = FlareCleanupTask() + task.manual_run(limit=3) + + mock_logs.assert_has_calls( + [ + call("Starting FlareCleanupTask"), + call("FlareCleanupTask cleared 3 database flares"), + call("FlareCleanupTask cleared 3 Archive flares"), + ] + ) + + # there is a cache for flare on the object (all ArchiveFields have this), + # so get a fresh copy of each object without the cached value + should_be_cleared = oldest_to_newest_pulls_with_local_flare[:3] + should_not_be_cleared = oldest_to_newest_pulls_with_local_flare[3:] + for pull_id in should_be_cleared: + pull = Pull.objects.get(id=pull_id) + assert pull.flare == {} + assert pull._flare is None + assert pull._flare_storage_path is None + + for pull_id in should_not_be_cleared: + pull = Pull.objects.get(id=pull_id) + assert pull.flare == local_value_for_flare + assert pull._flare == local_value_for_flare + assert pull._flare_storage_path is None + + everything_in_archive_storage = mock_archive_storage.list_folder_contents( + bucket_name="archive" + ) + assert len(everything_in_archive_storage) == 2 + file_names_in_archive_storage = [ + file["name"] for file in everything_in_archive_storage + ] + + should_be_cleared = oldest_to_newest_pulls_with_archive_flare[:3] + should_not_be_cleared = oldest_to_newest_pulls_with_archive_flare[3:] + for pull_id in should_be_cleared: + pull = Pull.objects.get(id=pull_id) + assert pull.flare == {} + assert pull._flare is None + assert pull._flare_storage_path is None + + for pull_id in should_not_be_cleared: + pull = Pull.objects.get(id=pull_id) + assert pull.flare == archive_value_for_flare + assert pull._flare is None + assert pull._flare_storage_path is not None + assert pull._flare_storage_path in file_names_in_archive_storage