Skip to content

Commit 6152055

Browse files
authored
feat: cache upload handles (#766)
* sort files for uploading * rename retryable checks * refactor * handle KeyboardInterrupt * add upload_failed * log if upload is resumed or not * improve http error message * improve logging * log multiple lines * handle keyboard interruptions * remove read timeout * handle json errors * update upload_api_v4 * refactor: add SingleImageUploader * fix logging * add PersistentCache and tests * use the persistent cache * fix AttributeError: '_dbm.dbm' object has no attribute 'items'
1 parent 008bdc1 commit 6152055

File tree

11 files changed

+791
-202
lines changed

11 files changed

+791
-202
lines changed

mapillary_tools/api_v4.py

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,17 @@
2121
USE_SYSTEM_CERTS: bool = False
2222

2323

24+
class HTTPContentError(Exception):
25+
"""
26+
Raised when the HTTP response is ok (200) but the content is not as expected
27+
e.g. not JSON or not a valid response.
28+
"""
29+
30+
def __init__(self, message: str, response: requests.Response):
31+
self.response = response
32+
super().__init__(message)
33+
34+
2435
class ClusterFileType(enum.Enum):
2536
ZIP = "zip"
2637
BLACKVUE = "mly_blackvue_video"
@@ -58,24 +69,25 @@ def cert_verify(self, *args, **kwargs):
5869

5970

6071
@T.overload
61-
def _truncate(s: bytes, limit: int = 512) -> bytes: ...
72+
def _truncate(s: bytes, limit: int = 256) -> bytes | str: ...
6273

6374

6475
@T.overload
65-
def _truncate(s: str, limit: int = 512) -> str: ...
76+
def _truncate(s: str, limit: int = 256) -> str: ...
6677

6778

68-
def _truncate(s, limit=512):
79+
def _truncate(s, limit=256):
6980
if limit < len(s):
81+
if isinstance(s, bytes):
82+
try:
83+
s = s.decode("utf-8")
84+
except UnicodeDecodeError:
85+
pass
7086
remaining = len(s) - limit
7187
if isinstance(s, bytes):
72-
return (
73-
s[:limit]
74-
+ b"..."
75-
+ f"({remaining} more bytes truncated)".encode("utf-8")
76-
)
88+
return s[:limit] + f"...({remaining} bytes truncated)".encode("utf-8")
7789
else:
78-
return str(s[:limit]) + f"...({remaining} more chars truncated)"
90+
return str(s[:limit]) + f"...({remaining} chars truncated)"
7991
else:
8092
return s
8193

@@ -95,7 +107,10 @@ def _sanitize(headers: T.Mapping[T.Any, T.Any]) -> T.Mapping[T.Any, T.Any]:
95107
]:
96108
new_headers[k] = "[REDACTED]"
97109
else:
98-
new_headers[k] = _truncate(v)
110+
if isinstance(v, (str, bytes)):
111+
new_headers[k] = T.cast(T.Any, _truncate(v))
112+
else:
113+
new_headers[k] = v
99114

100115
return new_headers
101116

