Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 68 additions & 61 deletions mapillary_tools/upload_api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import random
import typing as T
import uuid

import requests

Expand Down Expand Up @@ -55,31 +56,31 @@ def _truncate_end(s: _S) -> _S:

class UploadService:
user_access_token: str
entity_size: int
session_key: str
callbacks: T.List[T.Callable[[bytes, T.Optional[requests.Response]], None]]
cluster_filetype: ClusterFileType
organization_id: T.Optional[T.Union[str, int]]
chunk_size: int

MIME_BY_CLUSTER_TYPE: T.Dict[ClusterFileType, str] = {
ClusterFileType.ZIP: "application/zip",
ClusterFileType.BLACKVUE: "video/mp4",
ClusterFileType.CAMM: "video/mp4",
}

def __init__(
self,
user_access_token: str,
session_key: str,
entity_size: int,
organization_id: T.Optional[T.Union[str, int]] = None,
cluster_filetype: ClusterFileType = ClusterFileType.ZIP,
chunk_size: int = DEFAULT_CHUNK_SIZE,
):
if entity_size <= 0:
raise ValueError(f"Expect positive entity size but got {entity_size}")

if chunk_size <= 0:
raise ValueError("Expect positive chunk size")

self.user_access_token = user_access_token
self.session_key = session_key
self.entity_size = entity_size
self.organization_id = organization_id
# validate the input
self.cluster_filetype = ClusterFileType(cluster_filetype)
Expand Down Expand Up @@ -107,55 +108,66 @@ def upload(
data: T.IO[bytes],
offset: T.Optional[int] = None,
) -> str:
if offset is None:
offset = self.fetch_offset()

entity_type_map: T.Dict[ClusterFileType, str] = {
ClusterFileType.ZIP: "application/zip",
ClusterFileType.BLACKVUE: "video/mp4",
ClusterFileType.CAMM: "video/mp4",
}

entity_type = entity_type_map[self.cluster_filetype]

data.seek(offset, io.SEEK_CUR)
chunks = self._chunkize_byte_stream(data)
return self.upload_chunks(chunks, offset=offset)

def _chunkize_byte_stream(
self, stream: T.IO[bytes]
) -> T.Generator[bytes, None, None]:
while True:
chunk = data.read(self.chunk_size)
# it is possible to upload an empty chunk here
# in order to return the handle
headers = {
"Authorization": f"OAuth {self.user_access_token}",
"Offset": f"{offset}",
"X-Entity-Length": str(self.entity_size),
"X-Entity-Name": self.session_key,
"X-Entity-Type": entity_type,
}
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
LOG.debug("POST %s HEADERS %s", url, json.dumps(_sanitize_headers(headers)))
resp = request_post(
url,
headers=headers,
data=chunk,
timeout=UPLOAD_REQUESTS_TIMEOUT,
)
LOG.debug(
"HTTP response %s: %s", resp.status_code, _truncate_end(resp.content)
)
resp.raise_for_status()
offset += len(chunk)
LOG.debug("The next offset will be: %s", offset)
data = stream.read(self.chunk_size)
if not data:
break
yield data

def _offset_chunks(
self, chunks: T.Iterable[bytes], offset: int
) -> T.Generator[bytes, None, None]:
assert offset >= 0, f"Expect non-negative offset but got {offset}"

for chunk in chunks:
if offset:
if offset < len(chunk):
yield chunk[offset:]
offset = 0
else:
offset -= len(chunk)
else:
yield chunk

def _attach_callbacks(
self, chunks: T.Iterable[bytes]
) -> T.Generator[bytes, None, None]:
for chunk in chunks:
yield chunk
for callback in self.callbacks:
callback(chunk, resp)
# we can assert that offset == self.fetch_offset(session_key)
# otherwise, server will throw
callback(chunk, None)

if not chunk:
break
def upload_chunks(
self,
chunks: T.Iterable[bytes],
offset: T.Optional[int] = None,
) -> str:
if offset is None:
offset = self.fetch_offset()

