diff --git a/kobo/settings/base.py b/kobo/settings/base.py index 2ba8a355d1..ca78a36456 100644 --- a/kobo/settings/base.py +++ b/kobo/settings/base.py @@ -475,6 +475,10 @@ 'Enable automatic deletion of attachments for users who have exceeded ' 'their storage limits.' ), + 'ANON_EXPORTS_CLEANUP_AGE': ( + 30, + 'Number of minutes after which anonymous export tasks are cleaned up.' + ), 'LIMIT_ATTACHMENT_REMOVAL_GRACE_PERIOD': ( 90, 'Number of days to keep attachments after the user has exceeded their ' @@ -729,6 +733,7 @@ 'MASS_EMAIL_ENQUEUED_RECORD_EXPIRY', 'MASS_EMAIL_TEST_EMAILS', 'USAGE_LIMIT_ENFORCEMENT', + 'ANON_EXPORTS_CLEANUP_AGE', ), 'Rest Services': ( 'ALLOW_UNSECURED_HOOK_ENDPOINTS', @@ -1445,6 +1450,12 @@ def dj_stripe_request_callback_method(): 'options': {'queue': 'kpi_low_priority_queue'} }, # Schedule every 15 minutes + 'cleanup-anonymous-exports': { + 'task': 'kpi.tasks.cleanup_anonymous_exports', + 'schedule': crontab(minute='*/15'), + 'options': {'queue': 'kpi_low_priority_queue'} + }, + # Schedule every 15 minutes 'refresh-user-report-snapshot': { 'task': 'kobo.apps.user_reports.tasks.refresh_user_report_snapshots', 'schedule': crontab(minute='*/15'), diff --git a/kpi/tasks.py b/kpi/tasks.py index cc1406636c..4bc7d409ac 100644 --- a/kpi/tasks.py +++ b/kpi/tasks.py @@ -1,11 +1,15 @@ import time +from datetime import timedelta import requests +from constance import config from django.apps import apps from django.conf import settings from django.core import mail +from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist from django.core.management import call_command +from django.utils import timezone from kobo.apps.kobo_auth.shortcuts import User from kobo.apps.markdownx_uploader.tasks import remove_unused_markdown_files @@ -13,7 +17,13 @@ from kpi.constants import LIMIT_HOURS_23 from kpi.maintenance_tasks import remove_old_asset_snapshots, remove_old_import_tasks from kpi.models.asset import Asset -from kpi.models.import_export_task import ImportTask, SubmissionExportTask +from kpi.models.import_export_task import ( + ImportExportStatusChoices, + ImportTask, + SubmissionExportTask, +) +from kpi.utils.log import logging +from kpi.utils.object_permission import get_anonymous_user @celery_app.task( @@ -68,6 +78,56 @@ def export_task_in_background( ) +@celery_app.task +def cleanup_anonymous_exports(**kwargs): + """ + Task to clean up export tasks created by the AnonymousUser that are older + than `ANON_EXPORTS_CLEANUP_AGE`, excluding those that are still processing + """ + BATCH_SIZE = 50 + lock_timeout = 15*60 + cache_key = 'cleanup_anonymous_exports:lock' + lock = cache.lock(cache_key, timeout=lock_timeout + 60) + if not lock.acquire(blocking=False, blocking_timeout=0): + logging.info('Nothing to do, task is already running!') + return + + try: + cutoff_time = timezone.now() - timedelta( + minutes=config.ANON_EXPORTS_CLEANUP_AGE + ) + + old_exports = SubmissionExportTask.objects.filter( + user=get_anonymous_user(), + date_created__lt=cutoff_time, + ).exclude( + status=ImportExportStatusChoices.PROCESSING + ).order_by('date_created')[:BATCH_SIZE] + + if not old_exports.exists(): + logging.info('No old anonymous exports to clean up.') + return + + deleted_count = 0 + for export in old_exports: + try: + if export.result: + try: + export.result.delete(save=False) + except Exception as e: + logging.error( + f'Error deleting file for export {export.uid}: {e}' + ) + export.delete() + deleted_count += 1 + except Exception as e: + logging.error(f'Error deleting export {export.uid}: {e}') + + logging.info(f'Cleaned up {deleted_count} old anonymous exports.') + finally: + lock.release() + + @celery_app.task def sync_kobocat_xforms( username=None, diff --git a/kpi/tests/test_cleanup_anonymous_exports.py b/kpi/tests/test_cleanup_anonymous_exports.py new file mode 100644 index 0000000000..23b30e6586 --- /dev/null +++ b/kpi/tests/test_cleanup_anonymous_exports.py @@ -0,0 +1,116 @@ +import os +from datetime import timedelta + +from django.core.cache import cache +from django.core.files.base import ContentFile +from django.utils import timezone +from django.test import TestCase + +from kpi.models.import_export_task import ( + ImportExportStatusChoices, + SubmissionExportTask +) +from kpi.tasks import cleanup_anonymous_exports +from kpi.utils.object_permission import get_anonymous_user + + +class AnonymousExportCleanupTestCase(TestCase): + def _create_export_task( + self, status=ImportExportStatusChoices.COMPLETE, minutes_old=60 + ): + export = SubmissionExportTask() + export.user = get_anonymous_user() + export.status = status + export.data = {'type': 'xls', 'source': 'test'} + export.save() + + if minutes_old > 0: + past_time = timezone.now() - timedelta(minutes=minutes_old) + SubmissionExportTask.objects.filter(uid=export.uid).update( + date_created=past_time + ) + export.refresh_from_db() + return export + + def test_exports_older_than_30_minutes_are_deleted(self): + # Export older than 30 min - should be deleted + old_export = self._create_export_task(minutes_old=31) + + # Export newer than 30 min - should be kept + recent_export = self._create_export_task(minutes_old=29) + + cleanup_anonymous_exports() + self.assertFalse( + SubmissionExportTask.objects.filter(uid=old_export.uid).exists() + ) + self.assertTrue( + SubmissionExportTask.objects.filter(uid=recent_export.uid).exists() + ) + + def test_export_result_file_is_deleted_from_storage(self): + """ + Test that export files are deleted from storage + """ + export = self._create_export_task(minutes_old=60) + + # Create actual file in storage + file_content = ContentFile( + b'PK\x03\x04' + + b'{"data": "export"}' * 100, + name='test_export.xlsx' + ) + export.result.save(f'test_export_{export.uid}.xlsx', file_content, save=True) + export.refresh_from_db() + + storage = export.result.storage + file_path = storage.path(export.result.name) + self.assertTrue(os.path.exists(file_path)) + self.assertTrue(SubmissionExportTask.objects.filter(uid=export.uid).exists()) + + cleanup_anonymous_exports() + + self.assertFalse(os.path.exists(file_path)) + self.assertFalse(SubmissionExportTask.objects.filter(uid=export.uid).exists()) + + def test_processing_exports_are_not_deleted(self): + """ + Test that exports with PROCESSING status are never deleted + """ + processing_export = self._create_export_task( + status=ImportExportStatusChoices.PROCESSING, + minutes_old=100 + ) + + cleanup_anonymous_exports() + self.assertTrue( + SubmissionExportTask.objects.filter( + uid=processing_export.uid + ).exists() + ) + + def test_cache_lock_prevents_concurrent_execution(self): + """ + Test that cache lock prevents concurrent task execution + """ + for i in range(5): + self._create_export_task(minutes_old=60) + + cache_key = 'cleanup_anonymous_exports:lock' + lock_timeout = 15 * 60 + + # Acquire lock manually (simulate first task running) + lock = cache.lock(cache_key, timeout=lock_timeout + 60) + lock.acquire(blocking=False) + + try: + # Task should return early without deleting + cleanup_anonymous_exports() + + # Verify no exports were deleted + remaining = SubmissionExportTask.objects.filter( + user__username='AnonymousUser' + ).count() + self.assertEqual(remaining, 5) + + finally: + lock.release()