Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions mapillary_tools/exiftool_read_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@ def __init__(
etree: ET.ElementTree,
) -> None:
self.etree = etree
self._texts_by_tag = _index_text_by_tag(self.etree.getroot())
root = self.etree.getroot()
if root is None:
raise ValueError("ElementTree root is None")
self._texts_by_tag = _index_text_by_tag(root)
self._all_tags = set(self._texts_by_tag.keys())

def extract_gps_track(self) -> list[geo.Point]:
Expand Down Expand Up @@ -371,6 +374,10 @@ def extract_model(self) -> str | None:
return model

def _extract_gps_track_from_track(self) -> list[GPSPoint]:
root = self.etree.getroot()
if root is None:
raise ValueError("ElementTree root is None")

for track_id in range(1, MAX_TRACK_ID + 1):
track_ns = f"Track{track_id}"
if self._all_tags_exists(
Expand All @@ -382,7 +389,7 @@ def _extract_gps_track_from_track(self) -> list[GPSPoint]:
}
):
sample_iterator = _aggregate_samples(
self.etree.getroot(),
root,
f"{track_ns}:SampleTime",
f"{track_ns}:SampleDuration",
)
Expand Down
15 changes: 8 additions & 7 deletions mapillary_tools/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _setup_history(
) -> None:
@emitter.on("upload_start")
def check_duplication(payload: uploader.Progress):
md5sum = payload.get("md5sum")
md5sum = payload.get("sequence_md5sum")
assert md5sum is not None, f"md5sum has to be set for {payload}"

if history.is_uploaded(md5sum):
Expand All @@ -164,7 +164,7 @@ def check_duplication(payload: uploader.Progress):
@emitter.on("upload_finished")
def write_history(payload: uploader.Progress):
sequence_uuid = payload.get("sequence_uuid")
md5sum = payload.get("md5sum")
md5sum = payload.get("sequence_md5sum")
assert md5sum is not None, f"md5sum has to be set for {payload}"

