Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:

- name: Type check with mypy
run: |
mypy mapillary_tools
mypy mapillary_tools tests/cli

- name: Test with pytest
run: |
Expand Down
15 changes: 13 additions & 2 deletions tests/cli/blackvue_parser.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
from __future__ import annotations

import argparse
import pathlib

import gpxpy
import gpxpy.gpx

from mapillary_tools import utils
from mapillary_tools import geo, utils
from mapillary_tools.geotag import blackvue_parser, utils as geotag_utils


def _parse_gpx(path: pathlib.Path) -> list[geo.Point] | None:
with path.open("rb") as fp:
points = blackvue_parser.extract_points(fp)
return points


def _convert_to_track(path: pathlib.Path):
track = gpxpy.gpx.GPXTrack()
points = blackvue_parser.parse_gps_points(path)
points = _parse_gpx(path)
if points is None:
raise RuntimeError(f"Invalid BlackVue video {path}")

segment = geotag_utils.convert_points_to_gpx_segment(points)
track.segments.append(segment)
with path.open("rb") as fp:
Expand Down
10 changes: 9 additions & 1 deletion tests/cli/camm_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,16 @@
from mapillary_tools.geotag import utils as geotag_utils


def _parse_gpx(path: pathlib.Path):
with path.open("rb") as fp:
return camm_parser.extract_points(fp)


def _convert(path: pathlib.Path):
points = camm_parser.parse_gpx(path)
points = _parse_gpx(path)
if points is None:
raise RuntimeError(f"Invalid CAMM video {path}")

track = gpxpy.gpx.GPXTrack()
track.name = path.name
track.segments.append(geotag_utils.convert_points_to_gpx_segment(points))
Expand Down
54 changes: 33 additions & 21 deletions tests/cli/exif_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@ def extract_and_show_exif(image_path):


def as_dict(exif: ExifReadABC):
if isinstance(exif, (ExifToolRead, ExifRead)):
gps_datetime = exif.extract_gps_datetime()
exif_time = exif.extract_exif_datetime()
else:
gps_datetime = None
exif_time = None

return {
"altitude": exif.extract_altitude(),
"capture_time": exif.extract_capture_time(),
"direction": exif.extract_direction(),
"exif_time": exif.extract_exif_datetime(),
"gps_time": exif.extract_gps_datetime(),
"exif_time": exif_time,
"gps_time": gps_datetime,
"lon_lat": exif.extract_lon_lat(),
"make": exif.extract_make(),
"model": exif.extract_model(),
Expand All @@ -40,32 +47,28 @@ def as_dict(exif: ExifReadABC):
}


def _round(v):
if isinstance(v, float):
return round(v, 6)
elif isinstance(v, tuple):
return tuple(round(x, 6) for x in v)
else:
return v
def _approximate(left, right):
if isinstance(left, float) and isinstance(right, float):
return abs(left - right) < 0.000001
if isinstance(left, tuple) and isinstance(right, tuple):
return all(abs(l - r) < 0.000001 for l, r in zip(left, right))
return left == right


def compare_exif(left: dict, right: dict) -> bool:
def compare_exif(left: dict, right: dict) -> str:
RED_COLOR = "\x1b[31;20m"
RESET_COLOR = "\x1b[0m"
diff = False
diff = []
for key in left:
if key in ["width", "height"]:
continue
if key in ["lon_lat", "altitude", "direction"]:
left_value = _round(left[key])
right_value = _round(right[key])
same = _approximate(left[key], right[key])
else:
left_value = left[key]
right_value = right[key]
if left_value != right_value:
print(f"{RED_COLOR}{key}: {left_value} != {right_value}{RESET_COLOR}")
diff = True
return diff
same = left[key] == right[key]
if not same:
diff.append(f"{RED_COLOR}{key}: {left[key]} != {right[key]}{RESET_COLOR}")
return "\n".join(diff)


def extract_and_show_from_exiftool(fp, compare: bool = False):
Expand All @@ -82,10 +85,19 @@ def extract_and_show_from_exiftool(fp, compare: bool = False):
native_exif = ExifRead(image_path)
diff = compare_exif(as_dict(exif), as_dict(native_exif))
if diff:
print(f"======== ExifTool Outuput {image_path} ========")
print(f"======== {image_path} ========")

