Skip to content

Commit d2efc74

Browse files
authored
refactor: upload zips (#732)
* group functions in ZipFileSequence * remove generate_zip_chunks * update types * fix * move around * comments
1 parent d4fb843 commit d2efc74

File tree

5 files changed

+149
-95
lines changed

5 files changed

+149
-95
lines changed

mapillary_tools/types.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,9 +729,10 @@ def group_and_sort_images(
729729
return sorted_sequences_by_uuid
730730

731731

732-
def sequence_md5sum(sequence: T.Iterable[ImageMetadata]) -> str:
732+
def update_sequence_md5sum(sequence: T.Iterable[ImageMetadata]) -> str:
733733
md5 = hashlib.md5()
734734
for metadata in sequence:
735+
metadata.update_md5sum()
735736
assert isinstance(metadata.md5sum, str), "md5sum should be calculated"
736737
md5.update(metadata.md5sum.encode("utf-8"))
737738
return md5.hexdigest()

mapillary_tools/upload.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ def zip_images(
136136
metadata for metadata in metadatas if isinstance(metadata, types.ImageMetadata)
137137
]
138138

139-
uploader.zip_images(image_metadatas, zip_dir)
139+
uploader.ZipImageSequence.zip_images(image_metadatas, zip_dir)
140140

141141

142142
def fetch_user_items(

mapillary_tools/uploader.py

Lines changed: 124 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import io
24
import json
35
import logging
@@ -72,7 +74,7 @@ class UploadCancelled(Exception):
7274

7375

7476
class EventEmitter:
75-
events: T.Dict[EventName, T.List]
77+
events: dict[EventName, T.List]
7678

7779
def __init__(self):
7880
self.events = {}
@@ -88,11 +90,111 @@ def emit(self, event: EventName, *args, **kwargs):
8890
callback(*args, **kwargs)
8991

9092

93+
class ZipImageSequence:
94+
@classmethod
95+
def zip_images(
96+
cls, metadatas: T.Sequence[types.ImageMetadata], zip_dir: Path
97+
) -> None:
98+
"""
99+
Group images into sequences and zip each sequence into a zipfile.
100+
"""
101+
_validate_metadatas(metadatas)
102+
sequences = types.group_and_sort_images(metadatas)
103+
os.makedirs(zip_dir, exist_ok=True)
104+
105+
for sequence_uuid, sequence in sequences.items():
106+
# For atomicity we write into a WIP file and then rename to the final file
107+
wip_zip_filename = zip_dir.joinpath(
108+
f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}"
109+
)
110+
upload_md5sum = types.update_sequence_md5sum(sequence)
111+
zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip")
112+
with wip_file_context(wip_zip_filename, zip_filename) as wip_path:
113+
with wip_path.open("wb") as wip_fp:
114+
actual_md5sum = cls.zip_sequence_fp(sequence, wip_fp)
115+
assert actual_md5sum == upload_md5sum
116+
117+
@classmethod
118+
def zip_sequence_fp(
119+
cls,
120+
sequence: T.Sequence[types.ImageMetadata],
121+
zip_fp: T.IO[bytes],
122+
) -> str:
123+
"""
124+
Write a sequence of ImageMetadata into the zipfile handle.
125+
The sequence has to be one sequence and sorted.
126+
"""
127+
sequence_groups = types.group_and_sort_images(sequence)
128+
assert len(sequence_groups) == 1, (
129+
f"Only one sequence is allowed but got {len(sequence_groups)}: {list(sequence_groups.keys())}"
130+
)
131+
132+
upload_md5sum = types.update_sequence_md5sum(sequence)
133+
134+
with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf:
135+
arcnames: set[str] = set()
136+
for metadata in sequence:
137+
cls._write_imagebytes_in_zip(zipf, metadata, arcnames=arcnames)
138+
assert len(sequence) == len(set(zipf.namelist()))
139+
zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8")
140+
141+
return upload_md5sum
142+
143+
@classmethod
144+
def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str | None:
145+
with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph:
146+
comment = ziph.comment
147+
if not comment:
148+
return None
149+
try:
150+
upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum")
151+
except Exception:
152+
return None
153+
if not upload_md5sum:
154+
return None
155+
return str(upload_md5sum)
156+
157+
@classmethod
158+
def _uniq_arcname(cls, filename: Path, arcnames: set[str]):
159+
arcname: str = filename.name
160+
161+
# make sure the arcname is unique, otherwise zipfile.extractAll will eliminate duplicated ones
162+
arcname_idx = 0
163+
while arcname in arcnames:
164+
arcname_idx += 1
165+
arcname = f"{filename.stem}_{arcname_idx}{filename.suffix}"
166+
167+
return arcname
168+
169+
@classmethod
170+
def _write_imagebytes_in_zip(
171+
cls,
172+
zipf: zipfile.ZipFile,
173+
metadata: types.ImageMetadata,
174+
arcnames: set[str] | None = None,
175+
):
176+
if arcnames is None:
177+
arcnames = set()
178+
179+
edit = exif_write.ExifEdit(metadata.filename)
180+
# The cast is to fix the type checker error
181+
edit.add_image_description(
182+
T.cast(T.Dict, types.desc_file_to_exif(types.as_desc(metadata)))
183+
)
184+
image_bytes = edit.dump_image_bytes()
185+
186+
arcname = cls._uniq_arcname(metadata.filename, arcnames)
187+
arcnames.add(arcname)
188+
189+
zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0))
190+
zipf.writestr(zipinfo, image_bytes)
191+
192+
91193
class Uploader:
92194
def __init__(
93195
self,
94196
user_items: types.UserItem,
95-
emitter: T.Optional[EventEmitter] = None,
197+
emitter: EventEmitter | None = None,
96198
chunk_size: int = upload_api_v4.DEFAULT_CHUNK_SIZE,
97199
dry_run=False,
98200
):
@@ -105,8 +207,8 @@ def __init__(
105207
def upload_zipfile(
106208
self,
107209
zip_path: Path,
108-
event_payload: T.Optional[Progress] = None,
109-
) -> T.Optional[str]:
210+
event_payload: Progress | None = None,
211+
) -> str | None:
110212
if event_payload is None:
111213
event_payload = {}
112214

