Skip to content
1 change: 1 addition & 0 deletions cvat/apps/engine/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class DataAdmin(admin.ModelAdmin):
"frame_filter",
"compressed_chunk_type",
"original_chunk_type",
"local_storage_backing_cs",
)
readonly_fields = fields
autocomplete_fields = ("cloud_storage",)
Expand Down
163 changes: 85 additions & 78 deletions cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
)
from cvat.apps.engine import models
from cvat.apps.engine.cache import MediaCache
from cvat.apps.engine.cloud_provider import db_storage_to_storage_instance
from cvat.apps.engine.log import ServerLogManager
from cvat.apps.engine.models import DataChoice, StorageChoice
from cvat.apps.engine.serializers import (
Expand Down Expand Up @@ -477,17 +476,95 @@ def _write_filtered_media_manifest(self, zip_object: ZipFile, target_dir: str) -

self._manifest_was_filtered = True

def _write_data_from_cloud_storage(self, zip_object: ZipFile, target_dir: str) -> None:
assert not hasattr(self._db_data, "video"), "Only images can be stored in cloud storage"

target_data_dir = os.path.join(target_dir, self.DATA_DIRNAME)
data_dir = self._db_data.get_upload_dirname()

self._write_filtered_media_manifest(zip_object=zip_object, target_dir=target_dir)

files_for_local_copy = []

media_files_to_download: list[PurePath] = []
for media_file in self._db_data.related_files.all():
media_path = PurePath(media_file.path)

local_path = os.path.join(data_dir, media_path)
if os.path.exists(local_path):
files_for_local_copy.append(local_path)
else:
media_files_to_download.append(media_path)

frame_ids_to_download = []
frame_names_to_download = []
for media_file in self._db_data.images.all():
media_path = media_file.path

local_path = os.path.join(data_dir, media_path)
if os.path.exists(local_path):
files_for_local_copy.append(local_path)
else:
frame_ids_to_download.append(media_file.frame)
frame_names_to_download.append(media_file.path)

if media_files_to_download:
storage_client = self._db_data.get_cloud_storage_instance()
with tempfile.TemporaryDirectory() as tmp_dir:
storage_client.bulk_download_to_dir(
files=media_files_to_download, upload_dir=Path(tmp_dir)
)

self._write_files(
source_dir=tmp_dir,
zip_object=zip_object,
files=[os.path.join(tmp_dir, file) for file in media_files_to_download],
target_dir=target_data_dir,
)

if frame_ids_to_download:
media_cache = MediaCache()
with closing(
media_cache.read_raw_images(
self._db_task, frame_ids=frame_ids_to_download, decode=False
)
) as frame_iter:
# Avoid closing the frame iter before the files are copied
downloaded_paths = []
for _ in frame_ids_to_download:
downloaded_paths.append(next(frame_iter)[1])

tmp_dir = downloaded_paths[0].removesuffix(frame_names_to_download[0])

self._write_files(
source_dir=tmp_dir,
zip_object=zip_object,
files=downloaded_paths,
target_dir=target_data_dir,
)

self._write_files(
source_dir=data_dir,
zip_object=zip_object,
files=files_for_local_copy,
target_dir=target_data_dir,
)

def _write_data(self, zip_object: ZipFile, target_dir: str) -> None:
target_data_dir = os.path.join(target_dir, self.DATA_DIRNAME)

if self._db_data.storage == StorageChoice.LOCAL:
data_dir = self._db_data.get_upload_dirname()
self._write_directory(
source_dir=data_dir,
zip_object=zip_object,
target_dir=target_data_dir,
exclude_files=[self.MEDIA_MANIFEST_INDEX_FILENAME],
)
if self._db_data.local_storage_backing_cs:
self._write_data_from_cloud_storage(zip_object, target_dir)
else:
self._write_directory(
source_dir=data_dir,
zip_object=zip_object,
target_dir=target_data_dir,
exclude_files=[self.MEDIA_MANIFEST_INDEX_FILENAME],
)

elif self._db_data.storage == StorageChoice.SHARE:
data_dir = settings.SHARE_ROOT
if hasattr(self._db_data, "video"):
Expand All @@ -505,8 +582,6 @@ def _write_data(self, zip_object: ZipFile, target_dir: str) -> None:
self._write_filtered_media_manifest(zip_object=zip_object, target_dir=target_dir)

elif self._db_data.storage == StorageChoice.CLOUD_STORAGE:
assert not hasattr(self._db_data, "video"), "Only images can be stored in cloud storage"

data_dir = self._db_data.get_upload_dirname()

if self._lightweight:
Expand All @@ -517,75 +592,7 @@ def _write_data(self, zip_object: ZipFile, target_dir: str) -> None:
target_dir=target_data_dir,
)
else:
self._write_filtered_media_manifest(zip_object=zip_object, target_dir=target_dir)

