Skip to content

Commit 812f3fc

Browse files
authored
feat: support uploading images file by file (#753)
* simplify upload API * fix tests * sort * fix tests * refactor * wip * fix tests * format * format * parallel uploading * fix types * types * fix progresses * ruff * fix max number of upload workers
1 parent ebad6bf commit 812f3fc

File tree

14 files changed

+556
-426
lines changed

14 files changed

+556
-426
lines changed

mapillary_tools/api_v4.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class ClusterFileType(enum.Enum):
2525
ZIP = "zip"
2626
BLACKVUE = "mly_blackvue_video"
2727
CAMM = "mly_camm_video"
28+
MLY_BUNDLE_MANIFEST = "mly_bundle_manifest"
2829

2930

3031
class HTTPSystemCertsAdapter(HTTPAdapter):

mapillary_tools/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,5 @@ def _yes_or_no(val: str) -> bool:
8989
"upload_history",
9090
),
9191
)
92+
93+
MAX_IMAGE_UPLOAD_WORKERS = int(os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 64))

mapillary_tools/types.py

Lines changed: 27 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,16 @@
66
import hashlib
77
import json
88
import os
9+
import sys
910
import typing as T
1011
import uuid
1112
from pathlib import Path
12-
from typing import Literal, TypedDict
13+
from typing import TypedDict
14+
15+
if sys.version_info >= (3, 11):
16+
from typing import Required
17+
else:
18+
from typing_extensions import Required
1319

1420
import jsonschema
1521

@@ -144,73 +150,57 @@ class UserItem(TypedDict, total=False):
144150
# Not in use. Keep here for back-compatibility
145151
MAPSettingsUsername: str
146152
MAPSettingsUserKey: str
147-
user_upload_token: str
153+
user_upload_token: Required[str]
148154

149155

150156
class _CompassHeading(TypedDict, total=True):
151157
TrueHeading: float
152158
MagneticHeading: float
153159

154160

155-
class _ImageRequired(TypedDict, total=True):
156-
MAPLatitude: float
157-
MAPLongitude: float
158-
MAPCaptureTime: str
161+
class _SharedDescription(TypedDict, total=False):
162+
filename: Required[str]
163+
filetype: Required[str]
164+
165+
# if None or absent, it will be calculated
166+
md5sum: str | None
167+
filesize: int | None
159168

160169

161-
class _Image(_ImageRequired, total=False):
170+
class ImageDescription(_SharedDescription, total=False):
171+
MAPLatitude: Required[float]
172+
MAPLongitude: Required[float]
162173
MAPAltitude: float
174+
MAPCaptureTime: Required[str]
163175
MAPCompassHeading: _CompassHeading
164176

165-
166-
class _SequenceOnly(TypedDict, total=False):
167-
MAPSequenceUUID: str
168-
169-
170-
class MetaProperties(TypedDict, total=False):
171177
MAPDeviceMake: str
172178
MAPDeviceModel: str
173179
MAPGPSAccuracyMeters: float
174180
MAPCameraUUID: str
175181
MAPOrientation: int
176182

177-
178-
class ImageDescription(_SequenceOnly, _Image, MetaProperties, total=True):
179-
# filename is required
180-
filename: str
181-
# if None or absent, it will be calculated
182-
md5sum: str | None
183-
filetype: Literal["image"]
184-
filesize: int | None
185-
186-
187-
class _VideoDescriptionRequired(TypedDict, total=True):
188-
filename: str
189-
md5sum: str | None
190-
filetype: str
191-
MAPGPSTrack: list[T.Sequence[float | int | None]]
183+
# For grouping images in a sequence
184+
MAPSequenceUUID: str
192185

193186

194-
class VideoDescription(_VideoDescriptionRequired, total=False):
187+
class VideoDescription(_SharedDescription, total=False):
188+
MAPGPSTrack: Required[list[T.Sequence[float | int | None]]]
195189
MAPDeviceMake: str
196190
MAPDeviceModel: str
197-
filesize: int | None
198191

199192

200193
class _ErrorDescription(TypedDict, total=False):
201194
# type and message are required
202-
type: str
195+
type: Required[str]
203196
message: str
204197
# vars is optional
205198
vars: dict
206199

207200

