diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 57afbc7d3..b8624d21a 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -13,6 +13,8 @@ else: from typing_extensions import override +import tempfile + import requests from .api_v4 import request_get, request_post, REQUESTS_TIMEOUT @@ -30,14 +32,14 @@ class UploadService: + """ + Upload byte streams to the Upload Service. + """ + user_access_token: str session_key: str - def __init__( - self, - user_access_token: str, - session_key: str, - ): + def __init__(self, user_access_token: str, session_key: str): self.user_access_token = user_access_token self.session_key = session_key @@ -46,11 +48,7 @@ def fetch_offset(self) -> int: "Authorization": f"OAuth {self.user_access_token}", } url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}" - resp = request_get( - url, - headers=headers, - timeout=REQUESTS_TIMEOUT, - ) + resp = request_get(url, headers=headers, timeout=REQUESTS_TIMEOUT) resp.raise_for_status() data = resp.json() return data["offset"] @@ -59,18 +57,53 @@ def fetch_offset(self) -> int: def chunkize_byte_stream( cls, stream: T.IO[bytes], chunk_size: int ) -> T.Generator[bytes, None, None]: + """ + Chunkize a byte stream into chunks of the specified size. + + >>> list(UploadService.chunkize_byte_stream(io.BytesIO(b"foo"), 1)) + [b'f', b'o', b'o'] + + >>> list(UploadService.chunkize_byte_stream(io.BytesIO(b"foo"), 10)) + [b'foo'] + """ + if chunk_size <= 0: raise ValueError("Expect positive chunk size") + while True: data = stream.read(chunk_size) if not data: break yield data + @classmethod def shift_chunks( - self, chunks: T.Iterable[bytes], offset: int + cls, chunks: T.Iterable[bytes], offset: int ) -> T.Generator[bytes, None, None]: - assert offset >= 0, f"Expect non-negative offset but got {offset}" + """ + Shift the chunks by the offset. + + >>> list(UploadService.shift_chunks([b"foo", b"bar"], 0)) + [b'foo', b'bar'] + + >>> list(UploadService.shift_chunks([b"foo", b"bar"], 1)) + [b'oo', b'bar'] + + >>> list(UploadService.shift_chunks([b"foo", b"bar"], 3)) + [b'bar'] + + >>> list(UploadService.shift_chunks([b"foo", b"bar"], 6)) + [] + + >>> list(UploadService.shift_chunks([b"foo", b"bar"], 7)) + [] + + >>> list(UploadService.shift_chunks([], 0)) + [] + """ + + if offset < 0: + raise ValueError(f"Expect non-negative offset but got {offset}") for chunk in chunks: if offset: @@ -103,12 +136,10 @@ def upload_chunks( return self.upload_shifted_chunks(shifted_chunks, offset) def upload_shifted_chunks( - self, - shifted_chunks: T.Iterable[bytes], - offset: int, + self, shifted_chunks: T.Iterable[bytes], offset: int ) -> str: """ - Upload the chunks that must already be shifted by the offset (e.g. fp.seek(begin_offset, io.SEEK_SET)) + Upload the chunks that must already be shifted by the offset (e.g. fp.seek(offset, io.SEEK_SET)) """ headers = { @@ -118,10 +149,7 @@ def upload_shifted_chunks( } url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}" resp = request_post( - url, - headers=headers, - data=shifted_chunks, - timeout=UPLOAD_REQUESTS_TIMEOUT, + url, headers=headers, data=shifted_chunks, timeout=UPLOAD_REQUESTS_TIMEOUT ) resp.raise_for_status() @@ -137,18 +165,35 @@ def upload_shifted_chunks( # A mock class for testing only class FakeUploadService(UploadService): - def __init__(self, *args, **kwargs): + """ + A mock upload service that simulates the upload process for testing purposes. + It writes the uploaded data to a file in a temporary directory and generates a fake file handle. + """ + + FILE_HANDLE_DIR: str = "file_handles" + + def __init__( + self, + upload_path: Path | None = None, + transient_error_ratio: float = 0.0, + *args, + **kwargs, + ): super().__init__(*args, **kwargs) - self._upload_path = Path( - os.getenv("MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads") - ) - self._error_ratio = 0.02 + if upload_path is None: + upload_path = Path(tempfile.gettempdir()).joinpath( + "mapillary_public_uploads" + ) + self._upload_path = upload_path + self._transient_error_ratio = transient_error_ratio + + @property + def upload_path(self) -> Path: + return self._upload_path @override def upload_shifted_chunks( - self, - shifted_chunks: T.Iterable[bytes], - offset: int, + self, shifted_chunks: T.Iterable[bytes], offset: int ) -> str: expected_offset = self.fetch_offset() if offset != expected_offset: @@ -160,17 +205,17 @@ def upload_shifted_chunks( filename = self._upload_path.joinpath(self.session_key) with filename.open("ab") as fp: for chunk in shifted_chunks: - if random.random() <= self._error_ratio: + if random.random() <= self._transient_error_ratio: raise requests.ConnectionError( - f"TEST ONLY: Failed to upload with error ratio {self._error_ratio}" + f"TEST ONLY: Failed to upload with error ratio {self._transient_error_ratio}" ) fp.write(chunk) - if random.random() <= self._error_ratio: + if random.random() <= self._transient_error_ratio: raise requests.ConnectionError( - f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}" + f"TEST ONLY: Partially uploaded with error ratio {self._transient_error_ratio}" ) - file_handle_dir = self._upload_path.joinpath("file_handles") + file_handle_dir = self._upload_path.joinpath(self.FILE_HANDLE_DIR) file_handle_path = file_handle_dir.joinpath(self.session_key) if not file_handle_path.exists(): os.makedirs(file_handle_dir, exist_ok=True) @@ -181,12 +226,12 @@ def upload_shifted_chunks( @override def fetch_offset(self) -> int: - if random.random() <= self._error_ratio: + if random.random() <= self._transient_error_ratio: raise requests.ConnectionError( - f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}" + f"TEST ONLY: Partially uploaded with error ratio {self._transient_error_ratio}" ) - filename = os.path.join(self._upload_path, self.session_key) - if not os.path.exists(filename): + filename = self._upload_path.joinpath(self.session_key) + if not filename.exists(): return 0 with open(filename, "rb") as fp: fp.seek(0, io.SEEK_END) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 4fba04f20..ca0b941cf 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -528,9 +528,15 @@ def _create_upload_service(self, session_key: str) -> upload_api_v4.UploadServic upload_service: upload_api_v4.UploadService if self.dry_run: + upload_path = os.getenv("MAPILLARY_UPLOAD_ENDPOINT") upload_service = upload_api_v4.FakeUploadService( user_access_token=self.user_items["user_upload_token"], session_key=session_key, + upload_path=Path(upload_path) if upload_path is not None else None, + ) + LOG.info( + "Dry run mode enabled. Data will be uploaded to %s", + upload_service.upload_path.joinpath(session_key), ) else: upload_service = upload_api_v4.UploadService( diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 807ab32ce..19e6fc2b2 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -14,7 +14,7 @@ import py.path import pytest -from mapillary_tools import utils +from mapillary_tools import upload_api_v4, utils EXECUTABLE = os.getenv( "MAPILLARY_TOOLS__TESTS_EXECUTABLE", "python3 -m mapillary_tools.commands" @@ -58,7 +58,7 @@ def setup_data(tmpdir: py.path.local): @pytest.fixture def setup_upload(tmpdir: py.path.local): upload_dir = tmpdir.mkdir("mapillary_public_uploads") - os.environ["MAPILLARY_UPLOAD_PATH"] = str(upload_dir) + os.environ["MAPILLARY_UPLOAD_ENDPOINT"] = str(upload_dir) os.environ["MAPILLARY_TOOLS__AUTH_VERIFICATION_DISABLED"] = "YES" os.environ["MAPILLARY_TOOLS_PROMPT_DISABLED"] = "YES" os.environ["MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN"] = "YES" @@ -67,7 +67,7 @@ def setup_upload(tmpdir: py.path.local): yield upload_dir if tmpdir.check(): tmpdir.remove(ignore_errors=True) - os.environ.pop("MAPILLARY_UPLOAD_PATH", None) + os.environ.pop("MAPILLARY_UPLOAD_ENDPOINT", None) os.environ.pop("MAPILLARY_UPLOAD_HISTORY_PATH", None) os.environ.pop("MAPILLARY_TOOLS__AUTH_VERIFICATION_DISABLED", None) os.environ.pop("MAPILLARY_TOOLS_PROMPT_DISABLED", None) @@ -239,11 +239,11 @@ def load_descs(descs) -> list: def extract_all_uploaded_descs(upload_folder: Path) -> list[list[dict]]: - FILE_HANDLE_DIRNAME = "file_handles" - session_by_file_handle: dict[str, str] = {} - if upload_folder.joinpath(FILE_HANDLE_DIRNAME).exists(): - for session_path in upload_folder.joinpath(FILE_HANDLE_DIRNAME).iterdir(): + if upload_folder.joinpath(upload_api_v4.FakeUploadService.FILE_HANDLE_DIR).exists(): + for session_path in upload_folder.joinpath( + upload_api_v4.FakeUploadService.FILE_HANDLE_DIR + ).iterdir(): file_handle = session_path.read_text() session_by_file_handle[file_handle] = session_path.name @@ -267,7 +267,7 @@ def extract_all_uploaded_descs(upload_folder: Path) -> list[list[dict]]: sequences.append(validate_and_extract_zip(file)) elif file.suffix == ".mp4": sequences.append(validate_and_extract_camm(file)) - elif file.name == FILE_HANDLE_DIRNAME: + elif file.name == upload_api_v4.FakeUploadService.FILE_HANDLE_DIR: # Already processed above pass diff --git a/tests/unit/test_upload_api_v4.py b/tests/unit/test_upload_api_v4.py index 3857f522f..88c2b4156 100644 --- a/tests/unit/test_upload_api_v4.py +++ b/tests/unit/test_upload_api_v4.py @@ -1,50 +1,55 @@ import io +from pathlib import Path import py from mapillary_tools import upload_api_v4 -from ..integration.fixtures import setup_upload - -def test_upload(setup_upload: py.path.local): +def test_upload(tmpdir: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", + upload_path=Path(tmpdir), + transient_error_ratio=0.02, ) - upload_service._error_ratio = 0 + upload_service._transient_error_ratio = 0 content = b"double_foobar" cluster_id = upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1) assert isinstance(cluster_id, str), cluster_id - assert (setup_upload.join("FOOBAR.txt").read_binary()) == content + assert (tmpdir.join("FOOBAR.txt").read_binary()) == content # reupload should not affect the file upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1) - assert (setup_upload.join("FOOBAR.txt").read_binary()) == content + assert (tmpdir.join("FOOBAR.txt").read_binary()) == content -def test_upload_big_chunksize(setup_upload: py.path.local): +def test_upload_big_chunksize(tmpdir: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR.txt", + upload_path=Path(tmpdir), + transient_error_ratio=0.02, ) - upload_service._error_ratio = 0 + upload_service._transient_error_ratio = 0 content = b"double_foobar" cluster_id = upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1000) assert isinstance(cluster_id, str), cluster_id - assert (setup_upload.join("FOOBAR.txt").read_binary()) == content + assert (tmpdir.join("FOOBAR.txt").read_binary()) == content # reupload should not affect the file upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1000) - assert (setup_upload.join("FOOBAR.txt").read_binary()) == content + assert (tmpdir.join("FOOBAR.txt").read_binary()) == content -def test_upload_chunks(setup_upload: py.path.local): +def test_upload_chunks(tmpdir: py.path.local): upload_service = upload_api_v4.FakeUploadService( user_access_token="TEST", session_key="FOOBAR2.txt", + upload_path=Path(tmpdir), + transient_error_ratio=0.02, ) - upload_service._error_ratio = 0 + upload_service._transient_error_ratio = 0 def _gen_chunks(): yield b"foo" @@ -55,8 +60,8 @@ def _gen_chunks(): cluster_id = upload_service.upload_chunks(_gen_chunks()) assert isinstance(cluster_id, str), cluster_id - assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar" + assert (tmpdir.join("FOOBAR2.txt").read_binary()) == b"foobar" # reupload should not affect the file upload_service.upload_chunks(_gen_chunks()) - assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar" + assert (tmpdir.join("FOOBAR2.txt").read_binary()) == b"foobar"