diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index fe56e8f8d..3b66d83de 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -5,6 +5,7 @@ import os import random import typing as T +import uuid import requests @@ -55,31 +56,31 @@ def _truncate_end(s: _S) -> _S: class UploadService: user_access_token: str - entity_size: int session_key: str callbacks: T.List[T.Callable[[bytes, T.Optional[requests.Response]], None]] cluster_filetype: ClusterFileType organization_id: T.Optional[T.Union[str, int]] chunk_size: int + MIME_BY_CLUSTER_TYPE: T.Dict[ClusterFileType, str] = { + ClusterFileType.ZIP: "application/zip", + ClusterFileType.BLACKVUE: "video/mp4", + ClusterFileType.CAMM: "video/mp4", + } + def __init__( self, user_access_token: str, session_key: str, - entity_size: int, organization_id: T.Optional[T.Union[str, int]] = None, cluster_filetype: ClusterFileType = ClusterFileType.ZIP, chunk_size: int = DEFAULT_CHUNK_SIZE, ): - if entity_size <= 0: - raise ValueError(f"Expect positive entity size but got {entity_size}") - if chunk_size <= 0: raise ValueError("Expect positive chunk size") self.user_access_token = user_access_token self.session_key = session_key - self.entity_size = entity_size self.organization_id = organization_id # validate the input self.cluster_filetype = ClusterFileType(cluster_filetype) @@ -107,55 +108,66 @@ def upload( data: T.IO[bytes], offset: T.Optional[int] = None, ) -> str: - if offset is None: - offset = self.fetch_offset() - - entity_type_map: T.Dict[ClusterFileType, str] = { - ClusterFileType.ZIP: "application/zip", - ClusterFileType.BLACKVUE: "video/mp4", - ClusterFileType.CAMM: "video/mp4", - } - - entity_type = entity_type_map[self.cluster_filetype] - - data.seek(offset, io.SEEK_CUR) + chunks = self._chunkize_byte_stream(data) + return self.upload_chunks(chunks, offset=offset) + def _chunkize_byte_stream( + self, stream: T.IO[bytes] + ) -> T.Generator[bytes, None, None]: while True: - chunk = data.read(self.chunk_size) - # it is possible to upload an empty chunk here - # in order to return the handle - headers = { - "Authorization": f"OAuth {self.user_access_token}", - "Offset": f"{offset}", - "X-Entity-Length": str(self.entity_size), - "X-Entity-Name": self.session_key, - "X-Entity-Type": entity_type, - } - url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}" - LOG.debug("POST %s HEADERS %s", url, json.dumps(_sanitize_headers(headers))) - resp = request_post( - url, - headers=headers, - data=chunk, - timeout=UPLOAD_REQUESTS_TIMEOUT, - ) - LOG.debug( - "HTTP response %s: %s", resp.status_code, _truncate_end(resp.content) - ) - resp.raise_for_status() - offset += len(chunk) - LOG.debug("The next offset will be: %s", offset) + data = stream.read(self.chunk_size) + if not data: + break + yield data + + def _offset_chunks( + self, chunks: T.Iterable[bytes], offset: int + ) -> T.Generator[bytes, None, None]: + assert offset >= 0, f"Expect non-negative offset but got {offset}" + + for chunk in chunks: + if offset: + if offset < len(chunk): + yield chunk[offset:] + offset = 0 + else: + offset -= len(chunk) + else: + yield chunk + + def _attach_callbacks( + self, chunks: T.Iterable[bytes] + ) -> T.Generator[bytes, None, None]: + for chunk in chunks: + yield chunk for callback in self.callbacks: - callback(chunk, resp) - # we can assert that offset == self.fetch_offset(session_key) - # otherwise, server will throw + callback(chunk, None) - if not chunk: - break + def upload_chunks( + self, + chunks: T.Iterable[bytes], + offset: T.Optional[int] = None, + ) -> str: + if offset is None: + offset = self.fetch_offset() - assert offset == self.entity_size, ( - f"Offset ends at {offset} but the entity size is {self.entity_size}" + chunks = self._attach_callbacks(self._offset_chunks(chunks, offset)) + + headers = { + "Authorization": f"OAuth {self.user_access_token}", + "Offset": f"{offset}", + "X-Entity-Name": self.session_key, + "X-Entity-Type": self.MIME_BY_CLUSTER_TYPE[self.cluster_filetype], + } + url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}" + LOG.debug("POST %s HEADERS %s", url, json.dumps(_sanitize_headers(headers))) + resp = request_post( + url, + headers=headers, + data=chunks, + timeout=UPLOAD_REQUESTS_TIMEOUT, ) + LOG.debug("HTTP response %s: %s", resp.status_code, _truncate_end(resp.content)) payload = resp.json() try: @@ -209,35 +221,30 @@ def __init__(self, *args, **kwargs): ) self._error_ratio = 0.1 - def upload( + def upload_chunks( self, - data: T.IO[bytes], + chunks: T.Iterable[bytes], offset: T.Optional[int] = None, ) -> str: if offset is None: offset = self.fetch_offset() + + chunks = self._attach_callbacks(self._offset_chunks(chunks, offset)) + os.makedirs(self._upload_path, exist_ok=True) filename = os.path.join(self._upload_path, self.session_key) with open(filename, "ab") as fp: - data.seek(offset, io.SEEK_CUR) - while True: - chunk = data.read(self.chunk_size) - if not chunk: - break - # fail here means nothing uploaded + for chunk in chunks: if random.random() <= self._error_ratio: raise requests.ConnectionError( f"TEST ONLY: Failed to upload with error ratio {self._error_ratio}" ) fp.write(chunk) - # fail here means patially uploaded if random.random() <= self._error_ratio: raise requests.ConnectionError( f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}" ) - for callback in self.callbacks: - callback(chunk, None) - return self.session_key + return uuid.uuid4().hex def finish(self, _: str) -> str: return "0" diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index b69c99fba..19ed34992 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -195,7 +195,6 @@ def upload_stream( upload_api_v4.FakeUploadService( user_access_token=self.user_items["user_upload_token"], session_key=session_key, - entity_size=entity_size, organization_id=self.user_items.get("MAPOrganizationKey"), cluster_filetype=cluster_filetype, chunk_size=self.chunk_size, @@ -205,7 +204,6 @@ def upload_stream( upload_service = upload_api_v4.UploadService( user_access_token=self.user_items["user_upload_token"], session_key=session_key, - entity_size=entity_size, organization_id=self.user_items.get("MAPOrganizationKey"), cluster_filetype=cluster_filetype, chunk_size=self.chunk_size, diff --git a/tests/unit/test_upload_api_v4.py b/tests/unit/test_upload_api_v4.py new file mode 100644 index 000000000..6d8ebb3a3 --- /dev/null +++ b/tests/unit/test_upload_api_v4.py @@ -0,0 +1,47 @@ +import io +import py + +from mapillary_tools import upload_api_v4 + +from ..integration.fixtures import setup_upload + + +def test_upload(setup_upload: py.path.local): + upload_service = upload_api_v4.FakeUploadService( + user_access_token="TEST", + session_key="FOOBAR.txt", + chunk_size=1, + ) + upload_service._error_ratio = 0 + content = b"double_foobar" + cluster_id = upload_service.upload(io.BytesIO(content)) + assert isinstance(cluster_id, str), cluster_id + assert (setup_upload.join("FOOBAR.txt").read_binary()) == content + + # reupload should not affect the file + upload_service.upload(io.BytesIO(content)) + assert (setup_upload.join("FOOBAR.txt").read_binary()) == content + + +def test_upload_chunks(setup_upload: py.path.local): + upload_service = upload_api_v4.FakeUploadService( + user_access_token="TEST", + session_key="FOOBAR2.txt", + chunk_size=1, + ) + upload_service._error_ratio = 0 + + def _gen_chunks(): + yield b"foo" + yield b"" + yield b"bar" + yield b"" + + cluster_id = upload_service.upload_chunks(_gen_chunks()) + + assert isinstance(cluster_id, str), cluster_id + assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar" + + # reupload should not affect the file + upload_service.upload_chunks(_gen_chunks()) + assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar"