Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 182 additions & 121 deletions mapillary_tools/camm/camm_parser.py

Large diffs are not rendered by default.

23 changes: 10 additions & 13 deletions mapillary_tools/geotag/geotag_videos_from_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,23 @@ def extract(self) -> types.VideoMetadataOrError:
class CAMMVideoExtractor(GenericVideoExtractor):
def extract(self) -> types.VideoMetadataOrError:
with self.video_path.open("rb") as fp:
points = camm_parser.extract_points(fp)
camm_info = camm_parser.extract_camm_info(fp)

if points is None:
raise exceptions.MapillaryVideoGPSNotFoundError(
"No GPS data found from the video"
)

if not points:
raise exceptions.MapillaryGPXEmptyError("Empty GPS data found")
if camm_info is None:
raise exceptions.MapillaryVideoGPSNotFoundError(
"No GPS data found from the video"
)

fp.seek(0, io.SEEK_SET)
make, model = camm_parser.extract_camera_make_and_model(fp)
if not camm_info.gps and not camm_info.mini_gps:
raise exceptions.MapillaryGPXEmptyError("Empty GPS data found")

return types.VideoMetadata(
filename=self.video_path,
filesize=utils.get_file_size(self.video_path),
filetype=FileType.CAMM,
points=points,
make=make,
model=model,
points=T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps),
make=camm_info.make,
model=camm_info.model,
)


Expand Down
23 changes: 0 additions & 23 deletions mapillary_tools/geotag/utils.py

This file was deleted.

6 changes: 6 additions & 0 deletions mapillary_tools/mp4/mp4_sample_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ def extract_mvhd_boxdata(self) -> T.Dict:
mvhd = cparser.find_box_at_pathx(self.moov_children, [b"mvhd"])
return T.cast(T.Dict, mvhd["data"])

def extract_udta_boxdata(self) -> T.Dict | None:
box = cparser.find_box_at_path(self.moov_children, [b"udta"])
if box is None:
return None
return T.cast(T.Dict, box["data"])

def extract_tracks(self) -> T.Generator[TrackBoxParser, None, None]:
for box in self.moov_children:
if box["type"] == b"trak":
Expand Down
12 changes: 6 additions & 6 deletions mapillary_tools/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class GPSFix(Enum):


