Skip to content

Commit 5920cb2

Browse files
authored
Merge pull request #251 from hydroserver2/299-celery-etl
299 celery etl
2 parents 513186d + 3c23007 commit 5920cb2

File tree

72 files changed

+5657
-3884
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+5657
-3884
lines changed

api/urls.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
datastream_router,
2020
)
2121
from etl.views import (
22+
data_connection_router,
2223
orchestration_system_router,
23-
data_source_router,
24-
data_archive_router,
24+
task_router,
25+
task_run_router
2526
)
2627

2728

@@ -48,9 +49,10 @@
4849
api.add_router("processing-levels", processing_level_router)
4950
api.add_router("result-qualifiers", result_qualifier_router)
5051

51-
api.add_router("orchestration-systems", orchestration_system_router)
52-
api.add_router("data-sources", data_source_router)
53-
api.add_router("data-archives", data_archive_router)
52+
api.add_router("etl-data-connections", data_connection_router)
53+
api.add_router("etl-tasks", task_router)
54+
api.add_router("etl-tasks", task_run_router)
55+
api.add_router("etl-orchestration-systems", orchestration_system_router)
5456

5557
st_api_1_1 = SensorThingsAPI(
5658
title="HydroServer SensorThings API",

etl/admin.py

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,58 @@
1+
from django.urls import path
12
from django.contrib import admin
2-
from etl.models import OrchestrationSystem, DataSource, DataArchive
3+
from etl.models import OrchestrationSystem, DataConnection, Task, TaskMapping, TaskMappingPath, TaskRun
4+
from hydroserver.admin import VocabularyAdmin
35

46

5-
class OrchestrationSystemAdmin(admin.ModelAdmin):
7+
class OrchestrationSystemAdmin(admin.ModelAdmin, VocabularyAdmin):
68
list_display = ("id", "name", "orchestration_system_type", "workspace__name")
9+
change_list_template = "admin/etl/orchestrationsystem/change_list.html"
710

8-
def delete_queryset(self, request, queryset):
9-
OrchestrationSystem.delete_contents(filter_arg=queryset, filter_suffix="in")
10-
queryset.delete()
11+
def get_urls(self):
12+
urls = super().get_urls()
1113

14+
return [
15+
path(
16+
"load-default-orchestration-system-data/",
17+
self.admin_site.admin_view(self.load_default_data),
18+
name="orchestration_system_load_default_data",
19+
),
20+
] + urls
1221

13-
class DataSourceAdmin(admin.ModelAdmin):
14-
list_display = ("id", "name", "orchestration_system__name", "workspace__name")
22+
def load_default_data(self, request):
23+
return self.load_fixtures(
24+
request,
25+
"admin:etl_orchestrationsystem_changelist",
26+
["etl/fixtures/default_orchestration_systems.yaml"],
27+
)
1528

16-
def delete_queryset(self, request, queryset):
17-
DataSource.delete_contents(filter_arg=queryset, filter_suffix="in")
18-
queryset.delete()
1929

30+
class DataConnectionAdmin(admin.ModelAdmin):
31+
list_display = ("id", "name", "data_connection_type", "workspace__name")
2032

21-
class DataArchiveAdmin(admin.ModelAdmin):
22-
list_display = ("id", "name", "orchestration_system__name", "workspace__name")
2333

24-
def delete_queryset(self, request, queryset):
25-
DataArchive.delete_contents(filter_arg=queryset, filter_suffix="in")
26-
queryset.delete()
34+
class TaskAdmin(admin.ModelAdmin):
35+
list_display = ("id", "name", "data_connection__name", "orchestration_system__name",
36+
"data_connection__workspace__name")
37+
38+
39+
class TaskMappingAdmin(admin.ModelAdmin):
40+
list_display = ("id", "task__name", "source_identifier", "task__data_connection__name",
41+
"task__data_connection__workspace__name")
42+
43+
44+
class TaskMappingPathAdmin(admin.ModelAdmin):
45+
list_display = ("id", "task_mapping__task__name", "target_identifier", "task_mapping__task__data_connection__name",
46+
"task_mapping__task__data_connection__workspace__name")
47+
48+
49+
class TaskRunAdmin(admin.ModelAdmin):
50+
list_display = ("id", "status", "started_at", "finished_at", "result")
2751

2852

2953
admin.site.register(OrchestrationSystem, OrchestrationSystemAdmin)
30-
admin.site.register(DataSource, DataSourceAdmin)
31-
admin.site.register(DataArchive, DataArchiveAdmin)
54+
admin.site.register(DataConnection, DataConnectionAdmin)
55+
admin.site.register(Task, TaskAdmin)
56+
admin.site.register(TaskMapping, TaskMappingAdmin)
57+
admin.site.register(TaskMappingPath, TaskMappingPathAdmin)
58+
admin.site.register(TaskRun, TaskRunAdmin)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
- model: etl.orchestrationsystem
3+
pk: 019aeaa7-2310-7121-a627-e04427ff8bc9
4+
fields:
5+
name: HydroServer
6+
orchestration_system_type: INTERNAL
7+
workspace_id:

etl/fixtures/hydroshare_archival.yaml

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from datetime import timedelta
2+
from django.core.management.base import BaseCommand
3+
from django.utils import timezone
4+
from etl.models import Task, TaskRun
5+
6+
7+
class Command(BaseCommand):
8+
help = "Removes old TaskRun records, keeping the most recent per Task."
9+
10+
def add_arguments(self, parser):
11+
parser.add_argument(
12+
"--days",
13+
type=int,
14+
default=14,
15+
help="Number of days to keep TaskRun records. Older runs will be deleted. Default is 14.",
16+
)
17+
18+
def handle(self, *args, **options):
19+
days = options["days"]
20+
now = timezone.now()
21+
cutoff = now - timedelta(days=days)
22+
23+
total_deleted = 0
24+
25+
tasks = Task.objects.filter(
26+
taskrun__started_at__lt=cutoff
27+
).distinct()
28+
29+
for task in tasks:
30+
most_recent_run = TaskRun.objects.filter(task=task).order_by("-started_at").first()
31+
32+
deleted, _ = TaskRun.objects.filter(
33+
task=task,
34+
started_at__lt=cutoff
35+
).exclude(pk=most_recent_run.pk).delete()
36+
37+
total_deleted += deleted
38+
39+
self.stdout.write(
40+
self.style.SUCCESS(
41+
f"Cleanup complete. Deleted {total_deleted} old TaskRun records older than {days} days."
42+
)
43+
)

etl/management/commands/load_etl_test_data.py

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,9 @@ class Command(BaseCommand):
77

88
def handle(self, *args, **kwargs):
99
fixtures = [
10-
"tests/fixtures/test_users.yaml",
11-
"tests/fixtures/test_workspaces.yaml",
12-
"tests/fixtures/test_roles.yaml",
13-
"tests/fixtures/test_collaborators.yaml",
14-
"tests/fixtures/test_things.yaml",
15-
"tests/fixtures/test_observed_properties.yaml",
16-
"tests/fixtures/test_processing_levels.yaml",
17-
"tests/fixtures/test_result_qualifiers.yaml",
18-
"tests/fixtures/test_sensors.yaml",
19-
"tests/fixtures/test_units.yaml",
20-
"tests/fixtures/test_datastreams.yaml",
21-
"tests/fixtures/test_orchestration_systems.yaml",
22-
"tests/fixtures/test_data_sources.yaml",
23-
"tests/fixtures/test_data_archives.yaml",
24-
"tests/fixtures/test_etl_datastreams.yaml",
10+
"tests/fixtures/test_etl_data_connections.yaml",
11+
"tests/fixtures/test_etl_orchestration_systems.yaml",
12+
"tests/fixtures/test_etl_tasks.yaml",
2513
]
2614

2715
for fixture in fixtures:

0 commit comments

Comments
 (0)