if sequence_uuid is None:
Expand Down Expand Up @@ -466,9 +466,9 @@ def _gen_upload_everything(
(m for m in metadatas if isinstance(m, types.ImageMetadata)),
utils.find_images(import_paths, skip_subfolders=skip_subfolders),
)
for image_result in uploader.ZipImageSequence.prepare_images_and_upload(
image_metadatas,
for image_result in uploader.ZipImageSequence.zip_images_and_upload(
mly_uploader,
image_metadatas,
):
yield image_result

Expand Down Expand Up @@ -509,7 +509,8 @@ def _gen_upload_videos(
"sequence_idx": idx,
"file_type": video_metadata.filetype.value,
"import_path": str(video_metadata.filename),
"md5sum": video_metadata.md5sum,
"sequence_md5sum": video_metadata.md5sum,
"upload_md5sum": video_metadata.md5sum,
}

session_key = uploader._session_key(
Expand Down Expand Up @@ -720,8 +721,8 @@ def _gen_upload_zipfiles(
"import_path": str(zip_path),
}
try:
cluster_id = uploader.ZipImageSequence.prepare_zipfile_and_upload(
zip_path, mly_uploader, progress=T.cast(T.Dict[str, T.Any], progress)
cluster_id = uploader.ZipImageSequence.upload_zipfile(
mly_uploader, zip_path, progress=T.cast(T.Dict[str, T.Any], progress)
)
except Exception as ex:
yield zip_path, uploader.UploadResult(error=ex)
Expand Down
130 changes: 71 additions & 59 deletions mapillary_tools/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import requests

from . import api_v4, constants, exif_write, types, upload_api_v4
from . import api_v4, constants, exif_write, types, upload_api_v4, utils


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,8 +56,12 @@ class UploaderProgress(T.TypedDict, total=True):
class SequenceProgress(T.TypedDict, total=False):
"""Progress data at sequence level"""

# md5sum of the zipfile/BlackVue/CAMM in uploading
md5sum: str
# Used to check if it is uploaded or not
sequence_md5sum: str

# Used to resume from the previous upload,
# so it has to an unique identifier (hash) of the upload content
upload_md5sum: str

# File type
file_type: str
Expand Down Expand Up @@ -148,30 +152,50 @@ def zip_images(

for sequence_uuid, sequence in sequences.items():
_validate_metadatas(sequence)
upload_md5sum = types.update_sequence_md5sum(sequence)

# For atomicity we write into a WIP file and then rename to the final file
wip_zip_filename = zip_dir.joinpath(
f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}"
)
filename = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP)
zip_filename = zip_dir.joinpath(filename)
with wip_file_context(wip_zip_filename, zip_filename) as wip_path:
with cls._wip_file_context(wip_zip_filename) as wip_path:
with wip_path.open("wb") as wip_fp:
actual_md5sum = cls.zip_sequence_deterministically(sequence, wip_fp)
assert actual_md5sum == upload_md5sum, "md5sum mismatch"
cls.zip_sequence_fp(sequence, wip_fp)

@classmethod
def zip_sequence_deterministically(
@contextmanager
def _wip_file_context(cls, wip_path: Path):
try:
os.remove(wip_path)
except FileNotFoundError:
pass
try:
yield wip_path

with wip_path.open("rb") as fp:
upload_md5sum = utils.md5sum_fp(fp).hexdigest()

done_path = wip_path.parent.joinpath(
_session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP)
)

try:
os.remove(done_path)
except FileNotFoundError:
pass
wip_path.rename(done_path)
finally:
try:
os.remove(wip_path)
except FileNotFoundError:
pass

@classmethod
def zip_sequence_fp(
cls,
sequence: T.Sequence[types.ImageMetadata],
zip_fp: T.IO[bytes],
) -> str:
"""
Write a sequence of ImageMetadata into the zipfile handle. It should guarantee
that the same sequence always produces the same zipfile, because the
sequence md5sum will be used to upload the zipfile or resume the upload.

Write a sequence of ImageMetadata into the zipfile handle.
The sequence has to be one sequence and sorted.
"""

Expand All @@ -180,21 +204,21 @@ def zip_sequence_deterministically(
f"Only one sequence is allowed but got {len(sequence_groups)}: {list(sequence_groups.keys())}"
)

upload_md5sum = types.update_sequence_md5sum(sequence)
sequence_md5sum = types.update_sequence_md5sum(sequence)

with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf:
for idx, metadata in enumerate(sequence):
# Use {idx}.jpg (suffix does not matter) as the archive name to ensure the
# resulting zipfile is deterministic. This determinism is based on the upload_md5sum,
# which is derived from a list of image md5sums
# Arcname does not matter, but it should be unique
cls._write_imagebytes_in_zip(zipf, metadata, arcname=f"{idx}.jpg")
assert len(sequence) == len(set(zipf.namelist()))
zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8")
zipf.comment = json.dumps({"sequence_md5sum": sequence_md5sum}).encode(
"utf-8"
)

return upload_md5sum
return sequence_md5sum

@classmethod
def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str:
def extract_sequence_md5sum(cls, zip_fp: T.IO[bytes]) -> str:
with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph:
comment = ziph.comment

Expand All @@ -209,12 +233,12 @@ def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str:
except json.JSONDecodeError as ex:
raise InvalidMapillaryZipFileError(str(ex)) from ex

upload_md5sum = zip_metadata.get("upload_md5sum")
sequence_md5sum = zip_metadata.get("sequence_md5sum")

if not upload_md5sum and not isinstance(upload_md5sum, str):
raise InvalidMapillaryZipFileError("No upload_md5sum found")
if not sequence_md5sum and not isinstance(sequence_md5sum, str):
raise InvalidMapillaryZipFileError("No sequence_md5sum found")

return upload_md5sum
return sequence_md5sum

@classmethod
def _write_imagebytes_in_zip(
Expand All @@ -241,10 +265,10 @@ def _write_imagebytes_in_zip(
zipf.writestr(zipinfo, image_bytes)

@classmethod
def prepare_zipfile_and_upload(
def upload_zipfile(
cls,
zip_path: Path,
uploader: Uploader,
zip_path: Path,
progress: dict[str, T.Any] | None = None,
) -> str:
if progress is None:
Expand All @@ -256,30 +280,36 @@ def prepare_zipfile_and_upload(
raise InvalidMapillaryZipFileError("Zipfile has no files")

with zip_path.open("rb") as zip_fp:
upload_md5sum = cls.extract_upload_md5sum(zip_fp)
sequence_md5sum = cls.extract_sequence_md5sum(zip_fp)

with zip_path.open("rb") as zip_fp:
upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest()

sequence_progress: SequenceProgress = {
"sequence_image_count": len(namelist),
"file_type": types.FileType.ZIP.value,
"md5sum": upload_md5sum,
"sequence_md5sum": sequence_md5sum,
"upload_md5sum": upload_md5sum,
}

session_key = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP)
upload_session_key = _session_key(
upload_md5sum, upload_api_v4.ClusterFileType.ZIP
)

with zip_path.open("rb") as zip_fp:
return uploader.upload_stream(
zip_fp,
upload_api_v4.ClusterFileType.ZIP,
session_key,
upload_session_key,
# Send the copy of the input progress to each upload session, to avoid modifying the original one
progress=T.cast(T.Dict[str, T.Any], {**progress, **sequence_progress}),
)

@classmethod
def prepare_images_and_upload(
def zip_images_and_upload(
cls,
image_metadatas: T.Sequence[types.ImageMetadata],
uploader: Uploader,
image_metadatas: T.Sequence[types.ImageMetadata],
progress: dict[str, T.Any] | None = None,
) -> T.Generator[tuple[str, UploadResult], None, None]:
if progress is None:
Expand All @@ -304,22 +334,25 @@ def prepare_images_and_upload(

with tempfile.NamedTemporaryFile() as fp:
try:
upload_md5sum = cls.zip_sequence_deterministically(sequence, fp)
sequence_md5sum = cls.zip_sequence_fp(sequence, fp)
except Exception as ex:
yield sequence_uuid, UploadResult(error=ex)
continue

sequence_progress["md5sum"] = upload_md5sum
sequence_progress["sequence_md5sum"] = sequence_md5sum

session_key = _session_key(
fp.seek(0, io.SEEK_SET)
upload_md5sum = utils.md5sum_fp(fp).hexdigest()

upload_session_key = _session_key(
upload_md5sum, upload_api_v4.ClusterFileType.ZIP
)

try:
cluster_id = uploader.upload_stream(
fp,
upload_api_v4.ClusterFileType.ZIP,
session_key,
upload_session_key,
progress=T.cast(
T.Dict[str, T.Any], {**progress, **sequence_progress}
),
Expand Down Expand Up @@ -514,27 +547,6 @@ def _validate_metadatas(metadatas: T.Sequence[types.ImageMetadata]):
raise FileNotFoundError(f"No such file {metadata.filename}")


@contextmanager
def wip_file_context(wip_path: Path, done_path: Path):
assert wip_path != done_path, "should not be the same file"
try:
os.remove(wip_path)
except FileNotFoundError:
pass
try:
yield wip_path
try:
os.remove(done_path)
except FileNotFoundError:
pass
wip_path.rename(done_path)
finally:
try:
os.remove(wip_path)
except FileNotFoundError:
pass


def _is_immediate_retry(ex: Exception):
if (
isinstance(ex, requests.HTTPError)
Expand Down
14 changes: 10 additions & 4 deletions tests/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import py.path
import pytest

from mapillary_tools import utils

EXECUTABLE = os.getenv(
"MAPILLARY_TOOLS__TESTS_EXECUTABLE", "python3 -m mapillary_tools.commands"
Expand Down Expand Up @@ -174,20 +175,25 @@ def validate_and_extract_image(image_path: str):
return desc


def validate_and_extract_zip(zip_path: str) -> T.List[T.Dict]:
def validate_and_extract_zip(zip_path: Path) -> T.List[T.Dict]:
descs = []

with zipfile.ZipFile(zip_path) as zipf:
upload_md5sum = json.loads(zipf.comment)["upload_md5sum"]
_sequence_md5sum = json.loads(zipf.comment)["sequence_md5sum"]
with tempfile.TemporaryDirectory() as tempdir:
zipf.extractall(path=tempdir)
for name in os.listdir(tempdir):
filename = os.path.join(tempdir, name)
desc = validate_and_extract_image(filename)
descs.append(desc)

basename = os.path.basename(zip_path)
assert f"mly_tools_{upload_md5sum}.zip" == basename, (basename, upload_md5sum)
with zip_path.open("rb") as fp:
upload_md5sum = utils.md5sum_fp(fp).hexdigest()

assert f"mly_tools_{upload_md5sum}.zip" == zip_path.name, (
zip_path.name,
upload_md5sum,
)

return descs

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def test_zip(tmpdir: py.path.local, setup_data: py.path.local):
assert x.returncode == 0, x.stderr
assert 0 < len(zip_dir.listdir())
for file in zip_dir.listdir():
validate_and_extract_zip(str(file))
validate_and_extract_zip(Path(file))


@pytest.mark.usefixtures("setup_config")
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/test_process_and_upload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import os
import subprocess
from pathlib import Path

import py.path
import pytest
Expand Down Expand Up @@ -135,7 +136,7 @@ def _validate_uploads(upload_dir: py.path.local, expected):
if str(file).endswith(".mp4"):
descs.extend(validate_and_extract_camm(str(file)))
elif str(file).endswith(".zip"):
descs.extend(validate_and_extract_zip(str(file)))
descs.extend(validate_and_extract_zip(Path(file)))
else:
raise Exception(f"invalid file {file}")

Expand Down
Loading
Loading