diff --git a/mapillary_tools/api_v4.py b/mapillary_tools/api_v4.py index 6e5088a7c..9feee7d55 100644 --- a/mapillary_tools/api_v4.py +++ b/mapillary_tools/api_v4.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +import enum import logging import os import ssl @@ -18,6 +21,12 @@ USE_SYSTEM_CERTS: bool = False +class ClusterFileType(enum.Enum): + ZIP = "zip" + BLACKVUE = "mly_blackvue_video" + CAMM = "mly_camm_video" + + class HTTPSystemCertsAdapter(HTTPAdapter): """ This adapter uses the system's certificate store instead of the certifi module. @@ -93,9 +102,9 @@ def _sanitize(headers: T.Mapping[T.Any, T.Any]) -> T.Mapping[T.Any, T.Any]: def _log_debug_request( method: str, url: str, - json: T.Optional[T.Dict] = None, - params: T.Optional[T.Dict] = None, - headers: T.Optional[T.Dict] = None, + json: dict | None = None, + params: dict | None = None, + headers: dict | None = None, timeout: T.Any = None, ): if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: @@ -150,8 +159,8 @@ def readable_http_error(ex: requests.HTTPError) -> str: def request_post( url: str, - data: T.Optional[T.Any] = None, - json: T.Optional[dict] = None, + data: T.Any | None = None, + json: dict | None = None, **kwargs, ) -> requests.Response: global USE_SYSTEM_CERTS @@ -190,7 +199,7 @@ def request_post( def request_get( url: str, - params: T.Optional[dict] = None, + params: dict | None = None, **kwargs, ) -> requests.Response: global USE_SYSTEM_CERTS @@ -293,7 +302,7 @@ def fetch_organization( def fetch_user_or_me( user_access_token: str, - user_id: T.Optional[T.Union[int, str]] = None, + user_id: int | str | None = None, ) -> requests.Response: if user_id is None: url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/me" @@ -334,3 +343,30 @@ def log_event(action_type: ActionType, properties: T.Dict) -> requests.Response: ) resp.raise_for_status() return resp + + +def finish_upload( + user_access_token: str, + file_handle: str, + cluster_filetype: ClusterFileType, + organization_id: int | str | None = None, +) -> requests.Response: + data: dict[str, str | int] = { + "file_handle": file_handle, + "file_type": cluster_filetype.value, + } + if organization_id is not None: + data["organization_id"] = organization_id + + resp = request_post( + f"{MAPILLARY_GRAPH_API_ENDPOINT}/finish_upload", + headers={ + "Authorization": f"OAuth {user_access_token}", + }, + json=data, + timeout=REQUESTS_TIMEOUT, + ) + + resp.raise_for_status() + + return resp diff --git a/mapillary_tools/authenticate.py b/mapillary_tools/authenticate.py index af956f5e2..f58a1f220 100644 --- a/mapillary_tools/authenticate.py +++ b/mapillary_tools/authenticate.py @@ -7,9 +7,10 @@ import sys import typing as T -import requests import jsonschema +import requests + from . import api_v4, config, constants, exceptions, types diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 055b64a9b..8754efc27 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import json import logging import os @@ -45,8 +47,8 @@ def __init__(self, inner_ex) -> None: def _load_validate_metadatas_from_desc_path( - desc_path: T.Optional[str], import_paths: T.Sequence[Path] -) -> T.List[types.Metadata]: + desc_path: str | None, import_paths: T.Sequence[Path] +) -> list[types.Metadata]: is_default_desc_path = False if desc_path is None: is_default_desc_path = True @@ -64,7 +66,7 @@ def _load_validate_metadatas_from_desc_path( "The description path must be specified (with --desc_path) when uploading a single file", ) - descs: T.List[types.DescriptionOrError] = [] + descs: list[types.DescriptionOrError] = [] if desc_path == "-": try: @@ -117,7 +119,7 @@ def _load_validate_metadatas_from_desc_path( def zip_images( import_path: Path, zip_dir: Path, - desc_path: T.Optional[str] = None, + desc_path: str | None = None, ): if not import_path.is_dir(): raise exceptions.MapillaryFileNotFoundError( @@ -162,7 +164,7 @@ def upload_start(payload: uploader.Progress): def _setup_write_upload_history( emitter: uploader.EventEmitter, params: JSONDict, - metadatas: T.Optional[T.List[types.Metadata]] = None, + metadatas: list[types.Metadata] | None = None, ) -> None: @emitter.on("upload_finished") def upload_finished(payload: uploader.Progress): @@ -190,7 +192,7 @@ def upload_finished(payload: uploader.Progress): def _setup_tdqm(emitter: uploader.EventEmitter) -> None: - upload_pbar: T.Optional[tqdm] = None + upload_pbar: tqdm | None = None @emitter.on("upload_fetch_offset") def upload_fetch_offset(payload: uploader.Progress) -> None: @@ -201,7 +203,7 @@ def upload_fetch_offset(payload: uploader.Progress) -> None: nth = payload["sequence_idx"] + 1 total = payload["total_sequence_count"] - import_path: T.Optional[str] = payload.get("import_path") + import_path: str | None = payload.get("import_path") filetype = payload.get("file_type", "unknown").upper() if import_path is None: _desc = f"Uploading {filetype} ({nth}/{total})" @@ -276,7 +278,7 @@ class _APIStats(uploader.Progress, total=False): def _setup_api_stats(emitter: uploader.EventEmitter): - all_stats: T.List[_APIStats] = [] + all_stats: list[_APIStats] = [] @emitter.on("upload_start") def collect_start_time(payload: _APIStats) -> None: @@ -309,7 +311,7 @@ def collect_end_time(payload: _APIStats) -> None: return all_stats -def _summarize(stats: T.Sequence[_APIStats]) -> T.Dict: +def _summarize(stats: T.Sequence[_APIStats]) -> dict: total_image_count = sum(s.get("sequence_image_count", 0) for s in stats) total_uploaded_sequence_count = len(stats) # note that stats[0]["total_sequence_count"] not always same as total_uploaded_sequence_count @@ -341,7 +343,7 @@ def _summarize(stats: T.Sequence[_APIStats]) -> T.Dict: def _show_upload_summary(stats: T.Sequence[_APIStats]): - grouped: T.Dict[str, T.List[_APIStats]] = {} + grouped: dict[str, list[_APIStats]] = {} for stat in stats: grouped.setdefault(stat.get("file_type", "unknown"), []).append(stat) @@ -365,7 +367,7 @@ def _show_upload_summary(stats: T.Sequence[_APIStats]): LOG.info("%8.1fs upload time", summary["time"]) -def _api_logging_finished(summary: T.Dict): +def _api_logging_finished(summary: dict): if MAPILLARY_DISABLE_API_LOGGING: return @@ -383,7 +385,7 @@ def _api_logging_finished(summary: T.Dict): LOG.warning("Error from API Logging for action %s", action, exc_info=True) -def _api_logging_failed(payload: T.Dict, exc: Exception): +def _api_logging_failed(payload: dict, exc: Exception): if MAPILLARY_DISABLE_API_LOGGING: return @@ -403,11 +405,11 @@ def _api_logging_failed(payload: T.Dict, exc: Exception): def _load_descs( - _metadatas_from_process: T.Optional[T.Sequence[types.MetadataOrError]], - desc_path: T.Optional[str], + _metadatas_from_process: T.Sequence[types.MetadataOrError] | None, + desc_path: str | None, import_paths: T.Sequence[Path], -) -> T.List[types.Metadata]: - metadatas: T.List[types.Metadata] +) -> list[types.Metadata]: + metadatas: list[types.Metadata] if _metadatas_from_process is not None: metadatas = [ @@ -439,7 +441,7 @@ def _load_descs( def _find_metadata_with_filename_existed_in( metadatas: T.Sequence[_M], paths: T.Sequence[Path] -) -> T.List[_M]: +) -> list[_M]: resolved_image_paths = set(p.resolve() for p in paths) return [d for d in metadatas if d.filename.resolve() in resolved_image_paths] @@ -463,9 +465,9 @@ def _upload_everything( ) if specified_image_metadatas: try: - clusters = mly_uploader.upload_images( + clusters = uploader.ZipImageSequence.prepare_images_and_upload( specified_image_metadatas, - event_payload={"file_type": FileType.IMAGE.value}, + mly_uploader, ) except Exception as ex: raise UploadError(ex) from ex @@ -488,7 +490,7 @@ def _upload_everything( assert isinstance(video_metadata.md5sum, str), "md5sum should be updated" # extract telemetry measurements from GoPro videos - telemetry_measurements: T.List[camm_parser.TelemetryMeasurement] = [] + telemetry_measurements: list[camm_parser.TelemetryMeasurement] = [] if MAPILLARY__EXPERIMENTAL_ENABLE_IMU == "YES": if video_metadata.filetype is FileType.GOPRO: with video_metadata.filename.open("rb") as fp: @@ -505,18 +507,24 @@ def _upload_everything( with video_metadata.filename.open("rb") as src_fp: camm_fp = simple_mp4_builder.transform_mp4(src_fp, generator) - event_payload: uploader.Progress = { + progress: uploader.SequenceProgress = { "total_sequence_count": len(specified_video_metadatas), "sequence_idx": idx, "file_type": video_metadata.filetype.value, "import_path": str(video_metadata.filename), + "md5sum": video_metadata.md5sum, } + + session_key = uploader._session_key( + video_metadata.md5sum, upload_api_v4.ClusterFileType.CAMM + ) + try: cluster_id = mly_uploader.upload_stream( T.cast(T.BinaryIO, camm_fp), upload_api_v4.ClusterFileType.CAMM, - video_metadata.md5sum, - event_payload=event_payload, + session_key, + progress=T.cast(T.Dict[str, T.Any], progress), ) except Exception as ex: raise UploadError(ex) from ex @@ -528,10 +536,10 @@ def _upload_everything( def upload( - import_path: T.Union[Path, T.Sequence[Path]], + import_path: Path | T.Sequence[Path], user_items: types.UserItem, - desc_path: T.Optional[str] = None, - _metadatas_from_process: T.Optional[T.Sequence[types.MetadataOrError]] = None, + desc_path: str | None = None, + _metadatas_from_process: T.Sequence[types.MetadataOrError] | None = None, dry_run=False, skip_subfolders=False, ) -> None: @@ -632,15 +640,14 @@ def _upload_zipfiles( zip_paths: T.Sequence[Path], ) -> None: for idx, zip_path in enumerate(zip_paths): - event_payload: uploader.Progress = { + progress: uploader.SequenceProgress = { "total_sequence_count": len(zip_paths), "sequence_idx": idx, - "file_type": FileType.ZIP.value, "import_path": str(zip_path), } try: - cluster_id = mly_uploader.upload_zipfile( - zip_path, event_payload=event_payload + cluster_id = uploader.ZipImageSequence.prepare_zipfile_and_upload( + zip_path, mly_uploader, progress=progress ) except Exception as ex: raise UploadError(ex) from ex diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 6a1148722..b0a7cc8c9 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -1,23 +1,24 @@ -import enum +from __future__ import annotations + import io import os import random +import sys import typing as T import uuid +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + import requests -from .api_v4 import ( - MAPILLARY_GRAPH_API_ENDPOINT, - request_get, - request_post, - REQUESTS_TIMEOUT, -) +from .api_v4 import ClusterFileType, request_get, request_post, REQUESTS_TIMEOUT MAPILLARY_UPLOAD_ENDPOINT = os.getenv( "MAPILLARY_UPLOAD_ENDPOINT", "https://rupload.facebook.com/mapillary_public_uploads" ) -DEFAULT_CHUNK_SIZE = 1024 * 1024 * 16 # 16MB # According to the docs, UPLOAD_REQUESTS_TIMEOUT can be a tuple of # (connection_timeout, read_timeout): https://requests.readthedocs.io/en/latest/user/advanced/#timeouts # In my test, however, the connection_timeout rules both connection timeout and read timeout. @@ -27,21 +28,13 @@ UPLOAD_REQUESTS_TIMEOUT = (30 * 60, 30 * 60) # 30 minutes -class ClusterFileType(enum.Enum): - ZIP = "zip" - BLACKVUE = "mly_blackvue_video" - CAMM = "mly_camm_video" - - class UploadService: user_access_token: str session_key: str - callbacks: T.List[T.Callable[[bytes, T.Optional[requests.Response]], None]] cluster_filetype: ClusterFileType - organization_id: T.Optional[T.Union[str, int]] chunk_size: int - MIME_BY_CLUSTER_TYPE: T.Dict[ClusterFileType, str] = { + MIME_BY_CLUSTER_TYPE: dict[ClusterFileType, str] = { ClusterFileType.ZIP: "application/zip", ClusterFileType.BLACKVUE: "video/mp4", ClusterFileType.CAMM: "video/mp4", @@ -51,20 +44,12 @@ def __init__( self, user_access_token: str, session_key: str, - organization_id: T.Optional[T.Union[str, int]] = None, cluster_filetype: ClusterFileType = ClusterFileType.ZIP, - chunk_size: int = DEFAULT_CHUNK_SIZE, ): - if chunk_size <= 0: - raise ValueError("Expect positive chunk size") - self.user_access_token = user_access_token self.session_key = session_key - self.organization_id = organization_id # validate the input self.cluster_filetype = ClusterFileType(cluster_filetype) - self.callbacks = [] - self.chunk_size = chunk_size def fetch_offset(self) -> int: headers = { @@ -80,24 +65,19 @@ def fetch_offset(self) -> int: data = resp.json() return data["offset"] - def upload( - self, - data: T.IO[bytes], - offset: T.Optional[int] = None, - ) -> str: - chunks = self._chunkize_byte_stream(data) - return self.upload_chunks(chunks, offset=offset) - - def _chunkize_byte_stream( - self, stream: T.IO[bytes] + @classmethod + def chunkize_byte_stream( + cls, stream: T.IO[bytes], chunk_size: int ) -> T.Generator[bytes, None, None]: + if chunk_size <= 0: + raise ValueError("Expect positive chunk size") while True: - data = stream.read(self.chunk_size) + data = stream.read(chunk_size) if not data: break yield data - def _offset_chunks( + def shift_chunks( self, chunks: T.Iterable[bytes], offset: int ) -> T.Generator[bytes, None, None]: assert offset >= 0, f"Expect non-negative offset but got {offset}" @@ -112,23 +92,34 @@ def _offset_chunks( else: yield chunk - def _attach_callbacks( - self, chunks: T.Iterable[bytes] - ) -> T.Generator[bytes, None, None]: - for chunk in chunks: - yield chunk - for callback in self.callbacks: - callback(chunk, None) + def upload_byte_stream( + self, + stream: T.IO[bytes], + offset: int | None = None, + chunk_size: int = 2 * 1024 * 1024, + ) -> str: + if offset is None: + offset = self.fetch_offset() + return self.upload_chunks(self.chunkize_byte_stream(stream, chunk_size), offset) def upload_chunks( self, chunks: T.Iterable[bytes], - offset: T.Optional[int] = None, + offset: int | None = None, ) -> str: if offset is None: offset = self.fetch_offset() + shifted_chunks = self.shift_chunks(chunks, offset) + return self.upload_shifted_chunks(shifted_chunks, offset) - chunks = self._attach_callbacks(self._offset_chunks(chunks, offset)) + def upload_shifted_chunks( + self, + shifted_chunks: T.Iterable[bytes], + offset: int, + ) -> str: + """ + Upload the chunks that must already be shifted by the offset (e.g. fp.seek(begin_offset, io.SEEK_SET)) + """ headers = { "Authorization": f"OAuth {self.user_access_token}", @@ -140,7 +131,7 @@ def upload_chunks( resp = request_post( url, headers=headers, - data=chunks, + data=shifted_chunks, timeout=UPLOAD_REQUESTS_TIMEOUT, ) @@ -154,38 +145,6 @@ def upload_chunks( f"Upload server error: File handle not found in the upload response {resp.text}" ) - def finish(self, file_handle: str) -> str: - headers = { - "Authorization": f"OAuth {self.user_access_token}", - } - data: T.Dict[str, T.Union[str, int]] = { - "file_handle": file_handle, - "file_type": self.cluster_filetype.value, - } - if self.organization_id is not None: - data["organization_id"] = self.organization_id - - url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/finish_upload" - - resp = request_post( - url, - headers=headers, - json=data, - timeout=REQUESTS_TIMEOUT, - ) - - resp.raise_for_status() - - data = resp.json() - - cluster_id = data.get("cluster_id") - if cluster_id is None: - raise RuntimeError( - f"Upload server error: failed to create the cluster {resp.text}" - ) - - return T.cast(str, cluster_id) - # A mock class for testing only class FakeUploadService(UploadService): @@ -194,22 +153,24 @@ def __init__(self, *args, **kwargs): self._upload_path = os.getenv( "MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads" ) - self._error_ratio = 0.1 + self._error_ratio = 0.02 - def upload_chunks( + @override + def upload_shifted_chunks( self, - chunks: T.Iterable[bytes], - offset: T.Optional[int] = None, + shifted_chunks: T.Iterable[bytes], + offset: int, ) -> str: - if offset is None: - offset = self.fetch_offset() - - chunks = self._attach_callbacks(self._offset_chunks(chunks, offset)) + expected_offset = self.fetch_offset() + if offset != expected_offset: + raise ValueError( + f"Expect offset {expected_offset} but got {offset} for session {self.session_key}" + ) os.makedirs(self._upload_path, exist_ok=True) filename = os.path.join(self._upload_path, self.session_key) with open(filename, "ab") as fp: - for chunk in chunks: + for chunk in shifted_chunks: if random.random() <= self._error_ratio: raise requests.ConnectionError( f"TEST ONLY: Failed to upload with error ratio {self._error_ratio}" @@ -221,9 +182,7 @@ def upload_chunks( ) return uuid.uuid4().hex - def finish(self, _: str) -> str: - return "0" - + @override def fetch_offset(self) -> int: if random.random() <= self._error_ratio: raise requests.ConnectionError( diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index c9ae220d6..98052e750 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -15,18 +15,21 @@ import jsonschema import requests -from . import constants, exif_write, types, upload_api_v4, utils +from . import api_v4, constants, exif_write, types, upload_api_v4, utils LOG = logging.getLogger(__name__) -class Progress(T.TypedDict, total=False): - # The size of the chunk, in bytes, that has been uploaded in the last request +class UploaderProgress(T.TypedDict, total=True): + """ + Progress data that Uploader cares about. + """ + + # The size of the chunk, in bytes, that has been read and upload chunk_size: int - # File type - file_type: str + begin_offset: int | None # How many bytes has been uploaded so far since "upload_start" offset: int @@ -34,6 +37,22 @@ class Progress(T.TypedDict, total=False): # Size in bytes of the zipfile/BlackVue/CAMM entity_size: int + # An "upload_interrupted" will increase it. Reset to 0 if the chunk is uploaded + retries: int + + # Cluster ID after finishing the upload + cluster_id: str + + +class SequenceProgress(T.TypedDict, total=False): + """Progress data at sequence level""" + + # md5sum of the zipfile/BlackVue/CAMM in uploading + md5sum: str + + # File type + file_type: str + # How many sequences in total. It's always 1 when uploading Zipfile/BlackVue/CAMM total_sequence_count: int @@ -46,17 +65,12 @@ class Progress(T.TypedDict, total=False): # MAPSequenceUUID. It is only available for directory uploading sequence_uuid: str - # An "upload_interrupted" will increase it. Reset to 0 if the chunk is uploaded - retries: int - - # md5sum of the zipfile/BlackVue/CAMM in uploading - md5sum: str - # Path to the Zipfile/BlackVue/CAMM import_path: str - # Cluster ID after finishing the upload - cluster_id: str + +class Progress(SequenceProgress, UploaderProgress): + pass class UploadCancelled(Exception): @@ -74,7 +88,7 @@ class UploadCancelled(Exception): class EventEmitter: - events: dict[EventName, T.List] + events: dict[EventName, list] def __init__(self): self.events = {} @@ -108,11 +122,12 @@ def zip_images( f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}" ) upload_md5sum = types.update_sequence_md5sum(sequence) - zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip") + filename = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP) + zip_filename = zip_dir.joinpath(filename) with wip_file_context(wip_zip_filename, zip_filename) as wip_path: with wip_path.open("wb") as wip_fp: actual_md5sum = cls.zip_sequence_fp(sequence, wip_fp) - assert actual_md5sum == upload_md5sum + assert actual_md5sum == upload_md5sum, "md5sum mismatch" @classmethod def zip_sequence_fp( @@ -189,28 +204,15 @@ def _write_imagebytes_in_zip( zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) zipf.writestr(zipinfo, image_bytes) - -class Uploader: - def __init__( - self, - user_items: types.UserItem, - emitter: EventEmitter | None = None, - chunk_size: int = upload_api_v4.DEFAULT_CHUNK_SIZE, - dry_run=False, - ): - jsonschema.validate(instance=user_items, schema=types.UserItemSchema) - self.user_items = user_items - self.emitter = emitter - self.chunk_size = chunk_size - self.dry_run = dry_run - - def upload_zipfile( - self, + @classmethod + def prepare_zipfile_and_upload( + cls, zip_path: Path, - event_payload: Progress | None = None, + uploader: Uploader, + progress: SequenceProgress | None = None, ) -> str | None: - if event_payload is None: - event_payload = {} + if progress is None: + progress = T.cast(SequenceProgress, {}) with zipfile.ZipFile(zip_path) as ziph: namelist = ziph.namelist() @@ -218,111 +220,251 @@ def upload_zipfile( LOG.warning("Skipping empty zipfile: %s", zip_path) return None - final_event_payload: Progress = { - **event_payload, # type: ignore - "sequence_image_count": len(namelist), - } - with zip_path.open("rb") as zip_fp: - upload_md5sum = ZipImageSequence.extract_upload_md5sum(zip_fp) + upload_md5sum = cls.extract_upload_md5sum(zip_fp) if upload_md5sum is None: with zip_path.open("rb") as zip_fp: upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest() + final_progress: SequenceProgress = { + **progress, + "sequence_image_count": len(namelist), + "file_type": types.FileType.ZIP.value, + "md5sum": upload_md5sum, + } + + session_key = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP) + with zip_path.open("rb") as zip_fp: - return self.upload_stream( + return uploader.upload_stream( zip_fp, upload_api_v4.ClusterFileType.ZIP, - upload_md5sum, - event_payload=final_event_payload, + session_key, + progress=T.cast(T.Dict[str, T.Any], final_progress), ) - def upload_images( - self, + @classmethod + def prepare_images_and_upload( + cls, image_metadatas: T.Sequence[types.ImageMetadata], - event_payload: Progress | None = None, + uploader: Uploader, + progress: SequenceProgress | None = None, ) -> dict[str, str]: - if event_payload is None: - event_payload = {} + if progress is None: + progress = T.cast(SequenceProgress, {}) _validate_metadatas(image_metadatas) sequences = types.group_and_sort_images(image_metadatas) ret: dict[str, str] = {} for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()): - final_event_payload: Progress = { - **event_payload, # type: ignore + final_progress: SequenceProgress = { + **progress, "sequence_idx": sequence_idx, "total_sequence_count": len(sequences), "sequence_image_count": len(sequence), "sequence_uuid": sequence_uuid, + "file_type": types.FileType.IMAGE.value, } + with tempfile.NamedTemporaryFile() as fp: - upload_md5sum = ZipImageSequence.zip_sequence_fp(sequence, fp) - cluster_id = self.upload_stream( + upload_md5sum = cls.zip_sequence_fp(sequence, fp) + + final_progress["md5sum"] = upload_md5sum + + session_key = _session_key( + upload_md5sum, upload_api_v4.ClusterFileType.ZIP + ) + + cluster_id = uploader.upload_stream( fp, upload_api_v4.ClusterFileType.ZIP, - upload_md5sum, - final_event_payload, + session_key, + progress=T.cast(T.Dict[str, T.Any], final_progress), ) if cluster_id is not None: ret[sequence_uuid] = cluster_id return ret + +class Uploader: + def __init__( + self, + user_items: types.UserItem, + emitter: EventEmitter | None = None, + chunk_size: int = 2 * 1024 * 1024, # 2MB + dry_run=False, + ): + jsonschema.validate(instance=user_items, schema=types.UserItemSchema) + self.user_items = user_items + self.emitter = emitter + self.chunk_size = chunk_size + self.dry_run = dry_run + def upload_stream( self, fp: T.IO[bytes], cluster_filetype: upload_api_v4.ClusterFileType, - upload_md5sum: str, - event_payload: Progress | None = None, + session_key: str, + progress: dict[str, T.Any] | None = None, ) -> str | None: - if event_payload is None: - event_payload = {} + if progress is None: + progress = {} fp.seek(0, io.SEEK_END) entity_size = fp.tell() - SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = { - upload_api_v4.ClusterFileType.ZIP: ".zip", - upload_api_v4.ClusterFileType.CAMM: ".mp4", - upload_api_v4.ClusterFileType.BLACKVUE: ".mp4", - } - session_key = f"mly_tools_{upload_md5sum}{SUFFIX_MAP[cluster_filetype]}" + upload_service = self._create_upload_service(session_key, cluster_filetype) - if self.dry_run: - upload_service: upload_api_v4.UploadService = ( - upload_api_v4.FakeUploadService( - user_access_token=self.user_items["user_upload_token"], - session_key=session_key, - organization_id=self.user_items.get("MAPOrganizationKey"), - cluster_filetype=cluster_filetype, - chunk_size=self.chunk_size, + progress["entity_size"] = entity_size + progress["chunk_size"] = self.chunk_size + progress["retries"] = 0 + progress["begin_offset"] = None + + if self.emitter: + try: + self.emitter.emit("upload_start", progress) + except UploadCancelled: + # TODO: Right now it is thrown in upload_start only + return None + + while True: + try: + file_handle = self._upload_stream_retryable( + upload_service, fp, T.cast(UploaderProgress, progress) ) + except Exception as ex: + self._handle_upload_exception(ex, T.cast(UploaderProgress, progress)) + else: + break + + progress["retries"] += 1 + + if self.emitter: + self.emitter.emit("upload_end", progress) + + # TODO: retry here + cluster_id = self._finish_upload_retryable(upload_service, file_handle) + progress["cluster_id"] = cluster_id + + if self.emitter: + self.emitter.emit("upload_finished", progress) + + return cluster_id + + def _create_upload_service( + self, session_key: str, cluster_filetype: upload_api_v4.ClusterFileType + ) -> upload_api_v4.UploadService: + upload_service: upload_api_v4.UploadService + + if self.dry_run: + upload_service = upload_api_v4.FakeUploadService( + user_access_token=self.user_items["user_upload_token"], + session_key=session_key, + cluster_filetype=cluster_filetype, ) else: upload_service = upload_api_v4.UploadService( user_access_token=self.user_items["user_upload_token"], session_key=session_key, - organization_id=self.user_items.get("MAPOrganizationKey"), cluster_filetype=cluster_filetype, - chunk_size=self.chunk_size, ) - final_event_payload: Progress = { - **event_payload, # type: ignore - "entity_size": entity_size, - "md5sum": upload_md5sum, - } + return upload_service - try: - return _upload_stream_with_retries( - upload_service, - fp, - event_payload=final_event_payload, - emitter=self.emitter, + def _handle_upload_exception( + self, ex: Exception, progress: UploaderProgress + ) -> None: + retries = progress["retries"] + begin_offset = progress.get("begin_offset") + chunk_size = progress["chunk_size"] + + if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): + if self.emitter: + self.emitter.emit("upload_interrupted", progress) + LOG.warning( + # use %s instead of %d because offset could be None + "Error uploading chunk_size %d at begin_offset %s: %s: %s", + chunk_size, + begin_offset, + ex.__class__.__name__, + str(ex), ) - except UploadCancelled: - return None + # Keep things immutable here. Will increment retries in the caller + retries += 1 + if _is_immediate_retry(ex): + sleep_for = 0 + else: + sleep_for = min(2**retries, 16) + LOG.info( + "Retrying in %d seconds (%d/%d)", + sleep_for, + retries, + constants.MAX_UPLOAD_RETRIES, + ) + if sleep_for: + time.sleep(sleep_for) + else: + raise ex + + def _chunk_with_progress_emitted( + self, + stream: T.IO[bytes], + progress: UploaderProgress, + ) -> T.Generator[bytes, None, None]: + for chunk in upload_api_v4.UploadService.chunkize_byte_stream( + stream, self.chunk_size + ): + yield chunk + + progress["offset"] += len(chunk) + progress["chunk_size"] = len(chunk) + if self.emitter: + self.emitter.emit("upload_progress", progress) + + def _upload_stream_retryable( + self, + upload_service: upload_api_v4.UploadService, + fp: T.IO[bytes], + progress: UploaderProgress, + ) -> str: + """Upload the stream with safe retries guraranteed""" + + begin_offset = upload_service.fetch_offset() + + progress["begin_offset"] = begin_offset + progress["offset"] = begin_offset + + if self.emitter: + self.emitter.emit("upload_fetch_offset", progress) + + fp.seek(begin_offset, io.SEEK_SET) + + shifted_chunks = self._chunk_with_progress_emitted(fp, progress) + + return upload_service.upload_shifted_chunks(shifted_chunks, begin_offset) + + def _finish_upload_retryable( + self, upload_service: upload_api_v4.UploadService, file_handle: str + ) -> str: + """Finish upload with safe retries guraranteed""" + + if self.dry_run: + cluster_id = "0" + else: + resp = api_v4.finish_upload( + self.user_items["user_upload_token"], + file_handle, + upload_service.cluster_filetype, + organization_id=self.user_items.get("MAPOrganizationKey"), + ) + + data = resp.json() + cluster_id = data.get("cluster_id") + + # TODO: validate cluster_id + + return cluster_id def _validate_metadatas(metadatas: T.Sequence[types.ImageMetadata]): @@ -386,89 +528,13 @@ def _is_retriable_exception(ex: Exception): return False -def _setup_callback(emitter: EventEmitter, mutable_payload: Progress): - def _callback(chunk: bytes, _): - assert isinstance(emitter, EventEmitter) - mutable_payload["offset"] += len(chunk) - mutable_payload["chunk_size"] = len(chunk) - emitter.emit("upload_progress", mutable_payload) - - return _callback - - -def _upload_stream_with_retries( - upload_service: upload_api_v4.UploadService, - fp: T.IO[bytes], - event_payload: Progress | None = None, - emitter: EventEmitter | None = None, +def _session_key( + upload_md5sum: str, cluster_filetype: upload_api_v4.ClusterFileType ) -> str: - retries = 0 - - if event_payload is None: - event_payload = {} - - mutable_payload = T.cast(Progress, {**event_payload}) - - # when it progresses, we reset retries - def _reset_retries(_, __): - nonlocal retries - retries = 0 - - if emitter: - emitter.emit("upload_start", mutable_payload) - - while True: - fp.seek(0, io.SEEK_SET) - begin_offset: int | None = None - try: - begin_offset = upload_service.fetch_offset() - upload_service.callbacks = [_reset_retries] - if emitter: - mutable_payload["offset"] = begin_offset - mutable_payload["retries"] = retries - emitter.emit("upload_fetch_offset", mutable_payload) - upload_service.callbacks.append( - _setup_callback(emitter, mutable_payload) - ) - file_handle = upload_service.upload(fp, offset=begin_offset) - except Exception as ex: - if retries < constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): - if emitter: - emitter.emit("upload_interrupted", mutable_payload) - LOG.warning( - # use %s instead of %d because offset could be None - "Error uploading chunk_size %d at begin_offset %s: %s: %s", - upload_service.chunk_size, - begin_offset, - ex.__class__.__name__, - str(ex), - ) - retries += 1 - if _is_immediate_retry(ex): - sleep_for = 0 - else: - sleep_for = min(2**retries, 16) - LOG.info( - "Retrying in %d seconds (%d/%d)", - sleep_for, - retries, - constants.MAX_UPLOAD_RETRIES, - ) - if sleep_for: - time.sleep(sleep_for) - else: - raise ex - else: - break - - if emitter: - emitter.emit("upload_end", mutable_payload) - - # TODO: retry here - cluster_id = upload_service.finish(file_handle) - - if emitter: - mutable_payload["cluster_id"] = cluster_id - emitter.emit("upload_finished", mutable_payload) + SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = { + upload_api_v4.ClusterFileType.ZIP: ".zip", + upload_api_v4.ClusterFileType.CAMM: ".mp4", + upload_api_v4.ClusterFileType.BLACKVUE: ".mp4", + } - return cluster_id + return f"mly_tools_{upload_md5sum}{SUFFIX_MAP[cluster_filetype]}" diff --git a/tests/cli/upload_api_v4.py b/tests/cli/upload_api_v4.py index 1d1ceaf51..24d33c88f 100644 --- a/tests/cli/upload_api_v4.py +++ b/tests/cli/upload_api_v4.py @@ -8,7 +8,7 @@ import tqdm from mapillary_tools import api_v4, authenticate -from mapillary_tools.upload_api_v4 import DEFAULT_CHUNK_SIZE, UploadService +from mapillary_tools.upload_api_v4 import FakeUploadService, UploadService LOG = logging.getLogger("mapillary_tools") @@ -39,9 +39,10 @@ def _parse_args(): parser.add_argument( "--chunk_size", type=float, - default=DEFAULT_CHUNK_SIZE / (1024 * 1024), + default=2, help="chunk size in megabytes", ) + parser.add_argument("--dry_run", action="store_true", default=False) parser.add_argument("filename") parser.add_argument("session_key") return parser.parse_args() @@ -60,17 +61,13 @@ def main(): user_items = authenticate.fetch_user_items(parsed.user_name) session_key = parsed.session_key + chunk_size = int(parsed.chunk_size * 1024 * 1024) user_access_token = user_items.get("user_upload_token", "") - service = UploadService( - user_access_token, - session_key, - entity_size, - chunk_size=( - int(parsed.chunk_size * 1024 * 1024) - if parsed.chunk_size is not None - else DEFAULT_CHUNK_SIZE - ), - ) + + if parsed.dry_run: + service = FakeUploadService(user_access_token, session_key) + else: + service = UploadService(user_access_token, session_key) try: initial_offset = service.fetch_offset() @@ -80,9 +77,18 @@ def main(): LOG.info("Session key: %s", session_key) LOG.info("Initial offset: %s", initial_offset) LOG.info("Entity size: %d", entity_size) - LOG.info("Chunk size: %s MB", service.chunk_size / (1024 * 1024)) + LOG.info("Chunk size: %s MB", chunk_size / (1024 * 1024)) + + def _update_pbar(chunks, pbar): + for chunk in chunks: + yield chunk + pbar.update(len(chunk)) with open(parsed.filename, "rb") as fp: + fp.seek(initial_offset, io.SEEK_SET) + + shifted_chunks = service.chunkize_byte_stream(fp, chunk_size) + with tqdm.tqdm( total=entity_size, initial=initial_offset, @@ -91,9 +97,10 @@ def main(): unit_divisor=1024, disable=LOG.getEffectiveLevel() <= logging.DEBUG, ) as pbar: - service.callbacks.append(lambda chunk, resp: pbar.update(len(chunk))) try: - file_handle = service.upload(fp, initial_offset) + file_handle = service.upload_shifted_chunks( + _update_pbar(shifted_chunks, pbar), initial_offset + ) except requests.HTTPError as ex: raise RuntimeError(api_v4.readable_http_error(ex)) except KeyboardInterrupt: @@ -107,7 +114,6 @@ def main(): LOG.info("Final offset: %s", final_offset) LOG.info("Entity size: %d", entity_size) - LOG.info("File handle: %s", file_handle) diff --git a/tests/unit/test_upload_api_v4.py b/tests/unit/test_upload_api_v4.py index 8d9bc884c..3857f522f 100644 --- a/tests/unit/test_upload_api_v4.py +++ b/tests/unit/test_upload_api_v4.py @@ -11,16 +11,15 @@ def test_upload(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", - chunk_size=1, ) upload_service._error_ratio = 0 content = b"double_foobar" - cluster_id = upload_service.upload(io.BytesIO(content)) + cluster_id = upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1) assert isinstance(cluster_id, str), cluster_id assert (setup_upload.join("FOOBAR.txt").read_binary()) == content # reupload should not affect the file - upload_service.upload(io.BytesIO(content)) + upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1) assert (setup_upload.join("FOOBAR.txt").read_binary()) == content @@ -28,16 +27,15 @@ def test_upload_big_chunksize(setup_upload: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", - chunk_size=1000, ) upload_service._error_ratio = 0 content = b"double_foobar" - cluster_id = upload_service.upload(io.BytesIO(content)) + cluster_id = upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1000) assert isinstance(cluster_id, str), cluster_id assert (setup_upload.join("FOOBAR.txt").read_binary()) == content # reupload should not affect the file - upload_service.upload(io.BytesIO(content)) + upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1000) assert (setup_upload.join("FOOBAR.txt").read_binary()) == content diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index da1aed0e8..fca750fdc 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -64,8 +64,9 @@ def test_upload_images(setup_unittest_data: py.path.local, setup_upload: py.path "filetype": "image", }, ] - resp = mly_uploader.upload_images( - [types.from_desc(T.cast(T.Any, desc)) for desc in descs] + resp = uploader.ZipImageSequence.prepare_images_and_upload( + [types.from_desc(T.cast(T.Any, desc)) for desc in descs], + mly_uploader, ) assert len(resp) == 1 assert len(setup_upload.listdir()) == 1 @@ -115,8 +116,9 @@ def test_upload_images_multiple_sequences( }, dry_run=True, ) - resp = mly_uploader.upload_images( - [types.from_desc(T.cast(T.Any, desc)) for desc in descs] + resp = uploader.ZipImageSequence.prepare_images_and_upload( + [types.from_desc(T.cast(T.Any, desc)) for desc in descs], + mly_uploader, ) assert len(resp) == 2 assert len(setup_upload.listdir()) == 2 @@ -180,8 +182,10 @@ def test_upload_zip( emitter=emitter, ) for zip_path in zip_dir.listdir(): - resp = mly_uploader.upload_zipfile(Path(zip_path)) - assert resp == "0" + cluster = uploader.ZipImageSequence.prepare_zipfile_and_upload( + Path(zip_path), mly_uploader + ) + assert cluster == "0" descs = _validate_zip_dir(setup_upload) assert 3 == len(descs) @@ -205,11 +209,11 @@ def test_upload_blackvue( resp = mly_uploader.upload_stream( fp, upload_api_v4.ClusterFileType.BLACKVUE, - "this_is_a_blackvue_checksum", + "this_is_a_blackvue.mp4", ) assert resp == "0" for mp4_path in setup_upload.listdir(): - assert os.path.basename(mp4_path) == "mly_tools_this_is_a_blackvue_checksum.mp4" + assert os.path.basename(mp4_path) == "this_is_a_blackvue.mp4" with open(mp4_path, "rb") as fp: assert fp.read() == b"this is a fake video"