@@ -121,16 +223,16 @@ def upload_zipfile(
121223
"sequence_image_count": len(namelist),
122224
}
123225

124-
with zip_path.open("rb") as fp:
125-
upload_md5sum = _extract_upload_md5sum(fp)
226+
with zip_path.open("rb") as zip_fp:
227+
upload_md5sum = ZipImageSequence.extract_upload_md5sum(zip_fp)
126228

127229
if upload_md5sum is None:
128-
with zip_path.open("rb") as fp:
129-
upload_md5sum = utils.md5sum_fp(fp).hexdigest()
230+
with zip_path.open("rb") as zip_fp:
231+
upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest()
130232

131-
with zip_path.open("rb") as fp:
233+
with zip_path.open("rb") as zip_fp:
132234
return self.upload_stream(
133-
fp,
235+
zip_fp,
134236
upload_api_v4.ClusterFileType.ZIP,
135237
upload_md5sum,
136238
event_payload=final_event_payload,
@@ -139,14 +241,14 @@ def upload_zipfile(
139241
def upload_images(
140242
self,
141243
image_metadatas: T.Sequence[types.ImageMetadata],
142-
event_payload: T.Optional[Progress] = None,
143-
) -> T.Dict[str, str]:
244+
event_payload: Progress | None = None,
245+
) -> dict[str, str]:
144246
if event_payload is None:
145247
event_payload = {}
146248

147249
_validate_metadatas(image_metadatas)
148250
sequences = types.group_and_sort_images(image_metadatas)
149-
ret: T.Dict[str, str] = {}
251+
ret: dict[str, str] = {}
150252
for sequence_idx, (sequence_uuid, sequence) in enumerate(sequences.items()):
151253
final_event_payload: Progress = {
152254
**event_payload, # type: ignore
@@ -155,11 +257,8 @@ def upload_images(
155257
"sequence_image_count": len(sequence),
156258
"sequence_uuid": sequence_uuid,
157259
}
158-
for metadata in sequence:
159-
metadata.update_md5sum()
160-
upload_md5sum = types.sequence_md5sum(sequence)
161260
with tempfile.NamedTemporaryFile() as fp:
162-
_zip_sequence_fp(sequence, fp, upload_md5sum)
261+
upload_md5sum = ZipImageSequence.zip_sequence_fp(sequence, fp)
163262
cluster_id = self.upload_stream(
164263
fp,
165264
upload_api_v4.ClusterFileType.ZIP,
@@ -175,15 +274,15 @@ def upload_stream(
175274
fp: T.IO[bytes],
176275
cluster_filetype: upload_api_v4.ClusterFileType,
177276
upload_md5sum: str,
178-
event_payload: T.Optional[Progress] = None,
179-
) -> T.Optional[str]:
277+
event_payload: Progress | None = None,
278+
) -> str | None:
180279
if event_payload is None:
181280
event_payload = {}
182281

183282
fp.seek(0, io.SEEK_END)
184283
entity_size = fp.tell()
185284

186-
SUFFIX_MAP: T.Dict[upload_api_v4.ClusterFileType, str] = {
285+
SUFFIX_MAP: dict[upload_api_v4.ClusterFileType, str] = {
187286
upload_api_v4.ClusterFileType.ZIP: ".zip",
188287
upload_api_v4.ClusterFileType.CAMM: ".mp4",
189288
upload_api_v4.ClusterFileType.BLACKVUE: ".mp4",
@@ -216,7 +315,7 @@ def upload_stream(
216315
}
217316

218317
try:
219-
return _upload_stream(
318+
return _upload_stream_with_retries(
220319
upload_service,
221320
fp,
222321
event_payload=final_event_payload,
@@ -254,70 +353,6 @@ def wip_file_context(wip_path: Path, done_path: Path):
254353
pass
255354

256355

257-
def zip_images(
258-
metadatas: T.List[types.ImageMetadata],
259-
zip_dir: Path,
260-
) -> None:
261-
_validate_metadatas(metadatas)
262-
sequences = types.group_and_sort_images(metadatas)
263-
os.makedirs(zip_dir, exist_ok=True)
264-
for sequence_uuid, sequence in sequences.items():
265-
for metadata in sequence:
266-
metadata.update_md5sum()
267-
upload_md5sum = types.sequence_md5sum(sequence)
268-
timestamp = int(time.time())
269-
wip_zip_filename = zip_dir.joinpath(
270-
f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{timestamp}"
271-
)
272-
zip_filename = zip_dir.joinpath(f"mly_tools_{upload_md5sum}.zip")
273-
with wip_file_context(wip_zip_filename, zip_filename) as wip_dir:
274-
with wip_dir.open("wb") as fp:
275-
_zip_sequence_fp(sequence, fp, upload_md5sum)
276-
277-
278-
def _zip_sequence_fp(
279-
sequence: T.Sequence[types.ImageMetadata],
280-
fp: T.IO[bytes],
281-
upload_md5sum: str,
282-
) -> None:
283-
arcname_idx = 0
284-
arcnames = set()
285-
with zipfile.ZipFile(fp, "w", zipfile.ZIP_DEFLATED) as ziph:
286-
for metadata in sequence:
287-
edit = exif_write.ExifEdit(metadata.filename)
288-
# The cast is to fix the type checker error
289-
edit.add_image_description(
290-
T.cast(T.Dict, types.desc_file_to_exif(types.as_desc(metadata)))
291-
)
292-
image_bytes = edit.dump_image_bytes()
293-
arcname: str = metadata.filename.name
294-
# make sure the arcname is unique, otherwise zipfile.extractAll will eliminate duplicated ones
295-
while arcname in arcnames:
296-
arcname_idx += 1
297-
arcname = (
298-
f"{metadata.filename.stem}_{arcname_idx}{metadata.filename.suffix}"
299-
)
300-
arcnames.add(arcname)
301-
zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0))
302-
ziph.writestr(zipinfo, image_bytes)
303-
ziph.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8")
304-
assert len(sequence) == len(set(ziph.namelist()))
305-
306-
307-
def _extract_upload_md5sum(fp: T.IO[bytes]) -> T.Optional[str]:
308-
with zipfile.ZipFile(fp, "r", zipfile.ZIP_DEFLATED) as ziph:
309-
comment = ziph.comment
310-
if not comment:
311-
return None
312-
try:
313-
upload_md5sum = json.loads(comment.decode("utf-8")).get("upload_md5sum")
314-
except Exception:
315-
return None
316-
if not upload_md5sum:
317-
return None
318-
return str(upload_md5sum)
319-
320-
321356
def _is_immediate_retry(ex: Exception):
322357
if (
323358
isinstance(ex, requests.HTTPError)
@@ -361,11 +396,11 @@ def _callback(chunk: bytes, _):
361396
return _callback
362397

363398

364-
def _upload_stream(
399+
def _upload_stream_with_retries(
365400
upload_service: upload_api_v4.UploadService,
366401
fp: T.IO[bytes],
367-
event_payload: T.Optional[Progress] = None,
368-
emitter: T.Optional[EventEmitter] = None,
402+
event_payload: Progress | None = None,
403+
emitter: EventEmitter | None = None,
369404
) -> str:
370405
retries = 0
371406

@@ -384,7 +419,7 @@ def _reset_retries(_, __):
384419

385420
while True:
386421
fp.seek(0, io.SEEK_SET)
387-
begin_offset: T.Optional[int] = None
422+
begin_offset: int | None = None
388423
try:
389424
begin_offset = upload_service.fetch_offset()
390425
upload_service.callbacks = [_reset_retries]

tests/unit/test_upload_api_v4.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,27 @@ def test_upload(setup_upload: py.path.local):
2424
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
2525

2626

27+
def test_upload_big_chunksize(setup_upload: py.path.local):
28+
upload_service = upload_api_v4.FakeUploadService(
29+
user_access_token="TEST",
30+
session_key="FOOBAR.txt",
31+
chunk_size=1000,
32+
)
33+
upload_service._error_ratio = 0
34+
content = b"double_foobar"
35+
cluster_id = upload_service.upload(io.BytesIO(content))
36+
assert isinstance(cluster_id, str), cluster_id
37+
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
38+
39+
# reupload should not affect the file
40+
upload_service.upload(io.BytesIO(content))
41+
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
42+
43+
2744
def test_upload_chunks(setup_upload: py.path.local):
2845
upload_service = upload_api_v4.FakeUploadService(
2946
user_access_token="TEST",
3047
session_key="FOOBAR2.txt",
31-
chunk_size=1,
3248
)
3349
upload_service._error_ratio = 0
3450

tests/unit/test_uploader.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ def _validate_zip_dir(zip_dir: py.path.local):
3030
descs = []
3131
for zip_path in zip_dir.listdir():
3232
with zipfile.ZipFile(zip_path) as ziph:
33+
filename = ziph.testzip()
34+
assert filename is None, f"Corrupted zip {zip_path}: {filename}"
35+
3336
upload_md5sum = json.loads(ziph.comment).get("upload_md5sum")
3437
assert str(os.path.basename(zip_path)) == f"mly_tools_{upload_md5sum}.zip", (
3538
zip_path
@@ -161,9 +164,8 @@ def test_upload_zip(
161164
},
162165
]
163166
zip_dir = setup_unittest_data.mkdir("zip_dir")
164-
uploader.zip_images(
165-
[types.from_desc(T.cast(T.Any, desc)) for desc in descs], Path(zip_dir)
166-
)
167+
sequence = [types.from_desc(T.cast(T.Any, desc)) for desc in descs]
168+
uploader.ZipImageSequence.zip_images(sequence, Path(zip_dir))
167169
assert len(zip_dir.listdir()) == 2, list(zip_dir.listdir())
168170
descs = _validate_zip_dir(zip_dir)
169171
assert 3 == len(descs)

0 commit comments

Comments
 (0)