print("ExifTool Outuput:")
pprint.pprint(as_dict(exif))
print(f"======== ExifRead Output {image_path} ========")
print()

print("ExifRead Output:")
pprint.pprint(as_dict(native_exif))
print()

print("DIFF:")
print(diff)
print()
else:
print(f"======== ExifTool Outuput {image_path} ========")
pprint.pprint(as_dict(exif))
Expand Down
45 changes: 34 additions & 11 deletions tests/cli/gpmf_parser.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import argparse
import dataclasses
import datetime
Expand Down Expand Up @@ -66,27 +68,39 @@ def _convert_points_to_gpx_track_segment(
return gpx_segment


def _parse_gpx(path: pathlib.Path) -> list[telemetry.GPSPoint] | None:
with path.open("rb") as fp:
info = gpmf_parser.extract_gopro_info(fp)
if info is None:
return None
return info.gps or []


def _convert_gpx(gpx: gpxpy.gpx.GPX, path: pathlib.Path):
points = gpmf_parser.parse_gpx(path)
points = _parse_gpx(path)
if points is None:
raise RuntimeError(f"Invalid GoPro video {path}")
gpx_track = gpxpy.gpx.GPXTrack()
gpx_track_segment = _convert_points_to_gpx_track_segment(points)
gpx_track.segments.append(gpx_track_segment)
gpx_track.name = path.name
gpx_track.comment = f"#points: {len(points)}"
with path.open("rb") as fp:
device_names = gpmf_parser.extract_all_device_names(fp)
with path.open("rb") as fp:
model = gpmf_parser.extract_camera_model(fp)
info = gpmf_parser.extract_gopro_info(fp)
if info is None:
return
gpx_track.description = (
f'Extracted from model "{model}" among these devices {device_names}'
f'Extracted from model "{info.model}" and make "{info.make}"'
)
gpx.tracks.append(gpx_track)


def _convert_geojson(path: pathlib.Path):
features = []
points = gpmf_parser.parse_gpx(path)
points = _parse_gpx(path)
if points is None:
raise RuntimeError(f"Invalid GoPro video {path}")

features = []
for idx, p in enumerate(points):
geomtry = {"type": "Point", "coordinates": [p.lon, p.lat]}
properties = {
Expand Down Expand Up @@ -138,24 +152,33 @@ def main():
imu_option = parsed_args.imu.split(",")
for path in video_paths:
with path.open("rb") as fp:
telemetry_data = gpmf_parser.extract_telemetry_data(fp)
telemetry_data = gpmf_parser.extract_gopro_info(fp, telemetry_only=True)
if telemetry_data:
if "accl" in imu_option:
print(
json.dumps(
[dataclasses.asdict(accl) for accl in telemetry_data.accl]
[
dataclasses.asdict(accl)
for accl in telemetry_data.accl or []
]
)
)
if "gyro" in imu_option:
print(
json.dumps(
[dataclasses.asdict(gyro) for gyro in telemetry_data.gyro]
[
dataclasses.asdict(gyro)
for gyro in telemetry_data.gyro or []
]
)
)
if "magn" in imu_option:
print(
json.dumps(
[dataclasses.asdict(magn) for magn in telemetry_data.magn]
[
dataclasses.asdict(magn)
for magn in telemetry_data.magn or []
]
)
)

Expand Down
83 changes: 35 additions & 48 deletions tests/cli/simple_mp4_parser.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import argparse
import io
import logging
Expand Down Expand Up @@ -31,40 +33,29 @@
}


def _validate_samples(
path: pathlib.Path, filters: T.Optional[T.Container[bytes]] = None
):
samples: T.List[sample_parser.RawSample] = []

with open(path, "rb") as fp:
for h, s in sparser.parse_path(
fp, [b"moov", b"trak", b"mdia", b"minf", b"stbl"]
):
(
descriptions,
raw_samples,
) = sample_parser.parse_raw_samples_from_stbl_DEPRECATED(
s, maxsize=h.maxsize
)
samples.extend(
sample
for sample in raw_samples
if filters is None
or descriptions[sample.description_idx]["format"] in filters
)
samples.sort(key=lambda s: s.offset)
if not samples:
def _validate_samples(path: pathlib.Path, filters: T.Container[bytes] | None = None):
raw_samples: list[sample_parser.RawSample] = []

parser = sample_parser.MovieBoxParser.parse_file(path)
for track in parser.extract_tracks():
for sample in track.extract_samples():
if filters is None or sample.description["format"] in filters:
raw_samples.append(sample.raw_sample)

raw_samples.sort(key=lambda s: s.offset)
if not raw_samples:
return

last_sample = None
last_read = samples[0].offset
for sample in samples:
if sample.offset < last_read:
last_read = raw_samples[0].offset
for raw_sample in raw_samples:
if raw_sample.offset < last_read:
LOG.warning(f"overlap found:\n{last_sample}\n{sample}")
elif sample.offset == last_read:
elif raw_sample.offset == last_read:
pass
else:
LOG.warning(f"gap found:\n{last_sample}\n{sample}")
last_read = sample.offset + sample.size
last_read = raw_sample.offset + raw_sample.size
last_sample = sample


Expand All @@ -87,7 +78,7 @@ def _parse_structs(fp: T.BinaryIO):
print(margin, header, data)


def _dump_box_data_at(fp: T.BinaryIO, box_type_path: T.List[bytes]):
def _dump_box_data_at(fp: T.BinaryIO, box_type_path: list[bytes]):
for h, s in sparser.parse_path(fp, box_type_path):
max_chunk_size = 1024
read = 0
Expand All @@ -104,30 +95,26 @@ def _dump_box_data_at(fp: T.BinaryIO, box_type_path: T.List[bytes]):
break


def _parse_samples(fp: T.BinaryIO, filters: T.Optional[T.Container[bytes]] = None):
for h, s in sparser.parse_path(fp, [b"moov", b"trak"]):
offset = s.tell()
for h1, s1 in sparser.parse_path(s, [b"mdia", b"mdhd"], maxsize=h.maxsize):
box = cparser.MediaHeaderBox.parse(s1.read(h.maxsize))
LOG.info(box)
LOG.info(sample_parser.to_datetime(box.creation_time))
LOG.info(box.duration / box.timescale)
s.seek(offset, io.SEEK_SET)
for sample in sample_parser.parse_samples_from_trak_DEPRECATED(
s, maxsize=h.maxsize
):
def _parse_samples(fp: T.BinaryIO, filters: T.Container[bytes] | None = None):
parser = sample_parser.MovieBoxParser.parse_stream(fp)
for track in parser.extract_tracks():
box = track.extract_mdhd_boxdata()
LOG.info(box)
LOG.info(sample_parser.to_datetime(box["creation_time"]))
LOG.info(box["duration"] / box["timescale"])

for sample in track.extract_samples():
if filters is None or sample.description["format"] in filters:
print(sample)


def _dump_samples(fp: T.BinaryIO, filters: T.Optional[T.Container[bytes]] = None):
for h, s in sparser.parse_path(fp, [b"moov", b"trak"]):
for sample in sample_parser.parse_samples_from_trak_DEPRECATED(
s, maxsize=h.maxsize
):
def _dump_samples(fp: T.BinaryIO, filters: T.Container[bytes] | None = None):
parser = sample_parser.MovieBoxParser.parse_stream(fp)
for track in parser.extract_tracks():
for sample in track.extract_samples():
if filters is None or sample.description["format"] in filters:
fp.seek(sample.offset, io.SEEK_SET)
data = fp.read(sample.size)
fp.seek(sample.raw_sample.offset, io.SEEK_SET)
data = fp.read(sample.raw_sample.size)
sys.stdout.buffer.write(data)


Expand Down
8 changes: 4 additions & 4 deletions tests/cli/upload_api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ def main():
)
initial_offset = service.fetch_offset()

LOG.info(f"Session key: %s", session_key)
LOG.info(f"Entity size: %d", entity_size)
LOG.info(f"Initial offset: %s", initial_offset)
LOG.info(f"Chunk size: %s MB", service.chunk_size / (1024 * 1024))
LOG.info("Session key: %s", session_key)
LOG.info("Entity size: %d", entity_size)
LOG.info("Initial offset: %s", initial_offset)
LOG.info("Chunk size: %s MB", service.chunk_size / (1024 * 1024))

with open(parsed.filename, "rb") as fp:
with tqdm.tqdm(
Expand Down
Loading