diff --git a/mapillary_tools/camm/camm_builder.py b/mapillary_tools/camm/camm_builder.py index f8191f2b9..719d84282 100644 --- a/mapillary_tools/camm/camm_builder.py +++ b/mapillary_tools/camm/camm_builder.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import io import typing as T -from .. import geo, types +from .. import geo from ..mp4 import ( construct_mp4_parser as cparser, mp4_sample_parser as sample_parser, @@ -23,25 +25,23 @@ def _build_camm_sample(measurement: camm_parser.TelemetryMeasurement) -> bytes: def _create_edit_list_from_points( - point_segments: T.Sequence[T.Sequence[geo.Point]], + tracks: T.Sequence[T.Sequence[geo.Point]], movie_timescale: int, media_timescale: int, ) -> builder.BoxDict: - entries: T.List[T.Dict] = [] + entries: list[dict] = [] - non_empty_point_segments = [points for points in point_segments if points] + non_empty_tracks = [track for track in tracks if track] - for idx, points in enumerate(non_empty_point_segments): - assert 0 <= points[0].time, ( - f"expect non-negative point time but got {points[0]}" - ) - assert points[0].time <= points[-1].time, ( - f"expect points to be sorted but got first point {points[0]} and last point {points[-1]}" + for idx, track in enumerate(non_empty_tracks): + assert 0 <= track[0].time, f"expect non-negative point time but got {track[0]}" + assert track[0].time <= track[-1].time, ( + f"expect points to be sorted but got first point {track[0]} and last point {track[-1]}" ) if idx == 0: - if 0 < points[0].time: - segment_duration = int(points[0].time * movie_timescale) + if 0 < track[0].time: + segment_duration = int(track[0].time * movie_timescale) # put an empty edit list entry to skip the initial gap entries.append( { @@ -53,8 +53,8 @@ def _create_edit_list_from_points( } ) else: - media_time = int(points[0].time * media_timescale) - segment_duration = int((points[-1].time - points[0].time) * movie_timescale) + media_time = int(track[0].time * media_timescale) + segment_duration = int((track[-1].time - track[0].time) * movie_timescale) entries.append( { "media_time": media_time, @@ -72,19 +72,6 @@ def _create_edit_list_from_points( } -def _multiplex( - points: T.Sequence[geo.Point], - measurements: T.Optional[T.List[camm_parser.TelemetryMeasurement]] = None, -) -> T.List[camm_parser.TelemetryMeasurement]: - mutiplexed: T.List[camm_parser.TelemetryMeasurement] = [ - *points, - *(measurements or []), - ] - mutiplexed.sort(key=lambda m: m.time) - - return mutiplexed - - def convert_telemetry_to_raw_samples( measurements: T.Sequence[camm_parser.TelemetryMeasurement], timescale: int, @@ -237,29 +224,44 @@ def create_camm_trak( } -def camm_sample_generator2( - video_metadata: types.VideoMetadata, - telemetry_measurements: T.Optional[T.List[camm_parser.TelemetryMeasurement]] = None, -): +def camm_sample_generator2(camm_info: camm_parser.CAMMInfo): def _f( fp: T.BinaryIO, - moov_children: T.List[builder.BoxDict], + moov_children: list[builder.BoxDict], ) -> T.Generator[io.IOBase, None, None]: movie_timescale = builder.find_movie_timescale(moov_children) - # make sure the precision of timedeltas not lower than 0.001 (1ms) + # Make sure the precision of timedeltas not lower than 0.001 (1ms) media_timescale = max(1000, movie_timescale) - # points with negative time are skipped - # TODO: interpolate first point at time == 0 - # TODO: measurements with negative times should be skipped too - points = [point for point in video_metadata.points if point.time >= 0] - - measurements = _multiplex(points, telemetry_measurements) + # Multiplex points for creating elst + track: list[geo.Point] = [ + *(camm_info.gps or []), + *(camm_info.mini_gps or []), + ] + track.sort(key=lambda p: p.time) + if track and track[0].time < 0: + track = [p for p in track if p.time >= 0] + elst = _create_edit_list_from_points([track], movie_timescale, media_timescale) + + # Multiplex telemetry measurements + measurements: list[camm_parser.TelemetryMeasurement] = [ + *(camm_info.gps or []), + *(camm_info.mini_gps or []), + *(camm_info.accl or []), + *(camm_info.gyro or []), + *(camm_info.magn or []), + ] + measurements.sort(key=lambda m: m.time) + if measurements and measurements[0].time < 0: + measurements = [m for m in measurements if m.time >= 0] + + # Serialize the telemetry measurements into MP4 samples camm_samples = list( convert_telemetry_to_raw_samples(measurements, media_timescale) ) + camm_trak = create_camm_trak(camm_samples, media_timescale) - elst = _create_edit_list_from_points([points], movie_timescale, media_timescale) + if T.cast(T.Dict, elst["data"])["entries"]: T.cast(T.List[builder.BoxDict], camm_trak["data"]).append( { @@ -269,19 +271,19 @@ def _f( ) moov_children.append(camm_trak) - udta_data: T.List[builder.BoxDict] = [] - if video_metadata.make: + udta_data: list[builder.BoxDict] = [] + if camm_info.make: udta_data.append( { "type": b"@mak", - "data": video_metadata.make.encode("utf-8"), + "data": camm_info.make.encode("utf-8"), } ) - if video_metadata.model: + if camm_info.model: udta_data.append( { "type": b"@mod", - "data": video_metadata.model.encode("utf-8"), + "data": camm_info.model.encode("utf-8"), } ) if udta_data: diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index a58cce8ee..aa63a9cdb 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -70,3 +70,20 @@ def _yes_or_no(val: str) -> bool: _AUTH_VERIFICATION_DISABLED: bool = _yes_or_no( os.getenv(_ENV_PREFIX + "_AUTH_VERIFICATION_DISABLED", "NO") ) + +MAPILLARY_DISABLE_API_LOGGING: bool = _yes_or_no( + os.getenv("MAPILLARY_DISABLE_API_LOGGING", "NO") +) +MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN: bool = _yes_or_no( + os.getenv("MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN", "NO") +) +MAPILLARY__EXPERIMENTAL_ENABLE_IMU: bool = _yes_or_no( + os.getenv("MAPILLARY__EXPERIMENTAL_ENABLE_IMU", "NO") +) +MAPILLARY_UPLOAD_HISTORY_PATH: str = os.getenv( + "MAPILLARY_UPLOAD_HISTORY_PATH", + os.path.join( + USER_DATA_DIR, + "upload_history", + ), +) diff --git a/mapillary_tools/geotag/geotag_videos_from_video.py b/mapillary_tools/geotag/geotag_videos_from_video.py index 57c8229ac..ec0e6d7e2 100644 --- a/mapillary_tools/geotag/geotag_videos_from_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_video.py @@ -23,13 +23,23 @@ def extract(self) -> types.VideoMetadataOrError: gps_points = gopro_info.gps assert gps_points is not None, "must have GPS data extracted" if not gps_points: - raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") + # Instead of raising an exception, return error metadata to tell the file type + ex: exceptions.MapillaryDescriptionError = ( + exceptions.MapillaryGPXEmptyError("Empty GPS data found") + ) + return types.describe_error_metadata( + ex, self.video_path, filetype=FileType.GOPRO + ) gps_points = T.cast( T.List[telemetry.GPSPoint], gpmf_gps_filter.remove_noisy_points(gps_points) ) if not gps_points: - raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") + # Instead of raising an exception, return error metadata to tell the file type + ex = exceptions.MapillaryGPSNoiseError("GPS is too noisy") + return types.describe_error_metadata( + ex, self.video_path, filetype=FileType.GOPRO + ) video_metadata = types.VideoMetadata( filename=self.video_path, @@ -54,7 +64,13 @@ def extract(self) -> types.VideoMetadataOrError: ) if not camm_info.gps and not camm_info.mini_gps: - raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") + # Instead of raising an exception, return error metadata to tell the file type + ex: exceptions.MapillaryDescriptionError = ( + exceptions.MapillaryGPXEmptyError("Empty GPS data found") + ) + return types.describe_error_metadata( + ex, self.video_path, filetype=FileType.CAMM + ) return types.VideoMetadata( filename=self.video_path, @@ -77,7 +93,13 @@ def extract(self) -> types.VideoMetadataOrError: ) if not blackvue_info.gps: - raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") + # Instead of raising an exception, return error metadata to tell the file type + ex: exceptions.MapillaryDescriptionError = ( + exceptions.MapillaryGPXEmptyError("Empty GPS data found") + ) + return types.describe_error_metadata( + ex, self.video_path, filetype=FileType.BLACKVUE + ) video_metadata = types.VideoMetadata( filename=self.video_path, diff --git a/mapillary_tools/history.py b/mapillary_tools/history.py index c27e2aaa3..49b2ab715 100644 --- a/mapillary_tools/history.py +++ b/mapillary_tools/history.py @@ -1,6 +1,5 @@ import json import logging -import os import string import typing as T from pathlib import Path @@ -10,13 +9,6 @@ JSONDict = T.Dict[str, T.Union[str, int, float, None]] LOG = logging.getLogger(__name__) -MAPILLARY_UPLOAD_HISTORY_PATH = os.getenv( - "MAPILLARY_UPLOAD_HISTORY_PATH", - os.path.join( - constants.USER_DATA_DIR, - "upload_history", - ), -) def _validate_hexdigits(md5sum: str): @@ -35,14 +27,14 @@ def history_desc_path(md5sum: str) -> Path: basename = md5sum[2:] assert basename, f"Invalid md5sum {md5sum}" return ( - Path(MAPILLARY_UPLOAD_HISTORY_PATH) + Path(constants.MAPILLARY_UPLOAD_HISTORY_PATH) .joinpath(subfolder) .joinpath(f"{basename}.json") ) def is_uploaded(md5sum: str) -> bool: - if not MAPILLARY_UPLOAD_HISTORY_PATH: + if not constants.MAPILLARY_UPLOAD_HISTORY_PATH: return False return history_desc_path(md5sum).is_file() @@ -53,7 +45,7 @@ def write_history( summary: JSONDict, metadatas: T.Optional[T.Sequence[types.Metadata]] = None, ) -> None: - if not MAPILLARY_UPLOAD_HISTORY_PATH: + if not constants.MAPILLARY_UPLOAD_HISTORY_PATH: return path = history_desc_path(md5sum) LOG.debug("Writing upload history: %s", path) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 8754efc27..2536aa026 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -16,8 +16,10 @@ api_v4, constants, exceptions, + geo, history, ipc, + telemetry, types, upload_api_v4, uploader, @@ -32,12 +34,6 @@ JSONDict = T.Dict[str, T.Union[str, int, float, None]] LOG = logging.getLogger(__name__) -MAPILLARY_DISABLE_API_LOGGING = os.getenv("MAPILLARY_DISABLE_API_LOGGING") -MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN = os.getenv( - "MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN" -) -MAPILLARY__EXPERIMENTAL_ENABLE_IMU = os.getenv("MAPILLARY__EXPERIMENTAL_ENABLE_IMU") -CAMM_CONVERTABLES = {FileType.CAMM, FileType.BLACKVUE, FileType.GOPRO} class UploadError(Exception): @@ -238,25 +234,40 @@ def _setup_ipc(emitter: uploader.EventEmitter): @emitter.on("upload_start") def upload_start(payload: uploader.Progress): type: uploader.EventName = "upload_start" - LOG.debug("Sending %s via IPC: %s", type, payload) + LOG.debug("IPC %s: %s", type.upper(), payload) ipc.send(type, payload) @emitter.on("upload_fetch_offset") def upload_fetch_offset(payload: uploader.Progress) -> None: type: uploader.EventName = "upload_fetch_offset" - LOG.debug("Sending %s via IPC: %s", type, payload) + LOG.debug("IPC %s: %s", type.upper(), payload) ipc.send(type, payload) @emitter.on("upload_progress") def upload_progress(payload: uploader.Progress): type: uploader.EventName = "upload_progress" - LOG.debug("Sending %s via IPC: %s", type, payload) + + if LOG.getEffectiveLevel() <= logging.DEBUG: + # In debug mode, we want to see the progress every 10 seconds + # instead of every chunk (which is too verbose) + INTERVAL_SECONDS = 10 + now = time.time() + last_upload_progress_debug_at: float | None = T.cast(T.Dict, payload).get( + "_last_upload_progress_debug_at" + ) + if ( + last_upload_progress_debug_at is None + or last_upload_progress_debug_at + INTERVAL_SECONDS < now + ): + LOG.debug("IPC %s: %s", type.upper(), payload) + T.cast(T.Dict, payload)["_last_upload_progress_debug_at"] = now + ipc.send(type, payload) @emitter.on("upload_end") def upload_end(payload: uploader.Progress) -> None: type: uploader.EventName = "upload_end" - LOG.debug("Sending %s via IPC: %s", type, payload) + LOG.debug("IPC %s: %s", type.upper(), payload) ipc.send(type, payload) @@ -368,7 +379,7 @@ def _show_upload_summary(stats: T.Sequence[_APIStats]): def _api_logging_finished(summary: dict): - if MAPILLARY_DISABLE_API_LOGGING: + if constants.MAPILLARY_DISABLE_API_LOGGING: return action: api_v4.ActionType = "upload_finished_upload" @@ -386,7 +397,7 @@ def _api_logging_finished(summary: dict): def _api_logging_failed(payload: dict, exc: Exception): - if MAPILLARY_DISABLE_API_LOGGING: + if constants.MAPILLARY_DISABLE_API_LOGGING: return payload_with_reason = {**payload, "reason": exc.__class__.__name__} @@ -448,13 +459,13 @@ def _find_metadata_with_filename_existed_in( def _upload_everything( mly_uploader: uploader.Uploader, - import_paths: T.Sequence[Path], metadatas: T.Sequence[types.Metadata], + import_paths: T.Sequence[Path], skip_subfolders: bool, ): - # upload images + # Upload images image_paths = utils.find_images(import_paths, skip_subfolders=skip_subfolders) - # find descs that match the image paths from the import paths + # Find descs that match the image paths from the import paths image_metadatas = [ metadata for metadata in (metadatas or []) @@ -475,7 +486,7 @@ def _upload_everything( if clusters: LOG.debug("Uploaded to cluster: %s", clusters) - # upload videos + # Upload videos video_paths = utils.find_videos(import_paths, skip_subfolders=skip_subfolders) video_metadatas = [ metadata @@ -485,30 +496,28 @@ def _upload_everything( specified_video_metadatas = _find_metadata_with_filename_existed_in( video_metadatas, video_paths ) - for idx, video_metadata in enumerate(specified_video_metadatas): + _upload_videos(mly_uploader, specified_video_metadatas) + + # Upload zip files + zip_paths = utils.find_zipfiles(import_paths, skip_subfolders=skip_subfolders) + _upload_zipfiles(mly_uploader, zip_paths) + + +def _upload_videos( + mly_uploader: uploader.Uploader, video_metadatas: T.Sequence[types.VideoMetadata] +): + for idx, video_metadata in enumerate(video_metadatas): video_metadata.update_md5sum() assert isinstance(video_metadata.md5sum, str), "md5sum should be updated" - # extract telemetry measurements from GoPro videos - telemetry_measurements: list[camm_parser.TelemetryMeasurement] = [] - if MAPILLARY__EXPERIMENTAL_ENABLE_IMU == "YES": - if video_metadata.filetype is FileType.GOPRO: - with video_metadata.filename.open("rb") as fp: - gopro_info = gpmf_parser.extract_gopro_info(fp, telemetry_only=True) - if gopro_info is not None: - telemetry_measurements.extend(gopro_info.accl or []) - telemetry_measurements.extend(gopro_info.gyro or []) - telemetry_measurements.extend(gopro_info.magn or []) - telemetry_measurements.sort(key=lambda m: m.time) - - generator = camm_builder.camm_sample_generator2( - video_metadata, telemetry_measurements=telemetry_measurements - ) + camm_info = _prepare_camm_info(video_metadata) + + generator = camm_builder.camm_sample_generator2(camm_info) with video_metadata.filename.open("rb") as src_fp: camm_fp = simple_mp4_builder.transform_mp4(src_fp, generator) progress: uploader.SequenceProgress = { - "total_sequence_count": len(specified_video_metadatas), + "total_sequence_count": len(video_metadatas), "sequence_idx": idx, "file_type": video_metadata.filetype.value, "import_path": str(video_metadata.filename), @@ -530,29 +539,53 @@ def _upload_everything( raise UploadError(ex) from ex LOG.debug("Uploaded to cluster: %s", cluster_id) - # upload zip files - zip_paths = utils.find_zipfiles(import_paths, skip_subfolders=skip_subfolders) - _upload_zipfiles(mly_uploader, zip_paths) +def _prepare_camm_info(video_metadata: types.VideoMetadata) -> camm_parser.CAMMInfo: + camm_info = camm_parser.CAMMInfo( + make=video_metadata.make or "", model=video_metadata.model or "" + ) + + for point in video_metadata.points: + if isinstance(point, telemetry.CAMMGPSPoint): + if camm_info.gps is None: + camm_info.gps = [] + camm_info.gps.append(point) + + elif isinstance(point, telemetry.GPSPoint): + # There is no proper CAMM entry for GoPro GPS + if camm_info.mini_gps is None: + camm_info.mini_gps = [] + camm_info.mini_gps.append(point) + + elif isinstance(point, geo.Point): + if camm_info.mini_gps is None: + camm_info.mini_gps = [] + camm_info.mini_gps.append(point) + else: + raise ValueError(f"Unknown point type: {point}") + + if constants.MAPILLARY__EXPERIMENTAL_ENABLE_IMU: + if video_metadata.filetype is FileType.GOPRO: + with video_metadata.filename.open("rb") as fp: + gopro_info = gpmf_parser.extract_gopro_info(fp, telemetry_only=True) + if gopro_info is not None: + camm_info.accl = gopro_info.accl or [] + camm_info.gyro = gopro_info.gyro or [] + camm_info.magn = gopro_info.magn or [] + + return camm_info + + +def _normalize_import_paths(import_path: Path | T.Sequence[Path]) -> list[Path]: + import_paths: list[Path] -def upload( - import_path: Path | T.Sequence[Path], - user_items: types.UserItem, - desc_path: str | None = None, - _metadatas_from_process: T.Sequence[types.MetadataOrError] | None = None, - dry_run=False, - skip_subfolders=False, -) -> None: - import_paths: T.Sequence[Path] if isinstance(import_path, Path): import_paths = [import_path] else: assert isinstance(import_path, list) import_paths = import_path - import_paths = list(utils.deduplicate_paths(import_paths)) - if not import_paths: - return + import_paths = list(utils.deduplicate_paths(import_paths)) # Check and fail early for path in import_paths: @@ -561,14 +594,30 @@ def upload( f"Import file or directory not found: {path}" ) + return import_paths + + +def upload( + import_path: Path | T.Sequence[Path], + user_items: types.UserItem, + desc_path: str | None = None, + _metadatas_from_process: T.Sequence[types.MetadataOrError] | None = None, + dry_run=False, + skip_subfolders=False, +) -> None: + import_paths = _normalize_import_paths(import_path) + metadatas = _load_descs(_metadatas_from_process, desc_path, import_paths) # Setup the emitter -- the order matters here emitter = uploader.EventEmitter() - enable_history = history.MAPILLARY_UPLOAD_HISTORY_PATH and ( - not dry_run or MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN == "YES" + # When dry_run mode is on, we disable history by default. + # But we need dry_run for tests, so we added MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN + # and when it is on, we enable history regardless of dry_run + enable_history = constants.MAPILLARY_UPLOAD_HISTORY_PATH and ( + not dry_run or constants.MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN ) # Put it first one to cancel early @@ -603,7 +652,7 @@ def upload( ) try: - _upload_everything(mly_uploader, import_paths, metadatas, skip_subfolders) + _upload_everything(mly_uploader, metadatas, import_paths, skip_subfolders) except UploadError as ex: inner_ex = ex.inner_ex @@ -647,7 +696,7 @@ def _upload_zipfiles( } try: cluster_id = uploader.ZipImageSequence.prepare_zipfile_and_upload( - zip_path, mly_uploader, progress=progress + zip_path, mly_uploader, progress=T.cast(T.Dict[str, T.Any], progress) ) except Exception as ex: raise UploadError(ex) from ex diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index b0a7cc8c9..811bae29a 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -32,7 +32,6 @@ class UploadService: user_access_token: str session_key: str cluster_filetype: ClusterFileType - chunk_size: int MIME_BY_CLUSTER_TYPE: dict[ClusterFileType, str] = { ClusterFileType.ZIP: "application/zip", @@ -96,7 +95,7 @@ def upload_byte_stream( self, stream: T.IO[bytes], offset: int | None = None, - chunk_size: int = 2 * 1024 * 1024, + chunk_size: int = 2 * 1024 * 1024, # 2MB ) -> str: if offset is None: offset = self.fetch_offset() diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 98052e750..a94607996 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -209,10 +209,10 @@ def prepare_zipfile_and_upload( cls, zip_path: Path, uploader: Uploader, - progress: SequenceProgress | None = None, + progress: dict[str, T.Any] | None = None, ) -> str | None: if progress is None: - progress = T.cast(SequenceProgress, {}) + progress = {} with zipfile.ZipFile(zip_path) as ziph: namelist = ziph.namelist() @@ -227,8 +227,7 @@ def prepare_zipfile_and_upload( with zip_path.open("rb") as zip_fp: upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest() - final_progress: SequenceProgress = { - **progress, + sequence_progress: SequenceProgress = { "sequence_image_count": len(namelist), "file_type": types.FileType.ZIP.value, "md5sum": upload_md5sum, @@ -241,7 +240,8 @@ def prepare_zipfile_and_upload( zip_fp, upload_api_v4.ClusterFileType.ZIP, session_key, - progress=T.cast(T.Dict[str, T.Any], final_progress), + # Send the copy of the input progress to each upload session, to avoid modifying the original one + progress=T.cast(T.Dict[str, T.Any], {**progress, **sequence_progress}), ) @classmethod @@ -249,17 +249,16 @@ def prepare_images_and_upload( cls, image_metadatas: T.Sequence[types.ImageMetadata], uploader: Uploader, - progress: SequenceProgress | None = None, + progress: dict[str, T.Any] | None = None, ) -> dict[str, str]: if progress is None: - progress = T.cast(SequenceProgress, {}) + progress = {} _validate_metadatas(image_metadatas) sequences = types.group_and_sort_images(image_metadatas) ret: dict[str, str] = {} for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()): - final_progress: SequenceProgress = { - **progress, + sequence_progress: SequenceProgress = { "sequence_idx": sequence_idx, "total_sequence_count": len(sequences), "sequence_image_count": len(sequence), @@ -270,7 +269,7 @@ def prepare_images_and_upload( with tempfile.NamedTemporaryFile() as fp: upload_md5sum = cls.zip_sequence_fp(sequence, fp) - final_progress["md5sum"] = upload_md5sum + sequence_progress["md5sum"] = upload_md5sum session_key = _session_key( upload_md5sum, upload_api_v4.ClusterFileType.ZIP @@ -280,8 +279,12 @@ def prepare_images_and_upload( fp, upload_api_v4.ClusterFileType.ZIP, session_key, - progress=T.cast(T.Dict[str, T.Any], final_progress), + # Send the copy of the input progress to each upload session, to avoid modifying the original one + progress=T.cast( + T.Dict[str, T.Any], {**progress, **sequence_progress} + ), ) + if cluster_id is not None: ret[sequence_uuid] = cluster_id return ret @@ -419,6 +422,9 @@ def _chunk_with_progress_emitted( progress["offset"] += len(chunk) progress["chunk_size"] = len(chunk) + # Whenever a chunk is uploaded, reset retries + progress["retries"] = 0 + if self.emitter: self.emitter.emit("upload_progress", progress) diff --git a/tests/unit/test_camm_parser.py b/tests/unit/test_camm_parser.py index 1fdc6e6fa..361a173ec 100644 --- a/tests/unit/test_camm_parser.py +++ b/tests/unit/test_camm_parser.py @@ -3,7 +3,7 @@ import typing as T from pathlib import Path -from mapillary_tools import geo, telemetry, types +from mapillary_tools import geo, telemetry, types, upload from mapillary_tools.camm import camm_builder, camm_parser from mapillary_tools.mp4 import construct_mp4_parser as cparser, simple_mp4_builder @@ -38,6 +38,7 @@ def test_filter_points_by_edit_list(): ) +# TODO: use CAMMInfo as input def encode_decode_empty_camm_mp4(metadata: types.VideoMetadata) -> types.VideoMetadata: movie_timescale = 1_000_000 @@ -56,8 +57,9 @@ def encode_decode_empty_camm_mp4(metadata: types.VideoMetadata) -> types.VideoMe {"type": b"moov", "data": [mvhd]}, ] src = cparser.MP4WithoutSTBLBuilderConstruct.build_boxlist(empty_mp4) + input_camm_info = upload._prepare_camm_info(metadata) target_fp = simple_mp4_builder.transform_mp4( - io.BytesIO(src), camm_builder.camm_sample_generator2(metadata) + io.BytesIO(src), camm_builder.camm_sample_generator2(input_camm_info) ) # extract points