diff --git a/cvat/apps/engine/admin.py b/cvat/apps/engine/admin.py index 4c2dc4d56c25..773afe622fc3 100644 --- a/cvat/apps/engine/admin.py +++ b/cvat/apps/engine/admin.py @@ -84,6 +84,7 @@ class DataAdmin(admin.ModelAdmin): "frame_filter", "compressed_chunk_type", "original_chunk_type", + "local_storage_backing_cs", ) readonly_fields = fields autocomplete_fields = ("cloud_storage",) diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index c85d6ed4133e..65ce4fae763a 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -48,7 +48,6 @@ ) from cvat.apps.engine import models from cvat.apps.engine.cache import MediaCache -from cvat.apps.engine.cloud_provider import db_storage_to_storage_instance from cvat.apps.engine.log import ServerLogManager from cvat.apps.engine.models import DataChoice, StorageChoice from cvat.apps.engine.serializers import ( @@ -477,17 +476,95 @@ def _write_filtered_media_manifest(self, zip_object: ZipFile, target_dir: str) - self._manifest_was_filtered = True + def _write_data_from_cloud_storage(self, zip_object: ZipFile, target_dir: str) -> None: + assert not hasattr(self._db_data, "video"), "Only images can be stored in cloud storage" + + target_data_dir = os.path.join(target_dir, self.DATA_DIRNAME) + data_dir = self._db_data.get_upload_dirname() + + self._write_filtered_media_manifest(zip_object=zip_object, target_dir=target_dir) + + files_for_local_copy = [] + + media_files_to_download: list[PurePath] = [] + for media_file in self._db_data.related_files.all(): + media_path = PurePath(media_file.path) + + local_path = os.path.join(data_dir, media_path) + if os.path.exists(local_path): + files_for_local_copy.append(local_path) + else: + media_files_to_download.append(media_path) + + frame_ids_to_download = [] + frame_names_to_download = [] + for media_file in self._db_data.images.all(): + media_path = media_file.path + + local_path = os.path.join(data_dir, media_path) + if os.path.exists(local_path): + files_for_local_copy.append(local_path) + else: + frame_ids_to_download.append(media_file.frame) + frame_names_to_download.append(media_file.path) + + if media_files_to_download: + storage_client = self._db_data.get_cloud_storage_instance() + with tempfile.TemporaryDirectory() as tmp_dir: + storage_client.bulk_download_to_dir( + files=media_files_to_download, upload_dir=Path(tmp_dir) + ) + + self._write_files( + source_dir=tmp_dir, + zip_object=zip_object, + files=[os.path.join(tmp_dir, file) for file in media_files_to_download], + target_dir=target_data_dir, + ) + + if frame_ids_to_download: + media_cache = MediaCache() + with closing( + media_cache.read_raw_images( + self._db_task, frame_ids=frame_ids_to_download, decode=False + ) + ) as frame_iter: + # Avoid closing the frame iter before the files are copied + downloaded_paths = [] + for _ in frame_ids_to_download: + downloaded_paths.append(next(frame_iter)[1]) + + tmp_dir = downloaded_paths[0].removesuffix(frame_names_to_download[0]) + + self._write_files( + source_dir=tmp_dir, + zip_object=zip_object, + files=downloaded_paths, + target_dir=target_data_dir, + ) + + self._write_files( + source_dir=data_dir, + zip_object=zip_object, + files=files_for_local_copy, + target_dir=target_data_dir, + ) + def _write_data(self, zip_object: ZipFile, target_dir: str) -> None: target_data_dir = os.path.join(target_dir, self.DATA_DIRNAME) if self._db_data.storage == StorageChoice.LOCAL: data_dir = self._db_data.get_upload_dirname() - self._write_directory( - source_dir=data_dir, - zip_object=zip_object, - target_dir=target_data_dir, - exclude_files=[self.MEDIA_MANIFEST_INDEX_FILENAME], - ) + if self._db_data.local_storage_backing_cs: + self._write_data_from_cloud_storage(zip_object, target_dir) + else: + self._write_directory( + source_dir=data_dir, + zip_object=zip_object, + target_dir=target_data_dir, + exclude_files=[self.MEDIA_MANIFEST_INDEX_FILENAME], + ) + elif self._db_data.storage == StorageChoice.SHARE: data_dir = settings.SHARE_ROOT if hasattr(self._db_data, "video"): @@ -505,8 +582,6 @@ def _write_data(self, zip_object: ZipFile, target_dir: str) -> None: self._write_filtered_media_manifest(zip_object=zip_object, target_dir=target_dir) elif self._db_data.storage == StorageChoice.CLOUD_STORAGE: - assert not hasattr(self._db_data, "video"), "Only images can be stored in cloud storage" - data_dir = self._db_data.get_upload_dirname() if self._lightweight: @@ -517,75 +592,7 @@ def _write_data(self, zip_object: ZipFile, target_dir: str) -> None: target_dir=target_data_dir, ) else: - self._write_filtered_media_manifest(zip_object=zip_object, target_dir=target_dir) - - files_for_local_copy = [] - - media_files_to_download: list[PurePath] = [] - for media_file in self._db_data.related_files.all(): - media_path = PurePath(media_file.path) - - local_path = os.path.join(data_dir, media_path) - if os.path.exists(local_path): - files_for_local_copy.append(local_path) - else: - media_files_to_download.append(media_path) - - frame_ids_to_download = [] - frame_names_to_download = [] - for media_file in self._db_data.images.all(): - media_path = media_file.path - - local_path = os.path.join(data_dir, media_path) - if os.path.exists(local_path): - files_for_local_copy.append(local_path) - else: - frame_ids_to_download.append(media_file.frame) - frame_names_to_download.append(media_file.path) - - if media_files_to_download: - storage_client = db_storage_to_storage_instance(self._db_data.cloud_storage) - with tempfile.TemporaryDirectory() as tmp_dir: - storage_client.bulk_download_to_dir( - files=media_files_to_download, upload_dir=Path(tmp_dir) - ) - - self._write_files( - source_dir=tmp_dir, - zip_object=zip_object, - files=[os.path.join(tmp_dir, file) for file in media_files_to_download], - target_dir=target_data_dir, - ) - - if frame_ids_to_download: - media_cache = MediaCache() - with closing( - iter( - media_cache.read_raw_images( - self._db_task, frame_ids=frame_ids_to_download, decode=False - ) - ) - ) as frame_iter: - # Avoid closing the frame iter before the files are copied - downloaded_paths = [] - for _ in frame_ids_to_download: - downloaded_paths.append(next(frame_iter)[1]) - - tmp_dir = downloaded_paths[0].removesuffix(frame_names_to_download[0]) - - self._write_files( - source_dir=tmp_dir, - zip_object=zip_object, - files=downloaded_paths, - target_dir=target_data_dir, - ) - - self._write_files( - source_dir=data_dir, - zip_object=zip_object, - files=files_for_local_copy, - target_dir=target_data_dir, - ) + self._write_data_from_cloud_storage(zip_object, target_dir) else: raise NotImplementedError diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 66830a36b37b..8e52643f2e92 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -593,14 +593,10 @@ def read_raw_images( db_data = db_task.require_data() manifest_path = db_data.get_manifest_path() - if os.path.isfile(manifest_path) and db_data.storage == models.StorageChoice.CLOUD_STORAGE: + if storage_client := db_data.get_cloud_storage_instance(): + assert manifest_path.is_file() reader = ImageReaderWithManifest(manifest_path) with ExitStack() as es: - db_cloud_storage = db_data.cloud_storage - if not db_cloud_storage: - raise CloudStorageMissingError("Task is no longer connected to cloud storage") - storage_client = db_storage_to_storage_instance(db_cloud_storage) - tmp_dir = Path(es.enter_context(tempfile.TemporaryDirectory(prefix="cvat"))) # (storage filename, output filename) files_to_download: list[tuple[str, PurePath]] = [] @@ -621,7 +617,7 @@ def read_raw_images( files_to_download, checksums, media ): if checksum and not md5_hash(media_item[1]) == checksum: - slogger.cloud_storage[db_cloud_storage.id].warning( + slogger.task[db_task.id].warning( "Hash sums of files {} do not match".format(media_item[1]) ) @@ -710,12 +706,7 @@ def _validate_ri_path(path: str) -> PurePath: for frame_id, frame_ris in groupby(db_related_files, key=lambda v: v[0]) ] - if db_data.storage == models.StorageChoice.CLOUD_STORAGE: - db_cloud_storage = db_data.cloud_storage - if not db_cloud_storage: - raise CloudStorageMissingError("Task is no longer connected to cloud storage") - storage_client = db_storage_to_storage_instance(db_cloud_storage) - + if storage_client := db_data.get_cloud_storage_instance(): tmp_dir = Path(es.enter_context(tempfile.TemporaryDirectory(prefix="cvat"))) files_to_download: list[PurePath] = [] for _, frame_media in media: diff --git a/cvat/apps/engine/cloud_provider.py b/cvat/apps/engine/cloud_provider.py index 5c80d8cfd4ec..18b37827c8eb 100644 --- a/cvat/apps/engine/cloud_provider.py +++ b/cvat/apps/engine/cloud_provider.py @@ -10,7 +10,7 @@ import os from abc import ABC, abstractmethod from collections.abc import Callable, Iterator, Sequence -from concurrent.futures import FIRST_EXCEPTION, Future, ThreadPoolExecutor, wait +from concurrent.futures import Future, ThreadPoolExecutor from datetime import datetime from enum import Enum from io import BytesIO @@ -43,7 +43,7 @@ DimensionType, ) from cvat.apps.engine.rq import ExportRQMeta -from cvat.apps.engine.utils import get_cpu_number +from cvat.apps.engine.utils import get_cpu_number, take_by from cvat.utils.http import PROXIES_FOR_UNTRUSTED_URLS from utils.dataset_manifest.utils import ( InvalidPcdError, @@ -241,6 +241,12 @@ def bulk_download_to_memory( top_job = queue.get() yield top_job.result() + def _in_parallel(self, callable: Callable[[T], object], args: Sequence[T]) -> None: + threads_number = get_max_threads_number(len(args)) + + with ThreadPoolExecutor(max_workers=threads_number) as executor: + list(executor.map(callable, args)) + def bulk_download_to_dir( self, files: Sequence[PurePath | tuple[str, PurePath]], @@ -251,24 +257,26 @@ def bulk_download_to_dir( :param upload_dir: the output directory """ - threads_number = get_max_threads_number(len(files)) + def download_one(f: PurePath | tuple[str, PurePath]) -> None: + if isinstance(f, tuple): + key, output_path = f + else: + key = f.as_posix() + output_path = f - with ThreadPoolExecutor(max_workers=threads_number) as executor: - futures = [] - for f in files: - if isinstance(f, tuple): - key, output_path = f - else: - key = f.as_posix() - output_path = f + self.download_file(key, upload_dir / output_path) - output_path = upload_dir / output_path - futures.append(executor.submit(self.download_file, key, output_path)) + self._in_parallel(download_one, files) - done, _ = wait(futures, return_when=FIRST_EXCEPTION) - for future in done: - if ex := future.exception(): - raise ex + def bulk_upload_from_dir( + self, + files: Sequence[PurePath], + upload_dir: Path, + ): + def upload_one(f: PurePath): + self.upload_file(upload_dir / f, f.as_posix()) + + self._in_parallel(upload_one, files) @abstractmethod def upload_fileobj(self, file_obj: BinaryIO, key: str, /) -> None: @@ -278,6 +286,10 @@ def upload_fileobj(self, file_obj: BinaryIO, key: str, /) -> None: def upload_file(self, file_path: Path, key: str | None = None, /) -> None: pass + @abstractmethod + def bulk_delete(self, files: Sequence[str]) -> None: + pass + @abstractmethod def _list_raw_content_on_one_page( self, @@ -354,7 +366,7 @@ def list_files( prefix: str = "", *, _use_flat_listing: bool = False, - ) -> list[str]: + ) -> list[dict]: all_files = [] next_token = None while True: @@ -743,13 +755,12 @@ def _download_range_of_bytes(self, key: str, /, *, stop_byte: int, start_byte: i slogger.glob.error(f"{str(ex)}. Key: {key}, bucket: {self.name}") raise - def delete_file(self, file_name: str, /): - try: - self._client.delete_object(Bucket=self.name, Key=file_name) - except Exception as ex: - msg = str(ex) - slogger.glob.info(msg) - raise + def bulk_delete(self, files: Sequence[str]) -> None: + def delete_batch(batch: Sequence[str]): + delete_request = {"Objects": [{"Key": f} for f in batch], "Quiet": True} + self._client.delete_objects(Bucket=self.name, Delete=delete_request) + + self._in_parallel(delete_batch, list(take_by(files, 1000))) @property def supported_actions(self): @@ -870,6 +881,12 @@ def upload_file(self, file_path: Path, key: str | None = None, /): with open(file_path, "rb") as f: self.upload_fileobj(f, key or file_path.name) + def bulk_delete(self, files: Sequence[str]) -> None: + def delete_batch(batch: Sequence[str]) -> None: + self._client.delete_blobs(*batch) + + self._in_parallel(delete_batch, list(take_by(files, 256))) + def _list_raw_content_on_one_page( self, prefix: str = "", @@ -1032,6 +1049,14 @@ def upload_fileobj(self, file_obj: BinaryIO, key: str, /): def upload_file(self, file_path: Path, key: str | None = None, /): self.bucket.blob(key or file_path.name).upload_from_filename(os.fspath(file_path)) + def bulk_delete(self, files: Sequence[str]) -> None: + def delete_batch(batch: Sequence[str]): + with self._client.batch(): + for key in batch: + self.bucket.delete_blob(key) + + self._in_parallel(delete_batch, list(take_by(files, 100))) + @validate_file_status @validate_bucket_status def get_file_last_modified(self, key: str, /): @@ -1044,6 +1069,73 @@ def supported_actions(self): pass +class SubdirectoryCloudStorage(AbstractCloudStorage): + def __init__(self, underlying: AbstractCloudStorage, subdirectory: str) -> None: + super().__init__() + + self.underlying = underlying + self.subdirectory = subdirectory + if not self.subdirectory.endswith("/"): + self.subdirectory += "/" + + def _map_key(self, key: str) -> str: + return self.subdirectory + key + + def _unmap_key(self, key: str) -> str: + assert key.startswith(self.subdirectory) + return key[len(self.subdirectory) :] + + @property + def name(self) -> str: + return self.underlying.name + "/" + self.subdirectory + + def get_status(self) -> Status: + return self.underlying.get_status() + + def get_file_status(self, key: str, /) -> Status: + return self.underlying.get_file_status(self._map_key(key)) + + def get_file_last_modified(self, key: str, /) -> datetime: + return self.underlying.get_file_last_modified(self._map_key(key)) + + def _download_fileobj_to_stream(self, key: str, stream: BinaryIO, /) -> None: + return self.underlying._download_fileobj_to_stream(self._map_key(key), stream) + + def _download_range_of_bytes(self, key: str, /, *, stop_byte: int, start_byte: int) -> bytes: + return self.underlying._download_range_of_bytes( + self._map_key(key), start_byte=start_byte, stop_byte=stop_byte + ) + + def upload_fileobj(self, file_obj: BinaryIO, key: str, /) -> None: + return self.underlying.upload_fileobj(file_obj, self._map_key(key)) + + def upload_file(self, file_path: Path, key: str | None = None, /) -> None: + assert key is not None + return self.underlying.upload_file(file_path, self._map_key(key)) + + def bulk_delete(self, files: Sequence[str]) -> None: + self.underlying.bulk_delete(list(map(self._map_key, files))) + + def _list_raw_content_on_one_page( + self, + prefix: str = "", + *, + next_token: str | None = None, + page_size: int = settings.BUCKET_CONTENT_MAX_PAGE_SIZE, + ) -> dict: + result = self.underlying._list_raw_content_on_one_page( + self._map_key(prefix), next_token=next_token, page_size=page_size + ) + + for key in ("files", "directories"): + result[key] = list(map(self._unmap_key, result[key])) + + return result + + def supported_actions(self): + return self.underlying.supported_actions + + class Credentials: __slots__ = ( "key", diff --git a/cvat/apps/engine/management/commands/movetaskfrombackingcs.py b/cvat/apps/engine/management/commands/movetaskfrombackingcs.py new file mode 100644 index 000000000000..307cf469f8f7 --- /dev/null +++ b/cvat/apps/engine/management/commands/movetaskfrombackingcs.py @@ -0,0 +1,24 @@ +from django.core.management.base import BaseCommand, CommandError + +from cvat.apps.engine.models import Task + + +class Command(BaseCommand): + help = "Moves data of a given task from backing cloud storage back to the filesystem" + + def add_arguments(self, parser): + parser.add_argument("task_id", type=int, help="ID of the task to move data of") + + def handle(self, *args, **options): + task_id: int = options["task_id"] + + task = Task.objects.get(id=task_id) + data = task.require_data() + + if not data.supports_backing_cs(): + raise CommandError(f"Task #{task_id} does not support backing cloud storage") + + if not data.local_storage_backing_cs_id: + raise CommandError(f"Task #{task_id} has no backing cloud storage") + + data.move_from_backing_cs() diff --git a/cvat/apps/engine/management/commands/movetasktobackingcs.py b/cvat/apps/engine/management/commands/movetasktobackingcs.py new file mode 100644 index 000000000000..9338d6d353cd --- /dev/null +++ b/cvat/apps/engine/management/commands/movetasktobackingcs.py @@ -0,0 +1,33 @@ +from django.core.management.base import BaseCommand, CommandError + +from cvat.apps.engine.models import Task + + +class Command(BaseCommand): + help = "Moves data of a given task to a backing cloud storage" + + def add_arguments(self, parser): + parser.add_argument("task_id", type=int, help="ID of the task to move data of") + parser.add_argument( + "backing_cs_id", + type=int, + help="ID of the backing cloud storage to move data to", + ) + + def handle(self, *args, **options): + task_id: int = options["task_id"] + backing_cs_id: int = options["backing_cs_id"] + + task = Task.objects.get(id=task_id) + data = task.require_data() + + if not data.supports_backing_cs(): + raise CommandError(f"Task #{task_id} does not support backing cloud storage") + + if data.local_storage_backing_cs_id: + raise CommandError( + f"Task #{task_id} already has a backing cloud storage" + f" (#{data.local_storage_backing_cs_id})" + ) + + data.move_to_backing_cs(backing_cs_id) diff --git a/cvat/apps/engine/migrations/0098_data_local_storage_backing_cs.py b/cvat/apps/engine/migrations/0098_data_local_storage_backing_cs.py new file mode 100644 index 000000000000..4468ec95cdc5 --- /dev/null +++ b/cvat/apps/engine/migrations/0098_data_local_storage_backing_cs.py @@ -0,0 +1,24 @@ +# Generated by Django 4.2.26 on 2026-02-05 17:10 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("engine", "0097_alter_relatedfile_path"), + ] + + operations = [ + migrations.AddField( + model_name="data", + name="local_storage_backing_cs", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="+", + to="engine.cloudstorage", + ), + ), + ] diff --git a/cvat/apps/engine/models.py b/cvat/apps/engine/models.py index b5bdecdba63b..a90435eccd60 100644 --- a/cvat/apps/engine/models.py +++ b/cvat/apps/engine/models.py @@ -13,7 +13,7 @@ from collections.abc import Collection, Iterable, Sequence from enum import Enum, IntEnum from functools import cached_property -from pathlib import Path +from pathlib import Path, PurePath from typing import TYPE_CHECKING, Any, ClassVar from django.conf import settings @@ -29,12 +29,14 @@ from drf_spectacular.types import OpenApiTypes from drf_spectacular.utils import extend_schema_field +from cvat.apps.engine.exceptions import CloudStorageMissingError from cvat.apps.engine.lazy_list import LazyList from cvat.apps.engine.model_utils import MaybeUndefined from cvat.apps.engine.utils import parse_specific_attributes, take_by from cvat.apps.events.utils import cache_deleted if TYPE_CHECKING: + from cvat.apps.engine.cloud_provider import AbstractCloudStorage from cvat.apps.organizations.models import Organization @@ -445,6 +447,7 @@ class Data(models.Model): # Storage descriptors storage_method = models.CharField(max_length=15, choices=StorageMethodChoice.choices(), default=StorageMethodChoice.FILE_SYSTEM) storage = models.CharField(max_length=15, choices=StorageChoice.choices(), default=StorageChoice.LOCAL) + local_storage_backing_cs = models.ForeignKey(CloudStorage, on_delete=models.PROTECT, null=True, related_name="+") cloud_storage = models.ForeignKey(CloudStorage, on_delete=models.SET_NULL, null=True, related_name='data') # Task creation parameters @@ -540,6 +543,96 @@ def update_validation_layout( def validation_mode(self) -> ValidationMode | None: return getattr(getattr(self, 'validation_layout', None), 'mode', None) + def get_all_media_rel_paths(self) -> list[PurePath]: + if video := getattr(self, "video", None): + return [PurePath(video.path)] + else: + return ( + [PurePath(image.path) for image in self.images.all()] + + [PurePath(f.path) for f in self.related_files.all()] + ) + + def get_cloud_storage_instance(self) -> AbstractCloudStorage | None: + from .cloud_provider import SubdirectoryCloudStorage, db_storage_to_storage_instance + + if self.storage == StorageChoice.CLOUD_STORAGE: + if self.cloud_storage_id is None: + raise CloudStorageMissingError("Task is not connected to cloud storage") + + return db_storage_to_storage_instance(self.cloud_storage) + + if self.storage == StorageChoice.LOCAL and self.local_storage_backing_cs: + return SubdirectoryCloudStorage( + db_storage_to_storage_instance(self.local_storage_backing_cs), f"data/{self.id}/raw" + ) + + return None + + def supports_backing_cs(self) -> bool: + return ( + self.storage == StorageChoice.LOCAL + and self.storage_method == StorageMethodChoice.CACHE + and not hasattr(self, "video") + ) + + def move_to_backing_cs(self, backing_cs_id: int) -> None: + assert self.supports_backing_cs() + assert not self.local_storage_backing_cs_id + + self.local_storage_backing_cs_id = backing_cs_id + + cloud_storage_instance = self.get_cloud_storage_instance() + assert cloud_storage_instance + + upload_dir = self.get_upload_dirname() + assert (upload_dir / self.MANIFEST_FILENAME).is_file() + + rel_paths_to_move = self.get_all_media_rel_paths() + + cloud_storage_instance.bulk_upload_from_dir(rel_paths_to_move, upload_dir) + + self.save(update_fields=["local_storage_backing_cs"]) + + def clear_original_files() -> None: + parents = set() + + for path in rel_paths_to_move: + (upload_dir / path).unlink() + parents.update(path.parents) + + # Delete all empty parent directories, except for upload_dir itself. + parents.remove(PurePath()) + + for dir in sorted(parents, key=lambda path: -len(path.parts)): + try: + (upload_dir / dir).rmdir() + except OSError: + pass + + transaction.on_commit(clear_original_files, robust=True) + + def move_from_backing_cs(self) -> None: + assert self.supports_backing_cs() + assert self.local_storage_backing_cs_id + + cloud_storage_instance = self.get_cloud_storage_instance() + assert cloud_storage_instance + + upload_dir = self.get_upload_dirname() + + rel_paths_to_move = self.get_all_media_rel_paths() + + cloud_storage_instance.bulk_download_to_dir(rel_paths_to_move, upload_dir) + + self.local_storage_backing_cs = None + self.save(update_fields=["local_storage_backing_cs"]) + + def clear_original_files() -> None: + cloud_storage_instance.bulk_delete([p.as_posix() for p in rel_paths_to_move]) + + transaction.on_commit(clear_original_files, robust=True) + + class Video(models.Model): data = models.OneToOneField(Data, on_delete=models.CASCADE, related_name="video", null=True) diff --git a/cvat/apps/engine/signals.py b/cvat/apps/engine/signals.py index 4e18c3edfc6d..7277a320f2ae 100644 --- a/cvat/apps/engine/signals.py +++ b/cvat/apps/engine/signals.py @@ -9,7 +9,7 @@ from django.conf import settings from django.contrib.auth.models import User from django.db import transaction -from django.db.models.signals import m2m_changed, post_delete, post_save, pre_save +from django.db.models.signals import m2m_changed, post_delete, post_save, pre_delete, pre_save from django.dispatch import Signal, receiver from rest_framework.exceptions import ValidationError @@ -126,12 +126,30 @@ def __delete_job_handler(instance, **kwargs): ) +@receiver(pre_delete, sender=Data) +def __pre_delete_data_handler(instance: Data, **kwargs): + # Image/video objects are no longer available in the post_delete handler, so gather them here. + instance._saved_media_rel_paths = instance.get_all_media_rel_paths() + + @receiver(post_delete, sender=Data) -def __delete_data_handler(instance, **kwargs): +def __delete_data_handler(instance: Data, **kwargs): transaction.on_commit( functools.partial(shutil.rmtree, instance.get_data_dirname(), ignore_errors=True) ) + if instance.local_storage_backing_cs: + storage_instance = instance.get_cloud_storage_instance() + assert storage_instance + + transaction.on_commit( + functools.partial( + storage_instance.bulk_delete, + [p.as_posix() for p in instance._saved_media_rel_paths], + ), + robust=True, + ) + @receiver(post_delete, sender=CloudStorage) def __delete_cloudstorage_handler(instance, **kwargs): diff --git a/cvat/apps/engine/tests/test_rest_api.py b/cvat/apps/engine/tests/test_rest_api.py index 401c66af3101..194da52a88a0 100644 --- a/cvat/apps/engine/tests/test_rest_api.py +++ b/cvat/apps/engine/tests/test_rest_api.py @@ -16,13 +16,14 @@ import xml.etree.ElementTree as ET import zipfile from collections import defaultdict +from collections.abc import Sequence from contextlib import ExitStack from datetime import timedelta from enum import Enum from glob import glob from io import BytesIO, IOBase from itertools import product -from pathlib import Path +from pathlib import Path, PurePath from pprint import pformat from time import sleep from typing import BinaryIO @@ -36,7 +37,7 @@ from django.conf import settings from django.contrib.auth.models import Group, User from django.http import FileResponse, HttpResponse -from django.test import override_settings +from django.test import SimpleTestCase, override_settings from pdf2image import convert_from_bytes from PIL import Image from pycocotools import coco as coco_loader @@ -1842,6 +1843,16 @@ def test_api_v2_projects_id_export_no_auth(self): class _CloudStorageTestBase(ApiTestBase): + @classmethod + def setUpClass(cls) -> None: + cls.mock_aws = cls._start_aws_patch() + super().setUpClass() + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + cls._stop_aws_patch() + @classmethod def _start_aws_patch(cls): class MockS3(S3CloudStorage): @@ -1851,9 +1862,17 @@ def get_status(self): return Status.AVAILABLE @classmethod - def create_file(cls, key, _bytes): + def create_file(cls, key: str, _bytes: bytes) -> None: cls._files[key] = _bytes + @classmethod + def retrieve_file(cls, key: str) -> bytes: + return cls._files[key] + + @classmethod + def file_exists(cls, key: str) -> bool: + return key in cls._files + def get_file_status(self, key: str, /): return Status.AVAILABLE if key in self._files else Status.NOT_FOUND @@ -1863,6 +1882,13 @@ def _download_range_of_bytes(self, key: str, /, *, stop_byte: int, start_byte: i def _download_fileobj_to_stream(self, key: str, stream: BinaryIO, /): stream.write(self._files[key]) + def upload_file(self, file_path: Path, key: str | None = None, /) -> None: + self._files[key] = file_path.read_bytes() + + def bulk_delete(self, files: Sequence[str]) -> None: + for key in files: + del self._files[key] + cls._aws_patch = mock.patch("cvat.apps.engine.cloud_provider.S3CloudStorage", MockS3) cls._aws_patch.start() @@ -1894,6 +1920,20 @@ def _create_cloud_storage(cls): ) return response.json()["id"] + def _create_task(self, data, image_data): + with ForceLogin(self.owner, self.client): + response = self.client.post("/api/tasks", data=data, format="json") + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + tid = response.data["id"] + + response = self.client.post("/api/tasks/%s/data" % tid, data=image_data) + self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) + + response = self.client.get("/api/tasks/%s" % tid) + task = response.data + + return task + @override_settings(MEDIA_CACHE_ALLOW_STATIC_CACHE=False) class ProjectCloudBackupAPINoStaticChunksTestCase(ProjectBackupAPITestCase, _CloudStorageTestBase): @@ -1903,7 +1943,6 @@ class ProjectCloudBackupAPINoStaticChunksTestCase(ProjectBackupAPITestCase, _Clo def setUpTestData(cls): create_db_users(cls) cls.client = APIClient() - cls.mock_aws = cls._start_aws_patch() cls.cloud_storage_id = cls._create_cloud_storage() cls._create_media() cls._create_projects() @@ -1916,11 +1955,6 @@ def disabled(*args): cls.mock_aws._download_fileobj_to_stream = disabled - @classmethod - def tearDownClass(cls): - cls._stop_aws_patch() - super().tearDownClass() - def _compare_tasks(self, original_task, imported_task): super()._compare_tasks(original_task, imported_task) @@ -7784,15 +7818,9 @@ class TaskChangeCloudStorageTestCase(_CloudStorageTestBase): def setUpTestData(cls): create_db_users(cls) cls.client = APIClient() - cls.mock_aws = cls._start_aws_patch() cls.cloud_storage_id_1 = cls._create_cloud_storage() cls.cloud_storage_id_2 = cls._create_cloud_storage() - @classmethod - def tearDownClass(cls): - cls._stop_aws_patch() - super().tearDownClass() - def _create_cloud_task(self): data = { "name": "my cloud task #1", @@ -7817,37 +7845,6 @@ def _create_cloud_task(self): } return self._create_task(data, image_data) - def _create_local_task(self): - data = { - "name": "my local task #1", - "owner_id": self.owner.id, - "overlap": 0, - "segment_size": 100, - "labels": [{"name": "person"}], - } - - image_data = { - "client_files[0]": generate_random_image_file("test_1.jpg")[1], - "client_files[1]": generate_random_image_file("test_2.jpg")[1], - "client_files[2]": generate_random_image_file("test_3.jpg")[1], - "image_quality": 75, - } - return self._create_task(data, image_data) - - def _create_task(self, data, image_data): - with ForceLogin(self.owner, self.client): - response = self.client.post("/api/tasks", data=data, format="json") - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - tid = response.data["id"] - - response = self.client.post("/api/tasks/%s/data" % tid, data=image_data) - self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) - - response = self.client.get("/api/tasks/%s" % tid) - task = response.data - - return task - def test_can_change_cloud_storage(self): def get_cache_keys(): return MediaCache._cache()._cache.get_client().keys("*") @@ -7920,6 +7917,105 @@ def test_can_not_change_to_not_available_cloud_storage(self): ) +class TaskBackingCloudStorageTestCase(_CloudStorageTestBase): + _IMAGE_PATHS = ["test_1.jpg", "test_2.jpg", "related_images/test_1_jpg/context_1.jpg"] + + @classmethod + def setUpTestData(cls): + create_db_users(cls) + cls.client = APIClient() + cls.cloud_storage_id = cls._create_cloud_storage() + + def _create_local_task(self): + data = { + "name": "my local task #1", + "owner_id": self.owner.id, + "overlap": 0, + "segment_size": 100, + "labels": [{"name": "person"}], + } + + f = io.BytesIO() + with zipfile.ZipFile(f, "w") as zip: + for p in self._IMAGE_PATHS: + zip.writestr(p, generate_random_image_file(p)[1].getbuffer()) + + f.seek(0) + f.name = "test.zip" + + image_data = {"client_files[0]": f, "image_quality": 75} + return self._create_task(data, image_data) + + def test_can_move_to_backing_cs(self): + # Set up task. + task = self._create_local_task() + task_id = task["id"] + + data = Data.objects.get(task__id=task_id) + upload_dir = data.get_upload_dirname() + + self.assertTrue(data.images.exists()) + self.assertTrue(data.related_files.exists()) + + def local_path(rel_path): + return upload_dir / rel_path + + def cloud_key(rel_path): + return PurePath(f"data/{data.id}/raw", rel_path).as_posix() + + images = [(p, local_path(p).read_bytes()) for p in self._IMAGE_PATHS] + self.assertTrue(local_path(Data.MANIFEST_FILENAME).exists()) + + # Move the task to backing cloud storage. + with self.captureOnCommitCallbacks(execute=True): + data.move_to_backing_cs(self.cloud_storage_id) + + self.assertEqual(data.local_storage_backing_cs_id, self.cloud_storage_id) + + for image_rel_path, image_bytes in images: + self.assertFalse(local_path(image_rel_path).exists()) + self.assertEqual(self.mock_aws.retrieve_file(cloud_key(image_rel_path)), image_bytes) + self.assertFalse(local_path("related_images").exists()) + + # The manifest should still be in the local FS. + self.assertTrue(local_path(Data.MANIFEST_FILENAME).exists()) + self.assertFalse(self.mock_aws.file_exists(cloud_key(Data.MANIFEST_FILENAME))) + + # Move the task back. + with self.captureOnCommitCallbacks(execute=True): + data.move_from_backing_cs() + + self.assertEqual(data.local_storage_backing_cs_id, None) + + for image_rel_path, image_bytes in images: + self.assertEqual(local_path(image_rel_path).read_bytes(), image_bytes) + self.assertFalse(self.mock_aws.file_exists(cloud_key(image_rel_path))) + + def test_deletion_with_backing_cs(self): + task = self._create_local_task() + task_id = task["id"] + + data = Data.objects.get(task__id=task_id) + upload_dir = data.get_upload_dirname() + + def cloud_key(rel_path): + return PurePath(f"data/{data.id}/raw", rel_path).as_posix() + + with self.captureOnCommitCallbacks(execute=True): + data.move_to_backing_cs(self.cloud_storage_id) + + for p in self._IMAGE_PATHS: + self.assertTrue(self.mock_aws.file_exists(cloud_key(p))) + + with self.captureOnCommitCallbacks(execute=True): + self._delete_request(f"/api/tasks/{task['id']}", self.owner) + + self.assertFalse(upload_dir.exists()) + + for p in self._IMAGE_PATHS: + self.assertFalse(self.mock_aws.file_exists(cloud_key(p))) + + class TaskJobLimitAPITestCase(ApiTestBase): """ Tests for MAX_JOBS_PER_TASK validation at the REST API level @@ -8049,7 +8145,7 @@ def test_create_task_with_consensus_exceeds_job_limit(self): self.assertEqual(job_count, 0) -class TestCloudStorageS3Status(_CloudStorageTestBase): +class TestCloudStorageS3Status(SimpleTestCase): def setUp(self): self.storage = S3CloudStorage( bucket="test-bucket", @@ -8088,7 +8184,7 @@ def fake_head(): self.assertEqual(self.storage.get_status(), Status.NOT_FOUND) -class TestCloudStorageAzureStatus(_CloudStorageTestBase): +class TestCloudStorageAzureStatus(SimpleTestCase): def setUp(self): self.storage = AzureBlobCloudStorage( container="test-container", diff --git a/tests/docker-compose.minio.yml b/tests/docker-compose.minio.yml index 5e2ab74f2807..2833612b98b3 100644 --- a/tests/docker-compose.minio.yml +++ b/tests/docker-compose.minio.yml @@ -43,32 +43,35 @@ services: MINIO_ACCESS_KEY: "minio_access_key" MINIO_SECRET_KEY: "minio_secret_key" MINIO_ALIAS: "local_minio" - PRIVATE_BUCKET: "private" - PUBLIC_BUCKET: "public" - TEST_BUCKET: "test" - IMPORT_EXPORT_BUCKET: "importexportbucket" volumes: - ./tests/cypress/e2e/actions_tasks/assets/case_65_manifest/:/mnt/images_with_manifest:ro - ./tests/mounted_file_share/:/mnt/mounted_file_share:ro networks: - cvat - entrypoint: > - /bin/sh -c " - $${MC_PATH} config host add --quiet --api s3v4 $${MINIO_ALIAS} $${MINIO_HOST} $${MINIO_ACCESS_KEY} $${MINIO_SECRET_KEY}; - $${MC_PATH} mb $${MINIO_ALIAS}/$${PRIVATE_BUCKET} $${MINIO_ALIAS}/$${PUBLIC_BUCKET} $${MINIO_ALIAS}/$${TEST_BUCKET} $${MINIO_ALIAS}/$${IMPORT_EXPORT_BUCKET}; - for BUCKET in $${MINIO_ALIAS}/$${PRIVATE_BUCKET} $${MINIO_ALIAS}/$${PUBLIC_BUCKET} $${MINIO_ALIAS}/$${TEST_BUCKET} $${MINIO_ALIAS}/$${IMPORT_EXPORT_BUCKET}; - do - if [ $${BUCKET} == $${MINIO_ALIAS}/$${PRIVATE_BUCKET} ] - then - FULL_PATH=$${BUCKET}/sub; - else - FULL_PATH=$${BUCKET}; - fi - $${MC_PATH} cp --recursive /mnt/mounted_file_share/ /mnt/images_with_manifest $${FULL_PATH}/ - for i in 1 2; - do - $${MC_PATH} cp /mnt/images_with_manifest/manifest.jsonl $${FULL_PATH}/images_with_manifest/manifest_$${i}.jsonl; - done; - done; - $${MC_PATH} policy set public $${MINIO_ALIAS}/$${PUBLIC_BUCKET}; - " \ No newline at end of file + entrypoint: [] + command: + - bash + - -c + - | + $${MC_PATH} config host add --quiet --api s3v4 $${MINIO_ALIAS} $${MINIO_HOST} $${MINIO_ACCESS_KEY} $${MINIO_SECRET_KEY} + + BUCKETS=(private public test importexportbucket) + $${MC_PATH} mb "$${BUCKETS[@]/#/$${MINIO_ALIAS}/}" + + for BUCKET in "$${BUCKETS[@]}" + do + if [ $${BUCKET} == $${MINIO_ALIAS}/private ] + then + FULL_PATH=$${MINIO_ALIAS}/$${BUCKET}/sub + else + FULL_PATH=$${MINIO_ALIAS}/$${BUCKET} + fi + $${MC_PATH} cp --recursive /mnt/mounted_file_share/ /mnt/images_with_manifest $${FULL_PATH}/ + for i in 1 2 + do + $${MC_PATH} cp /mnt/images_with_manifest/manifest.jsonl $${FULL_PATH}/images_with_manifest/manifest_$${i}.jsonl + done + done + $${MC_PATH} policy set public $${MINIO_ALIAS}/public + + $${MC_PATH} mb "$${MINIO_ALIAS}/backingcs" diff --git a/tests/python/rest_api/_test_base.py b/tests/python/rest_api/_test_base.py index c9f1093b51ed..d0fd260062c4 100644 --- a/tests/python/rest_api/_test_base.py +++ b/tests/python/rest_api/_test_base.py @@ -1,8 +1,7 @@ import io import math import os -from collections.abc import Generator, Mapping, Sequence -from contextlib import closing +from collections.abc import Mapping, Sequence from functools import partial from pathlib import Path from typing import IO @@ -15,6 +14,7 @@ import shared.utils.s3 as s3 from rest_api.utils import calc_end_frame, create_task, iter_exclude, unique +from shared.fixtures.init import container_exec_cvat from shared.tasks.enums import SourceDataType from shared.tasks.interface import ITaskSpec from shared.tasks.types import ImagesTaskSpec, VideoTaskSpec @@ -49,7 +49,7 @@ def _image_task_fxt_base( cloud_storage_id: int | None = None, job_replication: int | None = None, **data_kwargs, - ) -> Generator[tuple[ImagesTaskSpec, int], None, None]: + ) -> tuple[ImagesTaskSpec, int]: task_params = { "name": f"{request.node.name}[{request.fixturename}]", "labels": [{"name": "a"}], @@ -113,25 +113,26 @@ def get_related_files(i: int) -> Mapping[str, bytes]: return {os.path.relpath(f.name, common_prefix): f.getvalue() for f in frame_ri} task_id, _ = create_task(self._USERNAME, spec=task_params, data=data_params) - yield ImagesTaskSpec( - models.TaskWriteRequest._from_openapi_data(**task_params), - models.DataRequest._from_openapi_data(**data_params), - get_frame=get_frame, - get_related_files=get_related_files if related_files else None, - size=resulting_task_size, - ), task_id + return ( + ImagesTaskSpec( + models.TaskWriteRequest._from_openapi_data(**task_params), + models.DataRequest._from_openapi_data(**data_params), + get_frame=get_frame, + get_related_files=get_related_files if related_files else None, + size=resulting_task_size, + ), + task_id, + ) @pytest.fixture(scope="class") - def fxt_uploaded_images_task( - self, request: pytest.FixtureRequest - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._image_task_fxt_base(request=request) + def fxt_uploaded_images_task(self, request: pytest.FixtureRequest) -> tuple[ITaskSpec, int]: + return self._image_task_fxt_base(request=request) @pytest.fixture(scope="class") def fxt_uploaded_images_task_with_segments( self, request: pytest.FixtureRequest - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._image_task_fxt_base(request=request, segment_size=4) + ) -> tuple[ITaskSpec, int]: + return self._image_task_fxt_base(request=request, segment_size=4) @fixture(scope="class") @parametrize("step", [2, 5]) @@ -139,8 +140,8 @@ def fxt_uploaded_images_task_with_segments( @parametrize("start_frame", [3, 7]) def fxt_uploaded_images_task_with_segments_start_stop_step( self, request: pytest.FixtureRequest, start_frame: int, stop_frame: int | None, step: int - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._image_task_fxt_base( + ) -> tuple[ITaskSpec, int]: + return self._image_task_fxt_base( request=request, frame_count=30, segment_size=4, @@ -152,8 +153,8 @@ def fxt_uploaded_images_task_with_segments_start_stop_step( @pytest.fixture(scope="class") def fxt_uploaded_images_task_with_segments_and_consensus( self, request: pytest.FixtureRequest - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._image_task_fxt_base(request=request, segment_size=4, job_replication=2) + ) -> tuple[ITaskSpec, int]: + return self._image_task_fxt_base(request=request, segment_size=4, job_replication=2) def _image_task_with_honeypots_and_segments_base( self, @@ -166,7 +167,7 @@ def _image_task_with_honeypots_and_segments_base( server_files: Sequence[str] | None = None, cloud_storage_id: int | None = None, **kwargs, - ) -> Generator[tuple[ITaskSpec, int], None, None]: + ) -> tuple[ITaskSpec, int]: validation_params = models.DataRequestValidationParams._from_openapi_data( mode="gt_pool", frame_selection_method="random_uniform", @@ -195,102 +196,97 @@ def _image_task_with_honeypots_and_segments_base( else: image_files = generate_image_files(total_frame_count) - with closing( - self._image_task_fxt_base( - request=request, - frame_count=None, - image_files=image_files, - segment_size=base_segment_size, - sorting_method="random", - start_frame=start_frame, - step=step, - validation_params=validation_params, - server_files=server_files, - cloud_storage_id=cloud_storage_id, - **kwargs, - ) - ) as task_gen: - for task_spec, task_id in task_gen: - # Get the actual frame order after the task is created - with make_api_client(self._USERNAME) as api_client: - task_meta, _ = api_client.tasks_api.retrieve_data_meta(task_id) - frame_map = [ - next(i for i, f in enumerate(image_files) if f.name == frame_info.name) - for frame_info in task_meta.frames - ] - - _get_frame = task_spec._get_frame - task_spec._get_frame = lambda i: _get_frame(frame_map[i]) - - task_spec.size = final_task_size - task_spec._params.segment_size = final_segment_size - - # These parameters are not applicable to the resulting task, - # they are only effective during task creation - if start_frame or step: - task_spec._data_params.start_frame = 0 - task_spec._data_params.stop_frame = task_spec.size - task_spec._data_params.frame_filter = "" - - yield task_spec, task_id + task_spec, task_id = self._image_task_fxt_base( + request=request, + frame_count=None, + image_files=image_files, + segment_size=base_segment_size, + sorting_method="random", + start_frame=start_frame, + step=step, + validation_params=validation_params, + server_files=server_files, + cloud_storage_id=cloud_storage_id, + **kwargs, + ) + + # Get the actual frame order after the task is created + with make_api_client(self._USERNAME) as api_client: + task_meta, _ = api_client.tasks_api.retrieve_data_meta(task_id) + frame_map = [ + next(i for i, f in enumerate(image_files) if f.name == frame_info.name) + for frame_info in task_meta.frames + ] + + _get_frame = task_spec._get_frame + task_spec._get_frame = lambda i: _get_frame(frame_map[i]) + + task_spec.size = final_task_size + task_spec._params.segment_size = final_segment_size + + # These parameters are not applicable to the resulting task, + # they are only effective during task creation + if start_frame or step: + task_spec._data_params.start_frame = 0 + task_spec._data_params.stop_frame = task_spec.size + task_spec._data_params.frame_filter = "" + + return task_spec, task_id @fixture(scope="class") def fxt_uploaded_images_task_with_honeypots_and_segments( self, request: pytest.FixtureRequest - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._image_task_with_honeypots_and_segments_base(request) + ) -> tuple[ITaskSpec, int]: + return self._image_task_with_honeypots_and_segments_base(request) @fixture(scope="class") @parametrize("start_frame, step", [(2, 3)]) def fxt_uploaded_images_task_with_honeypots_and_segments_start_step( self, request: pytest.FixtureRequest, start_frame: int | None, step: int | None - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._image_task_with_honeypots_and_segments_base( + ) -> tuple[ITaskSpec, int]: + return self._image_task_with_honeypots_and_segments_base( request, start_frame=start_frame, step=step ) def _images_task_with_honeypots_and_changed_real_frames_base( self, request: pytest.FixtureRequest, **kwargs ): - with closing( - self._image_task_with_honeypots_and_segments_base( - request, start_frame=2, step=3, **kwargs + task_spec, task_id = self._image_task_with_honeypots_and_segments_base( + request, start_frame=2, step=3, **kwargs + ) + + with make_api_client(self._USERNAME) as api_client: + validation_layout, _ = api_client.tasks_api.retrieve_validation_layout(task_id) + validation_frames = validation_layout.validation_frames + + new_honeypot_real_frames = [ + validation_frames[(validation_frames.index(f) + 1) % len(validation_frames)] + for f in validation_layout.honeypot_real_frames + ] + api_client.tasks_api.partial_update_validation_layout( + task_id, + patched_task_validation_layout_write_request=( + models.PatchedTaskValidationLayoutWriteRequest( + frame_selection_method="manual", + honeypot_real_frames=new_honeypot_real_frames, + ) + ), ) - ) as gen_iter: - task_spec, task_id = next(gen_iter) - - with make_api_client(self._USERNAME) as api_client: - validation_layout, _ = api_client.tasks_api.retrieve_validation_layout(task_id) - validation_frames = validation_layout.validation_frames - - new_honeypot_real_frames = [ - validation_frames[(validation_frames.index(f) + 1) % len(validation_frames)] - for f in validation_layout.honeypot_real_frames - ] - api_client.tasks_api.partial_update_validation_layout( - task_id, - patched_task_validation_layout_write_request=( - models.PatchedTaskValidationLayoutWriteRequest( - frame_selection_method="manual", - honeypot_real_frames=new_honeypot_real_frames, - ) - ), - ) - # Get the new frame order - frame_map = dict(zip(validation_layout.honeypot_frames, new_honeypot_real_frames)) + # Get the new frame order + frame_map = dict(zip(validation_layout.honeypot_frames, new_honeypot_real_frames)) - _get_frame = task_spec._get_frame - task_spec._get_frame = lambda i: _get_frame(frame_map.get(i, i)) + _get_frame = task_spec._get_frame + task_spec._get_frame = lambda i: _get_frame(frame_map.get(i, i)) - yield task_spec, task_id + return task_spec, task_id @fixture(scope="class") @parametrize("random_seed", [1, 2, 5]) def fxt_uploaded_images_task_with_honeypots_and_changed_real_frames( self, request: pytest.FixtureRequest, random_seed: int - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._images_task_with_honeypots_and_changed_real_frames_base( + ) -> tuple[ITaskSpec, int]: + return self._images_task_with_honeypots_and_changed_real_frames_base( request, random_seed=random_seed ) @@ -301,7 +297,7 @@ def fxt_uploaded_images_task_with_honeypots_and_changed_real_frames( ) def fxt_cloud_images_task_with_honeypots_and_changed_real_frames( self, request: pytest.FixtureRequest, cloud_storages, cloud_storage_id: int - ) -> Generator[tuple[ITaskSpec, int], None, None]: + ) -> tuple[ITaskSpec, int]: cloud_storage = cloud_storages[cloud_storage_id] s3_client = s3.make_client(bucket=cloud_storage["resource"]) @@ -319,7 +315,7 @@ def fxt_cloud_images_task_with_honeypots_and_changed_real_frames( for image in image_files: image.seek(0) - yield from self._images_task_with_honeypots_and_changed_real_frames_base( + return self._images_task_with_honeypots_and_changed_real_frames_base( request, image_files=image_files, server_files=server_files, @@ -338,7 +334,7 @@ def _uploaded_images_task_with_gt_and_segments_base( step: int | None = None, frame_selection_method: str = "random_uniform", job_replication: int | None = None, - ) -> Generator[tuple[ITaskSpec, int], None, None]: + ) -> tuple[ITaskSpec, int]: used_frames_count = 16 total_frame_count = (start_frame or 0) + used_frames_count * (step or 1) segment_size = 5 @@ -378,7 +374,7 @@ def _uploaded_images_task_with_gt_and_segments_base( **validation_params_kwargs, ) - yield from self._image_task_fxt_base( + return self._image_task_fxt_base( request=request, frame_count=None, image_files=image_files, @@ -393,8 +389,8 @@ def _uploaded_images_task_with_gt_and_segments_base( @pytest.fixture(scope="class") def fxt_uploaded_images_task_with_gt_and_segments_and_consensus( self, request: pytest.FixtureRequest - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._uploaded_images_task_with_gt_and_segments_base( + ) -> tuple[ITaskSpec, int]: + return self._uploaded_images_task_with_gt_and_segments_base( request=request, job_replication=2 ) @@ -405,7 +401,7 @@ def fxt_uploaded_images_task_with_gt_and_segments_and_consensus( ) def fxt_cloud_images_task_with_related_images( self, request: pytest.FixtureRequest, cloud_storages, cloud_storage_id: int - ) -> Generator[tuple[ITaskSpec, int], None, None]: + ) -> tuple[ITaskSpec, int]: cloud_storage = cloud_storages[cloud_storage_id] s3_client = s3.make_client(bucket=cloud_storage["resource"]) @@ -442,7 +438,7 @@ def _upload_file(file: io.RawIOBase): for image in image_files: image.seek(0) - yield from self._image_task_fxt_base( + return self._image_task_fxt_base( request, image_files=image_files, related_files=dict(enumerate(related_files)), @@ -457,7 +453,7 @@ def _upload_file(file: io.RawIOBase): ) def fxt_cloud_pcd_task_with_related_images( self, request: pytest.FixtureRequest, cloud_storages, cloud_storage_id: int - ) -> Generator[tuple[ITaskSpec, int], None, None]: + ) -> tuple[ITaskSpec, int]: cloud_storage = cloud_storages[cloud_storage_id] s3_client = s3.make_client(bucket=cloud_storage["resource"]) @@ -490,7 +486,7 @@ def fxt_cloud_pcd_task_with_related_images( ri_file.name = os.path.basename(filename) related_files.append([ri_file]) - yield from self._image_task_fxt_base( + return self._image_task_fxt_base( request, image_files=pcd_files, related_files=dict(enumerate(related_files)), @@ -498,11 +494,11 @@ def fxt_cloud_pcd_task_with_related_images( cloud_storage_id=cloud_storage_id, ) - @fixture(scope="class") - def fxt_share_images_task_with_related_images( + def _share_images_task_with_related_images( self, request: pytest.FixtureRequest, - ) -> Generator[tuple[ITaskSpec, int], None, None]: + **data_kwargs, + ) -> tuple[ITaskSpec, int]: image_files = [ read_share_file(fn) for fn in [ @@ -537,18 +533,41 @@ def fxt_share_images_task_with_related_images( f.name for fs in related_files.values() for f in fs ] - yield from self._image_task_fxt_base( + return self._image_task_fxt_base( request, image_files=image_files, related_files=related_files, server_files=server_files, + **data_kwargs, ) + @fixture(scope="class") + def fxt_share_images_task_with_related_images( + self, request: pytest.FixtureRequest + ) -> tuple[ITaskSpec, int]: + return self._share_images_task_with_related_images(request) + + @fixture(scope="class") + @parametrize( + "cloud_storage_id", + [pytest.param(5, marks=[pytest.mark.with_external_services])], + ) + def fxt_backing_cs_images_task_with_related_images( + self, request: pytest.FixtureRequest, cloud_storage_id: int + ) -> tuple[ITaskSpec, int]: + # The simplest way to get a task with local storage and subdirectories is to use the share + # with copy_data. + task_spec, task_id = self._share_images_task_with_related_images(request, copy_data=True) + container_exec_cvat( + request, ["./manage.py", "movetasktobackingcs", str(task_id), str(cloud_storage_id)] + ) + return task_spec, task_id + @fixture(scope="class") def fxt_share_pcd_task_with_related_images( self, request: pytest.FixtureRequest, - ) -> Generator[tuple[ITaskSpec, int], None, None]: + ) -> tuple[ITaskSpec, int]: pcd_files = [ read_share_file(fn) for fn in [ @@ -578,7 +597,7 @@ def fxt_share_pcd_task_with_related_images( f.name for fs in related_files.values() for f in fs ] - yield from self._image_task_fxt_base( + return self._image_task_fxt_base( request, image_files=pcd_files, related_files=related_files, @@ -594,8 +613,8 @@ def fxt_uploaded_images_task_with_gt_and_segments_start_step( start_frame: int | None, step: int | None, frame_selection_method: str, - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._uploaded_images_task_with_gt_and_segments_base( + ) -> tuple[ITaskSpec, int]: + return self._uploaded_images_task_with_gt_and_segments_base( request, start_frame=start_frame, step=step, @@ -613,7 +632,7 @@ def _uploaded_video_task_fxt_base( step: int | None = None, video_file: IO[bytes] | None = None, chapters: Sequence[dict] | None = None, - ) -> Generator[tuple[VideoTaskSpec, int], None, None]: + ) -> tuple[VideoTaskSpec, int]: task_params = { "name": f"{request.node.name}[{request.fixturename}]", "labels": [{"name": "a"}], @@ -663,36 +682,39 @@ def get_video_file() -> io.BytesIO: return io.BytesIO(video_data) task_id, _ = create_task(self._USERNAME, spec=task_params, data=data_params) - yield VideoTaskSpec( - models.TaskWriteRequest._from_openapi_data(**task_params), - models.DataRequest._from_openapi_data(**data_params), - get_video_file=get_video_file, - size=resulting_task_size, - chapters=chapters, - ), task_id + return ( + VideoTaskSpec( + models.TaskWriteRequest._from_openapi_data(**task_params), + models.DataRequest._from_openapi_data(**data_params), + get_video_file=get_video_file, + size=resulting_task_size, + chapters=chapters, + ), + task_id, + ) @pytest.fixture(scope="class") def fxt_uploaded_video_task( self, request: pytest.FixtureRequest, - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._uploaded_video_task_fxt_base(request=request) + ) -> tuple[ITaskSpec, int]: + return self._uploaded_video_task_fxt_base(request=request) @pytest.fixture(scope="class") def fxt_uploaded_video_task_without_manifest( self, request: pytest.FixtureRequest, - ) -> Generator[tuple[ITaskSpec, int], None, None]: + ) -> tuple[ITaskSpec, int]: video_file = generate_video_file(num_frames=10, invalid_keyframes=True) - yield from self._uploaded_video_task_fxt_base( + return self._uploaded_video_task_fxt_base( request=request, video_file=video_file, chapters=[] ) @pytest.fixture(scope="class") def fxt_uploaded_video_task_with_segments( self, request: pytest.FixtureRequest - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._uploaded_video_task_fxt_base(request=request, segment_size=4) + ) -> tuple[ITaskSpec, int]: + return self._uploaded_video_task_fxt_base(request=request, segment_size=4) @fixture(scope="class") @parametrize("step", [2, 5]) @@ -700,8 +722,8 @@ def fxt_uploaded_video_task_with_segments( @parametrize("start_frame", [3, 7]) def fxt_uploaded_video_task_with_segments_start_stop_step( self, request: pytest.FixtureRequest, start_frame: int, stop_frame: int | None, step: int - ) -> Generator[tuple[ITaskSpec, int], None, None]: - yield from self._uploaded_video_task_fxt_base( + ) -> tuple[ITaskSpec, int]: + return self._uploaded_video_task_fxt_base( request=request, frame_count=30, segment_size=4, @@ -804,6 +826,7 @@ def _get_job_abs_frame_set(self, job_meta: models.DataMetaRead) -> Sequence[int] fixture_ref("fxt_cloud_pcd_task_with_related_images"), fixture_ref("fxt_share_images_task_with_related_images"), fixture_ref("fxt_share_pcd_task_with_related_images"), + fixture_ref("fxt_backing_cs_images_task_with_related_images"), ] _3d_task_cases = [ diff --git a/tests/python/rest_api/test_tasks.py b/tests/python/rest_api/test_tasks.py index 2103b50db317..dd4fc93a61af 100644 --- a/tests/python/rest_api/test_tasks.py +++ b/tests/python/rest_api/test_tasks.py @@ -40,7 +40,7 @@ from pytest_cases import fixture, fixture_ref, parametrize import shared.utils.s3 as s3 -from rest_api._test_base import TestTasksBase +from rest_api._test_base import SHARE_DIR, TestTasksBase from rest_api.utils import ( DATUMARO_FORMAT_FOR_DIMENSION, CollectionSimpleFilterTestBase, @@ -1588,6 +1588,50 @@ def _test_can_restore_task_from_backup(self, task_id: int, lightweight_backup: b ) +@pytest.mark.with_external_services +@pytest.mark.usefixtures("restore_db_per_function") +@pytest.mark.usefixtures("restore_cvat_data_per_function") +@pytest.mark.usefixtures("restore_redis_inmem_per_function") +class TestTaskBackupsWithBackingCs: + @pytest.fixture(autouse=True) + def setup(self, tmp_path: Path, admin_user: str) -> None: + self.tmp_dir = tmp_path + self.user = admin_user + + def test_can_export_and_import_backup(self, request, tasks, cloud_storages): + # task_id = next(task["id"] for task in tasks if task["dimension"] == "3d") + cloud_storage_id = next(cs["id"] for cs in cloud_storages if cs["resource"] == "backingcs") + + with make_sdk_client(self.user) as client: + task = client.tasks.create_from_data( + models.TaskWriteRequest(name="Canvas3D"), [SHARE_DIR / "test_canvas3d.zip"] + ) + + container_exec_cvat( + request, ["./manage.py", "movetasktobackingcs", str(task.id), str(cloud_storage_id)] + ) + + backup_path = self.tmp_dir / "backup.zip" + task.download_backup(backup_path) + + with zipfile.ZipFile(backup_path) as zip: + names = zip.namelist() + + assert any(name.endswith(".pcd") for name in names) + assert any(name.endswith(".png") for name in names) + + new_task = client.tasks.create_from_backup(backup_path) + + assert ( + DeepDiff( + json.loads(task.api.retrieve_data_meta(task.id)[1].data), + json.loads(new_task.api.retrieve_data_meta(new_task.id)[1].data), + exclude_regex_paths=[r"root\['chunks_updated_date'\]"], + ) + == {} + ) + + @pytest.mark.usefixtures("restore_db_per_function") class TestWorkWithSimpleGtJobTasks: @fixture diff --git a/tests/python/shared/assets/cloudstorages.json b/tests/python/shared/assets/cloudstorages.json index 661b2cd0622c..e1972964d508 100644 --- a/tests/python/shared/assets/cloudstorages.json +++ b/tests/python/shared/assets/cloudstorages.json @@ -3,6 +3,26 @@ "next": null, "previous": null, "results": [ + { + "created_date": "2026-02-11T16:29:48.000000Z", + "credentials_type": "KEY_SECRET_KEY_PAIR", + "description": "Bucket to be used as backing cloud storage for tasks", + "display_name": "Backing CS", + "id": 5, + "manifests": [], + "organization": null, + "owner": { + "first_name": "Admin", + "id": 1, + "last_name": "First", + "url": "http://localhost:8080/api/users/1", + "username": "admin1" + }, + "provider_type": "AWS_S3_BUCKET", + "resource": "backingcs", + "specific_attributes": "endpoint_url=http%3A%2F%2Fminio%3A9000", + "updated_date": "2026-02-11T16:29:48.000000Z" + }, { "created_date": "2022-06-29T12:56:18.257000Z", "credentials_type": "KEY_SECRET_KEY_PAIR", diff --git a/tests/python/shared/assets/cvat_db/data.json b/tests/python/shared/assets/cvat_db/data.json index c90268e49c5c..32c270d884ab 100644 --- a/tests/python/shared/assets/cvat_db/data.json +++ b/tests/python/shared/assets/cvat_db/data.json @@ -16872,6 +16872,25 @@ "organization": 2 } }, +{ + "model": "engine.cloudstorage", + "pk": 5, + "fields": { + "created_date": "2026-02-11T16:29:48.000Z", + "updated_date": "2026-02-11T16:29:48.000Z", + "provider_type": "AWS_S3_BUCKET", + "resource": "backingcs", + "display_name": "Backing CS", + "owner": [ + "admin1" + ], + "credentials": "minio_access_key minio_secret_key", + "credentials_type": "KEY_SECRET_KEY_PAIR", + "specific_attributes": "endpoint_url=http%3A%2F%2Fminio%3A9000", + "description": "Bucket to be used as backing cloud storage for tasks", + "organization": null + } +}, { "model": "engine.storage", "pk": 1,