Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion mapillary_tools/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,9 +729,10 @@ def group_and_sort_images(
return sorted_sequences_by_uuid


def sequence_md5sum(sequence: T.Iterable[ImageMetadata]) -> str:
def update_sequence_md5sum(sequence: T.Iterable[ImageMetadata]) -> str:
md5 = hashlib.md5()
for metadata in sequence:
metadata.update_md5sum()
assert isinstance(metadata.md5sum, str), "md5sum should be calculated"
md5.update(metadata.md5sum.encode("utf-8"))
return md5.hexdigest()
Expand Down
2 changes: 1 addition & 1 deletion mapillary_tools/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def zip_images(
metadata for metadata in metadatas if isinstance(metadata, types.ImageMetadata)
]

uploader.zip_images(image_metadatas, zip_dir)
uploader.ZipImageSequence.zip_images(image_metadatas, zip_dir)


def fetch_user_items(
Expand Down
213 changes: 124 additions & 89 deletions mapillary_tools/uploader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import io
import json
import logging
Expand Down Expand Up @@ -72,7 +74,7 @@ class UploadCancelled(Exception):


class EventEmitter:
events: T.Dict[EventName, T.List]
events: dict[EventName, T.List]

def __init__(self):
self.events = {}
Expand All @@ -88,11 +90,111 @@ def emit(self, event: EventName, *args, **kwargs):
callback(*args, **kwargs)


class ZipImageSequence:
@classmethod
def zip_images(
cls, metadatas: T.Sequence[types.ImageMetadata], zip_dir: Path
) -> None:
"""
Group images into sequences and zip each sequence into a zipfile.
"""
_validate_metadatas(metadatas)
sequences = types.group_and_sort_images(metadatas)
os.makedirs(zip_dir, exist_ok=True)

for sequence_uuid, sequence in sequences.items():
# For atomicity we write into a WIP file and then rename to the final file
wip_zip_filename = zip_dir.joinpath(
f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}"
)
upload_md5sum = types.update_sequence_md5sum(sequence)
zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip")
with wip_file_context(wip_zip_filename, zip_filename) as wip_path:
with wip_path.open("wb") as wip_fp:
actual_md5sum = cls.zip_sequence_fp(sequence, wip_fp)
assert actual_md5sum == upload_md5sum

@classmethod
def zip_sequence_fp(
cls,
sequence: T.Sequence[types.ImageMetadata],
zip_fp: T.IO[bytes],
) -> str:
"""
Write a sequence of ImageMetadata into the zipfile handle.
The sequence has to be one sequence and sorted.
"""
sequence_groups = types.group_and_sort_images(sequence)
assert len(sequence_groups) == 1, (
f"Only one sequence is allowed but got {len(sequence_groups)}: {list(sequence_groups.keys())}"
)

upload_md5sum = types.update_sequence_md5sum(sequence)

with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf:
arcnames: set[str] = set()
for metadata in sequence:
cls._write_imagebytes_in_zip(zipf, metadata, arcnames=arcnames)
assert len(sequence) == len(set(zipf.namelist()))
zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8")

return upload_md5sum

@classmethod
def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str | None:
with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph:
comment = ziph.comment
if not comment:
return None
try:
upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum")
except Exception:
return None
if not upload_md5sum:
return None
return str(upload_md5sum)

@classmethod
def _uniq_arcname(cls, filename: Path, arcnames: set[str]):
arcname: str = filename.name

# make sure the arcname is unique, otherwise zipfile.extractAll will eliminate duplicated ones
arcname_idx = 0
while arcname in arcnames:
arcname_idx += 1
arcname = f"{filename.stem}_{arcname_idx}{filename.suffix}"

return arcname

@classmethod
def _write_imagebytes_in_zip(
cls,
zipf: zipfile.ZipFile,
metadata: types.ImageMetadata,
arcnames: set[str] | None = None,
):
if arcnames is None:
arcnames = set()

edit = exif_write.ExifEdit(metadata.filename)
# The cast is to fix the type checker error
edit.add_image_description(
T.cast(T.Dict, types.desc_file_to_exif(types.as_desc(metadata)))
)
image_bytes = edit.dump_image_bytes()

arcname = cls._uniq_arcname(metadata.filename, arcnames)
arcnames.add(arcname)

zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0))
zipf.writestr(zipinfo, image_bytes)