@@ -106,7 +121,6 @@ def _log_debug_request(
106121
json: dict | None = None,
107122
params: dict | None = None,
108123
headers: dict | None = None,
109-
timeout: T.Any = None,
110124
):
111125
if logging.getLogger().getEffectiveLevel() <= logging.DEBUG:
112126
return
@@ -126,8 +140,7 @@ def _log_debug_request(
126140
if headers:
127141
msg += f" HEADERS={_sanitize(headers)}"
128142

129-
if timeout is not None:
130-
msg += f" TIMEOUT={timeout}"
143+
msg = msg.replace("\n", "\\n")
131144

132145
LOG.debug(msg)
133146

@@ -136,26 +149,41 @@ def _log_debug_response(resp: requests.Response):
136149
if logging.getLogger().getEffectiveLevel() <= logging.DEBUG:
137150
return
138151

139-
data: str | bytes
152+
elapsed = resp.elapsed.total_seconds() * 1000 # Convert to milliseconds
153+
msg = f"HTTP {resp.status_code} {resp.reason} ({elapsed:.0f} ms): {str(_truncate_response_content(resp))}"
154+
155+
LOG.debug(msg)
156+
157+
158+
def _truncate_response_content(resp: requests.Response) -> str | bytes:
140159
try:
141-
data = _truncate(dumps(_sanitize(resp.json())))
142-
except Exception:
143-
data = _truncate(resp.content)
160+
json_data = resp.json()
161+
except requests.JSONDecodeError:
162+
if resp.content is not None:
163+
data = _truncate(resp.content)
164+
else:
165+
data = ""
166+
else:
167+
if isinstance(json_data, dict):
168+
data = _truncate(dumps(_sanitize(json_data)))
169+
else:
170+
data = _truncate(str(json_data))
171+
172+
if isinstance(data, bytes):
173+
return data.replace(b"\n", b"\\n")
144174

145-
LOG.debug(f"HTTP {resp.status_code} ({resp.reason}): %s", data)
175+
elif isinstance(data, str):
176+
return data.replace("\n", "\\n")
177+
178+
return data
146179

147180

148181
def readable_http_error(ex: requests.HTTPError) -> str:
149-
req = ex.request
150-
resp = ex.response
182+
return readable_http_response(ex.response)
151183

152-
data: str | bytes
153-
try:
154-
data = _truncate(dumps(_sanitize(resp.json())))
155-
except Exception:
156-
data = _truncate(resp.content)
157184

158-
return f"{req.method} {resp.url} => {resp.status_code} ({resp.reason}): {str(data)}"
185+
def readable_http_response(resp: requests.Response) -> str:
186+
return f"{resp.request.method} {resp.url} => {resp.status_code} {resp.reason}: {str(_truncate_response_content(resp))}"
159187

160188

161189
def request_post(
@@ -174,7 +202,6 @@ def request_post(
174202
json=json,
175203
params=kwargs.get("params"),
176204
headers=kwargs.get("headers"),
177-
timeout=kwargs.get("timeout"),
178205
)
179206

180207
if USE_SYSTEM_CERTS:
@@ -208,11 +235,7 @@ def request_get(
208235

209236
if not disable_debug:
210237
_log_debug_request(
211-
"GET",
212-
url,
213-
params=kwargs.get("params"),
214-
headers=kwargs.get("headers"),
215-
timeout=kwargs.get("timeout"),
238+
"GET", url, params=kwargs.get("params"), headers=kwargs.get("headers")
216239
)
217240

218241
if USE_SYSTEM_CERTS:
@@ -335,10 +358,7 @@ def fetch_user_or_me(
335358
def log_event(action_type: ActionType, properties: dict) -> requests.Response:
336359
resp = request_post(
337360
f"{MAPILLARY_GRAPH_API_ENDPOINT}/logging",
338-
json={
339-
"action_type": action_type,
340-
"properties": properties,
341-
},
361+
json={"action_type": action_type, "properties": properties},
342362
headers={
343363
"Authorization": f"OAuth {MAPILLARY_CLIENT_TOKEN}",
344364
},
@@ -374,3 +394,13 @@ def finish_upload(
374394
resp.raise_for_status()
375395

376396
return resp
397+
398+
399+
def jsonify_response(resp: requests.Response) -> T.Any:
400+
"""
401+
Convert the response to JSON, raising HTTPContentError if the response is not JSON.
402+
"""
403+
try:
404+
return resp.json()
405+
except requests.JSONDecodeError as ex:
406+
raise HTTPContentError("Invalid JSON response", resp) from ex

mapillary_tools/authenticate.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import annotations
22

33
import getpass
4-
import json
54
import logging
65
import re
76
import sys
@@ -131,15 +130,18 @@ def fetch_user_items(
131130
user_items = _verify_user_auth(_validate_profile(user_items))
132131

133132
LOG.info(
134-
'Uploading to profile "%s": %s', profile_name, api_v4._sanitize(user_items)
133+
f'Uploading to profile "{profile_name}": {user_items.get("MAPSettingsUsername")} (ID: {user_items.get("MAPSettingsUserKey")})'
135134
)
136135

137136
if organization_key is not None:
138137
resp = api_v4.fetch_organization(
139138
user_items["user_upload_token"], organization_key
140139
)
141-
LOG.info("Uploading to Mapillary organization: %s", json.dumps(resp.json()))
142-
user_items["MAPOrganizationKey"] = organization_key
140+
data = api_v4.jsonify_response(resp)
141+
LOG.info(
142+
f"Uploading to organization: {data.get('name')} (ID: {data.get('id')})"
143+
)
144+
user_items["MAPOrganizationKey"] = data.get("id")
143145

144146
return user_items
145147

@@ -182,12 +184,12 @@ def _verify_user_auth(user_items: config.UserItem) -> config.UserItem:
182184
else:
183185
raise ex
184186

185-
user_json = resp.json()
187+
data = api_v4.jsonify_response(resp)
186188

187189
return {
188190
**user_items,
189-
"MAPSettingsUsername": user_json.get("username"),
190-
"MAPSettingsUserKey": user_json.get("id"),
191+
"MAPSettingsUsername": data.get("username"),
192+
"MAPSettingsUserKey": data.get("id"),
191193
}
192194

193195

@@ -285,7 +287,7 @@ def _prompt_login(
285287

286288
raise ex
287289

288-
data = resp.json()
290+
data = api_v4.jsonify_response(resp)
289291

290292
user_items: config.UserItem = {
291293
"user_upload_token": str(data["access_token"]),

mapillary_tools/commands/__main__.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import requests
88

99
from .. import api_v4, constants, exceptions, VERSION
10+
from ..upload import log_exception
1011
from . import (
1112
authenticate,
1213
process,
@@ -162,14 +163,16 @@ def main():
162163
try:
163164
args.func(argvars)
164165
except requests.HTTPError as ex:
165-
LOG.error("%s: %s", ex.__class__.__name__, api_v4.readable_http_error(ex))
166+
log_exception(ex)
166167
# TODO: standardize exit codes as exceptions.MapillaryUserError
167168
sys.exit(16)
168169

170+
except api_v4.HTTPContentError as ex:
171+
log_exception(ex)
172+
sys.exit(17)
173+
169174
except exceptions.MapillaryUserError as ex:
170-
LOG.error(
171-
"%s: %s", ex.__class__.__name__, ex, exc_info=log_level == logging.DEBUG
172-
)
175+
log_exception(ex)
173176
sys.exit(ex.exit_code)
174177

175178

mapillary_tools/constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import functools
44
import os
5+
import tempfile
56

67
import appdirs
78

@@ -146,6 +147,10 @@ def _parse_scaled_integers(
146147
MAPILLARY_UPLOAD_HISTORY_PATH: str = os.getenv(
147148
"MAPILLARY_UPLOAD_HISTORY_PATH", os.path.join(USER_DATA_DIR, "upload_history")
148149
)
150+
UPLOAD_CACHE_DIR: str = os.getenv(
151+
_ENV_PREFIX + "UPLOAD_CACHE_DIR",
152+
os.path.join(tempfile.gettempdir(), "mapillary_tools", "upload_cache"),
153+
)
149154
MAX_IMAGE_UPLOAD_WORKERS: int = int(
150155
os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 64)
151156
)

0 commit comments

Comments
 (0)