diff --git a/mapillary_tools/geotag/blackvue_parser.py b/mapillary_tools/blackvue_parser.py similarity index 66% rename from mapillary_tools/geotag/blackvue_parser.py rename to mapillary_tools/blackvue_parser.py index cddd75e60..e7f30e79c 100644 --- a/mapillary_tools/geotag/blackvue_parser.py +++ b/mapillary_tools/blackvue_parser.py @@ -1,3 +1,7 @@ +from __future__ import annotations + +import dataclasses + import json import logging import re @@ -5,8 +9,8 @@ import pynmea2 -from .. import geo -from ..mp4 import simple_mp4_parser as sparser +from . import geo +from .mp4 import simple_mp4_parser as sparser LOG = logging.getLogger(__name__) @@ -25,31 +29,45 @@ ) -def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]: - for line_bytes in gps_data.splitlines(): - match = NMEA_LINE_REGEX.match(line_bytes) - if match is None: - continue - nmea_line_bytes = match.group(2) - if nmea_line_bytes.startswith(b"$GPGGA"): - try: - nmea_line = nmea_line_bytes.decode("utf8") - except UnicodeDecodeError: - continue - try: - nmea = pynmea2.parse(nmea_line) - except pynmea2.nmea.ParseError: - continue - if not nmea.is_valid: - continue - epoch_ms = int(match.group(1)) - yield geo.Point( - time=epoch_ms, - lat=nmea.latitude, - lon=nmea.longitude, - alt=nmea.altitude, - angle=None, - ) +@dataclasses.dataclass +class BlackVueInfo: + # None and [] are equivalent here. Use None as default because: + # ValueError: mutable default for field gps is not allowed: use default_factory + gps: list[geo.Point] | None = None + make: str = "BlackVue" + model: str = "" + + +def extract_blackvue_info(fp: T.BinaryIO) -> BlackVueInfo | None: + try: + gps_data = sparser.parse_mp4_data_first(fp, [b"free", b"gps "]) + except sparser.ParsingError: + gps_data = None + + if gps_data is None: + return None + + points = list(_parse_gps_box(gps_data)) + points.sort(key=lambda p: p.time) + + if points: + first_point_time = points[0].time + for p in points: + p.time = (p.time - first_point_time) / 1000 + + # Camera model + try: + cprt_bytes = sparser.parse_mp4_data_first(fp, [b"free", b"cprt"]) + except sparser.ParsingError: + cprt_bytes = None + model = "" + + if cprt_bytes is None: + model = "" + else: + model = _extract_camera_model_from_cprt(cprt_bytes) + + return BlackVueInfo(model=model, gps=points) def extract_camera_model(fp: T.BinaryIO) -> str: @@ -61,6 +79,10 @@ def extract_camera_model(fp: T.BinaryIO) -> str: if cprt_bytes is None: return "" + return _extract_camera_model_from_cprt(cprt_bytes) + + +def _extract_camera_model_from_cprt(cprt_bytes: bytes) -> str: # examples: b' {"model":"DR900X Plus","ver":0.918,"lang":"English","direct":1,"psn":"","temp":34,"GPS":1}\x00' # b' Pittasoft Co., Ltd.;DR900S-1CH;1.008;English;1;D90SS1HAE00661;T69;\x00' cprt_bytes = cprt_bytes.strip().strip(b"\x00") @@ -89,19 +111,28 @@ def extract_camera_model(fp: T.BinaryIO) -> str: return "" -def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]: - gps_data = sparser.parse_mp4_data_first(fp, [b"free", b"gps "]) - if gps_data is None: - return None - - points = list(_parse_gps_box(gps_data)) - if not points: - return points - - points.sort(key=lambda p: p.time) - - first_point_time = points[0].time - for p in points: - p.time = (p.time - first_point_time) / 1000 - - return points +def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]: + for line_bytes in gps_data.splitlines(): + match = NMEA_LINE_REGEX.match(line_bytes) + if match is None: + continue + nmea_line_bytes = match.group(2) + if nmea_line_bytes.startswith(b"$GPGGA"): + try: + nmea_line = nmea_line_bytes.decode("utf8") + except UnicodeDecodeError: + continue + try: + nmea = pynmea2.parse(nmea_line) + except pynmea2.nmea.ParseError: + continue + if not nmea.is_valid: + continue + epoch_ms = int(match.group(1)) + yield geo.Point( + time=epoch_ms, + lat=nmea.latitude, + lon=nmea.longitude, + alt=nmea.altitude, + angle=None, + ) diff --git a/mapillary_tools/geotag/geotag_videos_from_video.py b/mapillary_tools/geotag/geotag_videos_from_video.py index 77d239f4f..57c8229ac 100644 --- a/mapillary_tools/geotag/geotag_videos_from_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_video.py @@ -1,14 +1,12 @@ from __future__ import annotations -import io import typing as T from pathlib import Path -from .. import exceptions, geo, telemetry, types, utils +from .. import blackvue_parser, exceptions, geo, telemetry, types, utils from ..camm import camm_parser from ..gpmf import gpmf_gps_filter, gpmf_parser from ..types import FileType -from . import blackvue_parser from .geotag_from_generic import GenericVideoExtractor, GeotagVideosFromGeneric @@ -71,26 +69,23 @@ def extract(self) -> types.VideoMetadataOrError: class BlackVueVideoExtractor(GenericVideoExtractor): def extract(self) -> types.VideoMetadataOrError: with self.video_path.open("rb") as fp: - points = blackvue_parser.extract_points(fp) + blackvue_info = blackvue_parser.extract_blackvue_info(fp) - if points is None: - raise exceptions.MapillaryVideoGPSNotFoundError( - "No GPS data found from the video" - ) - - if not points: - raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") + if blackvue_info is None: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" + ) - fp.seek(0, io.SEEK_SET) - make, model = "BlackVue", blackvue_parser.extract_camera_model(fp) + if not blackvue_info.gps: + raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") video_metadata = types.VideoMetadata( filename=self.video_path, filesize=utils.get_file_size(self.video_path), filetype=FileType.BLACKVUE, - points=points, - make=make, - model=model, + points=blackvue_info.gps or [], + make=blackvue_info.make, + model=blackvue_info.model, ) return video_metadata diff --git a/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py b/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py index 9aef060f4..c1ba1cdb0 100644 --- a/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py @@ -1,8 +1,10 @@ +from __future__ import annotations + +import functools + import typing as T -from ... import geo -from ...geotag import blackvue_parser -from ...mp4 import simple_mp4_parser as sparser +from ... import blackvue_parser, geo from .base_parser import BaseParser @@ -13,22 +15,35 @@ class BlackVueParser(BaseParser): pointsFound: bool = False - def extract_points(self) -> T.Sequence[geo.Point]: + @functools.cached_property + def extract_blackvue_info(self) -> blackvue_parser.BlackVueInfo | None: source_path = self.geotag_source_path if not source_path: - return [] + return None + with source_path.open("rb") as fp: - try: - points = blackvue_parser.extract_points(fp) or [] - self.pointsFound = len(points) > 0 - return points - except sparser.ParsingError: - return [] - - def extract_make(self) -> T.Optional[str]: - # If no points were found, assume this is not a BlackVue - return "Blackvue" if self.pointsFound else None - - def extract_model(self) -> T.Optional[str]: - with self.videoPath.open("rb") as fp: - return blackvue_parser.extract_camera_model(fp) or None + return blackvue_parser.extract_blackvue_info(fp) + + def extract_points(self) -> T.Sequence[geo.Point]: + blackvue_info = self.extract_blackvue_info + + if blackvue_info is None: + return [] + + return blackvue_info.gps or [] + + def extract_make(self) -> str | None: + blackvue_info = self.extract_blackvue_info + + if blackvue_info is None: + return None + + return blackvue_info.make + + def extract_model(self) -> str | None: + blackvue_info = self.extract_blackvue_info + + if blackvue_info is None: + return None + + return blackvue_info.model diff --git a/tests/cli/blackvue_parser.py b/tests/cli/blackvue_parser.py index 7f368dc9c..1bb01a54a 100644 --- a/tests/cli/blackvue_parser.py +++ b/tests/cli/blackvue_parser.py @@ -8,8 +8,7 @@ import gpxpy import gpxpy.gpx -from mapillary_tools import geo, utils -from mapillary_tools.geotag import blackvue_parser +from mapillary_tools import blackvue_parser, geo, utils def _convert_points_to_gpx_segment( @@ -28,24 +27,23 @@ def _convert_points_to_gpx_segment( return gpx_segment -def _parse_gpx(path: pathlib.Path) -> list[geo.Point] | None: - with path.open("rb") as fp: - points = blackvue_parser.extract_points(fp) - return points - - def _convert_to_track(path: pathlib.Path): track = gpxpy.gpx.GPXTrack() - points = _parse_gpx(path) - if points is None: - raise RuntimeError(f"Invalid BlackVue video {path}") + track.name = str(path) - segment = _convert_points_to_gpx_segment(points) + with path.open("rb") as fp: + blackvue_info = blackvue_parser.extract_blackvue_info(fp) + + if blackvue_info is None: + track.description = "Invalid BlackVue video" + return track + + segment = _convert_points_to_gpx_segment(blackvue_info.gps or []) track.segments.append(segment) with path.open("rb") as fp: model = blackvue_parser.extract_camera_model(fp) track.description = f"Extracted from {model}" - track.name = path.name + return track diff --git a/tests/unit/test_blackvue_parser.py b/tests/unit/test_blackvue_parser.py index 0832a739f..2a430ce9b 100644 --- a/tests/unit/test_blackvue_parser.py +++ b/tests/unit/test_blackvue_parser.py @@ -1,8 +1,7 @@ import io import mapillary_tools.geo as geo - -from mapillary_tools.geotag import blackvue_parser +from mapillary_tools import blackvue_parser from mapillary_tools.mp4 import construct_mp4_parser as cparser @@ -42,8 +41,8 @@ def test_parse_points(): box = {"type": b"free", "data": [{"type": b"gps ", "data": gps_data}]} data = cparser.Box32ConstructBuilder({b"free": {}}).Box.build(box) - x = blackvue_parser.extract_points(io.BytesIO(data)) - assert x is not None + info = blackvue_parser.extract_blackvue_info(io.BytesIO(data)) + assert info is not None assert [ geo.Point( time=0.0, lat=38.8861575, lon=-76.99239516666667, alt=10.2, angle=None @@ -54,4 +53,4 @@ def test_parse_points(): geo.Point( time=0.968, lat=38.88615816666667, lon=-76.992434, alt=7.7, angle=None ), - ] == list(x) + ] == list(info.gps or [])