class Uploader:
def __init__(
self,
user_items: types.UserItem,
emitter: T.Optional[EventEmitter] = None,
emitter: EventEmitter | None = None,
chunk_size: int = upload_api_v4.DEFAULT_CHUNK_SIZE,
dry_run=False,
):
Expand All @@ -105,8 +207,8 @@ def __init__(
def upload_zipfile(
self,
zip_path: Path,
event_payload: T.Optional[Progress] = None,
) -> T.Optional[str]:
event_payload: Progress | None = None,
) -> str | None:
if event_payload is None:
event_payload = {}

Expand All @@ -121,16 +223,16 @@ def upload_zipfile(
"sequence_image_count": len(namelist),
}

with zip_path.open("rb") as fp:
upload_md5sum = _extract_upload_md5sum(fp)
with zip_path.open("rb") as zip_fp:
upload_md5sum = ZipImageSequence.extract_upload_md5sum(zip_fp)

if upload_md5sum is None:
with zip_path.open("rb") as fp:
upload_md5sum = utils.md5sum_fp(fp).hexdigest()
with zip_path.open("rb") as zip_fp:
upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest()

with zip_path.open("rb") as fp:
with zip_path.open("rb") as zip_fp:
return self.upload_stream(
fp,
zip_fp,
upload_api_v4.ClusterFileType.ZIP,
upload_md5sum,
event_payload=final_event_payload,
Expand All @@ -139,14 +241,14 @@ def upload_zipfile(
def upload_images(
self,
image_metadatas: T.Sequence[types.ImageMetadata],
event_payload: T.Optional[Progress] = None,
) -> T.Dict[str, str]:
event_payload: Progress | None = None,
) -> dict[str, str]:
if event_payload is None:
event_payload = {}

_validate_metadatas(image_metadatas)
sequences = types.group_and_sort_images(image_metadatas)
ret: T.Dict[str, str] = {}
ret: dict[str, str] = {}
for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()):
final_event_payload: Progress = {
**event_payload, # type: ignore
Expand All @@ -155,11 +257,8 @@ def upload_images(
"sequence_image_count": len(sequence),
"sequence_uuid": sequence_uuid,
}
for metadata in sequence:
metadata.update_md5sum()
upload_md5sum = types.sequence_md5sum(sequence)
with tempfile.NamedTemporaryFile() as fp:
_zip_sequence_fp(sequence, fp, upload_md5sum)
upload_md5sum = ZipImageSequence.zip_sequence_fp(sequence, fp)
cluster_id = self.upload_stream(
fp,
upload_api_v4.ClusterFileType.ZIP,
Expand All @@ -175,15 +274,15 @@ def upload_stream(
fp: T.IO[bytes],
cluster_filetype: upload_api_v4.ClusterFileType,
upload_md5sum: str,
event_payload: T.Optional[Progress] = None,
) -> T.Optional[str]:
event_payload: Progress | None = None,
) -> str | None:
if event_payload is None:
event_payload = {}

fp.seek(0, io.SEEK_END)
entity_size = fp.tell()

SUFFIX_MAP: T.Dict[upload_api_v4.ClusterFileType, str] = {
SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = {
upload_api_v4.ClusterFileType.ZIP: ".zip",
upload_api_v4.ClusterFileType.CAMM: ".mp4",
upload_api_v4.ClusterFileType.BLACKVUE: ".mp4",
Expand Down Expand Up @@ -216,7 +315,7 @@ def upload_stream(
}

try:
return _upload_stream(
return _upload_stream_with_retries(
upload_service,
fp,
event_payload=final_event_payload,
Expand Down Expand Up @@ -254,70 +353,6 @@ def wip_file_context(wip_path: Path, done_path: Path):
pass


def zip_images(
metadatas: T.List[types.ImageMetadata],
zip_dir: Path,
) -> None:
_validate_metadatas(metadatas)
sequences = types.group_and_sort_images(metadatas)
os.makedirs(zip_dir, exist_ok=True)
for sequence_uuid, sequence in sequences.items():
for metadata in sequence:
metadata.update_md5sum()
upload_md5sum = types.sequence_md5sum(sequence)
timestamp = int(time.time())
wip_zip_filename = zip_dir.joinpath(
f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{timestamp}"
)
zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip")
with wip_file_context(wip_zip_filename, zip_filename) as wip_dir:
with wip_dir.open("wb") as fp:
_zip_sequence_fp(sequence, fp, upload_md5sum)


def _zip_sequence_fp(
sequence: T.Sequence[types.ImageMetadata],
fp: T.IO[bytes],
upload_md5sum: str,
) -> None:
arcname_idx = 0
arcnames = set()
with zipfile.ZipFile(fp, "w", zipfile.ZIP_DEFLATED) as ziph:
for metadata in sequence:
edit = exif_write.ExifEdit(metadata.filename)
# The cast is to fix the type checker error
edit.add_image_description(
T.cast(T.Dict, types.desc_file_to_exif(types.as_desc(metadata)))
)
image_bytes = edit.dump_image_bytes()
arcname: str = metadata.filename.name
# make sure the arcname is unique, otherwise zipfile.extractAll will eliminate duplicated ones
while arcname in arcnames:
arcname_idx += 1
arcname = (
f"{metadata.filename.stem}_{arcname_idx}{metadata.filename.suffix}"
)
arcnames.add(arcname)
zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0))
ziph.writestr(zipinfo, image_bytes)
ziph.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8")
assert len(sequence) == len(set(ziph.namelist()))


def _extract_upload_md5sum(fp: T.IO[bytes]) -> T.Optional[str]:
with zipfile.ZipFile(fp, "r", zipfile.ZIP_DEFLATED) as ziph:
comment = ziph.comment
if not comment:
return None
try:
upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum")
except Exception:
return None
if not upload_md5sum:
return None
return str(upload_md5sum)


def _is_immediate_retry(ex: Exception):
if (
isinstance(ex, requests.HTTPError)
Expand Down Expand Up @@ -361,11 +396,11 @@ def _callback(chunk: bytes, _):
return _callback


def _upload_stream(
def _upload_stream_with_retries(
upload_service: upload_api_v4.UploadService,
fp: T.IO[bytes],
event_payload: T.Optional[Progress] = None,
emitter: T.Optional[EventEmitter] = None,
event_payload: Progress | None = None,
emitter: EventEmitter | None = None,
) -> str:
retries = 0

Expand All @@ -384,7 +419,7 @@ def _reset_retries(_, __):

while True:
fp.seek(0, io.SEEK_SET)
begin_offset: T.Optional[int] = None
begin_offset: int | None = None
try:
begin_offset = upload_service.fetch_offset()
upload_service.callbacks = [_reset_retries]
Expand Down
18 changes: 17 additions & 1 deletion tests/unit/test_upload_api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,27 @@ def test_upload(setup_upload: py.path.local):
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content


def test_upload_big_chunksize(setup_upload: py.path.local):
upload_service = upload_api_v4.FakeUploadService(
user_access_token="TEST",
session_key="FOOBAR.txt",
chunk_size=1000,
)
upload_service._error_ratio = 0
content = b"double_foobar"
cluster_id = upload_service.upload(io.BytesIO(content))
assert isinstance(cluster_id, str), cluster_id
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content

# reupload should not affect the file
upload_service.upload(io.BytesIO(content))
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content


def test_upload_chunks(setup_upload: py.path.local):
upload_service = upload_api_v4.FakeUploadService(
user_access_token="TEST",
session_key="FOOBAR2.txt",
chunk_size=1,
)
upload_service._error_ratio = 0

Expand Down
8 changes: 5 additions & 3 deletions tests/unit/test_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def _validate_zip_dir(zip_dir: py.path.local):
descs = []
for zip_path in zip_dir.listdir():
with zipfile.ZipFile(zip_path) as ziph:
filename = ziph.testzip()
assert filename is None, f"Corrupted zip {zip_path}: {filename}"

upload_md5sum = json.loads(ziph.comment).get("upload_md5sum")
assert str(os.path.basename(zip_path)) == f"mly_tools_{upload_md5sum}.zip", (
zip_path
Expand Down Expand Up @@ -161,9 +164,8 @@ def test_upload_zip(
},
]
zip_dir = setup_unittest_data.mkdir("zip_dir")
uploader.zip_images(
[types.from_desc(T.cast(T.Any, desc)) for desc in descs], Path(zip_dir)
)
sequence = [types.from_desc(T.cast(T.Any, desc)) for desc in descs]
uploader.ZipImageSequence.zip_images(sequence, Path(zip_dir))
assert len(zip_dir.listdir()) == 2, list(zip_dir.listdir())
descs = _validate_zip_dir(zip_dir)
assert 3 == len(descs)
Expand Down
Loading