Skip to content
50 changes: 43 additions & 7 deletions mapillary_tools/api_v4.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from __future__ import annotations

import enum
import logging
import os
import ssl
Expand All @@ -18,6 +21,12 @@
USE_SYSTEM_CERTS: bool = False


class ClusterFileType(enum.Enum):
ZIP = "zip"
BLACKVUE = "mly_blackvue_video"
CAMM = "mly_camm_video"


class HTTPSystemCertsAdapter(HTTPAdapter):
"""
This adapter uses the system's certificate store instead of the certifi module.
Expand Down Expand Up @@ -93,9 +102,9 @@ def _sanitize(headers: T.Mapping[T.Any, T.Any]) -> T.Mapping[T.Any, T.Any]:
def _log_debug_request(
method: str,
url: str,
json: T.Optional[T.Dict] = None,
params: T.Optional[T.Dict] = None,
headers: T.Optional[T.Dict] = None,
json: dict | None = None,
params: dict | None = None,
headers: dict | None = None,
timeout: T.Any = None,
):
if logging.getLogger().getEffectiveLevel() <= logging.DEBUG:
Expand Down Expand Up @@ -150,8 +159,8 @@ def readable_http_error(ex: requests.HTTPError) -> str:

def request_post(
url: str,
data: T.Optional[T.Any] = None,
json: T.Optional[dict] = None,
data: T.Any | None = None,
json: dict | None = None,
**kwargs,
) -> requests.Response:
global USE_SYSTEM_CERTS
Expand Down Expand Up @@ -190,7 +199,7 @@ def request_post(

def request_get(
url: str,
params: T.Optional[dict] = None,
params: dict | None = None,
**kwargs,
) -> requests.Response:
global USE_SYSTEM_CERTS
Expand Down Expand Up @@ -293,7 +302,7 @@ def fetch_organization(

def fetch_user_or_me(
user_access_token: str,
user_id: T.Optional[T.Union[int, str]] = None,
user_id: int | str | None = None,
) -> requests.Response:
if user_id is None:
url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/me"
Expand Down Expand Up @@ -334,3 +343,30 @@ def log_event(action_type: ActionType, properties: T.Dict) -> requests.Response:
)
resp.raise_for_status()
return resp


def finish_upload(
user_access_token: str,
file_handle: str,
cluster_filetype: ClusterFileType,
organization_id: int | str | None = None,
) -> requests.Response:
data: dict[str, str | int] = {
"file_handle": file_handle,
"file_type": cluster_filetype.value,
}
if organization_id is not None:
data["organization_id"] = organization_id

resp = request_post(
f"{MAPILLARY_GRAPH_API_ENDPOINT}/finish_upload",
headers={
"Authorization": f"OAuth {user_access_token}",
},
json=data,
timeout=REQUESTS_TIMEOUT,
)

resp.raise_for_status()

return resp
3 changes: 2 additions & 1 deletion mapillary_tools/authenticate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import sys
import typing as T

import requests
import jsonschema

import requests

from . import api_v4, config, constants, exceptions, types


Expand Down
67 changes: 37 additions & 30 deletions mapillary_tools/upload.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import json
import logging
import os
Expand Down Expand Up @@ -45,8 +47,8 @@ def __init__(self, inner_ex) -> None:


def _load_validate_metadatas_from_desc_path(
desc_path: T.Optional[str], import_paths: T.Sequence[Path]
) -> T.List[types.Metadata]:
desc_path: str | None, import_paths: T.Sequence[Path]
) -> list[types.Metadata]:
is_default_desc_path = False
if desc_path is None:
is_default_desc_path = True
Expand All @@ -64,7 +66,7 @@ def _load_validate_metadatas_from_desc_path(
"The description path must be specified (with --desc_path) when uploading a single file",
)

descs: T.List[types.DescriptionOrError] = []
descs: list[types.DescriptionOrError] = []

if desc_path == "-":
try:
Expand Down Expand Up @@ -117,7 +119,7 @@ def _load_validate_metadatas_from_desc_path(
def zip_images(
import_path: Path,
zip_dir: Path,
desc_path: T.Optional[str] = None,
desc_path: str | None = None,
):
if not import_path.is_dir():
raise exceptions.MapillaryFileNotFoundError(
Expand Down Expand Up @@ -162,7 +164,7 @@ def upload_start(payload: uploader.Progress):
def _setup_write_upload_history(
emitter: uploader.EventEmitter,
params: JSONDict,
metadatas: T.Optional[T.List[types.Metadata]] = None,
metadatas: list[types.Metadata] | None = None,
) -> None:
@emitter.on("upload_finished")
def upload_finished(payload: uploader.Progress):
Expand Down Expand Up @@ -190,7 +192,7 @@ def upload_finished(payload: uploader.Progress):


def _setup_tdqm(emitter: uploader.EventEmitter) -> None:
upload_pbar: T.Optional[tqdm] = None
upload_pbar: tqdm | None = None

@emitter.on("upload_fetch_offset")
def upload_fetch_offset(payload: uploader.Progress) -> None:
Expand All @@ -201,7 +203,7 @@ def upload_fetch_offset(payload: uploader.Progress) -> None:

nth = payload["sequence_idx"] + 1
total = payload["total_sequence_count"]
import_path: T.Optional[str] = payload.get("import_path")
import_path: str | None = payload.get("import_path")
filetype = payload.get("file_type", "unknown").upper()
if import_path is None:
_desc = f"Uploading {filetype} ({nth}/{total})"
Expand Down Expand Up @@ -276,7 +278,7 @@ class _APIStats(uploader.Progress, total=False):


def _setup_api_stats(emitter: uploader.EventEmitter):
all_stats: T.List[_APIStats] = []
all_stats: list[_APIStats] = []

@emitter.on("upload_start")
def collect_start_time(payload: _APIStats) -> None:
Expand Down Expand Up @@ -309,7 +311,7 @@ def collect_end_time(payload: _APIStats) -> None:
return all_stats


def _summarize(stats: T.Sequence[_APIStats]) -> T.Dict:
def _summarize(stats: T.Sequence[_APIStats]) -> dict:
total_image_count = sum(s.get("sequence_image_count", 0) for s in stats)
total_uploaded_sequence_count = len(stats)
# note that stats[0]["total_sequence_count"] not always same as total_uploaded_sequence_count
Expand Down Expand Up @@ -341,7 +343,7 @@ def _summarize(stats: T.Sequence[_APIStats]) -> T.Dict:


def _show_upload_summary(stats: T.Sequence[_APIStats]):
grouped: T.Dict[str, T.List[_APIStats]] = {}
grouped: dict[str, list[_APIStats]] = {}
for stat in stats:
grouped.setdefault(stat.get("file_type", "unknown"), []).append(stat)

Expand All @@ -365,7 +367,7 @@ def _show_upload_summary(stats: T.Sequence[_APIStats]):
LOG.info("%8.1fs upload time", summary["time"])


def _api_logging_finished(summary: T.Dict):
def _api_logging_finished(summary: dict):
if MAPILLARY_DISABLE_API_LOGGING:
return

Expand All @@ -383,7 +385,7 @@ def _api_logging_finished(summary: T.Dict):
LOG.warning("Error from API Logging for action %s", action, exc_info=True)


def _api_logging_failed(payload: T.Dict, exc: Exception):
def _api_logging_failed(payload: dict, exc: Exception):
if MAPILLARY_DISABLE_API_LOGGING:
return

Expand All @@ -403,11 +405,11 @@ def _api_logging_failed(payload: T.Dict, exc: Exception):


def _load_descs(
_metadatas_from_process: T.Optional[T.Sequence[types.MetadataOrError]],
desc_path: T.Optional[str],
_metadatas_from_process: T.Sequence[types.MetadataOrError] | None,
desc_path: str | None,
import_paths: T.Sequence[Path],
) -> T.List[types.Metadata]:
metadatas: T.List[types.Metadata]
) -> list[types.Metadata]:
metadatas: list[types.Metadata]

if _metadatas_from_process is not None:
metadatas = [
Expand Down Expand Up @@ -439,7 +441,7 @@ def _load_descs(

def _find_metadata_with_filename_existed_in(
metadatas: T.Sequence[_M], paths: T.Sequence[Path]
) -> T.List[_M]:
) -> list[_M]:
resolved_image_paths = set(p.resolve() for p in paths)
return [d for d in metadatas if d.filename.resolve() in resolved_image_paths]

Expand All @@ -463,9 +465,9 @@ def _upload_everything(
)
if specified_image_metadatas:
try:
clusters = mly_uploader.upload_images(
clusters = uploader.ZipImageSequence.prepare_images_and_upload(
specified_image_metadatas,
event_payload={"file_type": FileType.IMAGE.value},
mly_uploader,
)
except Exception as ex:
raise UploadError(ex) from ex
Expand All @@ -488,7 +490,7 @@ def _upload_everything(
assert isinstance(video_metadata.md5sum, str), "md5sum should be updated"

# extract telemetry measurements from GoPro videos
telemetry_measurements: T.List[camm_parser.TelemetryMeasurement] = []
telemetry_measurements: list[camm_parser.TelemetryMeasurement] = []
if MAPILLARY__EXPERIMENTAL_ENABLE_IMU == "YES":
if video_metadata.filetype is FileType.GOPRO:
with video_metadata.filename.open("rb") as fp:
Expand All @@ -505,18 +507,24 @@ def _upload_everything(

with video_metadata.filename.open("rb") as src_fp:
camm_fp = simple_mp4_builder.transform_mp4(src_fp, generator)
event_payload: uploader.Progress = {
progress: uploader.SequenceProgress = {
"total_sequence_count": len(specified_video_metadatas),
"sequence_idx": idx,
"file_type": video_metadata.filetype.value,
"import_path": str(video_metadata.filename),
"md5sum": video_metadata.md5sum,
}

session_key = uploader._session_key(
video_metadata.md5sum, upload_api_v4.ClusterFileType.CAMM
)

try:
cluster_id = mly_uploader.upload_stream(
T.cast(T.BinaryIO, camm_fp),
upload_api_v4.ClusterFileType.CAMM,
video_metadata.md5sum,
event_payload=event_payload,
session_key,
progress=T.cast(T.Dict[str, T.Any], progress),
)
except Exception as ex:
raise UploadError(ex) from ex
Expand All @@ -528,10 +536,10 @@ def _upload_everything(


def upload(
import_path: T.Union[Path, T.Sequence[Path]],
import_path: Path | T.Sequence[Path],
user_items: types.UserItem,
desc_path: T.Optional[str] = None,
_metadatas_from_process: T.Optional[T.Sequence[types.MetadataOrError]] = None,
desc_path: str | None = None,
_metadatas_from_process: T.Sequence[types.MetadataOrError] | None = None,
dry_run=False,
skip_subfolders=False,
) -> None:
Expand Down Expand Up @@ -632,15 +640,14 @@ def _upload_zipfiles(
zip_paths: T.Sequence[Path],
) -> None:
for idx, zip_path in enumerate(zip_paths):
event_payload: uploader.Progress = {
progress: uploader.SequenceProgress = {
"total_sequence_count": len(zip_paths),
"sequence_idx": idx,
"file_type": FileType.ZIP.value,
"import_path": str(zip_path),
}
try:
cluster_id = mly_uploader.upload_zipfile(
zip_path, event_payload=event_payload
cluster_id = uploader.ZipImageSequence.prepare_zipfile_and_upload(
zip_path, mly_uploader, progress=progress
)
except Exception as ex:
raise UploadError(ex) from ex
Expand Down
Loading
Loading