@dataclasses.dataclass(order=True)
class TelemetryMeasurement:
class TimestampedMeasurement:
"""Base class for all telemetry measurements.

All telemetry measurements must have a timestamp in seconds.
Expand All @@ -25,15 +25,15 @@ class TelemetryMeasurement:


@dataclasses.dataclass
class GPSPoint(TelemetryMeasurement, Point):
class GPSPoint(TimestampedMeasurement, Point):
epoch_time: T.Optional[float]
fix: T.Optional[GPSFix]
precision: T.Optional[float]
ground_speed: T.Optional[float]


@dataclasses.dataclass
class CAMMGPSPoint(TelemetryMeasurement, Point):
class CAMMGPSPoint(TimestampedMeasurement, Point):
time_gps_epoch: float
gps_fix_type: int
horizontal_accuracy: float
Expand All @@ -45,7 +45,7 @@ class CAMMGPSPoint(TelemetryMeasurement, Point):


@dataclasses.dataclass(order=True)
class GyroscopeData(TelemetryMeasurement):
class GyroscopeData(TimestampedMeasurement):
"""Gyroscope signal in radians/seconds around XYZ axes of the camera."""

x: float
Expand All @@ -54,7 +54,7 @@ class GyroscopeData(TelemetryMeasurement):


@dataclasses.dataclass(order=True)
class AccelerationData(TelemetryMeasurement):
class AccelerationData(TimestampedMeasurement):
"""Accelerometer reading in meters/second^2 along XYZ axes of the camera."""

x: float
Expand All @@ -63,7 +63,7 @@ class AccelerationData(TelemetryMeasurement):


@dataclasses.dataclass(order=True)
class MagnetometerData(TelemetryMeasurement):
class MagnetometerData(TimestampedMeasurement):
"""Ambient magnetic field."""

x: float
Expand Down
58 changes: 41 additions & 17 deletions mapillary_tools/video_data_extraction/extractors/camm_parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
from __future__ import annotations

import typing as T

from ... import geo
Expand All @@ -12,27 +13,50 @@ class CammParser(BaseParser):
must_rebase_times_to_zero = False
parser_label = "camm"

@functools.cached_property
def _camera_info(self) -> T.Tuple[str, str]:
source_path = self.geotag_source_path
if not source_path:
return "", ""
_extracted: bool = False
_cached_camm_info: camm_parser.CAMMInfo | None = None

with source_path.open("rb") as fp:
return camm_parser.extract_camera_make_and_model(fp)
# TODO: use @functools.cached_property
def _extract_camm_info(self) -> camm_parser.CAMMInfo | None:
if self._extracted:
return self._cached_camm_info

self._extracted = True

def extract_points(self) -> T.Sequence[geo.Point]:
source_path = self.geotag_source_path
if not source_path:
return []

if source_path is None:
# source_path not found
return None

with source_path.open("rb") as fp:
try:
return camm_parser.extract_points(fp) or []
self._cached_camm_info = camm_parser.extract_camm_info(fp)
except sparser.ParsingError:
return []
self._cached_camm_info = None

return self._cached_camm_info

def extract_points(self) -> T.Sequence[geo.Point]:
camm_info = self._extract_camm_info()

if camm_info is None:
return []

return T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps)

def extract_make(self) -> str | None:
camm_info = self._extract_camm_info()

if camm_info is None:
return None

return camm_info.make

def extract_model(self) -> str | None:
camm_info = self._extract_camm_info()

def extract_make(self) -> T.Optional[str]:
return self._camera_info[0] or None
if camm_info is None:
return None

def extract_model(self) -> T.Optional[str]:
return self._camera_info[1] or None
return camm_info.model
22 changes: 20 additions & 2 deletions tests/cli/blackvue_parser.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
from __future__ import annotations

import argparse
import datetime
import pathlib
import typing as T

import gpxpy
import gpxpy.gpx

from mapillary_tools import geo, utils
from mapillary_tools.geotag import blackvue_parser, utils as geotag_utils
from mapillary_tools.geotag import blackvue_parser


def _convert_points_to_gpx_segment(
points: T.Sequence[geo.Point],
) -> gpxpy.gpx.GPXTrackSegment:
gpx_segment = gpxpy.gpx.GPXTrackSegment()
for point in points:
gpx_segment.points.append(
gpxpy.gpx.GPXTrackPoint(
point.lat,
point.lon,
elevation=point.alt,
time=datetime.datetime.fromtimestamp(point.time, datetime.timezone.utc),
)
)
return gpx_segment


def _parse_gpx(path: pathlib.Path) -> list[geo.Point] | None:
Expand All @@ -22,7 +40,7 @@ def _convert_to_track(path: pathlib.Path):
if points is None:
raise RuntimeError(f"Invalid BlackVue video {path}")

segment = geotag_utils.convert_points_to_gpx_segment(points)
segment = _convert_points_to_gpx_segment(points)
track.segments.append(segment)
with path.open("rb") as fp:
model = blackvue_parser.extract_camera_model(fp)
Expand Down
87 changes: 53 additions & 34 deletions tests/cli/camm_parser.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,51 @@
import argparse
import dataclasses
import datetime
import json
import pathlib
import typing as T

import gpxpy
import gpxpy.gpx

from mapillary_tools import telemetry, utils
from mapillary_tools import geo, utils
from mapillary_tools.camm import camm_parser
from mapillary_tools.geotag import utils as geotag_utils


def _parse_gpx(path: pathlib.Path):
with path.open("rb") as fp:
return camm_parser.extract_points(fp)
def _convert_points_to_gpx_segment(
points: T.Sequence[geo.Point],
) -> gpxpy.gpx.GPXTrackSegment:
gpx_segment = gpxpy.gpx.GPXTrackSegment()
for point in points:
gpx_segment.points.append(
gpxpy.gpx.GPXTrackPoint(
point.lat,
point.lon,
elevation=point.alt,
time=datetime.datetime.fromtimestamp(point.time, datetime.timezone.utc),
)
)
return gpx_segment


def _convert(path: pathlib.Path):
points = _parse_gpx(path)
if points is None:
raise RuntimeError(f"Invalid CAMM video {path}")

track = gpxpy.gpx.GPXTrack()
track.name = path.name
track.segments.append(geotag_utils.convert_points_to_gpx_segment(points))
with open(path, "rb") as fp:
make, model = camm_parser.extract_camera_make_and_model(fp)
make_model = json.dumps({"make": make, "model": model})

track.name = str(path)

with path.open("rb") as fp:
camm_info = camm_parser.extract_camm_info(fp)

if camm_info is None:
track.description = "Invalid CAMM video"
return track

points = T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps)
track.segments.append(_convert_points_to_gpx_segment(points))

make_model = json.dumps({"make": camm_info.make, "model": camm_info.model})
track.description = f"Extracted from {make_model}"

return track


Expand All @@ -50,26 +68,27 @@ def main():

for path in video_paths:
with path.open("rb") as fp:
telemetry_measurements = camm_parser.extract_telemetry_data(fp)

accls = []
gyros = []
magns = []
if telemetry_measurements:
for m in telemetry_measurements:
if isinstance(m, telemetry.AccelerationData):
accls.append(m)
elif isinstance(m, telemetry.GyroscopeData):
gyros.append(m)
elif isinstance(m, telemetry.MagnetometerData):
magns.append(m)

if "accl" in imu_option:
print(json.dumps([dataclasses.asdict(accl) for accl in accls]))
if "gyro" in imu_option:
print(json.dumps([dataclasses.asdict(gyro) for gyro in gyros]))
if "magn" in imu_option:
print(json.dumps([dataclasses.asdict(magn) for magn in magns]))
camm_info = camm_parser.extract_camm_info(fp, telemetry_only=True)

if camm_info:
if "accl" in imu_option:
print(
json.dumps(
[dataclasses.asdict(accl) for accl in camm_info.accl or []]
)
)
if "gyro" in imu_option:
print(
json.dumps(
[dataclasses.asdict(gyro) for gyro in camm_info.gyro or []]
)
)
if "magn" in imu_option:
print(
json.dumps(
[dataclasses.asdict(magn) for magn in camm_info.magn or []]
)
)
else:
gpx = gpxpy.gpx.GPX()
for path in video_paths:
Expand Down
14 changes: 5 additions & 9 deletions tests/unit/test_camm_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,17 @@ def encode_decode_empty_camm_mp4(metadata: types.VideoMetadata) -> types.VideoMe
)

# extract points
points = camm_parser.extract_points(T.cast(T.BinaryIO, target_fp))
camm_info = camm_parser.extract_camm_info(T.cast(T.BinaryIO, target_fp))

# extract make/model
target_fp.seek(0, io.SEEK_SET)
make, model = camm_parser.extract_camera_make_and_model(
T.cast(T.BinaryIO, target_fp)
)
assert camm_info is not None

# return metadata
return types.VideoMetadata(
Path(""),
filetype=types.FileType.CAMM,
points=points or [],
make=make,
model=model,
points=T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps),
make=camm_info.make,
model=camm_info.model,
)


Expand Down
Loading