Skip to content

Commit ebad6bf

Browse files
authored
fix: ensure session key unique to upload content (#752)
* wip * throw if etree root is None * sort * update params * rename
1 parent 326391b commit ebad6bf

File tree

8 files changed

+120
-91
lines changed

8 files changed

+120
-91
lines changed

mapillary_tools/exiftool_read_video.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,10 @@ def __init__(
310310
etree: ET.ElementTree,
311311
) -> None:
312312
self.etree = etree
313-
self._texts_by_tag = _index_text_by_tag(self.etree.getroot())
313+
root = self.etree.getroot()
314+
if root is None:
315+
raise ValueError("ElementTree root is None")
316+
self._texts_by_tag = _index_text_by_tag(root)
314317
self._all_tags = set(self._texts_by_tag.keys())
315318

316319
def extract_gps_track(self) -> list[geo.Point]:
@@ -371,6 +374,10 @@ def extract_model(self) -> str | None:
371374
return model
372375

373376
def _extract_gps_track_from_track(self) -> list[GPSPoint]:
377+
root = self.etree.getroot()
378+
if root is None:
379+
raise ValueError("ElementTree root is None")
380+
374381
for track_id in range(1, MAX_TRACK_ID + 1):
375382
track_ns = f"Track{track_id}"
376383
if self._all_tags_exists(
@@ -382,7 +389,7 @@ def _extract_gps_track_from_track(self) -> list[GPSPoint]:
382389
}
383390
):
384391
sample_iterator = _aggregate_samples(
385-
self.etree.getroot(),
392+
root,
386393
f"{track_ns}:SampleTime",
387394
f"{track_ns}:SampleDuration",
388395
)

mapillary_tools/upload.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def _setup_history(
141141
) -> None:
142142
@emitter.on("upload_start")
143143
def check_duplication(payload: uploader.Progress):
144-
md5sum = payload.get("md5sum")
144+
md5sum = payload.get("sequence_md5sum")
145145
assert md5sum is not None, f"md5sum has to be set for {payload}"
146146

147147
if history.is_uploaded(md5sum):
@@ -164,7 +164,7 @@ def check_duplication(payload: uploader.Progress):
164164
@emitter.on("upload_finished")
165165
def write_history(payload: uploader.Progress):
166166
sequence_uuid = payload.get("sequence_uuid")
167-
md5sum = payload.get("md5sum")
167+
md5sum = payload.get("sequence_md5sum")
168168
assert md5sum is not None, f"md5sum has to be set for {payload}"
169169

