-
Notifications
You must be signed in to change notification settings - Fork 11
Framework for exporting data #725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
45c1d77
f6871ea
b3e448d
bb745f6
518b8df
b3b4369
8d98759
21470b9
a8673af
523d177
ac7cfbc
04ab2cf
e4599b9
94cc7a3
ed3960a
c4c9820
5dbc002
a86a348
57c5905
e0df304
8be00cd
b297a84
d8d3b5d
1270fd1
44c3ca8
95e6e86
8a02b3d
c8f5d3e
349925a
e0321bd
45485b2
4105177
3c9aca2
543a142
95745d7
25896b7
f14653f
43e8835
f836bfa
3d0514a
1e879e4
4d48622
747708c
2478789
cd2f57c
4bae6c7
26181d0
eded961
02dd4b7
058f93e
b20a851
a518a74
0b06579
ee34d2c
0900bb0
a1eb605
faeb081
6a50eed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| from django.contrib import admin | ||
| from django.http import HttpRequest | ||
|
|
||
| from .models import DataExport | ||
|
|
||
|
|
||
| @admin.register(DataExport) | ||
| class DataExportAdmin(admin.ModelAdmin): | ||
| """ | ||
| Admin panel for managing DataExport objects. | ||
| """ | ||
|
|
||
| list_display = ("id", "user", "format", "status_display", "project", "created_at", "get_job") | ||
| list_filter = ("format", "project") | ||
| search_fields = ("user__username", "format", "project__name") | ||
| readonly_fields = ("status_display", "file_url_display") | ||
|
|
||
| fieldsets = ( | ||
| ( | ||
| None, | ||
| { | ||
| "fields": ("user", "format", "project", "filters"), | ||
| }, | ||
| ), | ||
| ( | ||
| "Job Info", | ||
| { | ||
| "fields": ("status_display", "file_url_display"), | ||
| "classes": ("collapse",), # This makes job-related fields collapsible in the admin panel | ||
| }, | ||
| ), | ||
| ) | ||
|
|
||
| def get_queryset(self, request: HttpRequest): | ||
| """ | ||
| Optimize queryset by selecting related project and job data. | ||
| """ | ||
| return super().get_queryset(request).select_related("project", "job") | ||
|
|
||
| @admin.display(description="Status") | ||
| def status_display(self, obj): | ||
| return obj.status # Calls the @property from the model | ||
|
|
||
| @admin.display(description="File URL") | ||
| def file_url_display(self, obj): | ||
| return obj.file_url # Calls the @property from the model | ||
|
|
||
| @admin.display(description="Job ID") | ||
| def get_job(self, obj): | ||
| """Displays the related job ID or 'No Job' if none exists.""" | ||
| return obj.job.id if obj.job else "No Job" | ||
|
|
||
| @admin.action(description="Run export job") | ||
| def run_export_job(self, request: HttpRequest, queryset): | ||
| """ | ||
| Admin action to trigger the export job manually. | ||
| """ | ||
| for export in queryset: | ||
| if export.job: | ||
| export.job.enqueue() | ||
|
|
||
| self.message_user(request, f"Started export job for {queryset.count()} export(s).") | ||
|
|
||
| actions = [run_export_job] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| from django.apps import AppConfig | ||
|
|
||
|
|
||
| class ExportsConfig(AppConfig): | ||
| default_auto_field = "django.db.models.BigAutoField" | ||
| name = "ami.exports" | ||
|
|
||
| def ready(self): | ||
| import ami.exports.signals # noqa: F401 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| import logging | ||
| import os | ||
| from abc import ABC, abstractmethod | ||
|
|
||
| from ami.exports.utils import apply_filters | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class BaseExporter(ABC): | ||
| """Base class for all data export handlers.""" | ||
|
|
||
| file_format = "" # To be defined in child classes | ||
| serializer_class = None | ||
| filter_backends = [] | ||
|
|
||
| def __init__(self, data_export): | ||
| self.data_export = data_export | ||
| self.job = data_export.job if hasattr(data_export, "job") else None | ||
| self.project = data_export.project | ||
| self.queryset = apply_filters( | ||
| queryset=self.get_queryset(), filters=data_export.filters, filter_backends=self.get_filter_backends() | ||
| ) | ||
| self.total_records = self.queryset.count() | ||
| if self.job: | ||
| self.job.progress.add_stage_param(self.job.job_type_key, "Number of records exported", 0) | ||
| self.job.progress.add_stage_param(self.job.job_type_key, "Total records to export", self.total_records) | ||
| self.job.save() | ||
|
|
||
| @abstractmethod | ||
| def export(self): | ||
| """Perform the export process.""" | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For abstract base class methods, I like to |
||
| raise NotImplementedError() | ||
|
|
||
| @abstractmethod | ||
| def get_queryset(self): | ||
| raise NotImplementedError() | ||
|
|
||
| def get_serializer_class(self): | ||
| return self.serializer_class | ||
|
|
||
| def get_filter_backends(self): | ||
| from ami.main.api.views import OccurrenceCollectionFilter | ||
|
|
||
| return [OccurrenceCollectionFilter] | ||
|
|
||
| def update_export_stats(self, file_temp_path=None): | ||
| """ | ||
| Updates record_count based on queryset and file size after export. | ||
| """ | ||
| # Set record count from queryset | ||
| self.data_export.record_count = self.queryset.count() | ||
|
|
||
| # Check if temp file path is provided and update file size | ||
|
|
||
| if file_temp_path and os.path.exists(file_temp_path): | ||
| self.data_export.file_size = os.path.getsize(file_temp_path) | ||
|
|
||
| # Save the updated values | ||
| self.data_export.save() | ||
|
|
||
| def update_job_progress(self, records_exported): | ||
| """ | ||
| Updates job progress and record count. | ||
| """ | ||
| if self.job: | ||
| self.job.progress.update_stage( | ||
| self.job.job_type_key, progress=round(records_exported / self.total_records, 2) | ||
| ) | ||
| self.job.progress.add_or_update_stage_param( | ||
| self.job.job_type_key, "Number of records exported", records_exported | ||
| ) | ||
| self.job.save() | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,159 @@ | ||||||
| import csv | ||||||
| import json | ||||||
| import logging | ||||||
| import tempfile | ||||||
|
|
||||||
| from django.core.serializers.json import DjangoJSONEncoder | ||||||
| from rest_framework import serializers | ||||||
|
|
||||||
| from ami.exports.base import BaseExporter | ||||||
| from ami.exports.utils import get_data_in_batches | ||||||
| from ami.main.models import Occurrence | ||||||
|
|
||||||
| logger = logging.getLogger(__name__) | ||||||
|
|
||||||
|
|
||||||
| def get_export_serializer(): | ||||||
| from ami.main.api.serializers import OccurrenceSerializer | ||||||
|
|
||||||
| class OccurrenceExportSerializer(OccurrenceSerializer): | ||||||
| detection_images = serializers.SerializerMethodField() | ||||||
|
|
||||||
| def get_detection_images(self, obj: Occurrence): | ||||||
| """Convert the generator field to a list before serialization""" | ||||||
| if hasattr(obj, "detection_images") and callable(obj.detection_images): | ||||||
| return list(obj.detection_images()) # Convert generator to list | ||||||
| return [] | ||||||
|
|
||||||
| def get_permissions(self, instance_data): | ||||||
| return instance_data | ||||||
|
|
||||||
| def to_representation(self, instance): | ||||||
| return serializers.HyperlinkedModelSerializer.to_representation(self, instance) | ||||||
|
|
||||||
| return OccurrenceExportSerializer | ||||||
|
|
||||||
|
|
||||||
| class JSONExporter(BaseExporter): | ||||||
| """Handles JSON export of occurrences.""" | ||||||
|
|
||||||
| file_format = "json" | ||||||
|
|
||||||
| def get_serializer_class(self): | ||||||
| return get_export_serializer() | ||||||
|
|
||||||
| def get_queryset(self): | ||||||
| return ( | ||||||
| Occurrence.objects.filter(project=self.project) | ||||||
| .select_related( | ||||||
| "determination", | ||||||
| "deployment", | ||||||
| "event", | ||||||
| ) | ||||||
| .with_timestamps() # type: ignore[union-attr] Custom queryset method | ||||||
| .with_detections_count() | ||||||
| .with_identifications() | ||||||
| ) | ||||||
|
|
||||||
| def export(self): | ||||||
| """Exports occurrences to JSON format.""" | ||||||
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode="w", encoding="utf-8") | ||||||
| with open(temp_file.name, "w", encoding="utf-8") as f: | ||||||
| first = True | ||||||
| f.write("[") | ||||||
| records_exported = 0 | ||||||
| for i, batch in enumerate(get_data_in_batches(self.queryset, self.get_serializer_class())): | ||||||
| json_data = json.dumps(batch, cls=DjangoJSONEncoder) | ||||||
| json_data = json_data[1:-1] # remove [ and ] from json string | ||||||
| f.write(",\n" if not first else "") | ||||||
| f.write(json_data) | ||||||
| first = False | ||||||
| records_exported += len(batch) | ||||||
| self.update_job_progress(records_exported) | ||||||
| f.write("]") | ||||||
|
|
||||||
| self.update_export_stats(file_temp_path=temp_file.name) | ||||||
| return temp_file.name # Return file path | ||||||
|
|
||||||
|
|
||||||
| class OccurrenceTabularSerializer(serializers.ModelSerializer): | ||||||
| """Serializer to format occurrences for tabular data export.""" | ||||||
|
|
||||||
| event_id = serializers.IntegerField(source="event.id", allow_null=True) | ||||||
| event_name = serializers.CharField(source="event.name", allow_null=True) | ||||||
| deployment_id = serializers.IntegerField(source="deployment.id", allow_null=True) | ||||||
| deployment_name = serializers.CharField(source="deployment.name", allow_null=True) | ||||||
| project_id = serializers.IntegerField(source="project.id", allow_null=True) | ||||||
| project_name = serializers.CharField(source="project.name", allow_null=True) | ||||||
|
|
||||||
| determination_id = serializers.IntegerField(source="determination.id", allow_null=True) | ||||||
| determination_name = serializers.CharField(source="determination.name", allow_null=True) | ||||||
| determination_score = serializers.FloatField(allow_null=True) | ||||||
| verification_status = serializers.SerializerMethodField() | ||||||
|
|
||||||
| class Meta: | ||||||
| model = Occurrence | ||||||
| fields = [ | ||||||
| "id", | ||||||
| "event_id", | ||||||
| "event_name", | ||||||
| "deployment_id", | ||||||
| "deployment_name", | ||||||
| "project_id", | ||||||
| "project_name", | ||||||
| "determination_id", | ||||||
| "determination_name", | ||||||
| "determination_score", | ||||||
| "verification_status", | ||||||
| "detections_count", | ||||||
| "first_appearance_timestamp", | ||||||
| "last_appearance_timestamp", | ||||||
| "duration", | ||||||
| ] | ||||||
|
|
||||||
| def get_verification_status(self, obj): | ||||||
| """ | ||||||
| Returns 'Verified' if the occurrence has identifications, otherwise 'Not verified'. | ||||||
| """ | ||||||
| return "Verified" if obj.identifications.exists() else "Not verified" | ||||||
|
|
||||||
|
|
||||||
| class CSVExporter(BaseExporter): | ||||||
| """Handles CSV export of occurrences.""" | ||||||
|
|
||||||
| file_format = "csv" | ||||||
|
|
||||||
| serializer_class = OccurrenceTabularSerializer | ||||||
|
|
||||||
| def get_queryset(self): | ||||||
| return ( | ||||||
| Occurrence.objects.filter(project=self.project) | ||||||
| .select_related( | ||||||
| "determination", | ||||||
| "deployment", | ||||||
| "event", | ||||||
| ) | ||||||
| .with_timestamps() # type: ignore[union-attr] Custom queryset method | ||||||
| .with_detections_count() | ||||||
| .with_identifications() | ||||||
| ) | ||||||
|
|
||||||
| def export(self): | ||||||
| """Exports occurrences to CSV format.""" | ||||||
|
|
||||||
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode="w", newline="", encoding="utf-8") | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The named temporary file may only exist for a short time (even with delete=False). Use the Lines 73 to 74 in a362fe4
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see now that you are writing to a tempfile first, and then transferring to the object store. Sorry I missed that! |
||||||
|
|
||||||
| # Extract field names dynamically from the serializer | ||||||
| serializer = self.serializer_class() | ||||||
| field_names = list(serializer.fields.keys()) | ||||||
| records_exported = 0 | ||||||
| with open(temp_file.name, "w", newline="", encoding="utf-8") as csvfile: | ||||||
| writer = csv.DictWriter(csvfile, fieldnames=field_names) | ||||||
| writer.writeheader() | ||||||
|
|
||||||
| for i, batch in enumerate(get_data_in_batches(self.queryset, self.serializer_class)): | ||||||
| writer.writerows(batch) | ||||||
| records_exported += len(batch) | ||||||
| self.update_job_progress(records_exported) | ||||||
| self.update_export_stats(file_temp_path=temp_file.name) | ||||||
| return temp_file.name # Return the file path | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| # Generated by Django 4.2.10 on 2025-04-02 20:12 | ||
|
|
||
| from django.conf import settings | ||
| from django.db import migrations, models | ||
| import django.db.models.deletion | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| initial = True | ||
|
|
||
| dependencies = [ | ||
| ("main", "0058_alter_project_options"), | ||
| migrations.swappable_dependency(settings.AUTH_USER_MODEL), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.CreateModel( | ||
| name="DataExport", | ||
| fields=[ | ||
| ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), | ||
| ("created_at", models.DateTimeField(auto_now_add=True)), | ||
| ("updated_at", models.DateTimeField(auto_now=True)), | ||
| ( | ||
| "format", | ||
| models.CharField( | ||
| choices=[ | ||
| ("occurrences_simple_json", "occurrences_simple_json"), | ||
| ("occurrences_simple_csv", "occurrences_simple_csv"), | ||
| ], | ||
| max_length=255, | ||
| ), | ||
| ), | ||
| ("filters", models.JSONField(blank=True, null=True)), | ||
| ("filters_display", models.JSONField(blank=True, null=True)), | ||
| ("file_url", models.URLField(blank=True, null=True)), | ||
| ("record_count", models.PositiveIntegerField(default=0)), | ||
| ("file_size", models.PositiveBigIntegerField(default=0)), | ||
| ( | ||
| "project", | ||
| models.ForeignKey( | ||
| on_delete=django.db.models.deletion.CASCADE, related_name="exports", to="main.project" | ||
| ), | ||
| ), | ||
| ( | ||
| "user", | ||
| models.ForeignKey( | ||
| on_delete=django.db.models.deletion.CASCADE, | ||
| related_name="exports", | ||
| to=settings.AUTH_USER_MODEL, | ||
| ), | ||
| ), | ||
| ], | ||
| options={ | ||
| "abstract": False, | ||
| }, | ||
| ), | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually already the default, set by DEFAULT_AUTO_FIELD in settings/base.py