diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index b886bf3ec..8b1a56b5d 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -729,9 +729,10 @@ def group_and_sort_images( return sorted_sequences_by_uuid -def sequence_md5sum(sequence: T.Iterable[ImageMetadata]) -> str: +def update_sequence_md5sum(sequence: T.Iterable[ImageMetadata]) -> str: md5 = hashlib.md5() for metadata in sequence: + metadata.update_md5sum() assert isinstance(metadata.md5sum, str), "md5sum should be calculated" md5.update(metadata.md5sum.encode("utf-8")) return md5.hexdigest() diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 0bb19ce17..a3969918e 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -136,7 +136,7 @@ def zip_images( metadata for metadata in metadatas if isinstance(metadata, types.ImageMetadata) ] - uploader.zip_images(image_metadatas, zip_dir) + uploader.ZipImageSequence.zip_images(image_metadatas, zip_dir) def fetch_user_items( diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 19ed34992..c9ae220d6 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import io import json import logging @@ -72,7 +74,7 @@ class UploadCancelled(Exception): class EventEmitter: - events: T.Dict[EventName, T.List] + events: dict[EventName, T.List] def __init__(self): self.events = {} @@ -88,11 +90,111 @@ def emit(self, event: EventName, *args, **kwargs): callback(*args, **kwargs) +class ZipImageSequence: + @classmethod + def zip_images( + cls, metadatas: T.Sequence[types.ImageMetadata], zip_dir: Path + ) -> None: + """ + Group images into sequences and zip each sequence into a zipfile. + """ + _validate_metadatas(metadatas) + sequences = types.group_and_sort_images(metadatas) + os.makedirs(zip_dir, exist_ok=True) + + for sequence_uuid, sequence in sequences.items(): + # For atomicity we write into a WIP file and then rename to the final file + wip_zip_filename = zip_dir.joinpath( + f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}" + ) + upload_md5sum = types.update_sequence_md5sum(sequence) + zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip") + with wip_file_context(wip_zip_filename, zip_filename) as wip_path: + with wip_path.open("wb") as wip_fp: + actual_md5sum = cls.zip_sequence_fp(sequence, wip_fp) + assert actual_md5sum == upload_md5sum + + @classmethod + def zip_sequence_fp( + cls, + sequence: T.Sequence[types.ImageMetadata], + zip_fp: T.IO[bytes], + ) -> str: + """ + Write a sequence of ImageMetadata into the zipfile handle. + The sequence has to be one sequence and sorted. + """ + sequence_groups = types.group_and_sort_images(sequence) + assert len(sequence_groups) == 1, ( + f"Only one sequence is allowed but got {len(sequence_groups)}: {list(sequence_groups.keys())}" + ) + + upload_md5sum = types.update_sequence_md5sum(sequence) + + with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf: + arcnames: set[str] = set() + for metadata in sequence: + cls._write_imagebytes_in_zip(zipf, metadata, arcnames=arcnames) + assert len(sequence) == len(set(zipf.namelist())) + zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8") + + return upload_md5sum + + @classmethod + def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str | None: + with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph: + comment = ziph.comment + if not comment: + return None + try: + upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum") + except Exception: + return None + if not upload_md5sum: + return None + return str(upload_md5sum) + + @classmethod + def _uniq_arcname(cls, filename: Path, arcnames: set[str]): + arcname: str = filename.name + + # make sure the arcname is unique, otherwise zipfile.extractAll will eliminate duplicated ones + arcname_idx = 0 + while arcname in arcnames: + arcname_idx += 1 + arcname = f"{filename.stem}_{arcname_idx}{filename.suffix}" + + return arcname + + @classmethod + def _write_imagebytes_in_zip( + cls, + zipf: zipfile.ZipFile, + metadata: types.ImageMetadata, + arcnames: set[str] | None = None, + ): + if arcnames is None: + arcnames = set() + + edit = exif_write.ExifEdit(metadata.filename) + # The cast is to fix the type checker error + edit.add_image_description( + T.cast(T.Dict, types.desc_file_to_exif(types.as_desc(metadata))) + ) + image_bytes = edit.dump_image_bytes() + + arcname = cls._uniq_arcname(metadata.filename, arcnames) + arcnames.add(arcname) + + zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) + zipf.writestr(zipinfo, image_bytes) + + class Uploader: def __init__( self, user_items: types.UserItem, - emitter: T.Optional[EventEmitter] = None, + emitter: EventEmitter | None = None, chunk_size: int = upload_api_v4.DEFAULT_CHUNK_SIZE, dry_run=False, ): @@ -105,8 +207,8 @@ def __init__( def upload_zipfile( self, zip_path: Path, - event_payload: T.Optional[Progress] = None, - ) -> T.Optional[str]: + event_payload: Progress | None = None, + ) -> str | None: if event_payload is None: event_payload = {} @@ -121,16 +223,16 @@ def upload_zipfile( "sequence_image_count": len(namelist), } - with zip_path.open("rb") as fp: - upload_md5sum = _extract_upload_md5sum(fp) + with zip_path.open("rb") as zip_fp: + upload_md5sum = ZipImageSequence.extract_upload_md5sum(zip_fp) if upload_md5sum is None: - with zip_path.open("rb") as fp: - upload_md5sum = utils.md5sum_fp(fp).hexdigest() + with zip_path.open("rb") as zip_fp: + upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest() - with zip_path.open("rb") as fp: + with zip_path.open("rb") as zip_fp: return self.upload_stream( - fp, + zip_fp, upload_api_v4.ClusterFileType.ZIP, upload_md5sum, event_payload=final_event_payload, @@ -139,14 +241,14 @@ def upload_zipfile( def upload_images( self, image_metadatas: T.Sequence[types.ImageMetadata], - event_payload: T.Optional[Progress] = None, - ) -> T.Dict[str, str]: + event_payload: Progress | None = None, + ) -> dict[str, str]: if event_payload is None: event_payload = {} _validate_metadatas(image_metadatas) sequences = types.group_and_sort_images(image_metadatas) - ret: T.Dict[str, str] = {} + ret: dict[str, str] = {} for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()): final_event_payload: Progress = { **event_payload, # type: ignore @@ -155,11 +257,8 @@ def upload_images( "sequence_image_count": len(sequence), "sequence_uuid": sequence_uuid, } - for metadata in sequence: - metadata.update_md5sum() - upload_md5sum = types.sequence_md5sum(sequence) with tempfile.NamedTemporaryFile() as fp: - _zip_sequence_fp(sequence, fp, upload_md5sum) + upload_md5sum = ZipImageSequence.zip_sequence_fp(sequence, fp) cluster_id = self.upload_stream( fp, upload_api_v4.ClusterFileType.ZIP, @@ -175,15 +274,15 @@ def upload_stream( fp: T.IO[bytes], cluster_filetype: upload_api_v4.ClusterFileType, upload_md5sum: str, - event_payload: T.Optional[Progress] = None, - ) -> T.Optional[str]: + event_payload: Progress | None = None, + ) -> str | None: if event_payload is None: event_payload = {} fp.seek(0, io.SEEK_END) entity_size = fp.tell() - SUFFIX_MAP: T.Dict[upload_api_v4.ClusterFileType, str] = { + SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = { upload_api_v4.ClusterFileType.ZIP: ".zip", upload_api_v4.ClusterFileType.CAMM: ".mp4", upload_api_v4.ClusterFileType.BLACKVUE: ".mp4", @@ -216,7 +315,7 @@ def upload_stream( } try: - return _upload_stream( + return _upload_stream_with_retries( upload_service, fp, event_payload=final_event_payload, @@ -254,70 +353,6 @@ def wip_file_context(wip_path: Path, done_path: Path): pass -def zip_images( - metadatas: T.List[types.ImageMetadata], - zip_dir: Path, -) -> None: - _validate_metadatas(metadatas) - sequences = types.group_and_sort_images(metadatas) - os.makedirs(zip_dir, exist_ok=True) - for sequence_uuid, sequence in sequences.items(): - for metadata in sequence: - metadata.update_md5sum() - upload_md5sum = types.sequence_md5sum(sequence) - timestamp = int(time.time()) - wip_zip_filename = zip_dir.joinpath( - f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{timestamp}" - ) - zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip") - with wip_file_context(wip_zip_filename, zip_filename) as wip_dir: - with wip_dir.open("wb") as fp: - _zip_sequence_fp(sequence, fp, upload_md5sum) - - -def _zip_sequence_fp( - sequence: T.Sequence[types.ImageMetadata], - fp: T.IO[bytes], - upload_md5sum: str, -) -> None: - arcname_idx = 0 - arcnames = set() - with zipfile.ZipFile(fp, "w", zipfile.ZIP_DEFLATED) as ziph: - for metadata in sequence: - edit = exif_write.ExifEdit(metadata.filename) - # The cast is to fix the type checker error - edit.add_image_description( - T.cast(T.Dict, types.desc_file_to_exif(types.as_desc(metadata))) - ) - image_bytes = edit.dump_image_bytes() - arcname: str = metadata.filename.name - # make sure the arcname is unique, otherwise zipfile.extractAll will eliminate duplicated ones - while arcname in arcnames: - arcname_idx += 1 - arcname = ( - f"{metadata.filename.stem}_{arcname_idx}{metadata.filename.suffix}" - ) - arcnames.add(arcname) - zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) - ziph.writestr(zipinfo, image_bytes) - ziph.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8") - assert len(sequence) == len(set(ziph.namelist())) - - -def _extract_upload_md5sum(fp: T.IO[bytes]) -> T.Optional[str]: - with zipfile.ZipFile(fp, "r", zipfile.ZIP_DEFLATED) as ziph: - comment = ziph.comment - if not comment: - return None - try: - upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum") - except Exception: - return None - if not upload_md5sum: - return None - return str(upload_md5sum) - - def _is_immediate_retry(ex: Exception): if ( isinstance(ex, requests.HTTPError) @@ -361,11 +396,11 @@ def _callback(chunk: bytes, _): return _callback -def _upload_stream( +def _upload_stream_with_retries( upload_service: upload_api_v4.UploadService, fp: T.IO[bytes], - event_payload: T.Optional[Progress] = None, - emitter: T.Optional[EventEmitter] = None, + event_payload: Progress | None = None, + emitter: EventEmitter | None = None, ) -> str: retries = 0 @@ -384,7 +419,7 @@ def _reset_retries(_, __): while True: fp.seek(0, io.SEEK_SET) - begin_offset: T.Optional[int] = None + begin_offset: int | None = None try: begin_offset = upload_service.fetch_offset() upload_service.callbacks = [_reset_retries] diff --git a/tests/unit/test_upload_api_v4.py b/tests/unit/test_upload_api_v4.py index 83e47ad0f..8d9bc884c 100644 --- a/tests/unit/test_upload_api_v4.py +++ b/tests/unit/test_upload_api_v4.py @@ -24,11 +24,27 @@ def test_upload(setup_upload: py.path.local): assert (setup_upload.join("FOOBAR.txt").read_binary()) == content +def test_upload_big_chunksize(setup_upload: py.path.local): + upload_service = upload_api_v4.FakeUploadService( + user_access_token="TEST", + session_key="FOOBAR.txt", + chunk_size=1000, + ) + upload_service._error_ratio = 0 + content = b"double_foobar" + cluster_id = upload_service.upload(io.BytesIO(content)) + assert isinstance(cluster_id, str), cluster_id + assert (setup_upload.join("FOOBAR.txt").read_binary()) == content + + # reupload should not affect the file + upload_service.upload(io.BytesIO(content)) + assert (setup_upload.join("FOOBAR.txt").read_binary()) == content + + def test_upload_chunks(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR2.txt", - chunk_size=1, ) upload_service._error_ratio = 0 diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index fd6cc304a..da1aed0e8 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -30,6 +30,9 @@ def _validate_zip_dir(zip_dir: py.path.local): descs = [] for zip_path in zip_dir.listdir(): with zipfile.ZipFile(zip_path) as ziph: + filename = ziph.testzip() + assert filename is None, f"Corrupted zip {zip_path}: {filename}" + upload_md5sum = json.loads(ziph.comment).get("upload_md5sum") assert str(os.path.basename(zip_path)) == f"mly_tools_{upload_md5sum}.zip", ( zip_path @@ -161,9 +164,8 @@ def test_upload_zip( }, ] zip_dir = setup_unittest_data.mkdir("zip_dir") - uploader.zip_images( - [types.from_desc(T.cast(T.Any, desc)) for desc in descs], Path(zip_dir) - ) + sequence = [types.from_desc(T.cast(T.Any, desc)) for desc in descs] + uploader.ZipImageSequence.zip_images(sequence, Path(zip_dir)) assert len(zip_dir.listdir()) == 2, list(zip_dir.listdir()) descs = _validate_zip_dir(zip_dir) assert 3 == len(descs)