Skip to content
Open
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,21 @@ dependencies = [
"django-extensions",
"django-filter",
"django-flags",
"django-redis>=6",
"django-regex",
"django-smart-env",
"django-storages[azure]",
"django-svelte-jsoneditor",
"djangorestframework",
"dotenv>=0.9.9",
"drf-nested-routers",
"drf-spectacular[sidecar]",
"flower",
"jsonschema",
"numpy<2",
"psycopg[binary]",
"psycopg-pool",
"pyzstd>=0.17",
"requests",
"sentry-sdk[celery,django]",
"setuptools",
Expand Down
34 changes: 32 additions & 2 deletions src/hope_dedup_engine/apps/api/admin/deduplicationset.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.contrib import messages
from django.contrib.admin import ModelAdmin, register

from admin_extra_buttons.decorators import button, link
Expand All @@ -8,8 +9,8 @@
from django.db.models import QuerySet
from django.http import HttpRequest
from rest_framework.reverse import reverse

from hope_dedup_engine.apps.api.models import DeduplicationSet
from django.utils.translation import gettext as _
from hope_dedup_engine.apps.api.models import DedupJob, DeduplicationSet


@register(DeduplicationSet)
Expand Down Expand Up @@ -56,5 +57,34 @@ def findings(self, button: button) -> str | None:
button.visible = False
return None

@button(
label="Terminate Job",
permission=lambda request, obj, **kwargs: (
request.user.is_staff
and obj
and obj.state not in [DeduplicationSet.State.FAILED, DeduplicationSet.State.CANCELED]
),
confirm=_("Are you sure you want to terminate the running job for this deduplication set?"),
)
def terminate_job(self, request: HttpRequest, pk: str) -> None:
ds = self.get_object(request, pk)

job = DedupJob.objects.filter(deduplication_set=ds).exclude(curr_async_result_id__isnull=True).first()

if job and job.curr_async_result_id:
new_status = job.terminate(job.curr_async_result_id)
self.message_user(
request,
f"Job termination initiated. New job status: {new_status}.",
messages.SUCCESS,
)
else:
self.message_user(
request,
_("No active job found. Setting state to Canceled."),
messages.WARNING,
)
ds.set_state(DeduplicationSet.State.CANCELED)

def get_queryset(self, request: HttpRequest) -> QuerySet[DeduplicationSet]:
return DeduplicationSet.objects.only(*self.get_list_display(request))
4 changes: 2 additions & 2 deletions src/hope_dedup_engine/apps/api/deduplication/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
encode_chunk,
get_chunks,
finish_with_error,
ChunkPurpose,
)

HOUR = 60 * 60
Expand Down Expand Up @@ -68,6 +67,7 @@ def find_duplicates(self, dedup_job_id: int, version: int) -> dict[str, Any]:
send_notification(deduplication_set.notification_url)

config = asdict(DeduplicationSetConfig.from_deduplication_set(deduplication_set))
config["dedup_job_id"] = dedup_job_id

# clean results
Finding.objects.filter(deduplication_set=deduplication_set).delete()
Expand All @@ -78,7 +78,7 @@ def find_duplicates(self, dedup_job_id: int, version: int) -> dict[str, Any]:
deduplication_set.finding_set.update(score=F("score") / weight_total)

filenames = deduplication_set.filenames_without_encodings()
chunks = get_chunks(filenames, purpose=ChunkPurpose.ENCODE)
chunks = get_chunks(filenames)
tasks = [encode_chunk.s(chunk, config) for chunk in chunks]
chord_id = chord(tasks)(callback_encodings.s(config=config))

Expand Down
1 change: 1 addition & 0 deletions src/hope_dedup_engine/apps/api/models/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class State(models.IntegerChoices):
) # Images are added to deduplication set, but not yet processed
PROCESSING = 2, "Processing" # deduplication set is being processed
FAILED = 3, "Failed" # an error occurred
CANCELED = 4, "Canceled" # Process was canceled by a user