208-
class _ImageDescriptionErrorRequired(TypedDict, total=True):
209-
filename: str
210-
error: _ErrorDescription
211-
212-
213-
class ImageDescriptionError(_ImageDescriptionErrorRequired, total=False):
201+
class ImageDescriptionError(TypedDict, total=False):
202+
filename: Required[str]
203+
error: Required[_ErrorDescription]
214204
filetype: str
215205

216206

mapillary_tools/upload.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
ipc,
2323
telemetry,
2424
types,
25-
upload_api_v4,
2625
uploader,
2726
utils,
2827
VERSION,
@@ -192,8 +191,9 @@ def write_history(payload: uploader.Progress):
192191
def _setup_tdqm(emitter: uploader.EventEmitter) -> None:
193192
upload_pbar: tqdm | None = None
194193

194+
@emitter.on("upload_start")
195195
@emitter.on("upload_fetch_offset")
196-
def upload_fetch_offset(payload: uploader.Progress) -> None:
196+
def upload_start(payload: uploader.Progress) -> None:
197197
nonlocal upload_pbar
198198

199199
if upload_pbar is not None:
@@ -204,18 +204,18 @@ def upload_fetch_offset(payload: uploader.Progress) -> None:
204204
import_path: str | None = payload.get("import_path")
205205
filetype = payload.get("file_type", "unknown").upper()
206206
if import_path is None:
207-
_desc = f"Uploading {filetype} ({nth}/{total})"
207+
desc = f"Uploading {filetype} ({nth}/{total})"
208208
else:
209-
_desc = (
209+
desc = (
210210
f"Uploading {filetype} {os.path.basename(import_path)} ({nth}/{total})"
211211
)
212212
upload_pbar = tqdm(
213213
total=payload["entity_size"],
214-
desc=_desc,
214+
desc=desc,
215215
unit="B",
216216
unit_scale=True,
217217
unit_divisor=1024,
218-
initial=payload["offset"],
218+
initial=payload.get("offset", 0),
219219
disable=LOG.getEffectiveLevel() <= logging.DEBUG,
220220
)
221221

@@ -295,8 +295,13 @@ def _setup_api_stats(emitter: uploader.EventEmitter):
295295

296296
@emitter.on("upload_start")
297297
def collect_start_time(payload: _APIStats) -> None:
298-
payload["upload_start_time"] = time.time()
298+
now = time.time()
299+
payload["upload_start_time"] = now
299300
payload["upload_total_time"] = 0
301+
# These filed should be initialized in upload events like "upload_fetch_offset"
302+
# but since we disabled them for uploading images, so we initialize them here
303+
payload["upload_last_restart_time"] = now
304+
payload["upload_first_offset"] = 0
300305

301306
@emitter.on("upload_fetch_offset")
302307
def collect_restart_time(payload: _APIStats) -> None:
@@ -466,7 +471,7 @@ def _gen_upload_everything(
466471
(m for m in metadatas if isinstance(m, types.ImageMetadata)),
467472
utils.find_images(import_paths, skip_subfolders=skip_subfolders),
468473
)
469-
for image_result in uploader.ZipImageSequence.zip_images_and_upload(
474+
for image_result in uploader.ZipImageSequence.upload_images(
470475
mly_uploader,
471476
image_metadatas,
472477
):
@@ -510,13 +515,8 @@ def _gen_upload_videos(
510515
"file_type": video_metadata.filetype.value,
511516
"import_path": str(video_metadata.filename),
512517
"sequence_md5sum": video_metadata.md5sum,
513-
"upload_md5sum": video_metadata.md5sum,
514518
}
515519

516-
session_key = uploader._session_key(
517-
video_metadata.md5sum, upload_api_v4.ClusterFileType.CAMM
518-
)
519-
520520
try:
521521
with video_metadata.filename.open("rb") as src_fp:
522522
# Build the mp4 stream with the CAMM samples
@@ -525,12 +525,15 @@ def _gen_upload_videos(
525525
)
526526

527527
# Upload the mp4 stream
528-
cluster_id = mly_uploader.upload_stream(
528+
file_handle = mly_uploader.upload_stream(
529529
T.cast(T.IO[bytes], camm_fp),
530-
upload_api_v4.ClusterFileType.CAMM,
531-
session_key,
532530
progress=T.cast(T.Dict[str, T.Any], progress),
533531
)
532+
cluster_id = mly_uploader.finish_upload(
533+
file_handle,
534+
api_v4.ClusterFileType.CAMM,
535+
progress=T.cast(T.Dict[str, T.Any], progress),
536+
)
534537
except Exception as ex:
535538
yield video_metadata, uploader.UploadResult(error=ex)
536539
else:
@@ -706,7 +709,9 @@ def upload(
706709

707710
finally:
708711
# We collected stats after every upload is finished
709-
assert upload_successes == len(stats)
712+
assert upload_successes == len(stats), (
713+
f"Expect {upload_successes} success but got {stats}"
714+
)
710715
_show_upload_summary(stats, upload_errors)
711716

712717

mapillary_tools/upload_api_v4.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import sys
77
import typing as T
88
import uuid
9+
from pathlib import Path
910

1011
if sys.version_info >= (3, 12):
1112
from typing import override
@@ -14,7 +15,7 @@
1415

1516
import requests
1617

17-
from .api_v4 import ClusterFileType, request_get, request_post, REQUESTS_TIMEOUT
18+
from .api_v4 import request_get, request_post, REQUESTS_TIMEOUT
1819

1920
MAPILLARY_UPLOAD_ENDPOINT = os.getenv(
2021
"MAPILLARY_UPLOAD_ENDPOINT", "https://rupload.facebook.com/mapillary_public_uploads"
@@ -31,24 +32,14 @@
3132
class UploadService:
3233
user_access_token: str
3334
session_key: str
34-
cluster_filetype: ClusterFileType
35-
36-
MIME_BY_CLUSTER_TYPE: dict[ClusterFileType, str] = {
37-
ClusterFileType.ZIP: "application/zip",
38-
ClusterFileType.BLACKVUE: "video/mp4",
39-
ClusterFileType.CAMM: "video/mp4",
40-
}
4135

4236
def __init__(
4337
self,
4438
user_access_token: str,
4539
session_key: str,
46-
cluster_filetype: ClusterFileType,
4740
):
4841
self.user_access_token = user_access_token
4942
self.session_key = session_key
50-
# Validate the input
51-
self.cluster_filetype = cluster_filetype
5243

5344
def fetch_offset(self) -> int:
5445
headers = {
@@ -124,7 +115,6 @@ def upload_shifted_chunks(
124115
"Authorization": f"OAuth {self.user_access_token}",
125116
"Offset": f"{offset}",
126117
"X-Entity-Name": self.session_key,
127-
"X-Entity-Type": self.MIME_BY_CLUSTER_TYPE[self.cluster_filetype],
128118
}
129119
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
130120
resp = request_post(
@@ -149,8 +139,8 @@ def upload_shifted_chunks(
149139
class FakeUploadService(UploadService):
150140
def __init__(self, *args, **kwargs):
151141
super().__init__(*args, **kwargs)
152-
self._upload_path = os.getenv(
153-
"MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads"
142+
self._upload_path = Path(
143+
os.getenv("MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads")
154144
)
155145
self._error_ratio = 0.02
156146

@@ -167,8 +157,8 @@ def upload_shifted_chunks(
167157
)
168158

169159
os.makedirs(self._upload_path, exist_ok=True)
170-
filename = os.path.join(self._upload_path, self.session_key)
171-
with open(filename, "ab") as fp:
160+
filename = self._upload_path.joinpath(self.session_key)
161+
with filename.open("ab") as fp:
172162
for chunk in shifted_chunks:
173163
if random.random() <= self._error_ratio:
174164
raise requests.ConnectionError(
@@ -179,7 +169,15 @@ def upload_shifted_chunks(
179169
raise requests.ConnectionError(
180170
f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}"
181171
)
182-
return uuid.uuid4().hex
172+
173+
file_handle_dir = self._upload_path.joinpath("file_handles")
174+
file_handle_path = file_handle_dir.joinpath(self.session_key)
175+
if not file_handle_path.exists():
176+
os.makedirs(file_handle_dir, exist_ok=True)
177+
random_file_handle = uuid.uuid4().hex
178+
file_handle_path.write_text(random_file_handle)
179+
180+
return file_handle_path.read_text()
183181

184182
@override
185183
def fetch_offset(self) -> int:

0 commit comments

Comments
 (0)