diff --git a/mapillary_tools/camm/camm_parser.py b/mapillary_tools/camm/camm_parser.py index c2c602f89..512cc81bd 100644 --- a/mapillary_tools/camm/camm_parser.py +++ b/mapillary_tools/camm/camm_parser.py @@ -12,16 +12,18 @@ from typing_extensions import TypeIs from .. import geo, telemetry -from ..mp4 import simple_mp4_parser as sparser from ..mp4.mp4_sample_parser import MovieBoxParser, Sample, TrackBoxParser LOG = logging.getLogger(__name__) +# All fields are little-endian +_Float = C.Float32l +_Double = C.Float64l TelemetryMeasurement = T.Union[ geo.Point, - telemetry.TelemetryMeasurement, + telemetry.TimestampedMeasurement, ] @@ -37,12 +39,94 @@ class CAMMType(Enum): MAGNETIC_FIELD = 7 -# All fields are little-endian -Float = C.Float32l -Double = C.Float64l +TTelemetry = T.TypeVar("TTelemetry", bound=TelemetryMeasurement) -TTelemetry = T.TypeVar("TTelemetry", bound=TelemetryMeasurement) +@dataclasses.dataclass +class CAMMInfo: + # None indicates the data has been extracted, + # while [] indicates extracetd but no data point found + mini_gps: list[geo.Point] | None = None + gps: list[telemetry.CAMMGPSPoint] | None = None + accl: list[telemetry.AccelerationData] | None = None + gyro: list[telemetry.GyroscopeData] | None = None + magn: list[telemetry.MagnetometerData] | None = None + make: str = "" + model: str = "" + + +def extract_camm_info(fp: T.BinaryIO, telemetry_only: bool = False) -> CAMMInfo | None: + moov = MovieBoxParser.parse_stream(fp) + + make, model = "", "" + if not telemetry_only: + udta_boxdata = moov.extract_udta_boxdata() + if udta_boxdata is not None: + make, model = _extract_camera_make_and_model_from_utda_boxdata(udta_boxdata) + + gps_only_construct = _construct_with_selected_camm_types( + [CAMMType.MIN_GPS, CAMMType.GPS] + ) + # Optimization: skip parsing sample data smaller than 16 bytes + # because we are only interested in MIN_GPS and GPS which are larger than 16 bytes + MIN_GPS_SAMPLE_SIZE = 17 + + for track in moov.extract_tracks(): + if _contains_camm_description(track): + if telemetry_only: + maybe_measurements = ( + _parse_telemetry_from_sample(fp, sample) + for sample in track.extract_samples() + if _is_camm_description(sample.description) + ) + measurements = _filter_telemetry_by_track_elst( + moov, track, (m for m in maybe_measurements if m is not None) + ) + + accl: list[telemetry.AccelerationData] = [] + gyro: list[telemetry.GyroscopeData] = [] + magn: list[telemetry.MagnetometerData] = [] + + for measurement in measurements: + if isinstance(measurement, telemetry.AccelerationData): + accl.append(measurement) + elif isinstance(measurement, telemetry.GyroscopeData): + gyro.append(measurement) + elif isinstance(measurement, telemetry.MagnetometerData): + magn.append(measurement) + + return CAMMInfo(accl=accl, gyro=gyro, magn=magn) + else: + maybe_measurements = ( + _parse_telemetry_from_sample(fp, sample, gps_only_construct) + for sample in track.extract_samples() + if _is_camm_description(sample.description) + and sample.raw_sample.size >= MIN_GPS_SAMPLE_SIZE + ) + measurements = _filter_telemetry_by_track_elst( + moov, track, (m for m in maybe_measurements if m is not None) + ) + + mini_gps: list[geo.Point] = [] + gps: list[telemetry.CAMMGPSPoint] = [] + + for measurement in measurements: + if isinstance(measurement, geo.Point): + mini_gps.append(measurement) + elif isinstance(measurement, telemetry.CAMMGPSPoint): + gps.append(measurement) + + return CAMMInfo(mini_gps=mini_gps, gps=gps, make=make, model=model) + + return None + + +def extract_camera_make_and_model(fp: T.BinaryIO) -> tuple[str, str]: + moov = MovieBoxParser.parse_stream(fp) + udta_boxdata = moov.extract_udta_boxdata() + if udta_boxdata is None: + return "", "" + return _extract_camera_make_and_model_from_utda_boxdata(udta_boxdata) class CAMMSampleEntry(abc.ABC, T.Generic[TTelemetry]): @@ -81,7 +165,7 @@ class MinGPSSampleEntry(CAMMSampleEntry): telemetry_cls_type = geo.Point - construct = Double[3] # type: ignore + construct = _Double[3] # type: ignore @classmethod def deserialize(cls, sample: Sample, data: T.Any) -> geo.Point: @@ -115,17 +199,17 @@ class GPSSampleEntry(CAMMSampleEntry): telemetry_cls_type = telemetry.CAMMGPSPoint construct = C.Struct( - "time_gps_epoch" / Double, # type: ignore + "time_gps_epoch" / _Double, # type: ignore "gps_fix_type" / C.Int32sl, # type: ignore - "latitude" / Double, # type: ignore - "longitude" / Double, # type: ignore - "altitude" / Float, # type: ignore - "horizontal_accuracy" / Float, # type: ignore - "vertical_accuracy" / Float, # type: ignore - "velocity_east" / Float, # type: ignore - "velocity_north" / Float, # type: ignore - "velocity_up" / Float, # type: ignore - "speed_accuracy" / Float, # type: ignore + "latitude" / _Double, # type: ignore + "longitude" / _Double, # type: ignore + "altitude" / _Float, # type: ignore + "horizontal_accuracy" / _Float, # type: ignore + "vertical_accuracy" / _Float, # type: ignore + "velocity_east" / _Float, # type: ignore + "velocity_north" / _Float, # type: ignore + "velocity_up" / _Float, # type: ignore + "speed_accuracy" / _Float, # type: ignore ) @classmethod @@ -175,7 +259,7 @@ class GoProGPSSampleEntry(CAMMSampleEntry): telemetry_cls_type = telemetry.GPSPoint - construct = Double[3] # type: ignore + construct = _Double[3] # type: ignore @classmethod def deserialize(cls, sample: Sample, data: T.Any) -> telemetry.GPSPoint: @@ -202,7 +286,7 @@ class AccelerationSampleEntry(CAMMSampleEntry): telemetry_cls_type = telemetry.AccelerationData - construct: C.Struct = Float[3] # type: ignore + construct: C.Struct = _Float[3] # type: ignore @classmethod def deserialize(cls, sample: Sample, data: T.Any) -> telemetry.AccelerationData: @@ -230,7 +314,7 @@ class GyroscopeSampleEntry(CAMMSampleEntry): telemetry_cls_type = telemetry.GyroscopeData - construct: C.Struct = Float[3] # type: ignore + construct: C.Struct = _Float[3] # type: ignore @classmethod def deserialize(cls, sample: Sample, data: T.Any) -> telemetry.GyroscopeData: @@ -258,7 +342,7 @@ class MagnetometerSampleEntry(CAMMSampleEntry): telemetry_cls_type = telemetry.MagnetometerData - construct: C.Struct = Float[3] # type: ignore + construct: C.Struct = _Float[3] # type: ignore @classmethod def deserialize(cls, sample: Sample, data: T.Any) -> telemetry.MagnetometerData: @@ -290,35 +374,63 @@ def serialize(cls, data: telemetry.MagnetometerData) -> bytes: _SWITCH: T.Dict[int, C.Struct] = { - # angle_axis - CAMMType.ANGLE_AXIS.value: Float[3], # type: ignore + # Angle_axis + CAMMType.ANGLE_AXIS.value: _Float[3], # type: ignore + # Exposure time CAMMType.EXPOSURE_TIME.value: C.Struct( "pixel_exposure_time" / C.Int32sl, # type: ignore "rolling_shutter_skew_time" / C.Int32sl, # type: ignore ), - # position - CAMMType.POSITION.value: Float[3], # type: ignore + # Position + CAMMType.POSITION.value: _Float[3], # type: ignore + # Serializable types **{t.value: cls.construct for t, cls in SAMPLE_ENTRY_CLS_BY_CAMM_TYPE.items()}, } -CAMMSampleData = C.Struct( - C.Padding(2), - "type" / C.Int16ul, - "data" / C.Switch(C.this.type, _SWITCH), -) + +def _construct_with_selected_camm_types( + selected_camm_types: T.Container[CAMMType] | None = None, +) -> C.Struct: + if selected_camm_types is None: + switch = _SWITCH + else: + switch = { + k: v for k, v in _SWITCH.items() if CAMMType(k) in selected_camm_types + } + + return C.Struct( + C.Padding(2), + "type" / C.Int16ul, + "data" / C.Switch(C.this.type, switch), + ) + + +CAMMSampleData = _construct_with_selected_camm_types() def _parse_telemetry_from_sample( - fp: T.BinaryIO, sample: Sample -) -> T.Optional[TelemetryMeasurement]: + fp: T.BinaryIO, + sample: Sample, + construct: C.Struct | None = None, +) -> TelemetryMeasurement | None: + if construct is None: + construct = CAMMSampleData + fp.seek(sample.raw_sample.offset, io.SEEK_SET) data = fp.read(sample.raw_sample.size) - box = CAMMSampleData.parse(data) + + box = construct.parse(data) + + # boxdata=None when the construct is unable to parse the data + # (CAMM type not in the switch) + if box.data is None: + return None camm_type = CAMMType(box.type) # type: ignore SampleKlass = SAMPLE_ENTRY_CLS_BY_CAMM_TYPE.get(camm_type) if SampleKlass is None: return None + return SampleKlass.deserialize(sample, box.data) @@ -378,7 +490,7 @@ def _filter_telemetry_by_track_elst( moov: MovieBoxParser, track: TrackBoxParser, measurements: T.Iterable[TelemetryMeasurement], -) -> T.List[TelemetryMeasurement]: +) -> list[TelemetryMeasurement]: elst_boxdata = track.extract_elst_boxdata() if elst_boxdata is not None: @@ -406,69 +518,26 @@ def _filter_telemetry_by_track_elst( return list(measurements) -def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]: - """ - Return a list of points (could be empty) if it is a valid CAMM video, - otherwise None - """ - - moov = MovieBoxParser.parse_stream(fp) - - for track in moov.extract_tracks(): - if _contains_camm_description(track): - maybe_measurements = ( - _parse_telemetry_from_sample(fp, sample) - for sample in track.extract_samples() - if _is_camm_description(sample.description) - ) - points = [m for m in maybe_measurements if isinstance(m, geo.Point)] - - return T.cast( - T.List[geo.Point], _filter_telemetry_by_track_elst(moov, track, points) - ) - - return None - - -def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[T.List[TelemetryMeasurement]]: - moov = MovieBoxParser.parse_stream(fp) - - for track in moov.extract_tracks(): - if _contains_camm_description(track): - maybe_measurements = ( - _parse_telemetry_from_sample(fp, sample) - for sample in track.extract_samples() - if _is_camm_description(sample.description) - ) - measurements = [m for m in maybe_measurements if m is not None] - - measurements = _filter_telemetry_by_track_elst(moov, track, measurements) - - return measurements - - return None - - -MakeOrModel = C.Struct( +_MakeOrModel = C.Struct( "size" / C.Int16ub, C.Padding(2), "data" / C.FixedSized(C.this.size, C.GreedyBytes), ) -def _decode_quietly(data: bytes, h: sparser.Header) -> str: +def _decode_quietly(data: bytes, type: bytes) -> str: try: return data.decode("utf-8") except UnicodeDecodeError: - LOG.warning("Failed to decode %s: %s", h, data[:512]) + LOG.warning("Failed to decode %s: %s", type, data[:512]) return "" -def _parse_quietly(data: bytes, h: sparser.Header) -> bytes: +def _parse_quietly(data: bytes, type: bytes) -> bytes: try: - parsed = MakeOrModel.parse(data) + parsed = _MakeOrModel.parse(data) except C.ConstructError: - LOG.warning("Failed to parse %s: %s", h, data[:512]) + LOG.warning("Failed to parse %s: %s", type, data[:512]) return b"" if parsed is None: @@ -477,52 +546,44 @@ def _parse_quietly(data: bytes, h: sparser.Header) -> bytes: return parsed["data"] -def extract_camera_make_and_model(fp: T.BinaryIO) -> tuple[str, str]: - header_and_stream = sparser.parse_path( - fp, - [ - b"moov", - b"udta", - [ - # Insta360 Titan - b"\xa9mak", - b"\xa9mod", - # RICHO THETA V - b"@mod", - b"@mak", - # RICHO THETA V - b"manu", - b"modl", - ], - ], - ) - +def _extract_camera_make_and_model_from_utda_boxdata( + utda_boxdata: dict, +) -> tuple[str, str]: make: str = "" model: str = "" - try: - for h, s in header_and_stream: - data = s.read(h.maxsize) - if h.type == b"\xa9mak": - make_data = _parse_quietly(data, h) + for box in utda_boxdata: + # Insta360 Titan + if box.type == b"\xa9mak": + if not make: + make_data = _parse_quietly(box.data, box.type) make_data = make_data.rstrip(b"\x00") - make = _decode_quietly(make_data, h) - elif h.type == b"\xa9mod": - model_data = _parse_quietly(data, h) + make = _decode_quietly(make_data, box.type) + + # Insta360 Titan + elif box.type == b"\xa9mod": + if not model: + model_data = _parse_quietly(box.data, box.type) model_data = model_data.rstrip(b"\x00") - model = _decode_quietly(model_data, h) - elif h.type in [b"@mak", b"manu"]: - make = _decode_quietly(data, h) - elif h.type in [b"@mod", b"modl"]: - model = _decode_quietly(data, h) - # quit when both found - if make and model: - break - except sparser.ParsingError: - pass + model = _decode_quietly(model_data, box.type) + + # RICHO THETA V + elif box.type in [b"@mak", b"manu"]: + if not make: + make = _decode_quietly(box.data, box.type) + + # RICHO THETA V + elif box.type in [b"@mod", b"modl"]: + if not model: + model = _decode_quietly(box.data, box.type) + + # quit when both found + if make and model: + break if make: make = make.strip() + if model: model = model.strip() diff --git a/mapillary_tools/geotag/geotag_videos_from_video.py b/mapillary_tools/geotag/geotag_videos_from_video.py index ac05ce979..77d239f4f 100644 --- a/mapillary_tools/geotag/geotag_videos_from_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_video.py @@ -48,26 +48,23 @@ def extract(self) -> types.VideoMetadataOrError: class CAMMVideoExtractor(GenericVideoExtractor): def extract(self) -> types.VideoMetadataOrError: with self.video_path.open("rb") as fp: - points = camm_parser.extract_points(fp) + camm_info = camm_parser.extract_camm_info(fp) - if points is None: - raise exceptions.MapillaryVideoGPSNotFoundError( - "No GPS data found from the video" - ) - - if not points: - raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") + if camm_info is None: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" + ) - fp.seek(0, io.SEEK_SET) - make, model = camm_parser.extract_camera_make_and_model(fp) + if not camm_info.gps and not camm_info.mini_gps: + raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") return types.VideoMetadata( filename=self.video_path, filesize=utils.get_file_size(self.video_path), filetype=FileType.CAMM, - points=points, - make=make, - model=model, + points=T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps), + make=camm_info.make, + model=camm_info.model, ) diff --git a/mapillary_tools/geotag/utils.py b/mapillary_tools/geotag/utils.py deleted file mode 100644 index 807820dff..000000000 --- a/mapillary_tools/geotag/utils.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import annotations - -import datetime -import typing as T - -import gpxpy -import gpxpy.gpx - -from .. import geo - - -def convert_points_to_gpx_segment(points: T.Sequence[geo.Point]): - gpx_segment = gpxpy.gpx.GPXTrackSegment() - for point in points: - gpx_segment.points.append( - gpxpy.gpx.GPXTrackPoint( - point.lat, - point.lon, - elevation=point.alt, - time=datetime.datetime.fromtimestamp(point.time, datetime.timezone.utc), - ) - ) - return gpx_segment diff --git a/mapillary_tools/mp4/mp4_sample_parser.py b/mapillary_tools/mp4/mp4_sample_parser.py index bad5e6ae5..285825aa4 100644 --- a/mapillary_tools/mp4/mp4_sample_parser.py +++ b/mapillary_tools/mp4/mp4_sample_parser.py @@ -291,6 +291,12 @@ def extract_mvhd_boxdata(self) -> T.Dict: mvhd = cparser.find_box_at_pathx(self.moov_children, [b"mvhd"]) return T.cast(T.Dict, mvhd["data"]) + def extract_udta_boxdata(self) -> T.Dict | None: + box = cparser.find_box_at_path(self.moov_children, [b"udta"]) + if box is None: + return None + return T.cast(T.Dict, box["data"]) + def extract_tracks(self) -> T.Generator[TrackBoxParser, None, None]: for box in self.moov_children: if box["type"] == b"trak": diff --git a/mapillary_tools/telemetry.py b/mapillary_tools/telemetry.py index 1ebdd6f1a..fb4caa5b0 100644 --- a/mapillary_tools/telemetry.py +++ b/mapillary_tools/telemetry.py @@ -13,7 +13,7 @@ class GPSFix(Enum): @dataclasses.dataclass(order=True) -class TelemetryMeasurement: +class TimestampedMeasurement: """Base class for all telemetry measurements. All telemetry measurements must have a timestamp in seconds. @@ -25,7 +25,7 @@ class TelemetryMeasurement: @dataclasses.dataclass -class GPSPoint(TelemetryMeasurement, Point): +class GPSPoint(TimestampedMeasurement, Point): epoch_time: T.Optional[float] fix: T.Optional[GPSFix] precision: T.Optional[float] @@ -33,7 +33,7 @@ class GPSPoint(TelemetryMeasurement, Point): @dataclasses.dataclass -class CAMMGPSPoint(TelemetryMeasurement, Point): +class CAMMGPSPoint(TimestampedMeasurement, Point): time_gps_epoch: float gps_fix_type: int horizontal_accuracy: float @@ -45,7 +45,7 @@ class CAMMGPSPoint(TelemetryMeasurement, Point): @dataclasses.dataclass(order=True) -class GyroscopeData(TelemetryMeasurement): +class GyroscopeData(TimestampedMeasurement): """Gyroscope signal in radians/seconds around XYZ axes of the camera.""" x: float @@ -54,7 +54,7 @@ class GyroscopeData(TelemetryMeasurement): @dataclasses.dataclass(order=True) -class AccelerationData(TelemetryMeasurement): +class AccelerationData(TimestampedMeasurement): """Accelerometer reading in meters/second^2 along XYZ axes of the camera.""" x: float @@ -63,7 +63,7 @@ class AccelerationData(TelemetryMeasurement): @dataclasses.dataclass(order=True) -class MagnetometerData(TelemetryMeasurement): +class MagnetometerData(TimestampedMeasurement): """Ambient magnetic field.""" x: float diff --git a/mapillary_tools/video_data_extraction/extractors/camm_parser.py b/mapillary_tools/video_data_extraction/extractors/camm_parser.py index 4faa2c266..ea0a81d93 100644 --- a/mapillary_tools/video_data_extraction/extractors/camm_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/camm_parser.py @@ -1,4 +1,5 @@ -import functools +from __future__ import annotations + import typing as T from ... import geo @@ -12,27 +13,50 @@ class CammParser(BaseParser): must_rebase_times_to_zero = False parser_label = "camm" - @functools.cached_property - def _camera_info(self) -> T.Tuple[str, str]: - source_path = self.geotag_source_path - if not source_path: - return "", "" + _extracted: bool = False + _cached_camm_info: camm_parser.CAMMInfo | None = None - with source_path.open("rb") as fp: - return camm_parser.extract_camera_make_and_model(fp) + # TODO: use @functools.cached_property + def _extract_camm_info(self) -> camm_parser.CAMMInfo | None: + if self._extracted: + return self._cached_camm_info + + self._extracted = True - def extract_points(self) -> T.Sequence[geo.Point]: source_path = self.geotag_source_path - if not source_path: - return [] + + if source_path is None: + # source_path not found + return None + with source_path.open("rb") as fp: try: - return camm_parser.extract_points(fp) or [] + self._cached_camm_info = camm_parser.extract_camm_info(fp) except sparser.ParsingError: - return [] + self._cached_camm_info = None + + return self._cached_camm_info + + def extract_points(self) -> T.Sequence[geo.Point]: + camm_info = self._extract_camm_info() + + if camm_info is None: + return [] + + return T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps) + + def extract_make(self) -> str | None: + camm_info = self._extract_camm_info() + + if camm_info is None: + return None + + return camm_info.make + + def extract_model(self) -> str | None: + camm_info = self._extract_camm_info() - def extract_make(self) -> T.Optional[str]: - return self._camera_info[0] or None + if camm_info is None: + return None - def extract_model(self) -> T.Optional[str]: - return self._camera_info[1] or None + return camm_info.model diff --git a/tests/cli/blackvue_parser.py b/tests/cli/blackvue_parser.py index e1e1c8380..7f368dc9c 100644 --- a/tests/cli/blackvue_parser.py +++ b/tests/cli/blackvue_parser.py @@ -1,13 +1,31 @@ from __future__ import annotations import argparse +import datetime import pathlib +import typing as T import gpxpy import gpxpy.gpx from mapillary_tools import geo, utils -from mapillary_tools.geotag import blackvue_parser, utils as geotag_utils +from mapillary_tools.geotag import blackvue_parser + + +def _convert_points_to_gpx_segment( + points: T.Sequence[geo.Point], +) -> gpxpy.gpx.GPXTrackSegment: + gpx_segment = gpxpy.gpx.GPXTrackSegment() + for point in points: + gpx_segment.points.append( + gpxpy.gpx.GPXTrackPoint( + point.lat, + point.lon, + elevation=point.alt, + time=datetime.datetime.fromtimestamp(point.time, datetime.timezone.utc), + ) + ) + return gpx_segment def _parse_gpx(path: pathlib.Path) -> list[geo.Point] | None: @@ -22,7 +40,7 @@ def _convert_to_track(path: pathlib.Path): if points is None: raise RuntimeError(f"Invalid BlackVue video {path}") - segment = geotag_utils.convert_points_to_gpx_segment(points) + segment = _convert_points_to_gpx_segment(points) track.segments.append(segment) with path.open("rb") as fp: model = blackvue_parser.extract_camera_model(fp) diff --git a/tests/cli/camm_parser.py b/tests/cli/camm_parser.py index d9fd0a5fe..d44700f51 100644 --- a/tests/cli/camm_parser.py +++ b/tests/cli/camm_parser.py @@ -1,33 +1,51 @@ import argparse import dataclasses +import datetime import json import pathlib +import typing as T import gpxpy import gpxpy.gpx -from mapillary_tools import telemetry, utils +from mapillary_tools import geo, utils from mapillary_tools.camm import camm_parser -from mapillary_tools.geotag import utils as geotag_utils -def _parse_gpx(path: pathlib.Path): - with path.open("rb") as fp: - return camm_parser.extract_points(fp) +def _convert_points_to_gpx_segment( + points: T.Sequence[geo.Point], +) -> gpxpy.gpx.GPXTrackSegment: + gpx_segment = gpxpy.gpx.GPXTrackSegment() + for point in points: + gpx_segment.points.append( + gpxpy.gpx.GPXTrackPoint( + point.lat, + point.lon, + elevation=point.alt, + time=datetime.datetime.fromtimestamp(point.time, datetime.timezone.utc), + ) + ) + return gpx_segment def _convert(path: pathlib.Path): - points = _parse_gpx(path) - if points is None: - raise RuntimeError(f"Invalid CAMM video {path}") - track = gpxpy.gpx.GPXTrack() - track.name = path.name - track.segments.append(geotag_utils.convert_points_to_gpx_segment(points)) - with open(path, "rb") as fp: - make, model = camm_parser.extract_camera_make_and_model(fp) - make_model = json.dumps({"make": make, "model": model}) + + track.name = str(path) + + with path.open("rb") as fp: + camm_info = camm_parser.extract_camm_info(fp) + + if camm_info is None: + track.description = "Invalid CAMM video" + return track + + points = T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps) + track.segments.append(_convert_points_to_gpx_segment(points)) + + make_model = json.dumps({"make": camm_info.make, "model": camm_info.model}) track.description = f"Extracted from {make_model}" + return track @@ -50,26 +68,27 @@ def main(): for path in video_paths: with path.open("rb") as fp: - telemetry_measurements = camm_parser.extract_telemetry_data(fp) - - accls = [] - gyros = [] - magns = [] - if telemetry_measurements: - for m in telemetry_measurements: - if isinstance(m, telemetry.AccelerationData): - accls.append(m) - elif isinstance(m, telemetry.GyroscopeData): - gyros.append(m) - elif isinstance(m, telemetry.MagnetometerData): - magns.append(m) - - if "accl" in imu_option: - print(json.dumps([dataclasses.asdict(accl) for accl in accls])) - if "gyro" in imu_option: - print(json.dumps([dataclasses.asdict(gyro) for gyro in gyros])) - if "magn" in imu_option: - print(json.dumps([dataclasses.asdict(magn) for magn in magns])) + camm_info = camm_parser.extract_camm_info(fp, telemetry_only=True) + + if camm_info: + if "accl" in imu_option: + print( + json.dumps( + [dataclasses.asdict(accl) for accl in camm_info.accl or []] + ) + ) + if "gyro" in imu_option: + print( + json.dumps( + [dataclasses.asdict(gyro) for gyro in camm_info.gyro or []] + ) + ) + if "magn" in imu_option: + print( + json.dumps( + [dataclasses.asdict(magn) for magn in camm_info.magn or []] + ) + ) else: gpx = gpxpy.gpx.GPX() for path in video_paths: diff --git a/tests/unit/test_camm_parser.py b/tests/unit/test_camm_parser.py index dd86fa6af..1fdc6e6fa 100644 --- a/tests/unit/test_camm_parser.py +++ b/tests/unit/test_camm_parser.py @@ -61,21 +61,17 @@ def encode_decode_empty_camm_mp4(metadata: types.VideoMetadata) -> types.VideoMe ) # extract points - points = camm_parser.extract_points(T.cast(T.BinaryIO, target_fp)) + camm_info = camm_parser.extract_camm_info(T.cast(T.BinaryIO, target_fp)) - # extract make/model - target_fp.seek(0, io.SEEK_SET) - make, model = camm_parser.extract_camera_make_and_model( - T.cast(T.BinaryIO, target_fp) - ) + assert camm_info is not None # return metadata return types.VideoMetadata( Path(""), filetype=types.FileType.CAMM, - points=points or [], - make=make, - model=model, + points=T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps), + make=camm_info.make, + model=camm_info.model, )