files_for_local_copy = []

media_files_to_download: list[PurePath] = []
for media_file in self._db_data.related_files.all():
media_path = PurePath(media_file.path)

local_path = os.path.join(data_dir, media_path)
if os.path.exists(local_path):
files_for_local_copy.append(local_path)
else:
media_files_to_download.append(media_path)

frame_ids_to_download = []
frame_names_to_download = []
for media_file in self._db_data.images.all():
media_path = media_file.path

local_path = os.path.join(data_dir, media_path)
if os.path.exists(local_path):
files_for_local_copy.append(local_path)
else:
frame_ids_to_download.append(media_file.frame)
frame_names_to_download.append(media_file.path)

if media_files_to_download:
storage_client = db_storage_to_storage_instance(self._db_data.cloud_storage)
with tempfile.TemporaryDirectory() as tmp_dir:
storage_client.bulk_download_to_dir(
files=media_files_to_download, upload_dir=Path(tmp_dir)
)

self._write_files(
source_dir=tmp_dir,
zip_object=zip_object,
files=[os.path.join(tmp_dir, file) for file in media_files_to_download],
target_dir=target_data_dir,
)

if frame_ids_to_download:
media_cache = MediaCache()
with closing(
iter(
media_cache.read_raw_images(
self._db_task, frame_ids=frame_ids_to_download, decode=False
)
)
) as frame_iter:
# Avoid closing the frame iter before the files are copied
downloaded_paths = []
for _ in frame_ids_to_download:
downloaded_paths.append(next(frame_iter)[1])

tmp_dir = downloaded_paths[0].removesuffix(frame_names_to_download[0])

self._write_files(
source_dir=tmp_dir,
zip_object=zip_object,
files=downloaded_paths,
target_dir=target_data_dir,
)

self._write_files(
source_dir=data_dir,
zip_object=zip_object,
files=files_for_local_copy,
target_dir=target_data_dir,
)
self._write_data_from_cloud_storage(zip_object, target_dir)
else:
raise NotImplementedError

Expand Down
17 changes: 4 additions & 13 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,14 +593,10 @@ def read_raw_images(
db_data = db_task.require_data()
manifest_path = db_data.get_manifest_path()

if os.path.isfile(manifest_path) and db_data.storage == models.StorageChoice.CLOUD_STORAGE:
if storage_client := db_data.get_cloud_storage_instance():
assert manifest_path.is_file()
reader = ImageReaderWithManifest(manifest_path)
with ExitStack() as es:
db_cloud_storage = db_data.cloud_storage
if not db_cloud_storage:
raise CloudStorageMissingError("Task is no longer connected to cloud storage")
storage_client = db_storage_to_storage_instance(db_cloud_storage)

tmp_dir = Path(es.enter_context(tempfile.TemporaryDirectory(prefix="cvat")))
# (storage filename, output filename)
files_to_download: list[tuple[str, PurePath]] = []
Expand All @@ -621,7 +617,7 @@ def read_raw_images(
files_to_download, checksums, media
):
if checksum and not md5_hash(media_item[1]) == checksum:
slogger.cloud_storage[db_cloud_storage.id].warning(
slogger.task[db_task.id].warning(
"Hash sums of files {} do not match".format(media_item[1])
)

Expand Down Expand Up @@ -710,12 +706,7 @@ def _validate_ri_path(path: str) -> PurePath:
for frame_id, frame_ris in groupby(db_related_files, key=lambda v: v[0])
]

if db_data.storage == models.StorageChoice.CLOUD_STORAGE:
db_cloud_storage = db_data.cloud_storage
if not db_cloud_storage:
raise CloudStorageMissingError("Task is no longer connected to cloud storage")
storage_client = db_storage_to_storage_instance(db_cloud_storage)

if storage_client := db_data.get_cloud_storage_instance():
tmp_dir = Path(es.enter_context(tempfile.TemporaryDirectory(prefix="cvat")))
files_to_download: list[PurePath] = []
for _, frame_media in media:
Expand Down
Loading
Loading