diff --git a/mapillary_tools/exceptions.py b/mapillary_tools/exceptions.py index 45d1491b9..10c8b7b48 100644 --- a/mapillary_tools/exceptions.py +++ b/mapillary_tools/exceptions.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import typing as T @@ -5,6 +7,18 @@ class MapillaryUserError(Exception): exit_code: int +class MapillaryProcessError(MapillaryUserError): + """ + Base exception for process specific errors + """ + + exit_code = 6 + + +class MapillaryDescriptionError(Exception): + pass + + class MapillaryBadParameterError(MapillaryUserError): exit_code = 2 @@ -17,44 +31,35 @@ class MapillaryInvalidDescriptionFile(MapillaryUserError): exit_code = 4 -class MapillaryUnknownFileTypeError(MapillaryUserError): - exit_code = 5 - - -class MapillaryProcessError(MapillaryUserError): - exit_code = 6 - - class MapillaryVideoError(MapillaryUserError): exit_code = 7 class MapillaryFFmpegNotFoundError(MapillaryUserError): exit_code = 8 - help = "https://github.com/mapillary/mapillary_tools#video-support" class MapillaryExiftoolNotFoundError(MapillaryUserError): exit_code = 8 -class MapillaryDescriptionError(Exception): +class MapillaryGeoTaggingError(MapillaryDescriptionError): pass -class MapillaryGeoTaggingError(MapillaryDescriptionError): +class MapillaryVideoGPSNotFoundError(MapillaryDescriptionError): pass -class MapillaryGPXEmptyError(MapillaryDescriptionError, MapillaryUserError): - exit_code = 9 +class MapillaryGPXEmptyError(MapillaryDescriptionError): + pass -class MapillaryVideoGPSNotFoundError(MapillaryDescriptionError, MapillaryUserError): - exit_code = 9 +class MapillaryGPSNoiseError(MapillaryDescriptionError): + pass -class MapillaryGPSNoiseError(MapillaryDescriptionError): +class MapillaryStationaryVideoError(MapillaryDescriptionError): pass @@ -68,21 +73,13 @@ def __init__( self.gpx_end_time = gpx_end_time -class MapillaryStationaryVideoError(MapillaryDescriptionError, MapillaryUserError): - exit_code = 10 - - -class MapillaryInvalidBlackVueVideoError(MapillaryDescriptionError, MapillaryUserError): - exit_code = 11 - - class MapillaryDuplicationError(MapillaryDescriptionError): def __init__( self, message: str, desc: T.Mapping[str, T.Any], distance: float, - angle_diff: T.Optional[float], + angle_diff: float | None, ) -> None: super().__init__(message) self.desc = desc @@ -90,17 +87,19 @@ def __init__( self.angle_diff = angle_diff -class MapillaryUploadedAlreadyError(MapillaryDescriptionError): - def __init__( - self, - message: str, - desc: T.Mapping[str, T.Any], - ) -> None: - super().__init__(message) - self.desc = desc +class MapillaryEXIFNotFoundError(MapillaryDescriptionError): + pass -class MapillaryEXIFNotFoundError(MapillaryDescriptionError): +class MapillaryFileTooLargeError(MapillaryDescriptionError): + pass + + +class MapillaryCaptureSpeedTooFastError(MapillaryDescriptionError): + pass + + +class MapillaryNullIslandError(MapillaryDescriptionError): pass @@ -116,17 +115,5 @@ class MapillaryUploadUnauthorizedError(MapillaryUserError): exit_code = 14 -class MapillaryMetadataValidationError(MapillaryUserError, MapillaryDescriptionError): +class MapillaryMetadataValidationError(MapillaryUserError): exit_code = 15 - - -class MapillaryFileTooLargeError(MapillaryDescriptionError): - pass - - -class MapillaryCaptureSpeedTooFastError(MapillaryDescriptionError): - pass - - -class MapillaryNullIslandError(MapillaryDescriptionError): - pass diff --git a/mapillary_tools/mp4/io_utils.py b/mapillary_tools/mp4/io_utils.py index 23edc50d4..618e26096 100644 --- a/mapillary_tools/mp4/io_utils.py +++ b/mapillary_tools/mp4/io_utils.py @@ -3,7 +3,6 @@ class ChainedIO(io.IOBase): - # is the chained stream seekable? _streams: T.Sequence[io.IOBase] # the beginning offset of the current stream _begin_offset: int diff --git a/mapillary_tools/process_geotag_properties.py b/mapillary_tools/process_geotag_properties.py index 58436d1ca..a89f8097a 100644 --- a/mapillary_tools/process_geotag_properties.py +++ b/mapillary_tools/process_geotag_properties.py @@ -562,10 +562,7 @@ def process_finalize( # skip all exceptions skipped_process_errors = {Exception} else: - skipped_process_errors = { - exceptions.MapillaryDuplicationError, - exceptions.MapillaryUploadedAlreadyError, - } + skipped_process_errors = {exceptions.MapillaryDuplicationError} _show_stats(metadatas, skipped_process_errors=skipped_process_errors) return metadatas diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index 8b1a56b5d..dc19f50a2 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -456,11 +456,11 @@ def validate_image_desc(desc: T.Any) -> None: jsonschema.validate(instance=desc, schema=ImageDescriptionFileSchema) except jsonschema.ValidationError as ex: # do not use str(ex) which is more verbose - raise exceptions.MapillaryMetadataValidationError(ex.message) + raise exceptions.MapillaryMetadataValidationError(ex.message) from ex try: map_capture_time_to_datetime(desc["MAPCaptureTime"]) except ValueError as ex: - raise exceptions.MapillaryMetadataValidationError(str(ex)) + raise exceptions.MapillaryMetadataValidationError(str(ex)) from ex def validate_video_desc(desc: T.Any) -> None: @@ -468,7 +468,7 @@ def validate_video_desc(desc: T.Any) -> None: jsonschema.validate(instance=desc, schema=VideoDescriptionFileSchema) except jsonschema.ValidationError as ex: # do not use str(ex) which is more verbose - raise exceptions.MapillaryMetadataValidationError(ex.message) + raise exceptions.MapillaryMetadataValidationError(ex.message) from ex def datetime_to_map_capture_time(time: T.Union[datetime.datetime, int, float]) -> str: diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 2536aa026..961022295 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -9,6 +9,7 @@ import uuid from pathlib import Path +import jsonschema import requests from tqdm import tqdm @@ -36,10 +37,8 @@ LOG = logging.getLogger(__name__) -class UploadError(Exception): - def __init__(self, inner_ex) -> None: - self.inner_ex = inner_ex - super().__init__(str(inner_ex)) +class UploadedAlreadyError(uploader.SequenceError): + pass def _load_validate_metadatas_from_desc_path( @@ -70,7 +69,7 @@ def _load_validate_metadatas_from_desc_path( except json.JSONDecodeError as ex: raise exceptions.MapillaryInvalidDescriptionFile( f"Invalid JSON stream from stdin: {ex}" - ) + ) from ex else: if not os.path.isfile(desc_path): if is_default_desc_path: @@ -87,7 +86,7 @@ def _load_validate_metadatas_from_desc_path( except json.JSONDecodeError as ex: raise exceptions.MapillaryInvalidDescriptionFile( f"Invalid JSON file {desc_path}: {ex}" - ) + ) from ex # the descs load from stdin or json file may contain invalid entries validated_descs = [ @@ -135,10 +134,16 @@ def zip_images( uploader.ZipImageSequence.zip_images(image_metadatas, zip_dir) -def _setup_cancel_due_to_duplication(emitter: uploader.EventEmitter) -> None: +def _setup_history( + emitter: uploader.EventEmitter, + upload_run_params: JSONDict, + metadatas: list[types.Metadata], +) -> None: @emitter.on("upload_start") - def upload_start(payload: uploader.Progress): - md5sum = payload["md5sum"] + def check_duplication(payload: uploader.Progress): + md5sum = payload.get("md5sum") + assert md5sum is not None, f"md5sum has to be set for {payload}" + if history.is_uploaded(md5sum): sequence_uuid = payload.get("sequence_uuid") if sequence_uuid is None: @@ -154,19 +159,15 @@ def upload_start(payload: uploader.Progress): sequence_uuid, history.history_desc_path(md5sum), ) - raise uploader.UploadCancelled() - + raise UploadedAlreadyError() -def _setup_write_upload_history( - emitter: uploader.EventEmitter, - params: JSONDict, - metadatas: list[types.Metadata] | None = None, -) -> None: @emitter.on("upload_finished") - def upload_finished(payload: uploader.Progress): + def write_history(payload: uploader.Progress): sequence_uuid = payload.get("sequence_uuid") - md5sum = payload["md5sum"] - if sequence_uuid is None or metadatas is None: + md5sum = payload.get("md5sum") + assert md5sum is not None, f"md5sum has to be set for {payload}" + + if sequence_uuid is None: sequence = None else: sequence = [ @@ -176,10 +177,11 @@ def upload_finished(payload: uploader.Progress): and metadata.MAPSequenceUUID == sequence_uuid ] sequence.sort(key=lambda metadata: metadata.sort_key()) + try: history.write_history( md5sum, - params, + upload_run_params, T.cast(JSONDict, payload), sequence, ) @@ -317,6 +319,9 @@ def collect_end_time(payload: _APIStats) -> None: now = time.time() payload["upload_end_time"] = now payload["upload_total_time"] += now - payload["upload_last_restart_time"] + + @emitter.on("upload_finished") + def append_stats(payload: _APIStats) -> None: all_stats.append(payload) return all_stats @@ -325,7 +330,7 @@ def collect_end_time(payload: _APIStats) -> None: def _summarize(stats: T.Sequence[_APIStats]) -> dict: total_image_count = sum(s.get("sequence_image_count", 0) for s in stats) total_uploaded_sequence_count = len(stats) - # note that stats[0]["total_sequence_count"] not always same as total_uploaded_sequence_count + # Note that stats[0]["total_sequence_count"] not always same as total_uploaded_sequence_count total_uploaded_size = sum( s["entity_size"] - s.get("upload_first_offset", 0) for s in stats @@ -343,6 +348,7 @@ def _summarize(stats: T.Sequence[_APIStats]) -> dict: upload_summary = { "images": total_image_count, + # TODO: rename sequences to total uploads "sequences": total_uploaded_sequence_count, "size": round(total_entity_size_mb, 4), "uploaded_size": round(total_uploaded_size_mb, 4), @@ -353,29 +359,27 @@ def _summarize(stats: T.Sequence[_APIStats]) -> dict: return upload_summary -def _show_upload_summary(stats: T.Sequence[_APIStats]): - grouped: dict[str, list[_APIStats]] = {} - for stat in stats: - grouped.setdefault(stat.get("file_type", "unknown"), []).append(stat) +def _show_upload_summary(stats: T.Sequence[_APIStats], errors: T.Sequence[Exception]): + if not stats: + LOG.info("Nothing uploaded. Bye.") + else: + grouped: dict[str, list[_APIStats]] = {} + for stat in stats: + grouped.setdefault(stat.get("file_type", "unknown"), []).append(stat) - for file_type, typed_stats in grouped.items(): - if file_type == FileType.IMAGE.value: - LOG.info( - "%8d %s sequences uploaded", - len(typed_stats), - file_type.upper(), - ) - else: - LOG.info( - "%8d %s files uploaded", - len(typed_stats), - file_type.upper(), - ) + for file_type, typed_stats in grouped.items(): + if file_type == FileType.IMAGE.value: + LOG.info("%8d image sequences uploaded", len(typed_stats)) + else: + LOG.info("%8d %s videos uploaded", len(typed_stats), file_type.upper()) + + summary = _summarize(stats) + LOG.info("%8.1fM data in total", summary["size"]) + LOG.info("%8.1fM data uploaded", summary["uploaded_size"]) + LOG.info("%8.1fs upload time", summary["time"]) - summary = _summarize(stats) - LOG.info("%8.1fM data in total", summary["size"]) - LOG.info("%8.1fM data uploaded", summary["uploaded_size"]) - LOG.info("%8.1fs upload time", summary["time"]) + for error in errors: + LOG.error("Upload error: %s: %s", error.__class__.__name__, error) def _api_logging_finished(summary: dict): @@ -383,7 +387,6 @@ def _api_logging_finished(summary: dict): return action: api_v4.ActionType = "upload_finished_upload" - LOG.debug("API Logging for action %s: %s", action, summary) try: api_v4.log_event(action, summary) except requests.HTTPError as exc: @@ -402,7 +405,6 @@ def _api_logging_failed(payload: dict, exc: Exception): payload_with_reason = {**payload, "reason": exc.__class__.__name__} action: api_v4.ActionType = "upload_failed_upload" - LOG.debug("API Logging for action %s: %s", action, payload) try: api_v4.log_event(action, payload_with_reason) except requests.HTTPError as exc: @@ -451,93 +453,91 @@ def _load_descs( def _find_metadata_with_filename_existed_in( - metadatas: T.Sequence[_M], paths: T.Sequence[Path] + metadatas: T.Iterable[_M], paths: T.Iterable[Path] ) -> list[_M]: resolved_image_paths = set(p.resolve() for p in paths) return [d for d in metadatas if d.filename.resolve() in resolved_image_paths] -def _upload_everything( +def _gen_upload_everything( mly_uploader: uploader.Uploader, metadatas: T.Sequence[types.Metadata], import_paths: T.Sequence[Path], skip_subfolders: bool, ): # Upload images - image_paths = utils.find_images(import_paths, skip_subfolders=skip_subfolders) - # Find descs that match the image paths from the import paths - image_metadatas = [ - metadata - for metadata in (metadatas or []) - if isinstance(metadata, types.ImageMetadata) - ] - specified_image_metadatas = _find_metadata_with_filename_existed_in( - image_metadatas, image_paths + image_metadatas = _find_metadata_with_filename_existed_in( + (m for m in metadatas if isinstance(m, types.ImageMetadata)), + utils.find_images(import_paths, skip_subfolders=skip_subfolders), ) - if specified_image_metadatas: - try: - clusters = uploader.ZipImageSequence.prepare_images_and_upload( - specified_image_metadatas, - mly_uploader, - ) - except Exception as ex: - raise UploadError(ex) from ex - - if clusters: - LOG.debug("Uploaded to cluster: %s", clusters) + for image_result in uploader.ZipImageSequence.prepare_images_and_upload( + image_metadatas, + mly_uploader, + ): + yield image_result # Upload videos - video_paths = utils.find_videos(import_paths, skip_subfolders=skip_subfolders) - video_metadatas = [ - metadata - for metadata in (metadatas or []) - if isinstance(metadata, types.VideoMetadata) - ] - specified_video_metadatas = _find_metadata_with_filename_existed_in( - video_metadatas, video_paths + video_metadatas = _find_metadata_with_filename_existed_in( + (m for m in metadatas if isinstance(m, types.VideoMetadata)), + utils.find_videos(import_paths, skip_subfolders=skip_subfolders), ) - _upload_videos(mly_uploader, specified_video_metadatas) + for video_result in _gen_upload_videos(mly_uploader, video_metadatas): + yield video_result # Upload zip files zip_paths = utils.find_zipfiles(import_paths, skip_subfolders=skip_subfolders) - _upload_zipfiles(mly_uploader, zip_paths) + for zip_result in _gen_upload_zipfiles(mly_uploader, zip_paths): + yield zip_result -def _upload_videos( +def _gen_upload_videos( mly_uploader: uploader.Uploader, video_metadatas: T.Sequence[types.VideoMetadata] -): +) -> T.Generator[tuple[types.VideoMetadata, uploader.UploadResult], None, None]: for idx, video_metadata in enumerate(video_metadatas): - video_metadata.update_md5sum() + try: + video_metadata.update_md5sum() + except Exception as ex: + yield video_metadata, uploader.UploadResult(error=ex) + continue + assert isinstance(video_metadata.md5sum, str), "md5sum should be updated" + # Convert video metadata to CAMMInfo camm_info = _prepare_camm_info(video_metadata) - generator = camm_builder.camm_sample_generator2(camm_info) + # Create the CAMM sample generator + camm_sample_generator = camm_builder.camm_sample_generator2(camm_info) - with video_metadata.filename.open("rb") as src_fp: - camm_fp = simple_mp4_builder.transform_mp4(src_fp, generator) - progress: uploader.SequenceProgress = { - "total_sequence_count": len(video_metadatas), - "sequence_idx": idx, - "file_type": video_metadata.filetype.value, - "import_path": str(video_metadata.filename), - "md5sum": video_metadata.md5sum, - } + progress: uploader.SequenceProgress = { + "total_sequence_count": len(video_metadatas), + "sequence_idx": idx, + "file_type": video_metadata.filetype.value, + "import_path": str(video_metadata.filename), + "md5sum": video_metadata.md5sum, + } - session_key = uploader._session_key( - video_metadata.md5sum, upload_api_v4.ClusterFileType.CAMM - ) + session_key = uploader._session_key( + video_metadata.md5sum, upload_api_v4.ClusterFileType.CAMM + ) - try: + try: + with video_metadata.filename.open("rb") as src_fp: + # Build the mp4 stream with the CAMM samples + camm_fp = simple_mp4_builder.transform_mp4( + src_fp, camm_sample_generator + ) + + # Upload the mp4 stream cluster_id = mly_uploader.upload_stream( - T.cast(T.BinaryIO, camm_fp), + T.cast(T.IO[bytes], camm_fp), upload_api_v4.ClusterFileType.CAMM, session_key, progress=T.cast(T.Dict[str, T.Any], progress), ) - except Exception as ex: - raise UploadError(ex) from ex - LOG.debug("Uploaded to cluster: %s", cluster_id) + except Exception as ex: + yield video_metadata, uploader.UploadResult(error=ex) + else: + yield video_metadata, uploader.UploadResult(result=cluster_id) def _prepare_camm_info(video_metadata: types.VideoMetadata) -> camm_parser.CAMMInfo: @@ -597,6 +597,43 @@ def _normalize_import_paths(import_path: Path | T.Sequence[Path]) -> list[Path]: return import_paths +def _continue_or_fail(ex: Exception) -> Exception: + """ + Wrap the exception, or re-raise if it is a fatal error (i.e. there is no point to continue) + """ + + if isinstance(ex, uploader.SequenceError): + return ex + + # Certain files not found or no permission + if isinstance(ex, OSError): + return ex + + # Certain metadatas are not valid + if isinstance(ex, exceptions.MapillaryMetadataValidationError): + return ex + + # Fatal error: this is thrown after all retries + if isinstance(ex, requests.ConnectionError): + raise exceptions.MapillaryUploadConnectionError(str(ex)) from ex + + # Fatal error: this is thrown after all retries + if isinstance(ex, requests.Timeout): + raise exceptions.MapillaryUploadTimeoutError(str(ex)) from ex + + # Fatal error: + if isinstance(ex, requests.HTTPError) and isinstance( + ex.response, requests.Response + ): + if api_v4.is_auth_error(ex.response): + raise exceptions.MapillaryUploadUnauthorizedError( + api_v4.extract_auth_error_message(ex.response) + ) from ex + raise ex + + raise ex + + def upload( import_path: Path | T.Sequence[Path], user_items: types.UserItem, @@ -609,6 +646,8 @@ def upload( metadatas = _load_descs(_metadatas_from_process, desc_path, import_paths) + jsonschema.validate(instance=user_items, schema=types.UserItemSchema) + # Setup the emitter -- the order matters here emitter = uploader.EventEmitter() @@ -620,74 +659,64 @@ def upload( not dry_run or constants.MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN ) - # Put it first one to cancel early + # Put it first one to check duplications first if enable_history: - _setup_cancel_due_to_duplication(emitter) + upload_run_params: JSONDict = { + # Null if multiple paths provided + "import_path": str(import_path) if isinstance(import_path, Path) else None, + "organization_key": user_items.get("MAPOrganizationKey"), + "user_key": user_items.get("MAPSettingsUserKey"), + "version": VERSION, + "run_at": time.time(), + } + _setup_history(emitter, upload_run_params, metadatas) - # This one set up tdqm + # Set up tdqm _setup_tdqm(emitter) - # Now stats is empty but it will collect during upload + # Now stats is empty but it will collect during ALL uploads stats = _setup_api_stats(emitter) - # Send the progress as well as the log stats collected above + # Send the progress via IPC, and log the progress in debug mode _setup_ipc(emitter) - params: JSONDict = { - # null if multiple paths provided - "import_path": str(import_path) if isinstance(import_path, Path) else None, - "organization_key": user_items.get("MAPOrganizationKey"), - "user_key": user_items.get("MAPSettingsUserKey"), - "version": VERSION, - } + mly_uploader = uploader.Uploader(user_items, emitter=emitter, dry_run=dry_run) - if enable_history: - _setup_write_upload_history(emitter, params, metadatas) - - mly_uploader = uploader.Uploader( - user_items, - emitter=emitter, - dry_run=dry_run, - chunk_size=int(constants.UPLOAD_CHUNK_SIZE_MB * 1024 * 1024), + results = _gen_upload_everything( + mly_uploader, metadatas, import_paths, skip_subfolders ) + upload_successes = 0 + upload_errors: list[Exception] = [] + + # The real upload happens sequentially here try: - _upload_everything(mly_uploader, metadatas, import_paths, skip_subfolders) - except UploadError as ex: - inner_ex = ex.inner_ex + for _, result in results: + if result.error is not None: + upload_errors.append(_continue_or_fail(result.error)) + else: + upload_successes += 1 + except Exception as ex: + # Fatal error: log and raise if not dry_run: - _api_logging_failed(_summarize(stats), inner_ex) - - if isinstance(inner_ex, requests.ConnectionError): - raise exceptions.MapillaryUploadConnectionError(str(inner_ex)) from inner_ex - - if isinstance(inner_ex, requests.Timeout): - raise exceptions.MapillaryUploadTimeoutError(str(inner_ex)) from inner_ex - - if isinstance(inner_ex, requests.HTTPError) and isinstance( - inner_ex.response, requests.Response - ): - if api_v4.is_auth_error(inner_ex.response): - raise exceptions.MapillaryUploadUnauthorizedError( - api_v4.extract_auth_error_message(inner_ex.response) - ) from inner_ex - raise inner_ex + _api_logging_failed(_summarize(stats), ex) + raise ex - raise inner_ex - - if stats: + else: if not dry_run: _api_logging_finished(_summarize(stats)) - _show_upload_summary(stats) - else: - LOG.info("Nothing uploaded. Bye.") + finally: + # We collected stats after every upload is finished + assert upload_successes == len(stats) + _show_upload_summary(stats, upload_errors) -def _upload_zipfiles( + +def _gen_upload_zipfiles( mly_uploader: uploader.Uploader, zip_paths: T.Sequence[Path], -) -> None: +) -> T.Generator[tuple[Path, uploader.UploadResult], None, None]: for idx, zip_path in enumerate(zip_paths): progress: uploader.SequenceProgress = { "total_sequence_count": len(zip_paths), @@ -699,6 +728,6 @@ def _upload_zipfiles( zip_path, mly_uploader, progress=T.cast(T.Dict[str, T.Any], progress) ) except Exception as ex: - raise UploadError(ex) from ex - - LOG.debug("Uploaded to cluster: %s", cluster_id) + yield zip_path, uploader.UploadResult(error=ex) + else: + yield zip_path, uploader.UploadResult(result=cluster_id) diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 811bae29a..3c3b5bb20 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -43,12 +43,12 @@ def __init__( self, user_access_token: str, session_key: str, - cluster_filetype: ClusterFileType = ClusterFileType.ZIP, + cluster_filetype: ClusterFileType, ): self.user_access_token = user_access_token self.session_key = session_key - # validate the input - self.cluster_filetype = ClusterFileType(cluster_filetype) + # Validate the input + self.cluster_filetype = cluster_filetype def fetch_offset(self) -> int: headers = { diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index a94607996..ede0ae713 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -1,9 +1,11 @@ from __future__ import annotations +import dataclasses import io import json import logging import os +import struct import tempfile import time import typing as T @@ -12,10 +14,9 @@ from contextlib import contextmanager from pathlib import Path -import jsonschema import requests -from . import api_v4, constants, exif_write, types, upload_api_v4, utils +from . import api_v4, constants, exif_write, types, upload_api_v4 LOG = logging.getLogger(__name__) @@ -26,18 +27,26 @@ class UploaderProgress(T.TypedDict, total=True): Progress data that Uploader cares about. """ - # The size of the chunk, in bytes, that has been read and upload + # The size, in bytes, of the last chunk that has been read and upload chunk_size: int + # The initial offset returned by the upload service, which is also the offset + # uploader start uploading from. + # Assert: + # - 0 <= begin_offset <= offset <= entity_size + # - Be non-None after at least a successful "upload_fetch_offset" begin_offset: int | None - # How many bytes has been uploaded so far since "upload_start" + # How many bytes of the file has been uploaded so far offset: int - # Size in bytes of the zipfile/BlackVue/CAMM + # Size in bytes of the file (i.e. fp.tell() after seek to the end) + # NOTE: It's different from filesize in file system + # Assert: + # - offset == entity_size when "upload_end" or "upload_finished" entity_size: int - # An "upload_interrupted" will increase it. Reset to 0 if the chunk is uploaded + # An "upload_interrupted" will increase it. Reset to 0 if a chunk is uploaded retries: int # Cluster ID after finishing the upload @@ -73,7 +82,23 @@ class Progress(SequenceProgress, UploaderProgress): pass -class UploadCancelled(Exception): +class SequenceError(Exception): + """ + Base class for sequence specific errors. These errors will cause the + current sequence upload to fail but will not interrupt the overall upload + process for other sequences. + """ + + pass + + +class ExifError(SequenceError): + def __init__(self, message: str, image_path: Path): + super().__init__(message) + self.image_path = image_path + + +class InvalidMapillaryZipFileError(SequenceError): pass @@ -104,6 +129,12 @@ def emit(self, event: EventName, *args, **kwargs): callback(*args, **kwargs) +@dataclasses.dataclass +class UploadResult: + result: str | None = None + error: Exception | None = None + + class ZipImageSequence: @classmethod def zip_images( @@ -112,16 +143,17 @@ def zip_images( """ Group images into sequences and zip each sequence into a zipfile. """ - _validate_metadatas(metadatas) sequences = types.group_and_sort_images(metadatas) os.makedirs(zip_dir, exist_ok=True) for sequence_uuid, sequence in sequences.items(): + _validate_metadatas(sequence) + upload_md5sum = types.update_sequence_md5sum(sequence) + # For atomicity we write into a WIP file and then rename to the final file wip_zip_filename = zip_dir.joinpath( f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}" ) - upload_md5sum = types.update_sequence_md5sum(sequence) filename = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP) zip_filename = zip_dir.joinpath(filename) with wip_file_context(wip_zip_filename, zip_filename) as wip_path: @@ -156,18 +188,27 @@ def zip_sequence_fp( return upload_md5sum @classmethod - def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str | None: + def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str: with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph: comment = ziph.comment + if not comment: - return None + raise InvalidMapillaryZipFileError("No comment found in the zipfile") + try: - upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum") - except Exception: - return None - if not upload_md5sum: - return None - return str(upload_md5sum) + decoded = comment.decode("utf-8") + zip_metadata = json.loads(decoded) + except UnicodeDecodeError as ex: + raise InvalidMapillaryZipFileError(str(ex)) from ex + except json.JSONDecodeError as ex: + raise InvalidMapillaryZipFileError(str(ex)) from ex + + upload_md5sum = zip_metadata.get("upload_md5sum") + + if not upload_md5sum and not isinstance(upload_md5sum, str): + raise InvalidMapillaryZipFileError("No upload_md5sum found") + + return upload_md5sum @classmethod def _uniq_arcname(cls, filename: Path, arcnames: set[str]): @@ -191,12 +232,22 @@ def _write_imagebytes_in_zip( if arcnames is None: arcnames = set() - edit = exif_write.ExifEdit(metadata.filename) + try: + edit = exif_write.ExifEdit(metadata.filename) + except struct.error as ex: + raise ExifError(f"Failed to load EXIF: {ex}", metadata.filename) from ex + # The cast is to fix the type checker error edit.add_image_description( T.cast(T.Dict, types.desc_file_to_exif(types.as_desc(metadata))) ) - image_bytes = edit.dump_image_bytes() + + try: + image_bytes = edit.dump_image_bytes() + except struct.error as ex: + raise ExifError( + f"Failed to dump EXIF bytes: {ex}", metadata.filename + ) from ex arcname = cls._uniq_arcname(metadata.filename, arcnames) arcnames.add(arcname) @@ -210,23 +261,18 @@ def prepare_zipfile_and_upload( zip_path: Path, uploader: Uploader, progress: dict[str, T.Any] | None = None, - ) -> str | None: + ) -> str: if progress is None: progress = {} with zipfile.ZipFile(zip_path) as ziph: namelist = ziph.namelist() if not namelist: - LOG.warning("Skipping empty zipfile: %s", zip_path) - return None + raise InvalidMapillaryZipFileError("Zipfile has no files") with zip_path.open("rb") as zip_fp: upload_md5sum = cls.extract_upload_md5sum(zip_fp) - if upload_md5sum is None: - with zip_path.open("rb") as zip_fp: - upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest() - sequence_progress: SequenceProgress = { "sequence_image_count": len(namelist), "file_type": types.FileType.ZIP.value, @@ -250,13 +296,12 @@ def prepare_images_and_upload( image_metadatas: T.Sequence[types.ImageMetadata], uploader: Uploader, progress: dict[str, T.Any] | None = None, - ) -> dict[str, str]: + ) -> T.Generator[tuple[str, UploadResult], None, None]: if progress is None: progress = {} - _validate_metadatas(image_metadatas) sequences = types.group_and_sort_images(image_metadatas) - ret: dict[str, str] = {} + for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()): sequence_progress: SequenceProgress = { "sequence_idx": sequence_idx, @@ -266,8 +311,18 @@ def prepare_images_and_upload( "file_type": types.FileType.IMAGE.value, } + try: + _validate_metadatas(sequence) + except Exception as ex: + yield sequence_uuid, UploadResult(error=ex) + continue + with tempfile.NamedTemporaryFile() as fp: - upload_md5sum = cls.zip_sequence_fp(sequence, fp) + try: + upload_md5sum = cls.zip_sequence_fp(sequence, fp) + except Exception as ex: + yield sequence_uuid, UploadResult(error=ex) + continue sequence_progress["md5sum"] = upload_md5sum @@ -275,19 +330,20 @@ def prepare_images_and_upload( upload_md5sum, upload_api_v4.ClusterFileType.ZIP ) - cluster_id = uploader.upload_stream( - fp, - upload_api_v4.ClusterFileType.ZIP, - session_key, - # Send the copy of the input progress to each upload session, to avoid modifying the original one - progress=T.cast( - T.Dict[str, T.Any], {**progress, **sequence_progress} - ), - ) + try: + cluster_id = uploader.upload_stream( + fp, + upload_api_v4.ClusterFileType.ZIP, + session_key, + progress=T.cast( + T.Dict[str, T.Any], {**progress, **sequence_progress} + ), + ) + except Exception as ex: + yield sequence_uuid, UploadResult(error=ex) + continue - if cluster_id is not None: - ret[sequence_uuid] = cluster_id - return ret + yield sequence_uuid, UploadResult(result=cluster_id) class Uploader: @@ -295,12 +351,15 @@ def __init__( self, user_items: types.UserItem, emitter: EventEmitter | None = None, - chunk_size: int = 2 * 1024 * 1024, # 2MB + chunk_size: int = int(constants.UPLOAD_CHUNK_SIZE_MB * 1024 * 1024), dry_run=False, ): - jsonschema.validate(instance=user_items, schema=types.UserItemSchema) self.user_items = user_items - self.emitter = emitter + if emitter is None: + # An empty event emitter that does nothing + self.emitter = EventEmitter() + else: + self.emitter = emitter self.chunk_size = chunk_size self.dry_run = dry_run @@ -310,7 +369,7 @@ def upload_stream( cluster_filetype: upload_api_v4.ClusterFileType, session_key: str, progress: dict[str, T.Any] | None = None, - ) -> str | None: + ) -> str: if progress is None: progress = {} @@ -324,12 +383,7 @@ def upload_stream( progress["retries"] = 0 progress["begin_offset"] = None - if self.emitter: - try: - self.emitter.emit("upload_start", progress) - except UploadCancelled: - # TODO: Right now it is thrown in upload_start only - return None + self.emitter.emit("upload_start", progress) while True: try: @@ -343,15 +397,13 @@ def upload_stream( progress["retries"] += 1 - if self.emitter: - self.emitter.emit("upload_end", progress) + self.emitter.emit("upload_end", progress) # TODO: retry here cluster_id = self._finish_upload_retryable(upload_service, file_handle) progress["cluster_id"] = cluster_id - if self.emitter: - self.emitter.emit("upload_finished", progress) + self.emitter.emit("upload_finished", progress) return cluster_id @@ -383,8 +435,7 @@ def _handle_upload_exception( chunk_size = progress["chunk_size"] if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): - if self.emitter: - self.emitter.emit("upload_interrupted", progress) + self.emitter.emit("upload_interrupted", progress) LOG.warning( # use %s instead of %d because offset could be None "Error uploading chunk_size %d at begin_offset %s: %s: %s", @@ -425,8 +476,7 @@ def _chunk_with_progress_emitted( # Whenever a chunk is uploaded, reset retries progress["retries"] = 0 - if self.emitter: - self.emitter.emit("upload_progress", progress) + self.emitter.emit("upload_progress", progress) def _upload_stream_retryable( self, @@ -441,8 +491,7 @@ def _upload_stream_retryable( progress["begin_offset"] = begin_offset progress["offset"] = begin_offset - if self.emitter: - self.emitter.emit("upload_fetch_offset", progress) + self.emitter.emit("upload_fetch_offset", progress) fp.seek(begin_offset, io.SEEK_SET) @@ -534,13 +583,14 @@ def _is_retriable_exception(ex: Exception): return False +_SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = { + upload_api_v4.ClusterFileType.ZIP: ".zip", + upload_api_v4.ClusterFileType.CAMM: ".mp4", + upload_api_v4.ClusterFileType.BLACKVUE: ".mp4", +} + + def _session_key( upload_md5sum: str, cluster_filetype: upload_api_v4.ClusterFileType ) -> str: - SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = { - upload_api_v4.ClusterFileType.ZIP: ".zip", - upload_api_v4.ClusterFileType.CAMM: ".mp4", - upload_api_v4.ClusterFileType.BLACKVUE: ".mp4", - } - - return f"mly_tools_{upload_md5sum}{SUFFIX_MAP[cluster_filetype]}" + return f"mly_tools_{upload_md5sum}{_SUFFIX_MAP[cluster_filetype]}" diff --git a/tests/unit/test_exceptions.py b/tests/unit/test_exceptions.py index e4d4cb064..4b2e5220d 100644 --- a/tests/unit/test_exceptions.py +++ b/tests/unit/test_exceptions.py @@ -20,8 +20,6 @@ def test_all(): e = exc("hello", "world", "hey", "aa") elif exc is exceptions.MapillaryDuplicationError: e = exc("hello", {}, 1, float("inf")) - elif exc is exceptions.MapillaryUploadedAlreadyError: - e = exc("world", {}) else: e = exc("hello") # should not raise diff --git a/tests/unit/test_upload_api_v4.py b/tests/unit/test_upload_api_v4.py index 3857f522f..d8d8e0ac3 100644 --- a/tests/unit/test_upload_api_v4.py +++ b/tests/unit/test_upload_api_v4.py @@ -11,6 +11,7 @@ def test_upload(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", + cluster_filetype=upload_api_v4.ClusterFileType.ZIP, ) upload_service._error_ratio = 0 content = b"double_foobar" @@ -27,6 +28,7 @@ def test_upload_big_chunksize(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", + cluster_filetype=upload_api_v4.ClusterFileType.ZIP, ) upload_service._error_ratio = 0 content = b"double_foobar" @@ -43,6 +45,7 @@ def test_upload_chunks(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR2.txt", + cluster_filetype=upload_api_v4.ClusterFileType.ZIP, ) upload_service._error_ratio = 0 diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index fca750fdc..ae860d902 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -64,11 +64,13 @@ def test_upload_images(setup_unittest_data: py.path.local, setup_upload: py.path "filetype": "image", }, ] - resp = uploader.ZipImageSequence.prepare_images_and_upload( - [types.from_desc(T.cast(T.Any, desc)) for desc in descs], - mly_uploader, + results = list( + uploader.ZipImageSequence.prepare_images_and_upload( + [types.from_desc(T.cast(T.Any, desc)) for desc in descs], + mly_uploader, + ) ) - assert len(resp) == 1 + assert len(results) == 1 assert len(setup_upload.listdir()) == 1 actual_descs = _validate_zip_dir(setup_upload) assert 1 == len(actual_descs), "should return 1 desc because of the unique filename" @@ -116,11 +118,13 @@ def test_upload_images_multiple_sequences( }, dry_run=True, ) - resp = uploader.ZipImageSequence.prepare_images_and_upload( - [types.from_desc(T.cast(T.Any, desc)) for desc in descs], - mly_uploader, + results = list( + uploader.ZipImageSequence.prepare_images_and_upload( + [types.from_desc(T.cast(T.Any, desc)) for desc in descs], + mly_uploader, + ) ) - assert len(resp) == 2 + assert len(results) == 2 assert len(setup_upload.listdir()) == 2 actual_descs = _validate_zip_dir(setup_upload) assert 2 == len(actual_descs)