-
Notifications
You must be signed in to change notification settings - Fork 11
Framework for exporting data #725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
45c1d77
f6871ea
b3e448d
bb745f6
518b8df
b3b4369
8d98759
21470b9
a8673af
523d177
ac7cfbc
04ab2cf
e4599b9
94cc7a3
ed3960a
c4c9820
5dbc002
a86a348
57c5905
e0df304
8be00cd
b297a84
d8d3b5d
1270fd1
44c3ca8
95e6e86
8a02b3d
c8f5d3e
349925a
e0321bd
45485b2
4105177
3c9aca2
543a142
95745d7
25896b7
f14653f
43e8835
f836bfa
3d0514a
1e879e4
4d48622
747708c
2478789
cd2f57c
4bae6c7
26181d0
eded961
02dd4b7
058f93e
b20a851
a518a74
0b06579
ee34d2c
0900bb0
a1eb605
faeb081
6a50eed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| import logging | ||
|
|
||
| from celery import shared_task | ||
| from django.core.files.storage import default_storage | ||
| from django.core.mail import send_mail | ||
|
|
||
| from ami.main.models import ExportHistory, Occurrence | ||
| from ami.utils.exports import create_dwc_archive | ||
| from config.settings.local import DEFAULT_FROM_EMAIL | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @shared_task(bind=True) | ||
| def export_occurrences_task(self, occurrence_ids=None, user_email=None, base_url=None): | ||
| """ | ||
| Celery task for exporting occurrences asynchronously to MinIO. | ||
| """ | ||
|
|
||
| try: | ||
| occurrences = Occurrence.objects.filter(id__in=occurrence_ids) | ||
| file_path = create_dwc_archive(occurrences) | ||
| task_id = self.request.id | ||
| # Generate a unique filename for MinIO storage | ||
| file_name = f"{task_id}.zip" | ||
| minio_path = f"exports/{file_name}" # Save under 'exports/' folder in MinIO | ||
|
|
||
| # Upload file to MinIO storage | ||
| with open(file_path, "rb") as f: | ||
| default_storage.save(minio_path, f) | ||
|
|
||
| # Get public URL of the stored file | ||
| file_url = f"{base_url}{default_storage.url(minio_path)}" | ||
| logger.info(f"Export completed: {file_url}") | ||
| # Update export history | ||
| ExportHistory.objects.filter(task_id=task_id).update(status="completed", file_url=file_url) | ||
| send_mail( | ||
| subject="Your Occurrence Export is Ready!", | ||
| message=f"""Hello,\n\nYour occurrence data export is complete! | ||
| You can download the file here:\n{file_url}\n\nThank you!""", | ||
| from_email=DEFAULT_FROM_EMAIL, | ||
| recipient_list=[user_email], | ||
| fail_silently=False, | ||
| ) | ||
| logger.info(f"Email sent to {user_email} with download link.") | ||
| return {"status": "completed", "file_url": file_url} | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Export failed: {str(e)}") | ||
| ExportHistory.objects.filter(task_id=self.request.id).update(status="failed") | ||
| self.retry(exc=e, countdown=60, max_retries=3) # Retry up to 3 times |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,8 +2,11 @@ | |
| import logging | ||
| from statistics import mode | ||
|
|
||
| from celery.result import AsyncResult | ||
| from django.contrib.postgres.search import TrigramSimilarity | ||
| from django.core import exceptions | ||
| from django.core.files.base import ContentFile | ||
| from django.core.files.storage import default_storage | ||
| from django.db import models | ||
| from django.db.models import Prefetch | ||
| from django.db.models.query import QuerySet | ||
|
|
@@ -26,6 +29,8 @@ | |
| from ami.base.pagination import LimitOffsetPaginationWithPermissions | ||
| from ami.base.permissions import IsActiveStaffOrReadOnly | ||
| from ami.base.serializers import FilterParamsSerializer, SingleParamSerializer | ||
| from ami.main.api.tasks import export_occurrences_task | ||
| from ami.utils.exports import create_dwc_archive | ||
| from ami.utils.requests import get_active_classification_threshold, get_active_project, project_id_doc_param | ||
| from ami.utils.storages import ConnectionTestResult | ||
|
|
||
|
|
@@ -35,6 +40,7 @@ | |
| Detection, | ||
| Device, | ||
| Event, | ||
| ExportHistory, | ||
| Identification, | ||
| Occurrence, | ||
| Page, | ||
|
|
@@ -998,6 +1004,7 @@ def get_serializer_class(self): | |
| return OccurrenceSerializer | ||
|
|
||
| def get_queryset(self) -> QuerySet: | ||
| logger.info(f"OccurrenceViewset action : {self.action}") | ||
| project = get_active_project(self.request) | ||
| qs = super().get_queryset() | ||
| if project: | ||
|
|
@@ -1010,7 +1017,7 @@ def get_queryset(self) -> QuerySet: | |
| qs = qs.with_detections_count().with_timestamps() # type: ignore | ||
| qs = qs.with_identifications() # type: ignore | ||
|
|
||
| if self.action == "list": | ||
| if self.action == "list" or self.action == "export": | ||
| qs = ( | ||
| qs.all() | ||
| .exclude(detections=None) | ||
|
|
@@ -1033,6 +1040,85 @@ def get_queryset(self) -> QuerySet: | |
| def list(self, request, *args, **kwargs): | ||
| return super().list(request, *args, **kwargs) | ||
|
|
||
| def paginate_queryset(self, queryset): | ||
| """ | ||
| Override pagination to skip pagination for 'export' action. | ||
| """ | ||
|
|
||
| if self.action == "export": | ||
|
||
| return None # Disable pagination, return full queryset | ||
|
|
||
| return super().paginate_queryset(queryset) # Apply normal pagination | ||
|
|
||
| @action(detail=False, methods=["post"]) | ||
| def export(self, request): | ||
| """ | ||
| Trigger occurrence export via Celery, passing only filtered occurrence IDs. | ||
| """ | ||
| query_set = self.get_queryset() | ||
| occurrence_ids = list(query_set.values_list("id", flat=True)) # Extract IDs only | ||
|
|
||
| logger.info(f"OccurrenceViewSet.export - Exporting {len(occurrence_ids)} occurrences") | ||
| base_url = request.build_absolute_uri("/").rstrip("/") # Get the full domain name | ||
| # Trigger Celery task with occurrence IDs | ||
| task = export_occurrences_task.apply_async( | ||
| kwargs={"occurrence_ids": occurrence_ids, "user_email": request.user.email, "base_url": base_url} | ||
| ) | ||
| # Save export history | ||
| ExportHistory.objects.create(user=request.user, task_id=task.id, status="pending") | ||
|
|
||
| return Response({"task_id": task.id}) | ||
|
|
||
| @action(detail=False, methods=["get"]) | ||
| def export_status(self, request): | ||
| """ | ||
| Check export task status. | ||
| """ | ||
| task_id = request.query_params.get("task_id") | ||
| if not task_id: | ||
| return Response({"error": "task_id is required"}, status=400) | ||
|
|
||
| task = AsyncResult(task_id) | ||
| # Handle case where task ID does not exist in Celery | ||
| if task.state is None or task.result is None: | ||
| return Response({"error": "Invalid or unknown task ID"}, status=404) | ||
| if task.state == "PENDING": | ||
| return Response({"status": "pending"}) | ||
| elif task.state == "SUCCESS": | ||
| return Response({"status": "completed", "file_url": task.result.get("file_url")}) | ||
| elif task.state == "FAILURE": | ||
| return Response({"status": "failed", "error": str(task.result)}) | ||
| else: | ||
| return Response({"status": task.state}) | ||
|
|
||
| @action(detail=False, methods=["post"]) | ||
| def export_test(self, request): | ||
| """ | ||
| Synchronous test endpoint to generate a DwC-A archive instantly. | ||
| """ | ||
| query_set = self.get_queryset() | ||
|
|
||
| if not query_set.exists(): | ||
| return Response({"error": "No occurrences found to export."}, status=status.HTTP_400_BAD_REQUEST) | ||
|
|
||
| archive_path = create_dwc_archive(query_set) | ||
| logger.info(f"Test export created: {archive_path}") | ||
| # Generate a unique filename for MinIO (use task ID or timestamp) | ||
| import datetime | ||
|
|
||
| now = datetime.datetime.now() | ||
| now = str(now) | ||
| file_name = f"exports/dwca_{now}.zip" | ||
|
|
||
| # Upload to MinIO storage | ||
| with open(archive_path, "rb") as archive_file: | ||
| default_storage.save(file_name, ContentFile(archive_file.read())) | ||
|
|
||
| # Get MinIO file URL | ||
| file_url = default_storage.url(file_name) | ||
|
|
||
| return Response({"message": "Export completed successfully", "file_url": file_url}) | ||
|
|
||
|
|
||
| class TaxonViewSet(DefaultViewSet): | ||
| """ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| # Generated by Django 4.2.10 on 2025-02-17 01:16 | ||
|
|
||
| from django.conf import settings | ||
| from django.db import migrations, models | ||
| import django.db.models.deletion | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| dependencies = [ | ||
| migrations.swappable_dependency(settings.AUTH_USER_MODEL), | ||
| ("main", "0044_merge_20250124_2333"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AlterField( | ||
| model_name="classification", | ||
| name="algorithm", | ||
| field=models.ForeignKey( | ||
| null=True, | ||
| on_delete=django.db.models.deletion.SET_NULL, | ||
| related_name="classifications", | ||
| to="ml.algorithm", | ||
| ), | ||
| ), | ||
| migrations.CreateModel( | ||
| name="ExportHistory", | ||
| fields=[ | ||
| ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), | ||
| ("created_at", models.DateTimeField(auto_now_add=True)), | ||
| ("updated_at", models.DateTimeField(auto_now=True)), | ||
| ("task_id", models.CharField(max_length=255, unique=True)), | ||
| ( | ||
| "status", | ||
| models.CharField( | ||
| choices=[("pending", "Pending"), ("completed", "Completed"), ("failed", "Failed")], | ||
| default="pending", | ||
| max_length=10, | ||
| ), | ||
| ), | ||
| ("file_url", models.URLField(blank=True, null=True)), | ||
| ( | ||
| "user", | ||
| models.ForeignKey( | ||
| on_delete=django.db.models.deletion.CASCADE, | ||
| related_name="exports", | ||
| to=settings.AUTH_USER_MODEL, | ||
| ), | ||
| ), | ||
| ], | ||
| options={ | ||
| "abstract": False, | ||
| }, | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| # Generated by Django 4.2.10 on 2025-02-17 03:18 | ||
|
|
||
| from django.db import migrations | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| dependencies = [ | ||
| ("ml", "0016_merge_20250117_2101"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AlterUniqueTogether( | ||
| name="algorithm", | ||
| unique_together={("name", "version")}, | ||
| ), | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be more scalable to keep pagination, but automatically loop through all the pages. Rather than triggering a single huge database query. Or another way to break it apart? I can give you a large DB snapshot to test on.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would appreciate it very much if I 've access to the DB snapshot.