id = models.UUIDField(primary_key=True, default=uuid4)
name = models.CharField(max_length=128, unique=True, null=True, blank=True, db_index=True)
Expand Down
6 changes: 3 additions & 3 deletions src/hope_dedup_engine/apps/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@
urlpatterns = [
path("", include(router.urls)),
path("", include(deduplication_sets_router.urls)),
path("api/rest/", SpectacularAPIView.as_view(), name="schema"),
path("rest/", SpectacularAPIView.as_view(), name="schema"),
path(
"api/rest/swagger/",
"rest/swagger/",
SpectacularSwaggerView.as_view(url_name="schema"),
name="swagger-ui",
),
path(
"api/rest/redoc/",
"rest/redoc/",
SpectacularRedocView.as_view(url_name="schema"),
name="redoc",
),
Expand Down
67 changes: 36 additions & 31 deletions src/hope_dedup_engine/apps/api/views.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from dataclasses import dataclass
from http import HTTPMethod
from typing import Any
from uuid import UUID

from django.db.models import QuerySet, Model
from django.http import HttpRequest
from drf_spectacular.utils import extend_schema
from rest_framework import mixins, status, viewsets
from rest_framework.decorators import action
Expand Down Expand Up @@ -61,6 +61,7 @@ class DeduplicationSetViewSet(
HasAccessToDeduplicationSet,
)
serializer_class = DeduplicationSetSerializer
queryset = DeduplicationSet.objects.all()

def get_queryset(self) -> QuerySet:
return DeduplicationSet.objects.filter(system=self.request.auth.system, deleted=False)
Expand Down Expand Up @@ -160,35 +161,8 @@ def destroy(self, request: Request, *args: Any, **kwargs: Any) -> Response:
return super().destroy(request, *args, **kwargs)


@dataclass
class ListDataWrapper:
data: list[dict[str, Any]]

def __setitem__(self, key: str, value: Any) -> None:
for item in self.data:
item[key] = value


class WrapRequestDataMixin:
def initialize_request(self, request: Request, *args: Any, **kwargs: Any) -> Request:
request = super().initialize_request(request, *args, **kwargs)
request._full_data = ListDataWrapper(request.data)
return request


class UnwrapRequestDataMixin:
def initialize_request(self, request: Request, *args: Any, **kwargs: Any) -> Request:
request = super().initialize_request(request, *args, **kwargs)
request._full_data = request._full_data.data
return request


# drf-nested-routers doesn't work correctly when request data is a list, so we use WrapRequestDataMixin,
# UnwrapRequestDataMixin, and ListDataWrapper to make it work with list of objects
class BulkImageViewSet(
UnwrapRequestDataMixin,
nested_viewsets.NestedViewSetMixin[Image],
WrapRequestDataMixin,
mixins.CreateModelMixin,
viewsets.GenericViewSet,
):
Expand All @@ -204,14 +178,45 @@ class BulkImageViewSet(
DEDUPLICATION_SET_PARAM: DEDUPLICATION_SET_FILTER,
}

def _inject_parent_lookup_kwargs(self, request_data: Any, view_kwargs: dict) -> None:
"""Inject parent lookup kwargs into request data, handling list-based data for bulk operations."""
if getattr(self, "swagger_fake_view", False):
return

for url_kwarg, fk_filter in self._get_parent_lookup_kwargs().items():
parent_arg = fk_filter.partition("__")[0]
parent_pk = view_kwargs[url_kwarg]

if isinstance(request_data, list):
for item in request_data:
if isinstance(item, dict):
item[parent_arg] = parent_pk
elif isinstance(request_data, dict):
request_data[parent_arg] = parent_pk

def initialize_request(self, request: HttpRequest, *args: Any, **kwargs: Any) -> Request:
"""Override to bypass faulty drf-nested-routers logic and inject parent kwargs correctly."""
drf_request: Request = super(nested_viewsets.NestedViewSetMixin, self).initialize_request(
request, *args, **kwargs
)
self._inject_parent_lookup_kwargs(drf_request.data, kwargs)
return drf_request

def initial(self, request: Request, *args: Any, **kwargs: Any) -> None:
"""Override to bypass faulty drf-nested-routers logic and inject parent kwargs correctly."""
super(nested_viewsets.NestedViewSetMixin, self).initial(request, *args, **kwargs)
self._inject_parent_lookup_kwargs(request.data, kwargs)

def get_serializer(self, *args: Any, **kwargs: Any) -> Serializer:
if self.action == "create":
return CreateImageSerializer(*args, **kwargs, many=True)
return super().get_serializer(*args, **kwargs, many=True)
kwargs.setdefault("many", True)
return CreateImageSerializer(*args, **kwargs)
return super().get_serializer(*args, **kwargs)

def perform_create(self, serializer: Serializer) -> None:
super().perform_create(serializer)
if deduplication_set := (serializer.instance[0].deduplication_set if serializer.instance else None):
if serializer.instance:
deduplication_set = serializer.instance[0].deduplication_set
deduplication_set.updated_by = self.request.user
deduplication_set.save()

Expand Down
Loading
Loading