From 40685fa2dc116043db30fd461993df2dcc776e19 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Fri, 11 Apr 2025 00:51:47 -0700 Subject: [PATCH 1/5] wip --- mapillary_tools/upload.py | 7 +- mapillary_tools/uploader.py | 121 ++++++++++--------- tests/integration/fixtures.py | 14 ++- tests/integration/test_process.py | 2 +- tests/integration/test_process_and_upload.py | 3 +- tests/integration/test_upload.py | 7 +- tests/unit/test_uploader.py | 17 +-- 7 files changed, 97 insertions(+), 74 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 48f758ee4..7482caf44 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -141,7 +141,7 @@ def _setup_history( ) -> None: @emitter.on("upload_start") def check_duplication(payload: uploader.Progress): - md5sum = payload.get("md5sum") + md5sum = payload.get("sequence_md5sum") assert md5sum is not None, f"md5sum has to be set for {payload}" if history.is_uploaded(md5sum): @@ -164,7 +164,7 @@ def check_duplication(payload: uploader.Progress): @emitter.on("upload_finished") def write_history(payload: uploader.Progress): sequence_uuid = payload.get("sequence_uuid") - md5sum = payload.get("md5sum") + md5sum = payload.get("sequence_md5sum") assert md5sum is not None, f"md5sum has to be set for {payload}" if sequence_uuid is None: @@ -509,7 +509,8 @@ def _gen_upload_videos( "sequence_idx": idx, "file_type": video_metadata.filetype.value, "import_path": str(video_metadata.filename), - "md5sum": video_metadata.md5sum, + "sequence_md5sum": video_metadata.md5sum, + "upload_md5sum": video_metadata.md5sum, } session_key = uploader._session_key( diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 76146673f..0e138cd43 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -16,7 +16,7 @@ import requests -from . import api_v4, constants, exif_write, types, upload_api_v4 +from . import api_v4, constants, exif_write, types, upload_api_v4, utils LOG = logging.getLogger(__name__) @@ -56,8 +56,11 @@ class UploaderProgress(T.TypedDict, total=True): class SequenceProgress(T.TypedDict, total=False): """Progress data at sequence level""" - # md5sum of the zipfile/BlackVue/CAMM in uploading - md5sum: str + # To check if it is uploaded or not + sequence_md5sum: str + + # To resume from the previous upload + upload_md5sum: str # File type file_type: str @@ -148,30 +151,50 @@ def zip_images( for sequence_uuid, sequence in sequences.items(): _validate_metadatas(sequence) - upload_md5sum = types.update_sequence_md5sum(sequence) - # For atomicity we write into a WIP file and then rename to the final file wip_zip_filename = zip_dir.joinpath( f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}" ) - filename = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP) - zip_filename = zip_dir.joinpath(filename) - with wip_file_context(wip_zip_filename, zip_filename) as wip_path: + with cls._wip_file_context(wip_zip_filename) as wip_path: with wip_path.open("wb") as wip_fp: - actual_md5sum = cls.zip_sequence_deterministically(sequence, wip_fp) - assert actual_md5sum == upload_md5sum, "md5sum mismatch" + cls.zip_sequence_fp(sequence, wip_fp) + + @classmethod + @contextmanager + def _wip_file_context(cls, wip_path: Path): + try: + os.remove(wip_path) + except FileNotFoundError: + pass + try: + yield wip_path + + with wip_path.open("rb") as fp: + upload_md5sum = utils.md5sum_fp(fp).hexdigest() + + done_path = wip_path.parent.joinpath( + _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP) + ) + + try: + os.remove(done_path) + except FileNotFoundError: + pass + wip_path.rename(done_path) + finally: + try: + os.remove(wip_path) + except FileNotFoundError: + pass @classmethod - def zip_sequence_deterministically( + def zip_sequence_fp( cls, sequence: T.Sequence[types.ImageMetadata], zip_fp: T.IO[bytes], ) -> str: """ - Write a sequence of ImageMetadata into the zipfile handle. It should guarantee - that the same sequence always produces the same zipfile, because the - sequence md5sum will be used to upload the zipfile or resume the upload. - + Write a sequence of ImageMetadata into the zipfile handle. The sequence has to be one sequence and sorted. """ @@ -180,21 +203,21 @@ def zip_sequence_deterministically( f"Only one sequence is allowed but got {len(sequence_groups)}: {list(sequence_groups.keys())}" ) - upload_md5sum = types.update_sequence_md5sum(sequence) + sequence_md5sum = types.update_sequence_md5sum(sequence) with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf: for idx, metadata in enumerate(sequence): - # Use {idx}.jpg (suffix does not matter) as the archive name to ensure the - # resulting zipfile is deterministic. This determinism is based on the upload_md5sum, - # which is derived from a list of image md5sums + # Arcname does not matter, but it should be unique cls._write_imagebytes_in_zip(zipf, metadata, arcname=f"{idx}.jpg") assert len(sequence) == len(set(zipf.namelist())) - zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8") + zipf.comment = json.dumps({"sequence_md5sum": sequence_md5sum}).encode( + "utf-8" + ) - return upload_md5sum + return sequence_md5sum @classmethod - def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str: + def extract_sequence_md5sum(cls, zip_fp: T.IO[bytes]) -> str: with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph: comment = ziph.comment @@ -209,12 +232,12 @@ def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str: except json.JSONDecodeError as ex: raise InvalidMapillaryZipFileError(str(ex)) from ex - upload_md5sum = zip_metadata.get("upload_md5sum") + sequence_md5sum = zip_metadata.get("sequence_md5sum") - if not upload_md5sum and not isinstance(upload_md5sum, str): - raise InvalidMapillaryZipFileError("No upload_md5sum found") + if not sequence_md5sum and not isinstance(sequence_md5sum, str): + raise InvalidMapillaryZipFileError("No sequence_md5sum found") - return upload_md5sum + return sequence_md5sum @classmethod def _write_imagebytes_in_zip( @@ -256,21 +279,27 @@ def prepare_zipfile_and_upload( raise InvalidMapillaryZipFileError("Zipfile has no files") with zip_path.open("rb") as zip_fp: - upload_md5sum = cls.extract_upload_md5sum(zip_fp) + sequence_md5sum = cls.extract_sequence_md5sum(zip_fp) + + with zip_path.open("rb") as zip_fp: + upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest() sequence_progress: SequenceProgress = { "sequence_image_count": len(namelist), "file_type": types.FileType.ZIP.value, - "md5sum": upload_md5sum, + "sequence_md5sum": sequence_md5sum, + "upload_md5sum": upload_md5sum, } - session_key = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP) + upload_session_key = _session_key( + upload_md5sum, upload_api_v4.ClusterFileType.ZIP + ) with zip_path.open("rb") as zip_fp: return uploader.upload_stream( zip_fp, upload_api_v4.ClusterFileType.ZIP, - session_key, + upload_session_key, # Send the copy of the input progress to each upload session, to avoid modifying the original one progress=T.cast(T.Dict[str, T.Any], {**progress, **sequence_progress}), ) @@ -304,14 +333,17 @@ def prepare_images_and_upload( with tempfile.NamedTemporaryFile() as fp: try: - upload_md5sum = cls.zip_sequence_deterministically(sequence, fp) + sequence_md5sum = cls.zip_sequence_fp(sequence, fp) except Exception as ex: yield sequence_uuid, UploadResult(error=ex) continue - sequence_progress["md5sum"] = upload_md5sum + sequence_progress["sequence_md5sum"] = sequence_md5sum + + fp.seek(0, io.SEEK_SET) + upload_md5sum = utils.md5sum_fp(fp).hexdigest() - session_key = _session_key( + upload_session_key = _session_key( upload_md5sum, upload_api_v4.ClusterFileType.ZIP ) @@ -319,7 +351,7 @@ def prepare_images_and_upload( cluster_id = uploader.upload_stream( fp, upload_api_v4.ClusterFileType.ZIP, - session_key, + upload_session_key, progress=T.cast( T.Dict[str, T.Any], {**progress, **sequence_progress} ), @@ -514,27 +546,6 @@ def _validate_metadatas(metadatas: T.Sequence[types.ImageMetadata]): raise FileNotFoundError(f"No such file {metadata.filename}") -@contextmanager -def wip_file_context(wip_path: Path, done_path: Path): - assert wip_path != done_path, "should not be the same file" - try: - os.remove(wip_path) - except FileNotFoundError: - pass - try: - yield wip_path - try: - os.remove(done_path) - except FileNotFoundError: - pass - wip_path.rename(done_path) - finally: - try: - os.remove(wip_path) - except FileNotFoundError: - pass - - def _is_immediate_retry(ex: Exception): if ( isinstance(ex, requests.HTTPError) diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 918e97fa3..71ccb8117 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -13,6 +13,7 @@ import py.path import pytest +from mapillary_tools import utils EXECUTABLE = os.getenv( "MAPILLARY_TOOLS__TESTS_EXECUTABLE", "python3 -m mapillary_tools.commands" @@ -174,11 +175,11 @@ def validate_and_extract_image(image_path: str): return desc -def validate_and_extract_zip(zip_path: str) -> T.List[T.Dict]: +def validate_and_extract_zip(zip_path: Path) -> T.List[T.Dict]: descs = [] with zipfile.ZipFile(zip_path) as zipf: - upload_md5sum = json.loads(zipf.comment)["upload_md5sum"] + _sequence_md5sum = json.loads(zipf.comment)["sequence_md5sum"] with tempfile.TemporaryDirectory() as tempdir: zipf.extractall(path=tempdir) for name in os.listdir(tempdir): @@ -186,8 +187,13 @@ def validate_and_extract_zip(zip_path: str) -> T.List[T.Dict]: desc = validate_and_extract_image(filename) descs.append(desc) - basename = os.path.basename(zip_path) - assert f"mly_tools_{upload_md5sum}.zip" == basename, (basename, upload_md5sum) + with zip_path.open("rb") as fp: + upload_md5sum = utils.md5sum_fp(fp).hexdigest() + + assert f"mly_tools_{upload_md5sum}.zip" == zip_path.name, ( + zip_path.name, + upload_md5sum, + ) return descs diff --git a/tests/integration/test_process.py b/tests/integration/test_process.py index 4fceeed9e..82b042e2d 100644 --- a/tests/integration/test_process.py +++ b/tests/integration/test_process.py @@ -303,7 +303,7 @@ def test_zip(tmpdir: py.path.local, setup_data: py.path.local): assert x.returncode == 0, x.stderr assert 0 < len(zip_dir.listdir()) for file in zip_dir.listdir(): - validate_and_extract_zip(str(file)) + validate_and_extract_zip(Path(file)) @pytest.mark.usefixtures("setup_config") diff --git a/tests/integration/test_process_and_upload.py b/tests/integration/test_process_and_upload.py index dd4f4c03e..95e4d5b30 100644 --- a/tests/integration/test_process_and_upload.py +++ b/tests/integration/test_process_and_upload.py @@ -1,5 +1,6 @@ import datetime import os +from pathlib import Path import subprocess import py.path @@ -135,7 +136,7 @@ def _validate_uploads(upload_dir: py.path.local, expected): if str(file).endswith(".mp4"): descs.extend(validate_and_extract_camm(str(file))) elif str(file).endswith(".zip"): - descs.extend(validate_and_extract_zip(str(file))) + descs.extend(validate_and_extract_zip(Path(file))) else: raise Exception(f"invalid file {file}") diff --git a/tests/integration/test_upload.py b/tests/integration/test_upload.py index 1d0131a01..97cb40f74 100644 --- a/tests/integration/test_upload.py +++ b/tests/integration/test_upload.py @@ -2,6 +2,7 @@ import json import os import subprocess +from pathlib import Path import py.path import pytest @@ -46,7 +47,7 @@ def test_upload_image_dir( shell=True, ) for file in setup_upload.listdir(): - validate_and_extract_zip(str(file)) + validate_and_extract_zip(Path(file)) assert x.returncode == 0, x.stderr @@ -71,7 +72,7 @@ def test_upload_image_dir_twice( ) assert x.returncode == 0, x.stderr for file in setup_upload.listdir(): - validate_and_extract_zip(str(file)) + validate_and_extract_zip(Path(file)) md5sum_map[os.path.basename(file)] = file_md5sum(file) # expect the second upload to not produce new uploads @@ -81,7 +82,7 @@ def test_upload_image_dir_twice( ) assert x.returncode == 0, x.stderr for file in setup_upload.listdir(): - validate_and_extract_zip(str(file)) + validate_and_extract_zip(Path(file)) new_md5sum = file_md5sum(file) assert md5sum_map[os.path.basename(file)] == new_md5sum assert len(md5sum_map) == len(setup_upload.listdir()) diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index 84a0ec91c..020719080 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -8,7 +8,7 @@ import pytest -from mapillary_tools import types, upload_api_v4, uploader +from mapillary_tools import types, upload_api_v4, uploader, utils from ..integration.fixtures import setup_upload, validate_and_extract_zip @@ -32,12 +32,15 @@ def _validate_zip_dir(zip_dir: py.path.local): with zipfile.ZipFile(zip_path) as ziph: filename = ziph.testzip() assert filename is None, f"Corrupted zip {zip_path}: {filename}" + sequence_md5sum = json.loads(ziph.comment).get("sequence_md5sum") + + with open(zip_path, "rb") as fp: + upload_md5sum = utils.md5sum_fp(fp).hexdigest() - upload_md5sum = json.loads(ziph.comment).get("upload_md5sum") assert str(os.path.basename(zip_path)) == f"mly_tools_{upload_md5sum}.zip", ( zip_path ) - descs.extend(validate_and_extract_zip(str(zip_path))) + descs.extend(validate_and_extract_zip(Path(zip_path))) return descs @@ -238,8 +241,8 @@ def _upload_start(payload): assert "test_started" not in payload payload["test_started"] = True - assert payload["md5sum"] not in stats - stats[payload["md5sum"]] = {**payload} + assert payload["upload_md5sum"] not in stats + stats[payload["upload_md5sum"]] = {**payload} @emitter.on("upload_fetch_offset") def _fetch_offset(payload): @@ -247,7 +250,7 @@ def _fetch_offset(payload): assert payload["test_started"] payload["test_fetch_offset"] = True - assert payload["md5sum"] in stats + assert payload["upload_md5sum"] in stats @emitter.on("upload_end") def _upload_end(payload): @@ -255,7 +258,7 @@ def _upload_end(payload): assert payload["test_started"] assert payload["test_fetch_offset"] - assert payload["md5sum"] in stats + assert payload["upload_md5sum"] in stats test_upload_zip(setup_unittest_data, setup_upload, emitter=emitter) From 4857fed57ad877c3a3959955d47e87822794964c Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Sun, 8 Jun 2025 16:20:57 -0700 Subject: [PATCH 2/5] throw if etree root is None --- mapillary_tools/exiftool_read_video.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/mapillary_tools/exiftool_read_video.py b/mapillary_tools/exiftool_read_video.py index 6b2d1261f..9973097de 100644 --- a/mapillary_tools/exiftool_read_video.py +++ b/mapillary_tools/exiftool_read_video.py @@ -310,7 +310,10 @@ def __init__( etree: ET.ElementTree, ) -> None: self.etree = etree - self._texts_by_tag = _index_text_by_tag(self.etree.getroot()) + root = self.etree.getroot() + if root is None: + raise ValueError("ElementTree root is None") + self._texts_by_tag = _index_text_by_tag(root) self._all_tags = set(self._texts_by_tag.keys()) def extract_gps_track(self) -> list[geo.Point]: @@ -371,6 +374,10 @@ def extract_model(self) -> str | None: return model def _extract_gps_track_from_track(self) -> list[GPSPoint]: + root = self.etree.getroot() + if root is None: + raise ValueError("ElementTree root is None") + for track_id in range(1, MAX_TRACK_ID + 1): track_ns = f"Track{track_id}" if self._all_tags_exists( @@ -382,7 +389,7 @@ def _extract_gps_track_from_track(self) -> list[GPSPoint]: } ): sample_iterator = _aggregate_samples( - self.etree.getroot(), + root, f"{track_ns}:SampleTime", f"{track_ns}:SampleDuration", ) From 30036f3b3d6804306ec1894fb9a5821704e27cc9 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Sun, 8 Jun 2025 16:21:25 -0700 Subject: [PATCH 3/5] sort --- tests/integration/test_process_and_upload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_process_and_upload.py b/tests/integration/test_process_and_upload.py index 95e4d5b30..a75fe502d 100644 --- a/tests/integration/test_process_and_upload.py +++ b/tests/integration/test_process_and_upload.py @@ -1,7 +1,7 @@ import datetime import os -from pathlib import Path import subprocess +from pathlib import Path import py.path import pytest From f28326fd48ecf5415d0084fa665d6739b58b2feb Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Sun, 8 Jun 2025 16:38:11 -0700 Subject: [PATCH 4/5] update params --- mapillary_tools/upload.py | 4 ++-- mapillary_tools/uploader.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 7482caf44..4701e51b6 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -467,8 +467,8 @@ def _gen_upload_everything( utils.find_images(import_paths, skip_subfolders=skip_subfolders), ) for image_result in uploader.ZipImageSequence.prepare_images_and_upload( - image_metadatas, mly_uploader, + image_metadatas, ): yield image_result @@ -722,7 +722,7 @@ def _gen_upload_zipfiles( } try: cluster_id = uploader.ZipImageSequence.prepare_zipfile_and_upload( - zip_path, mly_uploader, progress=T.cast(T.Dict[str, T.Any], progress) + mly_uploader, zip_path, progress=T.cast(T.Dict[str, T.Any], progress) ) except Exception as ex: yield zip_path, uploader.UploadResult(error=ex) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 7735d9579..1a2434720 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -267,8 +267,8 @@ def _write_imagebytes_in_zip( @classmethod def prepare_zipfile_and_upload( cls, - zip_path: Path, uploader: Uploader, + zip_path: Path, progress: dict[str, T.Any] | None = None, ) -> str: if progress is None: @@ -308,8 +308,8 @@ def prepare_zipfile_and_upload( @classmethod def prepare_images_and_upload( cls, - image_metadatas: T.Sequence[types.ImageMetadata], uploader: Uploader, + image_metadatas: T.Sequence[types.ImageMetadata], progress: dict[str, T.Any] | None = None, ) -> T.Generator[tuple[str, UploadResult], None, None]: if progress is None: From 9d1356cd32e96aa7747323f6cfcbc58a0f02022a Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Sun, 8 Jun 2025 16:48:00 -0700 Subject: [PATCH 5/5] rename --- mapillary_tools/upload.py | 4 ++-- mapillary_tools/uploader.py | 4 ++-- tests/unit/test_uploader.py | 12 +++++------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 4701e51b6..3b1f8bef0 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -466,7 +466,7 @@ def _gen_upload_everything( (m for m in metadatas if isinstance(m, types.ImageMetadata)), utils.find_images(import_paths, skip_subfolders=skip_subfolders), ) - for image_result in uploader.ZipImageSequence.prepare_images_and_upload( + for image_result in uploader.ZipImageSequence.zip_images_and_upload( mly_uploader, image_metadatas, ): @@ -721,7 +721,7 @@ def _gen_upload_zipfiles( "import_path": str(zip_path), } try: - cluster_id = uploader.ZipImageSequence.prepare_zipfile_and_upload( + cluster_id = uploader.ZipImageSequence.upload_zipfile( mly_uploader, zip_path, progress=T.cast(T.Dict[str, T.Any], progress) ) except Exception as ex: diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 1a2434720..13a631b0a 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -265,7 +265,7 @@ def _write_imagebytes_in_zip( zipf.writestr(zipinfo, image_bytes) @classmethod - def prepare_zipfile_and_upload( + def upload_zipfile( cls, uploader: Uploader, zip_path: Path, @@ -306,7 +306,7 @@ def prepare_zipfile_and_upload( ) @classmethod - def prepare_images_and_upload( + def zip_images_and_upload( cls, uploader: Uploader, image_metadatas: T.Sequence[types.ImageMetadata], diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index 020719080..4e633019d 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -68,9 +68,9 @@ def test_upload_images(setup_unittest_data: py.path.local, setup_upload: py.path }, ] results = list( - uploader.ZipImageSequence.prepare_images_and_upload( - [types.from_desc(T.cast(T.Any, desc)) for desc in descs], + uploader.ZipImageSequence.zip_images_and_upload( mly_uploader, + [types.from_desc(T.cast(T.Any, desc)) for desc in descs], ) ) assert len(results) == 1 @@ -122,9 +122,9 @@ def test_upload_images_multiple_sequences( dry_run=True, ) results = list( - uploader.ZipImageSequence.prepare_images_and_upload( - [types.from_desc(T.cast(T.Any, desc)) for desc in descs], + uploader.ZipImageSequence.zip_images_and_upload( mly_uploader, + [types.from_desc(T.cast(T.Any, desc)) for desc in descs], ) ) assert len(results) == 2 @@ -189,9 +189,7 @@ def test_upload_zip( emitter=emitter, ) for zip_path in zip_dir.listdir(): - cluster = uploader.ZipImageSequence.prepare_zipfile_and_upload( - Path(zip_path), mly_uploader - ) + cluster = uploader.ZipImageSequence.upload_zipfile(mly_uploader, Path(zip_path)) assert cluster == "0" descs = _validate_zip_dir(setup_upload) assert 3 == len(descs)