Skip to content

Commit a619ea0

Browse files
authored
improve: use chunked transfer encoding to stream large files (#714)
* improve: use data generator (chunks) to stream large files * add unit tests
1 parent ec5a49b commit a619ea0

File tree

3 files changed

+115
-63
lines changed

3 files changed

+115
-63
lines changed

mapillary_tools/upload_api_v4.py

Lines changed: 68 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import random
77
import typing as T
8+
import uuid
89

910
import requests
1011

@@ -55,31 +56,31 @@ def _truncate_end(s: _S) -> _S:
5556

5657
class UploadService:
5758
user_access_token: str
58-
entity_size: int
5959
session_key: str
6060
callbacks: T.List[T.Callable[[bytes, T.Optional[requests.Response]], None]]
6161
cluster_filetype: ClusterFileType
6262
organization_id: T.Optional[T.Union[str, int]]
6363
chunk_size: int
6464

65+
MIME_BY_CLUSTER_TYPE: T.Dict[ClusterFileType, str] = {
66+
ClusterFileType.ZIP: "application/zip",
67+
ClusterFileType.BLACKVUE: "video/mp4",
68+
ClusterFileType.CAMM: "video/mp4",
69+
}
70+
6571
def __init__(
6672
self,
6773
user_access_token: str,
6874
session_key: str,
69-
entity_size: int,
7075
organization_id: T.Optional[T.Union[str, int]] = None,
7176
cluster_filetype: ClusterFileType = ClusterFileType.ZIP,
7277
chunk_size: int = DEFAULT_CHUNK_SIZE,
7378
):
74-
if entity_size <= 0:
75-
raise ValueError(f"Expect positive entity size but got {entity_size}")
76-
7779
if chunk_size <= 0:
7880
raise ValueError("Expect positive chunk size")
7981

8082
self.user_access_token = user_access_token
8183
self.session_key = session_key
82-
self.entity_size = entity_size
8384
self.organization_id = organization_id
8485
# validate the input
8586
self.cluster_filetype = ClusterFileType(cluster_filetype)
@@ -107,55 +108,66 @@ def upload(
107108
data: T.IO[bytes],
108109
offset: T.Optional[int] = None,
109110
) -> str:
110-
if offset is None:
111-
offset = self.fetch_offset()
112-
113-
entity_type_map: T.Dict[ClusterFileType, str] = {
114-
ClusterFileType.ZIP: "application/zip",
115-
ClusterFileType.BLACKVUE: "video/mp4",
116-
ClusterFileType.CAMM: "video/mp4",
117-
}
118-
119-
entity_type = entity_type_map[self.cluster_filetype]
120-
121-
data.seek(offset, io.SEEK_CUR)
111+
chunks = self._chunkize_byte_stream(data)
112+
return self.upload_chunks(chunks, offset=offset)
122113

114+
def _chunkize_byte_stream(
115+
self, stream: T.IO[bytes]
116+
) -> T.Generator[bytes, None, None]:
123117
while True:
124-
chunk = data.read(self.chunk_size)
125-
# it is possible to upload an empty chunk here
126-
# in order to return the handle
127-
headers = {
128-
"Authorization": f"OAuth {self.user_access_token}",
129-
"Offset": f"{offset}",
130-
"X-Entity-Length": str(self.entity_size),
131-
"X-Entity-Name": self.session_key,
132-
"X-Entity-Type": entity_type,
133-
}
134-
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
135-
LOG.debug("POST %s HEADERS %s", url, json.dumps(_sanitize_headers(headers)))
136-
resp = request_post(
137-
url,
138-
headers=headers,
139-
data=chunk,
140-
timeout=UPLOAD_REQUESTS_TIMEOUT,
141-
)
142-
LOG.debug(
143-
"HTTP response %s: %s", resp.status_code, _truncate_end(resp.content)
144-
)
145-
resp.raise_for_status()
146-
offset += len(chunk)
147-
LOG.debug("The next offset will be: %s", offset)
118+
data = stream.read(self.chunk_size)
119+
if not data:
120+
break
121+
yield data
122+
123+
def _offset_chunks(
124+
self, chunks: T.Iterable[bytes], offset: int
125+
) -> T.Generator[bytes, None, None]:
126+
assert offset >= 0, f"Expect non-negative offset but got {offset}"
127+
128+
for chunk in chunks:
129+
if offset:
130+
if offset < len(chunk):
131+
yield chunk[offset:]
132+
offset = 0
133+
else:
134+
offset -= len(chunk)
135+
else:
136+
yield chunk
137+
138+
def _attach_callbacks(
139+
self, chunks: T.Iterable[bytes]
140+
) -> T.Generator[bytes, None, None]:
141+
for chunk in chunks:
142+
yield chunk
148143
for callback in self.callbacks:
149-
callback(chunk, resp)
150-
# we can assert that offset == self.fetch_offset(session_key)
151-
# otherwise, server will throw
144+
callback(chunk, None)
152145

153-
if not chunk:
154-
break
146+
def upload_chunks(
147+
self,
148+
chunks: T.Iterable[bytes],
149+
offset: T.Optional[int] = None,
150+
) -> str:
151+
if offset is None:
152+
offset = self.fetch_offset()
155153

156-
assert offset == self.entity_size, (
157-
f"Offset ends at {offset} but the entity size is {self.entity_size}"
154+
chunks = self._attach_callbacks(self._offset_chunks(chunks, offset))
155+
156+
headers = {
157+
"Authorization": f"OAuth {self.user_access_token}",
158+
"Offset": f"{offset}",
159+
"X-Entity-Name": self.session_key,
160+
"X-Entity-Type": self.MIME_BY_CLUSTER_TYPE[self.cluster_filetype],
161+
}
162+
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
163+
LOG.debug("POST %s HEADERS %s", url, json.dumps(_sanitize_headers(headers)))
164+
resp = request_post(
165+
url,
166+
headers=headers,
167+
data=chunks,
168+
timeout=UPLOAD_REQUESTS_TIMEOUT,
158169
)
170+
LOG.debug("HTTP response %s: %s", resp.status_code, _truncate_end(resp.content))
159171

160172
payload = resp.json()
161173
try:
@@ -209,35 +221,30 @@ def __init__(self, *args, **kwargs):
209221
)
210222
self._error_ratio = 0.1
211223

212-
def upload(
224+
def upload_chunks(
213225
self,
214-
data: T.IO[bytes],
226+
chunks: T.Iterable[bytes],
215227
offset: T.Optional[int] = None,
216228
) -> str:
217229
if offset is None:
218230
offset = self.fetch_offset()
231+
232+
chunks = self._attach_callbacks(self._offset_chunks(chunks, offset))
233+
219234
os.makedirs(self._upload_path, exist_ok=True)
220235
filename = os.path.join(self._upload_path, self.session_key)
221236
with open(filename, "ab") as fp:
222-
data.seek(offset, io.SEEK_CUR)
223-
while True:
224-
chunk = data.read(self.chunk_size)
225-
if not chunk:
226-
break
227-
# fail here means nothing uploaded
237+
for chunk in chunks:
228238
if random.random() <= self._error_ratio:
229239
raise requests.ConnectionError(
230240
f"TEST ONLY: Failed to upload with error ratio {self._error_ratio}"
231241
)
232242
fp.write(chunk)
233-
# fail here means patially uploaded
234243
if random.random() <= self._error_ratio:
235244
raise requests.ConnectionError(
236245
f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}"
237246
)
238-
for callback in self.callbacks:
239-
callback(chunk, None)
240-
return self.session_key
247+
return uuid.uuid4().hex
241248

242249
def finish(self, _: str) -> str:
243250
return "0"

mapillary_tools/uploader.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ def upload_stream(
195195
upload_api_v4.FakeUploadService(
196196
user_access_token=self.user_items["user_upload_token"],
197197
session_key=session_key,
198-
entity_size=entity_size,
199198
organization_id=self.user_items.get("MAPOrganizationKey"),
200199
cluster_filetype=cluster_filetype,
201200
chunk_size=self.chunk_size,
@@ -205,7 +204,6 @@ def upload_stream(
205204
upload_service = upload_api_v4.UploadService(
206205
user_access_token=self.user_items["user_upload_token"],
207206
session_key=session_key,
208-
entity_size=entity_size,
209207
organization_id=self.user_items.get("MAPOrganizationKey"),
210208
cluster_filetype=cluster_filetype,
211209
chunk_size=self.chunk_size,

tests/unit/test_upload_api_v4.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import io
2+
import py
3+
4+
from mapillary_tools import upload_api_v4
5+
6+
from ..integration.fixtures import setup_upload
7+
8+
9+
def test_upload(setup_upload: py.path.local):
10+
upload_service = upload_api_v4.FakeUploadService(
11+
user_access_token="TEST",
12+
session_key="FOOBAR.txt",
13+
chunk_size=1,
14+
)
15+
upload_service._error_ratio = 0
16+
content = b"double_foobar"
17+
cluster_id = upload_service.upload(io.BytesIO(content))
18+
assert isinstance(cluster_id, str), cluster_id
19+
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
20+
21+
# reupload should not affect the file
22+
upload_service.upload(io.BytesIO(content))
23+
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
24+
25+
26+
def test_upload_chunks(setup_upload: py.path.local):
27+
upload_service = upload_api_v4.FakeUploadService(
28+
user_access_token="TEST",
29+
session_key="FOOBAR2.txt",
30+
chunk_size=1,
31+
)
32+
upload_service._error_ratio = 0
33+
34+
def _gen_chunks():
35+
yield b"foo"
36+
yield b""
37+
yield b"bar"
38+
yield b""
39+
40+
cluster_id = upload_service.upload_chunks(_gen_chunks())
41+
42+
assert isinstance(cluster_id, str), cluster_id
43+
assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar"
44+
45+
# reupload should not affect the file
46+
upload_service.upload_chunks(_gen_chunks())
47+
assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar"

0 commit comments

Comments
 (0)