diff --git a/mapillary_tools/api_v4.py b/mapillary_tools/api_v4.py index 4511f2088..317aef850 100644 --- a/mapillary_tools/api_v4.py +++ b/mapillary_tools/api_v4.py @@ -25,6 +25,7 @@ class ClusterFileType(enum.Enum): ZIP = "zip" BLACKVUE = "mly_blackvue_video" CAMM = "mly_camm_video" + MLY_BUNDLE_MANIFEST = "mly_bundle_manifest" class HTTPSystemCertsAdapter(HTTPAdapter): diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index c37e61003..fc1fc4007 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -89,3 +89,5 @@ def _yes_or_no(val: str) -> bool: "upload_history", ), ) + +MAX_IMAGE_UPLOAD_WORKERS = int(os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 64)) diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index 747b421b7..b4fe31615 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -6,10 +6,16 @@ import hashlib import json import os +import sys import typing as T import uuid from pathlib import Path -from typing import Literal, TypedDict +from typing import TypedDict + +if sys.version_info >= (3, 11): + from typing import Required +else: + from typing_extensions import Required import jsonschema @@ -144,7 +150,7 @@ class UserItem(TypedDict, total=False): # Not in use. Keep here for back-compatibility MAPSettingsUsername: str MAPSettingsUserKey: str - user_upload_token: str + user_upload_token: Required[str] class _CompassHeading(TypedDict, total=True): @@ -152,65 +158,49 @@ class _CompassHeading(TypedDict, total=True): MagneticHeading: float -class _ImageRequired(TypedDict, total=True): - MAPLatitude: float - MAPLongitude: float - MAPCaptureTime: str +class _SharedDescription(TypedDict, total=False): + filename: Required[str] + filetype: Required[str] + + # if None or absent, it will be calculated + md5sum: str | None + filesize: int | None -class _Image(_ImageRequired, total=False): +class ImageDescription(_SharedDescription, total=False): + MAPLatitude: Required[float] + MAPLongitude: Required[float] MAPAltitude: float + MAPCaptureTime: Required[str] MAPCompassHeading: _CompassHeading - -class _SequenceOnly(TypedDict, total=False): - MAPSequenceUUID: str - - -class MetaProperties(TypedDict, total=False): MAPDeviceMake: str MAPDeviceModel: str MAPGPSAccuracyMeters: float MAPCameraUUID: str MAPOrientation: int - -class ImageDescription(_SequenceOnly, _Image, MetaProperties, total=True): - # filename is required - filename: str - # if None or absent, it will be calculated - md5sum: str | None - filetype: Literal["image"] - filesize: int | None - - -class _VideoDescriptionRequired(TypedDict, total=True): - filename: str - md5sum: str | None - filetype: str - MAPGPSTrack: list[T.Sequence[float | int | None]] + # For grouping images in a sequence + MAPSequenceUUID: str -class VideoDescription(_VideoDescriptionRequired, total=False): +class VideoDescription(_SharedDescription, total=False): + MAPGPSTrack: Required[list[T.Sequence[float | int | None]]] MAPDeviceMake: str MAPDeviceModel: str - filesize: int | None class _ErrorDescription(TypedDict, total=False): # type and message are required - type: str + type: Required[str] message: str # vars is optional vars: dict -class _ImageDescriptionErrorRequired(TypedDict, total=True): - filename: str - error: _ErrorDescription - - -class ImageDescriptionError(_ImageDescriptionErrorRequired, total=False): +class ImageDescriptionError(TypedDict, total=False): + filename: Required[str] + error: Required[_ErrorDescription] filetype: str diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 3b1f8bef0..6387d77db 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -22,7 +22,6 @@ ipc, telemetry, types, - upload_api_v4, uploader, utils, VERSION, @@ -192,8 +191,9 @@ def write_history(payload: uploader.Progress): def _setup_tdqm(emitter: uploader.EventEmitter) -> None: upload_pbar: tqdm | None = None + @emitter.on("upload_start") @emitter.on("upload_fetch_offset") - def upload_fetch_offset(payload: uploader.Progress) -> None: + def upload_start(payload: uploader.Progress) -> None: nonlocal upload_pbar if upload_pbar is not None: @@ -204,18 +204,18 @@ def upload_fetch_offset(payload: uploader.Progress) -> None: import_path: str | None = payload.get("import_path") filetype = payload.get("file_type", "unknown").upper() if import_path is None: - _desc = f"Uploading {filetype} ({nth}/{total})" + desc = f"Uploading {filetype} ({nth}/{total})" else: - _desc = ( + desc = ( f"Uploading {filetype} {os.path.basename(import_path)} ({nth}/{total})" ) upload_pbar = tqdm( total=payload["entity_size"], - desc=_desc, + desc=desc, unit="B", unit_scale=True, unit_divisor=1024, - initial=payload["offset"], + initial=payload.get("offset", 0), disable=LOG.getEffectiveLevel() <= logging.DEBUG, ) @@ -295,8 +295,13 @@ def _setup_api_stats(emitter: uploader.EventEmitter): @emitter.on("upload_start") def collect_start_time(payload: _APIStats) -> None: - payload["upload_start_time"] = time.time() + now = time.time() + payload["upload_start_time"] = now payload["upload_total_time"] = 0 + # These filed should be initialized in upload events like "upload_fetch_offset" + # but since we disabled them for uploading images, so we initialize them here + payload["upload_last_restart_time"] = now + payload["upload_first_offset"] = 0 @emitter.on("upload_fetch_offset") def collect_restart_time(payload: _APIStats) -> None: @@ -466,7 +471,7 @@ def _gen_upload_everything( (m for m in metadatas if isinstance(m, types.ImageMetadata)), utils.find_images(import_paths, skip_subfolders=skip_subfolders), ) - for image_result in uploader.ZipImageSequence.zip_images_and_upload( + for image_result in uploader.ZipImageSequence.upload_images( mly_uploader, image_metadatas, ): @@ -510,13 +515,8 @@ def _gen_upload_videos( "file_type": video_metadata.filetype.value, "import_path": str(video_metadata.filename), "sequence_md5sum": video_metadata.md5sum, - "upload_md5sum": video_metadata.md5sum, } - session_key = uploader._session_key( - video_metadata.md5sum, upload_api_v4.ClusterFileType.CAMM - ) - try: with video_metadata.filename.open("rb") as src_fp: # Build the mp4 stream with the CAMM samples @@ -525,12 +525,15 @@ def _gen_upload_videos( ) # Upload the mp4 stream - cluster_id = mly_uploader.upload_stream( + file_handle = mly_uploader.upload_stream( T.cast(T.IO[bytes], camm_fp), - upload_api_v4.ClusterFileType.CAMM, - session_key, progress=T.cast(T.Dict[str, T.Any], progress), ) + cluster_id = mly_uploader.finish_upload( + file_handle, + api_v4.ClusterFileType.CAMM, + progress=T.cast(T.Dict[str, T.Any], progress), + ) except Exception as ex: yield video_metadata, uploader.UploadResult(error=ex) else: @@ -706,7 +709,9 @@ def upload( finally: # We collected stats after every upload is finished - assert upload_successes == len(stats) + assert upload_successes == len(stats), ( + f"Expect {upload_successes} success but got {stats}" + ) _show_upload_summary(stats, upload_errors) diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 3c3b5bb20..57afbc7d3 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -6,6 +6,7 @@ import sys import typing as T import uuid +from pathlib import Path if sys.version_info >= (3, 12): from typing import override @@ -14,7 +15,7 @@ import requests -from .api_v4 import ClusterFileType, request_get, request_post, REQUESTS_TIMEOUT +from .api_v4 import request_get, request_post, REQUESTS_TIMEOUT MAPILLARY_UPLOAD_ENDPOINT = os.getenv( "MAPILLARY_UPLOAD_ENDPOINT", "https://rupload.facebook.com/mapillary_public_uploads" @@ -31,24 +32,14 @@ class UploadService: user_access_token: str session_key: str - cluster_filetype: ClusterFileType - - MIME_BY_CLUSTER_TYPE: dict[ClusterFileType, str] = { - ClusterFileType.ZIP: "application/zip", - ClusterFileType.BLACKVUE: "video/mp4", - ClusterFileType.CAMM: "video/mp4", - } def __init__( self, user_access_token: str, session_key: str, - cluster_filetype: ClusterFileType, ): self.user_access_token = user_access_token self.session_key = session_key - # Validate the input - self.cluster_filetype = cluster_filetype def fetch_offset(self) -> int: headers = { @@ -124,7 +115,6 @@ def upload_shifted_chunks( "Authorization": f"OAuth {self.user_access_token}", "Offset": f"{offset}", "X-Entity-Name": self.session_key, - "X-Entity-Type": self.MIME_BY_CLUSTER_TYPE[self.cluster_filetype], } url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}" resp = request_post( @@ -149,8 +139,8 @@ def upload_shifted_chunks( class FakeUploadService(UploadService): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._upload_path = os.getenv( - "MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads" + self._upload_path = Path( + os.getenv("MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads") ) self._error_ratio = 0.02 @@ -167,8 +157,8 @@ def upload_shifted_chunks( ) os.makedirs(self._upload_path, exist_ok=True) - filename = os.path.join(self._upload_path, self.session_key) - with open(filename, "ab") as fp: + filename = self._upload_path.joinpath(self.session_key) + with filename.open("ab") as fp: for chunk in shifted_chunks: if random.random() <= self._error_ratio: raise requests.ConnectionError( @@ -179,7 +169,15 @@ def upload_shifted_chunks( raise requests.ConnectionError( f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}" ) - return uuid.uuid4().hex + + file_handle_dir = self._upload_path.joinpath("file_handles") + file_handle_path = file_handle_dir.joinpath(self.session_key) + if not file_handle_path.exists(): + os.makedirs(file_handle_dir, exist_ok=True) + random_file_handle = uuid.uuid4().hex + file_handle_path.write_text(random_file_handle) + + return file_handle_path.read_text() @override def fetch_offset(self) -> int: diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 13a631b0a..4fba04f20 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -1,5 +1,7 @@ from __future__ import annotations +import concurrent.futures + import dataclasses import io import json @@ -7,6 +9,7 @@ import os import struct import tempfile +import threading import time import typing as T import uuid @@ -125,6 +128,7 @@ def __init__(self): def on(self, event: EventName): def _wrap(callback): self.events.setdefault(event, []).append(callback) + return callback return _wrap @@ -174,7 +178,7 @@ def _wip_file_context(cls, wip_path: Path): upload_md5sum = utils.md5sum_fp(fp).hexdigest() done_path = wip_path.parent.joinpath( - _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP) + _session_key(upload_md5sum, api_v4.ClusterFileType.ZIP) ) try: @@ -208,8 +212,10 @@ def zip_sequence_fp( with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf: for idx, metadata in enumerate(sequence): - # Arcname does not matter, but it should be unique - cls._write_imagebytes_in_zip(zipf, metadata, arcname=f"{idx}.jpg") + # Arcname should be unique, the name does not matter + arcname = f"{idx}.jpg" + zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) + zipf.writestr(zipinfo, cls._dump_image_bytes(metadata)) assert len(sequence) == len(set(zipf.namelist())) zipf.comment = json.dumps({"sequence_md5sum": sequence_md5sum}).encode( "utf-8" @@ -241,9 +247,7 @@ def extract_sequence_md5sum(cls, zip_fp: T.IO[bytes]) -> str: return sequence_md5sum @classmethod - def _write_imagebytes_in_zip( - cls, zipf: zipfile.ZipFile, metadata: types.ImageMetadata, arcname: str - ): + def _dump_image_bytes(cls, metadata: types.ImageMetadata) -> bytes: try: edit = exif_write.ExifEdit(metadata.filename) except struct.error as ex: @@ -255,15 +259,12 @@ def _write_imagebytes_in_zip( ) try: - image_bytes = edit.dump_image_bytes() + return edit.dump_image_bytes() except struct.error as ex: raise ExifError( f"Failed to dump EXIF bytes: {ex}", metadata.filename ) from ex - zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) - zipf.writestr(zipinfo, image_bytes) - @classmethod def upload_zipfile( cls, @@ -282,28 +283,23 @@ def upload_zipfile( with zip_path.open("rb") as zip_fp: sequence_md5sum = cls.extract_sequence_md5sum(zip_fp) - with zip_path.open("rb") as zip_fp: - upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest() - sequence_progress: SequenceProgress = { "sequence_image_count": len(namelist), "file_type": types.FileType.ZIP.value, "sequence_md5sum": sequence_md5sum, - "upload_md5sum": upload_md5sum, } - upload_session_key = _session_key( - upload_md5sum, upload_api_v4.ClusterFileType.ZIP - ) + # Send the copy of the input progress to each upload session, to avoid modifying the original one + mutable_progress: dict[str, T.Any] = {**progress, **sequence_progress} with zip_path.open("rb") as zip_fp: - return uploader.upload_stream( - zip_fp, - upload_api_v4.ClusterFileType.ZIP, - upload_session_key, - # Send the copy of the input progress to each upload session, to avoid modifying the original one - progress=T.cast(T.Dict[str, T.Any], {**progress, **sequence_progress}), - ) + file_handle = uploader.upload_stream(zip_fp, progress=mutable_progress) + + cluster_id = uploader.finish_upload( + file_handle, api_v4.ClusterFileType.ZIP, progress=mutable_progress + ) + + return cluster_id @classmethod def zip_images_and_upload( @@ -323,7 +319,7 @@ def zip_images_and_upload( "total_sequence_count": len(sequences), "sequence_image_count": len(sequence), "sequence_uuid": sequence_uuid, - "file_type": types.FileType.IMAGE.value, + "file_type": types.FileType.ZIP.value, } try: @@ -341,21 +337,14 @@ def zip_images_and_upload( sequence_progress["sequence_md5sum"] = sequence_md5sum - fp.seek(0, io.SEEK_SET) - upload_md5sum = utils.md5sum_fp(fp).hexdigest() - - upload_session_key = _session_key( - upload_md5sum, upload_api_v4.ClusterFileType.ZIP - ) + mutable_progress: dict[str, T.Any] = {**progress, **sequence_progress} try: - cluster_id = uploader.upload_stream( - fp, - upload_api_v4.ClusterFileType.ZIP, - upload_session_key, - progress=T.cast( - T.Dict[str, T.Any], {**progress, **sequence_progress} - ), + file_handle = uploader.upload_stream(fp, progress=mutable_progress) + cluster_id = uploader.finish_upload( + file_handle, + api_v4.ClusterFileType.ZIP, + progress=mutable_progress, ) except Exception as ex: yield sequence_uuid, UploadResult(error=ex) @@ -363,6 +352,113 @@ def zip_images_and_upload( yield sequence_uuid, UploadResult(result=cluster_id) + @classmethod + def _upload_sequence( + cls, + uploader: Uploader, + sequence: T.Sequence[types.ImageMetadata], + progress: dict[str, T.Any] | None = None, + ) -> str: + if progress is None: + progress = {} + + lock = threading.Lock() + + def _upload_image(image_metadata: types.ImageMetadata) -> str: + mutable_progress = { + **(progress or {}), + "filename": str(image_metadata.filename), + } + + bytes = cls._dump_image_bytes(image_metadata) + file_handle = uploader.upload_stream( + io.BytesIO(bytes), progress=mutable_progress + ) + + mutable_progress["chunk_size"] = image_metadata.filesize + + with lock: + uploader.emitter.emit("upload_progress", mutable_progress) + + return file_handle + + _validate_metadatas(sequence) + + progress["entity_size"] = sum(m.filesize or 0 for m in sequence) + + # TODO: assert sequence is sorted + + # FIXME: This is a hack to disable the event emitter inside the uploader + uploader.emittion_disabled = True + + uploader.emitter.emit("upload_start", progress) + + with concurrent.futures.ThreadPoolExecutor( + max_workers=constants.MAX_IMAGE_UPLOAD_WORKERS + ) as executor: + image_file_handles = list(executor.map(_upload_image, sequence)) + + manifest = { + "version": "1", + "upload_type": "images", + "image_handles": image_file_handles, + } + + with io.BytesIO() as manifest_fp: + manifest_fp.write(json.dumps(manifest).encode("utf-8")) + manifest_fp.seek(0, io.SEEK_SET) + manifest_file_handle = uploader.upload_stream( + manifest_fp, session_key=f"{uuid.uuid4().hex}.json" + ) + + uploader.emitter.emit("upload_end", progress) + + # FIXME: This is a hack to disable the event emitter inside the uploader + uploader.emittion_disabled = False + + cluster_id = uploader.finish_upload( + manifest_file_handle, + api_v4.ClusterFileType.MLY_BUNDLE_MANIFEST, + progress=progress, + ) + + return cluster_id + + @classmethod + def upload_images( + cls, + uploader: Uploader, + image_metadatas: T.Sequence[types.ImageMetadata], + progress: dict[str, T.Any] | None = None, + ) -> T.Generator[tuple[str, UploadResult], None, None]: + if progress is None: + progress = {} + + sequences = types.group_and_sort_images(image_metadatas) + + for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()): + sequence_md5sum = types.update_sequence_md5sum(sequence) + + sequence_progress: SequenceProgress = { + "sequence_idx": sequence_idx, + "total_sequence_count": len(sequences), + "sequence_image_count": len(sequence), + "sequence_uuid": sequence_uuid, + "file_type": types.FileType.IMAGE.value, + "sequence_md5sum": sequence_md5sum, + } + + mutable_progress: dict[str, T.Any] = {**progress, **sequence_progress} + + try: + cluster_id = cls._upload_sequence( + uploader, sequence, progress=mutable_progress + ) + except Exception as ex: + yield sequence_uuid, UploadResult(error=ex) + else: + yield sequence_uuid, UploadResult(result=cluster_id) + class Uploader: def __init__( @@ -373,6 +469,7 @@ def __init__( dry_run=False, ): self.user_items = user_items + self.emittion_disabled = False if emitter is None: # An empty event emitter that does nothing self.emitter = EventEmitter() @@ -384,24 +481,32 @@ def __init__( def upload_stream( self, fp: T.IO[bytes], - cluster_filetype: upload_api_v4.ClusterFileType, - session_key: str, + session_key: str | None = None, progress: dict[str, T.Any] | None = None, ) -> str: if progress is None: progress = {} + if session_key is None: + fp.seek(0, io.SEEK_SET) + md5sum = utils.md5sum_fp(fp).hexdigest() + filetype = progress.get("file_type") + if filetype is not None: + session_key = _session_key(md5sum, types.FileType(filetype)) + else: + session_key = md5sum + fp.seek(0, io.SEEK_END) entity_size = fp.tell() - upload_service = self._create_upload_service(session_key, cluster_filetype) + upload_service = self._create_upload_service(session_key) progress["entity_size"] = entity_size progress["chunk_size"] = self.chunk_size progress["retries"] = 0 progress["begin_offset"] = None - self.emitter.emit("upload_start", progress) + self._maybe_emit("upload_start", progress) while True: try: @@ -415,32 +520,22 @@ def upload_stream( progress["retries"] += 1 - self.emitter.emit("upload_end", progress) + self._maybe_emit("upload_end", progress) - # TODO: retry here - cluster_id = self._finish_upload_retryable(upload_service, file_handle) - progress["cluster_id"] = cluster_id - - self.emitter.emit("upload_finished", progress) + return file_handle - return cluster_id - - def _create_upload_service( - self, session_key: str, cluster_filetype: upload_api_v4.ClusterFileType - ) -> upload_api_v4.UploadService: + def _create_upload_service(self, session_key: str) -> upload_api_v4.UploadService: upload_service: upload_api_v4.UploadService if self.dry_run: upload_service = upload_api_v4.FakeUploadService( user_access_token=self.user_items["user_upload_token"], session_key=session_key, - cluster_filetype=cluster_filetype, ) else: upload_service = upload_api_v4.UploadService( user_access_token=self.user_items["user_upload_token"], session_key=session_key, - cluster_filetype=cluster_filetype, ) return upload_service @@ -453,7 +548,7 @@ def _handle_upload_exception( chunk_size = progress["chunk_size"] if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): - self.emitter.emit("upload_interrupted", progress) + self._maybe_emit("upload_interrupted", progress) LOG.warning( # use %s instead of %d because offset could be None "Error uploading chunk_size %d at begin_offset %s: %s: %s", @@ -494,7 +589,7 @@ def _chunk_with_progress_emitted( # Whenever a chunk is uploaded, reset retries progress["retries"] = 0 - self.emitter.emit("upload_progress", progress) + self._maybe_emit("upload_progress", progress) def _upload_stream_retryable( self, @@ -509,7 +604,7 @@ def _upload_stream_retryable( progress["begin_offset"] = begin_offset progress["offset"] = begin_offset - self.emitter.emit("upload_fetch_offset", progress) + self._maybe_emit("upload_fetch_offset", progress) fp.seek(begin_offset, io.SEEK_SET) @@ -517,10 +612,15 @@ def _upload_stream_retryable( return upload_service.upload_shifted_chunks(shifted_chunks, begin_offset) - def _finish_upload_retryable( - self, upload_service: upload_api_v4.UploadService, file_handle: str + def finish_upload( + self, + file_handle: str, + cluster_filetype: api_v4.ClusterFileType, + progress: dict[str, T.Any] | None = None, ) -> str: """Finish upload with safe retries guraranteed""" + if progress is None: + progress = {} if self.dry_run: cluster_id = "0" @@ -528,7 +628,7 @@ def _finish_upload_retryable( resp = api_v4.finish_upload( self.user_items["user_upload_token"], file_handle, - upload_service.cluster_filetype, + cluster_filetype, organization_id=self.user_items.get("MAPOrganizationKey"), ) @@ -537,8 +637,17 @@ def _finish_upload_retryable( # TODO: validate cluster_id + progress["cluster_id"] = cluster_id + self._maybe_emit("upload_finished", progress) + return cluster_id + def _maybe_emit( + self, event: EventName, progress: dict[str, T.Any] | UploaderProgress + ): + if not self.emittion_disabled: + return self.emitter.emit(event, progress) + def _validate_metadatas(metadatas: T.Sequence[types.ImageMetadata]): for metadata in metadatas: @@ -580,14 +689,19 @@ def _is_retriable_exception(ex: Exception): return False -_SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = { - upload_api_v4.ClusterFileType.ZIP: ".zip", - upload_api_v4.ClusterFileType.CAMM: ".mp4", - upload_api_v4.ClusterFileType.BLACKVUE: ".mp4", -} - - def _session_key( - upload_md5sum: str, cluster_filetype: upload_api_v4.ClusterFileType + upload_md5sum: str, filetype: api_v4.ClusterFileType | types.FileType ) -> str: - return f"mly_tools_{upload_md5sum}{_SUFFIX_MAP[cluster_filetype]}" + _SUFFIX_MAP: dict[api_v4.ClusterFileType | types.FileType, str] = { + api_v4.ClusterFileType.ZIP: ".zip", + api_v4.ClusterFileType.CAMM: ".mp4", + api_v4.ClusterFileType.BLACKVUE: ".mp4", + types.FileType.IMAGE: ".jpg", + types.FileType.ZIP: ".zip", + types.FileType.BLACKVUE: ".mp4", + types.FileType.CAMM: ".mp4", + types.FileType.GOPRO: ".mp4", + types.FileType.VIDEO: ".mp4", + } + + return f"mly_tools_{upload_md5sum}{_SUFFIX_MAP[filetype]}" diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 71ccb8117..807ab32ce 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -1,9 +1,10 @@ +from __future__ import annotations + import json import os import shutil import subprocess import tempfile -import typing as T import zipfile from pathlib import Path @@ -159,52 +160,59 @@ def run_exiftool_and_generate_geotag_args( with open("schema/image_description_schema.json") as fp: - image_description_schema = json.load(fp) + IMAGE_DESCRIPTION_SCHEMA = json.load(fp) -def validate_and_extract_image(image_path: str): - with open(image_path, "rb") as fp: +def validate_and_extract_image(image_path: Path): + with image_path.open("rb") as fp: tags = exifread.process_file(fp) desc_tag = tags.get("Image ImageDescription") assert desc_tag is not None, (tags, image_path) desc = json.loads(str(desc_tag.values)) - desc["filename"] = image_path + desc["filename"] = str(image_path) desc["filetype"] = "image" - jsonschema.validate(desc, image_description_schema) + jsonschema.validate(desc, IMAGE_DESCRIPTION_SCHEMA) return desc -def validate_and_extract_zip(zip_path: Path) -> T.List[T.Dict]: +def validate_and_extract_zip(zip_path: Path) -> list[dict]: + with zip_path.open("rb") as fp: + upload_md5sum = utils.md5sum_fp(fp).hexdigest() + + assert f"mly_tools_{upload_md5sum}.zip" == zip_path.name, ( + zip_path.name, + upload_md5sum, + ) + descs = [] with zipfile.ZipFile(zip_path) as zipf: - _sequence_md5sum = json.loads(zipf.comment)["sequence_md5sum"] with tempfile.TemporaryDirectory() as tempdir: zipf.extractall(path=tempdir) for name in os.listdir(tempdir): filename = os.path.join(tempdir, name) - desc = validate_and_extract_image(filename) + desc = validate_and_extract_image(Path(filename)) descs.append(desc) - with zip_path.open("rb") as fp: + return descs + + +def validate_and_extract_camm(video_path: Path) -> list[dict]: + with video_path.open("rb") as fp: upload_md5sum = utils.md5sum_fp(fp).hexdigest() - assert f"mly_tools_{upload_md5sum}.zip" == zip_path.name, ( - zip_path.name, + assert f"mly_tools_{upload_md5sum}.mp4" == video_path.name, ( + video_path.name, upload_md5sum, ) - return descs - - -def validate_and_extract_camm(filename: str) -> T.List[T.Dict]: if not IS_FFMPEG_INSTALLED: return [] with tempfile.TemporaryDirectory() as tempdir: x = subprocess.run( - f"{EXECUTABLE} --verbose video_process --video_sample_interval=2 --video_sample_distance=-1 --geotag_source=camm {filename} {tempdir}", + f"{EXECUTABLE} --verbose video_process --video_sample_interval=2 --video_sample_distance=-1 --geotag_source=camm {str(video_path)} {tempdir}", shell=True, ) assert x.returncode == 0, x.stderr @@ -222,72 +230,135 @@ def validate_and_extract_camm(filename: str) -> T.List[T.Dict]: return json.load(fp) -def verify_descs(expected: T.List[T.Dict], actual: T.Union[Path, T.List[T.Dict]]): - if isinstance(actual, Path): - with actual.open("r") as fp: - actual = json.load(fp) - assert isinstance(actual, list), f"expect a list of descs but got: {actual}" - - expected_map = {desc["filename"]: desc for desc in expected} - assert len(expected) == len(expected_map), expected - - actual_map = {desc["filename"]: desc for desc in actual} - assert len(actual) == len(actual_map), actual - - for filename, expected_desc in expected_map.items(): - actual_desc = actual_map.get(filename) - assert actual_desc is not None, expected_desc - if "error" in expected_desc: - assert expected_desc["error"]["type"] == actual_desc.get("error", {}).get( - "type" - ), f"{expected_desc=} != {actual_desc=}" - if "message" in expected_desc["error"]: - assert ( - expected_desc["error"]["message"] == actual_desc["error"]["message"] - ) - if "filetype" in expected_desc: - assert expected_desc["filetype"] == actual_desc.get("filetype"), actual_desc - - if "MAPCompassHeading" in expected_desc: - e = expected_desc["MAPCompassHeading"] - assert "MAPCompassHeading" in actual_desc, actual_desc - a = actual_desc["MAPCompassHeading"] - assert abs(e["TrueHeading"] - a["TrueHeading"]) < 0.001, ( - f"got {a['TrueHeading']} but expect {e['TrueHeading']} in {filename}" - ) - assert abs(e["MagneticHeading"] - a["MagneticHeading"]) < 0.001, ( - f"got {a['MagneticHeading']} but expect {e['MagneticHeading']} in {filename}" - ) - - if "MAPCaptureTime" in expected_desc: - assert expected_desc["MAPCaptureTime"] == actual_desc["MAPCaptureTime"], ( - f"expect {expected_desc['MAPCaptureTime']} but got {actual_desc['MAPCaptureTime']} in {filename}" - ) - - if "MAPLongitude" in expected_desc: - assert ( - abs(expected_desc["MAPLongitude"] - actual_desc["MAPLongitude"]) - < 0.00001 - ), ( - f"expect {expected_desc['MAPLongitude']} but got {actual_desc['MAPLongitude']} in {filename}" - ) - - if "MAPLatitude" in expected_desc: - assert ( - abs(expected_desc["MAPLatitude"] - actual_desc["MAPLatitude"]) < 0.00001 - ), ( - f"expect {expected_desc['MAPLatitude']} but got {actual_desc['MAPLatitude']} in {filename}" - ) - - if "MAPAltitude" in expected_desc: - assert ( - abs(expected_desc["MAPAltitude"] - actual_desc["MAPAltitude"]) < 0.001 - ), ( - f"expect {expected_desc['MAPAltitude']} but got {actual_desc['MAPAltitude']} in {filename}" - ) - - if "MAPDeviceMake" in expected_desc: - assert expected_desc["MAPDeviceMake"] == actual_desc["MAPDeviceMake"] - - if "MAPDeviceModel" in expected_desc: - assert expected_desc["MAPDeviceModel"] == actual_desc["MAPDeviceModel"] +def load_descs(descs) -> list: + if isinstance(descs, Path): + with descs.open("r") as fp: + descs = json.load(fp) + assert isinstance(descs, list), f"expect a list of descs but got: {descs}" + return descs + + +def extract_all_uploaded_descs(upload_folder: Path) -> list[list[dict]]: + FILE_HANDLE_DIRNAME = "file_handles" + + session_by_file_handle: dict[str, str] = {} + if upload_folder.joinpath(FILE_HANDLE_DIRNAME).exists(): + for session_path in upload_folder.joinpath(FILE_HANDLE_DIRNAME).iterdir(): + file_handle = session_path.read_text() + session_by_file_handle[file_handle] = session_path.name + + sequences = [] + + for file in upload_folder.iterdir(): + if file.suffix == ".json": + with file.open() as fp: + manifest = json.load(fp) + image_file_handles = manifest["image_handles"] + assert len(image_file_handles) > 0, manifest + image_sequence = [] + for file_handle in image_file_handles: + image_path = upload_folder.joinpath(session_by_file_handle[file_handle]) + with image_path.open("rb") as fp: + upload_md5sum = utils.md5sum_fp(fp).hexdigest() + assert upload_md5sum in image_path.stem, (upload_md5sum, image_path) + image_sequence.append(validate_and_extract_image(image_path)) + sequences.append(image_sequence) + elif file.suffix == ".zip": + sequences.append(validate_and_extract_zip(file)) + elif file.suffix == ".mp4": + sequences.append(validate_and_extract_camm(file)) + elif file.name == FILE_HANDLE_DIRNAME: + # Already processed above + pass + + return sequences + + +def approximate(left: float, right: float, threshold=0.00001): + return abs(left - right) < threshold + + +def assert_compare_image_descs(expected: dict, actual: dict): + jsonschema.validate(expected, IMAGE_DESCRIPTION_SCHEMA) + jsonschema.validate(actual, IMAGE_DESCRIPTION_SCHEMA) + + assert expected.get("MAPFilename"), expected + assert actual.get("MAPFilename"), actual + assert expected.get("MAPFilename") == actual.get("MAPFilename") + + filename = actual.get("MAPFilename") + + if "error" in expected: + assert expected["error"]["type"] == actual.get("error", {}).get("type"), ( + f"{expected=} != {actual=}" + ) + if "message" in expected["error"]: + assert expected["error"]["message"] == actual["error"]["message"] + + if "filetype" in expected: + assert expected["filetype"] == actual.get("filetype"), actual + + if "MAPCompassHeading" in expected: + e = expected["MAPCompassHeading"] + assert "MAPCompassHeading" in actual, actual + a = actual["MAPCompassHeading"] + assert approximate(e["TrueHeading"], a["TrueHeading"], 0.001), ( + f"got {a['TrueHeading']} but expect {e['TrueHeading']} in {filename}" + ) + assert approximate(e["MagneticHeading"], a["MagneticHeading"], 0.001), ( + f"got {a['MagneticHeading']} but expect {e['MagneticHeading']} in {filename}" + ) + + if "MAPCaptureTime" in expected: + assert expected["MAPCaptureTime"] == actual["MAPCaptureTime"], ( + f"expect {expected['MAPCaptureTime']} but got {actual['MAPCaptureTime']} in {filename}" + ) + + if "MAPLongitude" in expected: + assert approximate(expected["MAPLongitude"], actual["MAPLongitude"], 0.00001), ( + f"expect {expected['MAPLongitude']} but got {actual['MAPLongitude']} in {filename}" + ) + + if "MAPLatitude" in expected: + assert approximate(expected["MAPLatitude"], actual["MAPLatitude"], 0.00001), ( + f"expect {expected['MAPLatitude']} but got {actual['MAPLatitude']} in {filename}" + ) + + if "MAPAltitude" in expected: + assert approximate(expected["MAPAltitude"], actual["MAPAltitude"], 0.001), ( + f"expect {expected['MAPAltitude']} but got {actual['MAPAltitude']} in {filename}" + ) + + if "MAPDeviceMake" in expected: + assert expected["MAPDeviceMake"] == actual["MAPDeviceMake"] + + if "MAPDeviceModel" in expected: + assert expected["MAPDeviceModel"] == actual["MAPDeviceModel"] + + +def assert_contains_image_descs(haystack: Path | list[dict], needle: Path | list[dict]): + """ + Check if the haystack contains all the descriptions in needle. + """ + + haystack = load_descs(haystack) + needle = load_descs(needle) + + haystack_by_filename = { + desc["MAPFilename"]: desc for desc in haystack if "MAPFilename" in desc + } + + needle_by_filename = { + desc["MAPFilename"]: desc for desc in needle if "MAPFilename" in desc + } + + assert haystack_by_filename.keys() >= needle_by_filename.keys(), ( + f"haystack {list(haystack_by_filename.keys())} does not contain all the keys in needle {list(needle_by_filename.keys())}" + ) + for filename, desc in needle_by_filename.items(): + assert_compare_image_descs(desc, haystack_by_filename[filename]) + + +def assert_same_image_descs(left: Path | list[dict], right: Path | list[dict]): + assert_contains_image_descs(left, right) + assert_contains_image_descs(right, left) diff --git a/tests/integration/test_gopro.py b/tests/integration/test_gopro.py index b02587dd6..8fcbbf121 100644 --- a/tests/integration/test_gopro.py +++ b/tests/integration/test_gopro.py @@ -8,12 +8,12 @@ import pytest from .fixtures import ( + assert_same_image_descs, EXECUTABLE, IS_FFMPEG_INSTALLED, run_exiftool_and_generate_geotag_args, setup_config, setup_upload, - verify_descs, ) @@ -26,6 +26,8 @@ } EXPECTED_DESCS: T.List[T.Any] = [ { + "filename": "hero8.mp4/hero8_NA_000001.jpg", + "filetype": "image", "MAPAltitude": 9540.24, "MAPCaptureTime": "2019_11_18_15_41_12_354", "MAPCompassHeading": { @@ -36,9 +38,11 @@ "MAPLongitude": -129.2943386, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000001.jpg", + "MAPFilename": "hero8_NA_000001.jpg", }, { + "filename": "hero8.mp4/hero8_NA_000002.jpg", + "filetype": "image", "MAPAltitude": 7112.573717404068, "MAPCaptureTime": "2019_11_18_15_41_14_354", "MAPCompassHeading": { @@ -49,9 +53,11 @@ "MAPLongitude": -126.85929159704702, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000002.jpg", + "MAPFilename": "hero8_NA_000002.jpg", }, { + "filename": "hero8.mp4/hero8_NA_000003.jpg", + "filetype": "image", "MAPAltitude": 7463.642846094319, "MAPCaptureTime": "2019_11_18_15_41_16_354", "MAPCompassHeading": { @@ -62,9 +68,11 @@ "MAPLongitude": -127.18475264566939, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000003.jpg", + "MAPFilename": "hero8_NA_000003.jpg", }, { + "filename": "hero8.mp4/hero8_NA_000004.jpg", + "filetype": "image", "MAPAltitude": 6909.8168472111465, "MAPCaptureTime": "2019_11_18_15_41_18_354", "MAPCompassHeading": { @@ -75,9 +83,11 @@ "MAPLongitude": -126.65905680405231, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000004.jpg", + "MAPFilename": "hero8_NA_000004.jpg", }, { + "filename": "hero8.mp4/hero8_NA_000005.jpg", + "filetype": "image", "MAPAltitude": 7212.594480737465, "MAPCaptureTime": "2019_11_18_15_41_20_354", "MAPCompassHeading": { @@ -88,9 +98,11 @@ "MAPLongitude": -126.93688762007304, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000005.jpg", + "MAPFilename": "hero8_NA_000005.jpg", }, { + "filename": "hero8.mp4/hero8_NA_000006.jpg", + "filetype": "image", "MAPAltitude": 7274.361994963208, "MAPCaptureTime": "2019_11_18_15_41_22_354", "MAPCompassHeading": { @@ -101,7 +113,7 @@ "MAPLongitude": -126.98833423074615, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "filename": "hero8.mp4/hero8_NA_000006.jpg", + "MAPFilename": "hero8_NA_000006.jpg", }, ] @@ -140,7 +152,7 @@ def test_process_gopro_hero8( for expected_desc in expected_descs: expected_desc["filename"] = str(sample_dir.join(expected_desc["filename"])) - verify_descs(expected_descs, Path(desc_path)) + assert_same_image_descs(Path(desc_path), expected_descs) @pytest.mark.usefixtures("setup_config") diff --git a/tests/integration/test_history.py b/tests/integration/test_history.py index 0b7060fab..62cd8d6ff 100644 --- a/tests/integration/test_history.py +++ b/tests/integration/test_history.py @@ -47,18 +47,19 @@ def test_upload_gopro( shell=True, ) assert x.returncode == 0, x.stderr - assert len(setup_upload.listdir()) == 1, ( + assert len(setup_upload.listdir()) == 2, ( f"should be uploaded for the first time but got {setup_upload.listdir()}" ) for upload in setup_upload.listdir(): - upload.remove() - assert len(setup_upload.listdir()) == 0 + if upload.basename != "file_handles": + upload.remove() + assert len(setup_upload.listdir()) == 1 x = subprocess.run( f"{EXECUTABLE} process_and_upload --skip_process_errors {UPLOAD_FLAGS} {str(video_dir)}", shell=True, ) assert x.returncode == 0, x.stderr - assert len(setup_upload.listdir()) == 0, ( + assert len(setup_upload.listdir()) == 1, ( "should NOT upload because it is uploaded already" ) diff --git a/tests/integration/test_process.py b/tests/integration/test_process.py index 82b042e2d..a3afe06f2 100644 --- a/tests/integration/test_process.py +++ b/tests/integration/test_process.py @@ -9,13 +9,13 @@ import pytest from .fixtures import ( + assert_contains_image_descs, EXECUTABLE, IS_FFMPEG_INSTALLED, run_exiftool_and_generate_geotag_args, setup_config, setup_data, validate_and_extract_zip, - verify_descs, ) @@ -25,6 +25,7 @@ "DSC00001.JPG": { "filename": "DSC00001.JPG", "filetype": "image", + "MAPFilename": "DSC00001.JPG", "MAPLatitude": 45.5169031, "MAPLongitude": -122.572765, "MAPCaptureTime": "2018_06_08_20_24_11_000", @@ -37,6 +38,7 @@ "DSC00497.JPG": { "filename": "DSC00497.JPG", "filetype": "image", + "MAPFilename": "DSC00497.JPG", "MAPLatitude": 45.5107231, "MAPLongitude": -122.5760514, "MAPCaptureTime": "2018_06_08_20_32_28_000", @@ -49,6 +51,7 @@ "V0370574.JPG": { "filename": "V0370574.JPG", "filetype": "image", + "MAPFilename": "V0370574.JPG", "MAPLatitude": -1.0169444, "MAPLongitude": -1.0169444, "MAPCaptureTime": "2018_07_27_11_32_14_000", @@ -58,7 +61,9 @@ "MAPOrientation": 1, }, "adobe_coords.jpg": { + "filename": "adobe_coords.jpg", "filetype": "image", + "MAPFilename": "adobe_coords.jpg", "MAPLatitude": -0.0702668, "MAPLongitude": 34.3819352, "MAPCaptureTime": "2019_07_16_10_26_11_000", @@ -94,23 +99,20 @@ def test_process_images_with_defaults( x = subprocess.run(args, shell=True) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], - "filename": str(Path(setup_data, "images", "DSC00001.JPG")), }, { **_DEFAULT_EXPECTED_DESCS["DSC00497.JPG"], - "filename": str(Path(setup_data, "images", "DSC00497.JPG")), }, { **_DEFAULT_EXPECTED_DESCS["V0370574.JPG"], - "filename": str(Path(setup_data, "images", "V0370574.JPG")), "MAPCaptureTime": _local_to_utc("2018-07-27T11:32:14"), }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -124,7 +126,8 @@ def test_time_with_offset(setup_data: py.path.local, use_exiftool: bool = False) args = run_exiftool_and_generate_geotag_args(setup_data, args) x = subprocess.run(args, shell=True) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -142,7 +145,6 @@ def test_time_with_offset(setup_data: py.path.local, use_exiftool: bool = False) "MAPCaptureTime": _local_to_utc("2018-07-27T11:32:16.500"), }, ], - Path(setup_data, "mapillary_image_description.json"), ) args = f"{EXECUTABLE} process --file_types=image {PROCESS_FLAGS} {setup_data} --offset_time=-1.0" @@ -150,7 +152,8 @@ def test_time_with_offset(setup_data: py.path.local, use_exiftool: bool = False) args = run_exiftool_and_generate_geotag_args(setup_data, args) x = subprocess.run(args, shell=True) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -168,7 +171,6 @@ def test_time_with_offset(setup_data: py.path.local, use_exiftool: bool = False) "MAPCaptureTime": _local_to_utc("2018-07-27T11:32:13.000"), }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -201,9 +203,9 @@ def test_process_images_with_overwrite_all_EXIF_tags( "MAPCaptureTime": _local_to_utc("2018-07-27T11:32:16.500"), }, ] - verify_descs( - expected_descs, + assert_contains_image_descs( Path(setup_data, "mapillary_image_description.json"), + expected_descs, ) args = f"{EXECUTABLE} process --file_types=image {PROCESS_FLAGS} {setup_data}" @@ -211,9 +213,9 @@ def test_process_images_with_overwrite_all_EXIF_tags( args = run_exiftool_and_generate_geotag_args(setup_data, args) x = subprocess.run(args, shell=True) assert x.returncode == 0, x.stderr - verify_descs( - expected_descs, + assert_contains_image_descs( Path(setup_data, "mapillary_image_description.json"), + expected_descs, ) @@ -232,7 +234,8 @@ def test_angle_with_offset(setup_data: py.path.local, use_exiftool: bool = False x = subprocess.run(args, shell=True) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -260,7 +263,6 @@ def test_angle_with_offset(setup_data: py.path.local, use_exiftool: bool = False }, }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -271,7 +273,8 @@ def test_angle_with_offset_with_exiftool(setup_data: py.path.local): def test_parse_adobe_coordinates(setup_data: py.path.local): args = f"{EXECUTABLE} process --file_types=image {PROCESS_FLAGS} {setup_data}/adobe_coords" x = subprocess.run(args, shell=True) - verify_descs( + assert_contains_image_descs( + Path(setup_data, "adobe_coords/mapillary_image_description.json"), [ { "filename": str(Path(setup_data, "adobe_coords", "adobe_coords.jpg")), @@ -285,7 +288,6 @@ def test_parse_adobe_coordinates(setup_data: py.path.local): "MAPOrientation": 1, } ], - Path(setup_data, "adobe_coords/mapillary_image_description.json"), ) @@ -385,7 +387,8 @@ def test_geotagging_images_from_gpx(setup_data: py.path.local): """, shell=True, ) - verify_descs( + assert_contains_image_descs( + Path(images, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -410,7 +413,6 @@ def test_geotagging_images_from_gpx(setup_data: py.path.local): }, }, ], - Path(images, "mapillary_image_description.json"), ) @@ -424,7 +426,8 @@ def test_geotagging_images_from_gpx_with_offset(setup_data: py.path.local): shell=True, ) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -451,7 +454,6 @@ def test_geotagging_images_from_gpx_with_offset(setup_data: py.path.local): }, }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -464,7 +466,8 @@ def test_geotagging_images_from_gpx_use_gpx_start_time(setup_data: py.path.local shell=True, ) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -491,7 +494,6 @@ def test_geotagging_images_from_gpx_use_gpx_start_time(setup_data: py.path.local }, }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -506,7 +508,8 @@ def test_geotagging_images_from_gpx_use_gpx_start_time_with_offset( shell=True, ) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + Path(setup_data, "mapillary_image_description.json"), [ { **_DEFAULT_EXPECTED_DESCS["DSC00001.JPG"], @@ -533,7 +536,6 @@ def test_geotagging_images_from_gpx_use_gpx_start_time_with_offset( }, }, ], - Path(setup_data, "mapillary_image_description.json"), ) @@ -711,7 +713,8 @@ def test_video_process_sample_with_distance(setup_data: py.path.local): shell=True, ) assert x.returncode == 0, x.stderr - verify_descs( + assert_contains_image_descs( + desc_path, [ { "filename": str( @@ -780,7 +783,6 @@ def test_video_process_sample_with_distance(setup_data: py.path.local): "MAPOrientation": 1, }, ], - desc_path, ) diff --git a/tests/integration/test_process_and_upload.py b/tests/integration/test_process_and_upload.py index a75fe502d..8b244eb04 100644 --- a/tests/integration/test_process_and_upload.py +++ b/tests/integration/test_process_and_upload.py @@ -1,5 +1,4 @@ import datetime -import os import subprocess from pathlib import Path @@ -7,14 +6,15 @@ import pytest from .fixtures import ( + assert_contains_image_descs, + assert_same_image_descs, EXECUTABLE, + extract_all_uploaded_descs, IS_FFMPEG_INSTALLED, setup_config, setup_data, setup_upload, USERNAME, - validate_and_extract_camm, - validate_and_extract_zip, ) PROCESS_FLAGS = "" @@ -130,34 +130,6 @@ } -def _validate_uploads(upload_dir: py.path.local, expected): - descs = [] - for file in upload_dir.listdir(): - if str(file).endswith(".mp4"): - descs.extend(validate_and_extract_camm(str(file))) - elif str(file).endswith(".zip"): - descs.extend(validate_and_extract_zip(Path(file))) - else: - raise Exception(f"invalid file {file}") - - excludes = [ - "filename", - "filesize", - "md5sum", - "MAPMetaTags", - "MAPSequenceUUID", - "MAPFilename", - ] - - actual = {} - for desc in descs: - actual[os.path.basename(desc["MAPFilename"])] = { - k: v for k, v in desc.items() if k not in excludes - } - - assert expected == actual - - @pytest.mark.usefixtures("setup_config") def test_process_and_upload(setup_data: py.path.local, setup_upload: py.path.local): input_paths = [ @@ -168,17 +140,17 @@ def test_process_and_upload(setup_data: py.path.local, setup_upload: py.path.loc setup_data.join("images"), setup_data.join("images").join("DSC00001.JPG"), ] - x = subprocess.run( + subprocess.run( f"{EXECUTABLE} --verbose process_and_upload {UPLOAD_FLAGS} {' '.join(map(str, input_paths))} --skip_process_errors", shell=True, + check=True, + ) + + descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert_contains_image_descs( + descs, + [*EXPECTED_DESCS["gopro"].values(), *EXPECTED_DESCS["image"].values()], ) - assert x.returncode == 0, x.stderr - if IS_FFMPEG_INSTALLED: - _validate_uploads( - setup_upload, {**EXPECTED_DESCS["gopro"], **EXPECTED_DESCS["image"]} - ) - else: - _validate_uploads(setup_upload, {**EXPECTED_DESCS["image"]}) @pytest.mark.usefixtures("setup_config") @@ -186,7 +158,7 @@ def test_process_and_upload_images_only( setup_data: py.path.local, setup_upload: py.path.local, ): - x = subprocess.run( + subprocess.run( f"""{EXECUTABLE} --verbose process_and_upload \ {UPLOAD_FLAGS} {PROCESS_FLAGS} \ --filetypes=image \ @@ -194,9 +166,10 @@ def test_process_and_upload_images_only( {setup_data}/images {setup_data}/images {setup_data}/images/DSC00001.JPG """, shell=True, + check=True, ) - assert x.returncode == 0, x.stderr - _validate_uploads(setup_upload, EXPECTED_DESCS["image"]) + descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert_contains_image_descs(descs, [*EXPECTED_DESCS["image"].values()]) @pytest.mark.usefixtures("setup_config") @@ -210,7 +183,7 @@ def test_video_process_and_upload( gpx_start_time = "2025_03_14_07_00_00_000" gpx_end_time = "2025_03_14_07_01_33_624" gpx_file = setup_data.join("gpx").join("sf_30km_h.gpx") - x = subprocess.run( + subprocess.run( f"""{EXECUTABLE} video_process_and_upload \ {PROCESS_FLAGS} {UPLOAD_FLAGS} \ --video_sample_interval=2 \ @@ -222,11 +195,12 @@ def test_video_process_and_upload( {video_dir} {video_dir.join("my_samples")} """, shell=True, + check=True, ) - assert x.returncode == 0, x.stderr - assert 1 == len(setup_upload.listdir()) expected = { "sample-5s_NA_000001.jpg": { + "filename": "sample-5s_NA_000001.jpg", + "MAPFilename": "sample-5s_NA_000001.jpg", "MAPAltitude": 94.75, "MAPCaptureTime": "2025_03_14_07_00_00_000", "MAPCompassHeading": { @@ -239,6 +213,8 @@ def test_video_process_and_upload( "filetype": "image", }, "sample-5s_NA_000002.jpg": { + "filename": "sample-5s_NA_000002.jpg", + "MAPFilename": "sample-5s_NA_000002.jpg", "MAPAltitude": 93.347, "MAPCaptureTime": "2025_03_14_07_00_02_000", "MAPCompassHeading": { @@ -251,6 +227,8 @@ def test_video_process_and_upload( "filetype": "image", }, "sample-5s_NA_000003.jpg": { + "filename": "sample-5s_NA_000003.jpg", + "MAPFilename": "sample-5s_NA_000003.jpg", "MAPAltitude": 92.492, "MAPCaptureTime": "2025_03_14_07_00_04_000", "MAPCompassHeading": { @@ -263,7 +241,8 @@ def test_video_process_and_upload( "filetype": "image", }, } - _validate_uploads(setup_upload, expected) + descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert_same_image_descs(descs, list(expected.values())) @pytest.mark.usefixtures("setup_config") @@ -278,7 +257,7 @@ def test_video_process_and_upload_after_gpx( gpx_end_time = "2025_03_14_07_01_33_624" video_start_time = "2025_03_14_07_01_34_624" gpx_file = setup_data.join("gpx").join("sf_30km_h.gpx") - x = subprocess.run( + subprocess.run( f"""{EXECUTABLE} video_process_and_upload \ {PROCESS_FLAGS} {UPLOAD_FLAGS} \ --video_sample_interval=2 \ @@ -291,7 +270,7 @@ def test_video_process_and_upload_after_gpx( {video_dir} {video_dir.join("my_samples")} """, shell=True, + check=True, ) - assert x.returncode == 0, x.stderr - assert 0 == len(setup_upload.listdir()) - _validate_uploads(setup_upload, {}) + descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert_same_image_descs(descs, []) diff --git a/tests/integration/test_upload.py b/tests/integration/test_upload.py index 97cb40f74..04a258b29 100644 --- a/tests/integration/test_upload.py +++ b/tests/integration/test_upload.py @@ -1,6 +1,4 @@ -import hashlib import json -import os import subprocess from pathlib import Path @@ -8,12 +6,13 @@ import pytest from .fixtures import ( + assert_contains_image_descs, EXECUTABLE, + extract_all_uploaded_descs, setup_config, setup_data, setup_upload, USERNAME, - validate_and_extract_zip, ) @@ -21,78 +20,65 @@ UPLOAD_FLAGS = f"--dry_run --user_name={USERNAME}" -def file_md5sum(path) -> str: - with open(path, "rb") as fp: - md5 = hashlib.md5() - while True: - buf = fp.read(1024 * 1024 * 32) - if not buf: - break - md5.update(buf) - return md5.hexdigest() - - @pytest.mark.usefixtures("setup_config") -def test_upload_image_dir( - setup_data: py.path.local, - setup_upload: py.path.local, -): - x = subprocess.run( - f"{EXECUTABLE} process --file_types=image {PROCESS_FLAGS} {setup_data}", +def test_upload_image_dir(setup_data: py.path.local, setup_upload: py.path.local): + subprocess.run( + f"{EXECUTABLE} process {PROCESS_FLAGS} --file_types=image {setup_data}", shell=True, + check=True, ) - assert x.returncode == 0, x.stderr - x = subprocess.run( + + subprocess.run( f"{EXECUTABLE} process_and_upload {UPLOAD_FLAGS} --file_types=image {setup_data}", shell=True, + check=True, + ) + + uploaded_descs: list[dict] = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert len(uploaded_descs) > 0, "No images were uploaded" + + assert_contains_image_descs( + Path(setup_data.join("mapillary_image_description.json")), + uploaded_descs, ) - for file in setup_upload.listdir(): - validate_and_extract_zip(Path(file)) - assert x.returncode == 0, x.stderr @pytest.mark.usefixtures("setup_config") -def test_upload_image_dir_twice( - setup_data: py.path.local, - setup_upload: py.path.local, -): - x = subprocess.run( +def test_upload_image_dir_twice(setup_data: py.path.local, setup_upload: py.path.local): + subprocess.run( f"{EXECUTABLE} process --skip_process_errors {PROCESS_FLAGS} {setup_data}", shell=True, + check=True, ) - assert x.returncode == 0, x.stderr desc_path = setup_data.join("mapillary_image_description.json") - md5sum_map = {} - # first upload - x = subprocess.run( + subprocess.run( f"{EXECUTABLE} process_and_upload {UPLOAD_FLAGS} --file_types=image {setup_data}", shell=True, + check=True, + ) + first_descs = extract_all_uploaded_descs(Path(setup_upload)) + assert_contains_image_descs( + Path(desc_path), + sum(first_descs, []), ) - assert x.returncode == 0, x.stderr - for file in setup_upload.listdir(): - validate_and_extract_zip(Path(file)) - md5sum_map[os.path.basename(file)] = file_md5sum(file) # expect the second upload to not produce new uploads - x = subprocess.run( + subprocess.run( f"{EXECUTABLE} process_and_upload {UPLOAD_FLAGS} --desc_path={desc_path} --file_types=image {setup_data} {setup_data} {setup_data}/images/DSC00001.JPG", shell=True, + check=True, + ) + second_descs = extract_all_uploaded_descs(Path(setup_upload)) + assert_contains_image_descs( + Path(desc_path), + sum(second_descs, []), ) - assert x.returncode == 0, x.stderr - for file in setup_upload.listdir(): - validate_and_extract_zip(Path(file)) - new_md5sum = file_md5sum(file) - assert md5sum_map[os.path.basename(file)] == new_md5sum - assert len(md5sum_map) == len(setup_upload.listdir()) @pytest.mark.usefixtures("setup_config") -def test_upload_wrong_descs( - setup_data: py.path.local, - setup_upload: py.path.local, -): +def test_upload_wrong_descs(setup_data: py.path.local, setup_upload: py.path.local): x = subprocess.run( f"{EXECUTABLE} process --skip_process_errors {PROCESS_FLAGS} {setup_data}", shell=True, diff --git a/tests/unit/test_upload_api_v4.py b/tests/unit/test_upload_api_v4.py index d8d8e0ac3..3857f522f 100644 --- a/tests/unit/test_upload_api_v4.py +++ b/tests/unit/test_upload_api_v4.py @@ -11,7 +11,6 @@ def test_upload(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", - cluster_filetype=upload_api_v4.ClusterFileType.ZIP, ) upload_service._error_ratio = 0 content = b"double_foobar" @@ -28,7 +27,6 @@ def test_upload_big_chunksize(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", - cluster_filetype=upload_api_v4.ClusterFileType.ZIP, ) upload_service._error_ratio = 0 content = b"double_foobar" @@ -45,7 +43,6 @@ def test_upload_chunks(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR2.txt", - cluster_filetype=upload_api_v4.ClusterFileType.ZIP, ) upload_service._error_ratio = 0 diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index 4e633019d..f9367d900 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -1,16 +1,13 @@ -import json -import os import typing as T -import zipfile from pathlib import Path import py.path import pytest -from mapillary_tools import types, upload_api_v4, uploader, utils +from mapillary_tools import api_v4, types, uploader -from ..integration.fixtures import setup_upload, validate_and_extract_zip +from ..integration.fixtures import extract_all_uploaded_descs, setup_upload IMPORT_PATH = "tests/unit/data" @@ -26,24 +23,6 @@ def setup_unittest_data(tmpdir: py.path.local): tmpdir.remove(ignore_errors=True) -def _validate_zip_dir(zip_dir: py.path.local): - descs = [] - for zip_path in zip_dir.listdir(): - with zipfile.ZipFile(zip_path) as ziph: - filename = ziph.testzip() - assert filename is None, f"Corrupted zip {zip_path}: {filename}" - sequence_md5sum = json.loads(ziph.comment).get("sequence_md5sum") - - with open(zip_path, "rb") as fp: - upload_md5sum = utils.md5sum_fp(fp).hexdigest() - - assert str(os.path.basename(zip_path)) == f"mly_tools_{upload_md5sum}.zip", ( - zip_path - ) - descs.extend(validate_and_extract_zip(Path(zip_path))) - return descs - - def test_upload_images(setup_unittest_data: py.path.local, setup_upload: py.path.local): mly_uploader = uploader.Uploader( {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, dry_run=True @@ -74,9 +53,10 @@ def test_upload_images(setup_unittest_data: py.path.local, setup_upload: py.path ) ) assert len(results) == 1 - assert len(setup_upload.listdir()) == 1 - actual_descs = _validate_zip_dir(setup_upload) - assert 1 == len(actual_descs), "should return 1 desc because of the unique filename" + actual_descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert 1 == len(actual_descs), ( + f"should return 1 desc because of the unique filename but got {actual_descs}" + ) def test_upload_images_multiple_sequences( @@ -128,15 +108,12 @@ def test_upload_images_multiple_sequences( ) ) assert len(results) == 2 - assert len(setup_upload.listdir()) == 2 - actual_descs = _validate_zip_dir(setup_upload) + actual_descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) assert 2 == len(actual_descs) def test_upload_zip( - setup_unittest_data: py.path.local, - setup_upload: py.path.local, - emitter=None, + setup_unittest_data: py.path.local, setup_upload: py.path.local, emitter=None ): test_exif = setup_unittest_data.join("test_exif.jpg") setup_unittest_data.join("another_directory").mkdir() @@ -173,11 +150,10 @@ def test_upload_zip( }, ] zip_dir = setup_unittest_data.mkdir("zip_dir") - sequence = [types.from_desc(T.cast(T.Any, desc)) for desc in descs] - uploader.ZipImageSequence.zip_images(sequence, Path(zip_dir)) + uploader.ZipImageSequence.zip_images( + [types.from_desc(T.cast(T.Any, desc)) for desc in descs], Path(zip_dir) + ) assert len(zip_dir.listdir()) == 2, list(zip_dir.listdir()) - descs = _validate_zip_dir(zip_dir) - assert 3 == len(descs) mly_uploader = uploader.Uploader( { @@ -191,14 +167,11 @@ def test_upload_zip( for zip_path in zip_dir.listdir(): cluster = uploader.ZipImageSequence.upload_zipfile(mly_uploader, Path(zip_path)) assert cluster == "0" - descs = _validate_zip_dir(setup_upload) - assert 3 == len(descs) + actual_descs = sum(extract_all_uploaded_descs(Path(setup_upload)), []) + assert 3 == len(actual_descs) -def test_upload_blackvue( - tmpdir: py.path.local, - setup_upload: py.path.local, -): +def test_upload_blackvue(tmpdir: py.path.local, setup_upload: py.path.local): mly_uploader = uploader.Uploader( { "user_upload_token": "YOUR_USER_ACCESS_TOKEN", @@ -211,22 +184,21 @@ def test_upload_blackvue( with open(blackvue_path, "wb") as fp: fp.write(b"this is a fake video") with Path(blackvue_path).open("rb") as fp: - resp = mly_uploader.upload_stream( + file_handle = mly_uploader.upload_stream( fp, - upload_api_v4.ClusterFileType.BLACKVUE, - "this_is_a_blackvue.mp4", + session_key="this_is_a_blackvue.mp4", ) - assert resp == "0" - for mp4_path in setup_upload.listdir(): - assert os.path.basename(mp4_path) == "this_is_a_blackvue.mp4" - with open(mp4_path, "rb") as fp: - assert fp.read() == b"this is a fake video" + cluster_id = mly_uploader.finish_upload( + file_handle, api_v4.ClusterFileType.BLACKVUE + ) + assert cluster_id == "0" + assert setup_upload.join("this_is_a_blackvue.mp4").exists() + with open(setup_upload.join("this_is_a_blackvue.mp4"), "rb") as fp: + assert fp.read() == b"this is a fake video" def test_upload_zip_with_emitter( - setup_unittest_data: py.path.local, - tmpdir: py.path.local, - setup_upload: py.path.local, + setup_unittest_data: py.path.local, setup_upload: py.path.local ): emitter = uploader.EventEmitter() @@ -239,8 +211,8 @@ def _upload_start(payload): assert "test_started" not in payload payload["test_started"] = True - assert payload["upload_md5sum"] not in stats - stats[payload["upload_md5sum"]] = {**payload} + assert payload["sequence_md5sum"] not in stats + stats[payload["sequence_md5sum"]] = {**payload} @emitter.on("upload_fetch_offset") def _fetch_offset(payload): @@ -248,7 +220,7 @@ def _fetch_offset(payload): assert payload["test_started"] payload["test_fetch_offset"] = True - assert payload["upload_md5sum"] in stats + assert payload["sequence_md5sum"] in stats @emitter.on("upload_end") def _upload_end(payload): @@ -256,8 +228,8 @@ def _upload_end(payload): assert payload["test_started"] assert payload["test_fetch_offset"] - assert payload["upload_md5sum"] in stats + assert payload["sequence_md5sum"] in stats test_upload_zip(setup_unittest_data, setup_upload, emitter=emitter) - assert len(stats) == 2 + assert len(stats) == 2, stats