assert offset == self.entity_size, (
f"Offset ends at {offset} but the entity size is {self.entity_size}"
chunks = self._attach_callbacks(self._offset_chunks(chunks, offset))

headers = {
"Authorization": f"OAuth {self.user_access_token}",
"Offset": f"{offset}",
"X-Entity-Name": self.session_key,
"X-Entity-Type": self.MIME_BY_CLUSTER_TYPE[self.cluster_filetype],
}
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
LOG.debug("POST %s HEADERS %s", url, json.dumps(_sanitize_headers(headers)))
resp = request_post(
url,
headers=headers,
data=chunks,
timeout=UPLOAD_REQUESTS_TIMEOUT,
)
LOG.debug("HTTP response %s: %s", resp.status_code, _truncate_end(resp.content))

payload = resp.json()
try:
Expand Down Expand Up @@ -209,35 +221,30 @@ def __init__(self, *args, **kwargs):
)
self._error_ratio = 0.1

def upload(
def upload_chunks(
self,
data: T.IO[bytes],
chunks: T.Iterable[bytes],
offset: T.Optional[int] = None,
) -> str:
if offset is None:
offset = self.fetch_offset()

chunks = self._attach_callbacks(self._offset_chunks(chunks, offset))

os.makedirs(self._upload_path, exist_ok=True)
filename = os.path.join(self._upload_path, self.session_key)
with open(filename, "ab") as fp:
data.seek(offset, io.SEEK_CUR)
while True:
chunk = data.read(self.chunk_size)
if not chunk:
break
# fail here means nothing uploaded
for chunk in chunks:
if random.random() <= self._error_ratio:
raise requests.ConnectionError(
f"TEST ONLY: Failed to upload with error ratio {self._error_ratio}"
)
fp.write(chunk)
# fail here means patially uploaded
if random.random() <= self._error_ratio:
raise requests.ConnectionError(
f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}"
)
for callback in self.callbacks:
callback(chunk, None)
return self.session_key
return uuid.uuid4().hex

def finish(self, _: str) -> str:
return "0"
Expand Down
2 changes: 0 additions & 2 deletions mapillary_tools/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ def upload_stream(
upload_api_v4.FakeUploadService(
user_access_token=self.user_items["user_upload_token"],
session_key=session_key,
entity_size=entity_size,
organization_id=self.user_items.get("MAPOrganizationKey"),
cluster_filetype=cluster_filetype,
chunk_size=self.chunk_size,
Expand All @@ -205,7 +204,6 @@ def upload_stream(
upload_service = upload_api_v4.UploadService(
user_access_token=self.user_items["user_upload_token"],
session_key=session_key,
entity_size=entity_size,
organization_id=self.user_items.get("MAPOrganizationKey"),
cluster_filetype=cluster_filetype,
chunk_size=self.chunk_size,
Expand Down
47 changes: 47 additions & 0 deletions tests/unit/test_upload_api_v4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import io
import py

from mapillary_tools import upload_api_v4

from ..integration.fixtures import setup_upload


def test_upload(setup_upload: py.path.local):
upload_service = upload_api_v4.FakeUploadService(
user_access_token="TEST",
session_key="FOOBAR.txt",
chunk_size=1,
)
upload_service._error_ratio = 0
content = b"double_foobar"
cluster_id = upload_service.upload(io.BytesIO(content))
assert isinstance(cluster_id, str), cluster_id
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content

# reupload should not affect the file
upload_service.upload(io.BytesIO(content))
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content


def test_upload_chunks(setup_upload: py.path.local):
upload_service = upload_api_v4.FakeUploadService(
user_access_token="TEST",
session_key="FOOBAR2.txt",
chunk_size=1,
)
upload_service._error_ratio = 0

def _gen_chunks():
yield b"foo"
yield b""
yield b"bar"
yield b""

cluster_id = upload_service.upload_chunks(_gen_chunks())

assert isinstance(cluster_id, str), cluster_id
assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar"

# reupload should not affect the file
upload_service.upload_chunks(_gen_chunks())
assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar"
Loading