170170
if sequence_uuid is None:
@@ -466,9 +466,9 @@ def _gen_upload_everything(
466466
(m for m in metadatas if isinstance(m, types.ImageMetadata)),
467467
utils.find_images(import_paths, skip_subfolders=skip_subfolders),
468468
)
469-
for image_result in uploader.ZipImageSequence.prepare_images_and_upload(
470-
image_metadatas,
469+
for image_result in uploader.ZipImageSequence.zip_images_and_upload(
471470
mly_uploader,
471+
image_metadatas,
472472
):
473473
yield image_result
474474

@@ -509,7 +509,8 @@ def _gen_upload_videos(
509509
"sequence_idx": idx,
510510
"file_type": video_metadata.filetype.value,
511511
"import_path": str(video_metadata.filename),
512-
"md5sum": video_metadata.md5sum,
512+
"sequence_md5sum": video_metadata.md5sum,
513+
"upload_md5sum": video_metadata.md5sum,
513514
}
514515

515516
session_key = uploader._session_key(
@@ -720,8 +721,8 @@ def _gen_upload_zipfiles(
720721
"import_path": str(zip_path),
721722
}
722723
try:
723-
cluster_id = uploader.ZipImageSequence.prepare_zipfile_and_upload(
724-
zip_path, mly_uploader, progress=T.cast(T.Dict[str, T.Any], progress)
724+
cluster_id = uploader.ZipImageSequence.upload_zipfile(
725+
mly_uploader, zip_path, progress=T.cast(T.Dict[str, T.Any], progress)
725726
)
726727
except Exception as ex:
727728
yield zip_path, uploader.UploadResult(error=ex)

mapillary_tools/uploader.py

Lines changed: 71 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import requests
1818

19-
from . import api_v4, constants, exif_write, types, upload_api_v4
19+
from . import api_v4, constants, exif_write, types, upload_api_v4, utils
2020

2121

2222
LOG = logging.getLogger(__name__)
@@ -56,8 +56,12 @@ class UploaderProgress(T.TypedDict, total=True):
5656
class SequenceProgress(T.TypedDict, total=False):
5757
"""Progress data at sequence level"""
5858

59-
# md5sum of the zipfile/BlackVue/CAMM in uploading
60-
md5sum: str
59+
# Used to check if it is uploaded or not
60+
sequence_md5sum: str
61+
62+
# Used to resume from the previous upload,
63+
# so it has to an unique identifier (hash) of the upload content
64+
upload_md5sum: str
6165

6266
# File type
6367
file_type: str
@@ -148,30 +152,50 @@ def zip_images(
148152

149153
for sequence_uuid, sequence in sequences.items():
150154
_validate_metadatas(sequence)
151-
upload_md5sum = types.update_sequence_md5sum(sequence)
152-
153155
# For atomicity we write into a WIP file and then rename to the final file
154156
wip_zip_filename = zip_dir.joinpath(
155157
f".mly_zip_{uuid.uuid4()}_{sequence_uuid}_{os.getpid()}_{int(time.time())}"
156158
)
157-
filename = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP)
158-
zip_filename = zip_dir.joinpath(filename)
159-
with wip_file_context(wip_zip_filename, zip_filename) as wip_path:
159+
with cls._wip_file_context(wip_zip_filename) as wip_path:
160160
with wip_path.open("wb") as wip_fp:
161-
actual_md5sum = cls.zip_sequence_deterministically(sequence, wip_fp)
162-
assert actual_md5sum == upload_md5sum, "md5sum mismatch"
161+
cls.zip_sequence_fp(sequence, wip_fp)
163162

164163
@classmethod
165-
def zip_sequence_deterministically(
164+
@contextmanager
165+
def _wip_file_context(cls, wip_path: Path):
166+
try:
167+
os.remove(wip_path)
168+
except FileNotFoundError:
169+
pass
170+
try:
171+
yield wip_path
172+
173+
with wip_path.open("rb") as fp:
174+
upload_md5sum = utils.md5sum_fp(fp).hexdigest()
175+
176+
done_path = wip_path.parent.joinpath(
177+
_session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP)
178+
)
179+
180+
try:
181+
os.remove(done_path)
182+
except FileNotFoundError:
183+
pass
184+
wip_path.rename(done_path)
185+
finally:
186+
try:
187+
os.remove(wip_path)
188+
except FileNotFoundError:
189+
pass
190+
191+
@classmethod
192+
def zip_sequence_fp(
166193
cls,
167194
sequence: T.Sequence[types.ImageMetadata],
168195
zip_fp: T.IO[bytes],
169196
) -> str:
170197
"""
171-
Write a sequence of ImageMetadata into the zipfile handle. It should guarantee
172-
that the same sequence always produces the same zipfile, because the
173-
sequence md5sum will be used to upload the zipfile or resume the upload.
174-
198+
Write a sequence of ImageMetadata into the zipfile handle.
175199
The sequence has to be one sequence and sorted.
176200
"""
177201

@@ -180,21 +204,21 @@ def zip_sequence_deterministically(
180204
f"Only one sequence is allowed but got {len(sequence_groups)}: {list(sequence_groups.keys())}"
181205
)
182206

183-
upload_md5sum = types.update_sequence_md5sum(sequence)
207+
sequence_md5sum = types.update_sequence_md5sum(sequence)
184208

185209
with zipfile.ZipFile(zip_fp, "w", zipfile.ZIP_DEFLATED) as zipf:
186210
for idx, metadata in enumerate(sequence):
187-
# Use {idx}.jpg (suffix does not matter) as the archive name to ensure the
188-
# resulting zipfile is deterministic. This determinism is based on the upload_md5sum,
189-
# which is derived from a list of image md5sums
211+
# Arcname does not matter, but it should be unique
190212
cls._write_imagebytes_in_zip(zipf, metadata, arcname=f"{idx}.jpg")
191213
assert len(sequence) == len(set(zipf.namelist()))
192-
zipf.comment = json.dumps({"upload_md5sum": upload_md5sum}).encode("utf-8")
214+
zipf.comment = json.dumps({"sequence_md5sum": sequence_md5sum}).encode(
215+
"utf-8"
216+
)
193217

194-
return upload_md5sum
218+
return sequence_md5sum
195219

196220
@classmethod
197-
def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str:
221+
def extract_sequence_md5sum(cls, zip_fp: T.IO[bytes]) -> str:
198222
with zipfile.ZipFile(zip_fp, "r", zipfile.ZIP_DEFLATED) as ziph:
199223
comment = ziph.comment
200224

@@ -209,12 +233,12 @@ def extract_upload_md5sum(cls, zip_fp: T.IO[bytes]) -> str:
209233
except json.JSONDecodeError as ex:
210234
raise InvalidMapillaryZipFileError(str(ex)) from ex
211235

212-
upload_md5sum = zip_metadata.get("upload_md5sum")
236+
sequence_md5sum = zip_metadata.get("sequence_md5sum")
213237

214-
if not upload_md5sum and not isinstance(upload_md5sum, str):
215-
raise InvalidMapillaryZipFileError("No upload_md5sum found")
238+
if not sequence_md5sum and not isinstance(sequence_md5sum, str):
239+
raise InvalidMapillaryZipFileError("No sequence_md5sum found")
216240

217-
return upload_md5sum
241+
return sequence_md5sum
218242

219243
@classmethod
220244
def _write_imagebytes_in_zip(
@@ -241,10 +265,10 @@ def _write_imagebytes_in_zip(
241265
zipf.writestr(zipinfo, image_bytes)
242266

243267
@classmethod
244-
def prepare_zipfile_and_upload(
268+
def upload_zipfile(
245269
cls,
246-
zip_path: Path,
247270
uploader: Uploader,
271+
zip_path: Path,
248272
progress: dict[str, T.Any] | None = None,
249273
) -> str:
250274
if progress is None:
@@ -256,30 +280,36 @@ def prepare_zipfile_and_upload(
256280
raise InvalidMapillaryZipFileError("Zipfile has no files")
257281

258282
with zip_path.open("rb") as zip_fp:
259-
upload_md5sum = cls.extract_upload_md5sum(zip_fp)
283+
sequence_md5sum = cls.extract_sequence_md5sum(zip_fp)
284+
285+
with zip_path.open("rb") as zip_fp:
286+
upload_md5sum = utils.md5sum_fp(zip_fp).hexdigest()
260287

261288
sequence_progress: SequenceProgress = {
262289
"sequence_image_count": len(namelist),
263290
"file_type": types.FileType.ZIP.value,
264-
"md5sum": upload_md5sum,
291+
"sequence_md5sum": sequence_md5sum,
292+
"upload_md5sum": upload_md5sum,
265293
}
266294

267-
session_key = _session_key(upload_md5sum, upload_api_v4.ClusterFileType.ZIP)
295+
upload_session_key = _session_key(
296+
upload_md5sum, upload_api_v4.ClusterFileType.ZIP
297+
)
268298

269299
with zip_path.open("rb") as zip_fp:
270300
return uploader.upload_stream(
271301
zip_fp,
272302
upload_api_v4.ClusterFileType.ZIP,
273-
session_key,
303+
upload_session_key,
274304
# Send the copy of the input progress to each upload session, to avoid modifying the original one
275305
progress=T.cast(T.Dict[str, T.Any], {**progress, **sequence_progress}),
276306
)
277307

278308
@classmethod
279-
def prepare_images_and_upload(
309+
def zip_images_and_upload(
280310
cls,
281-
image_metadatas: T.Sequence[types.ImageMetadata],
282311
uploader: Uploader,
312+
image_metadatas: T.Sequence[types.ImageMetadata],
283313
progress: dict[str, T.Any] | None = None,
284314
) -> T.Generator[tuple[str, UploadResult], None, None]:
285315
if progress is None:
@@ -304,22 +334,25 @@ def prepare_images_and_upload(
304334

305335
with tempfile.NamedTemporaryFile() as fp:
306336
try:
307-
upload_md5sum = cls.zip_sequence_deterministically(sequence, fp)
337+
sequence_md5sum = cls.zip_sequence_fp(sequence, fp)
308338
except Exception as ex:
309339
yield sequence_uuid, UploadResult(error=ex)
310340
continue
311341

312-
sequence_progress["md5sum"] = upload_md5sum
342+
sequence_progress["sequence_md5sum"] = sequence_md5sum
313343

314-
session_key = _session_key(
344+
fp.seek(0, io.SEEK_SET)
345+
upload_md5sum = utils.md5sum_fp(fp).hexdigest()
346+
347+
upload_session_key = _session_key(
315348
upload_md5sum, upload_api_v4.ClusterFileType.ZIP
316349
)
317350

318351
try:
319352
cluster_id = uploader.upload_stream(
320353
fp,
321354
upload_api_v4.ClusterFileType.ZIP,
322-
session_key,
355+
upload_session_key,
323356
progress=T.cast(
324357
T.Dict[str, T.Any], {**progress, **sequence_progress}
325358
),
@@ -514,27 +547,6 @@ def _validate_metadatas(metadatas: T.Sequence[types.ImageMetadata]):
514547
raise FileNotFoundError(f"No such file {metadata.filename}")
515548

516549

517-
@contextmanager
518-
def wip_file_context(wip_path: Path, done_path: Path):
519-
assert wip_path != done_path, "should not be the same file"
520-
try:
521-
os.remove(wip_path)
522-
except FileNotFoundError:
523-
pass
524-
try:
525-
yield wip_path
526-
try:
527-
os.remove(done_path)
528-
except FileNotFoundError:
529-
pass
530-
wip_path.rename(done_path)
531-
finally:
532-
try:
533-
os.remove(wip_path)
534-
except FileNotFoundError:
535-
pass
536-
537-
538550
def _is_immediate_retry(ex: Exception):
539551
if (
540552
isinstance(ex, requests.HTTPError)

tests/integration/fixtures.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import py.path
1414
import pytest
1515

16+
from mapillary_tools import utils
1617

1718
EXECUTABLE = os.getenv(
1819
"MAPILLARY_TOOLS__TESTS_EXECUTABLE", "python3 -m mapillary_tools.commands"
@@ -174,20 +175,25 @@ def validate_and_extract_image(image_path: str):
174175
return desc
175176

176177

177-
def validate_and_extract_zip(zip_path: str) -> T.List[T.Dict]:
178+
def validate_and_extract_zip(zip_path: Path) -> T.List[T.Dict]:
178179
descs = []
179180

180181
with zipfile.ZipFile(zip_path) as zipf:
181-
upload_md5sum = json.loads(zipf.comment)["upload_md5sum"]
182+
_sequence_md5sum = json.loads(zipf.comment)["sequence_md5sum"]
182183
with tempfile.TemporaryDirectory() as tempdir:
183184
zipf.extractall(path=tempdir)
184185
for name in os.listdir(tempdir):
185186
filename = os.path.join(tempdir, name)
186187
desc = validate_and_extract_image(filename)
187188
descs.append(desc)
188189

189-
basename = os.path.basename(zip_path)
190-
assert f"mly_tools_{upload_md5sum}.zip" == basename, (basename, upload_md5sum)
190+
with zip_path.open("rb") as fp:
191+
upload_md5sum = utils.md5sum_fp(fp).hexdigest()
192+
193+
assert f"mly_tools_{upload_md5sum}.zip" == zip_path.name, (
194+
zip_path.name,
195+
upload_md5sum,
196+
)
191197

192198
return descs
193199

tests/integration/test_process.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def test_zip(tmpdir: py.path.local, setup_data: py.path.local):
303303
assert x.returncode == 0, x.stderr
304304
assert 0 < len(zip_dir.listdir())
305305
for file in zip_dir.listdir():
306-
validate_and_extract_zip(str(file))
306+
validate_and_extract_zip(Path(file))
307307

308308

309309
@pytest.mark.usefixtures("setup_config")

tests/integration/test_process_and_upload.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import datetime
22
import os
33
import subprocess
4+
from pathlib import Path
45

56
import py.path
67
import pytest
@@ -135,7 +136,7 @@ def _validate_uploads(upload_dir: py.path.local, expected):
135136
if str(file).endswith(".mp4"):
136137
descs.extend(validate_and_extract_camm(str(file)))
137138
elif str(file).endswith(".zip"):
138-
descs.extend(validate_and_extract_zip(str(file)))
139+
descs.extend(validate_and_extract_zip(Path(file)))
139140
else:
140141
raise Exception(f"invalid file {file}")
141142

0 commit comments

Comments
 (0)