Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from __future__ import annotations

import dataclasses

import json
import logging
import re
import typing as T

import pynmea2

from .. import geo
from ..mp4 import simple_mp4_parser as sparser
from . import geo
from .mp4 import simple_mp4_parser as sparser


LOG = logging.getLogger(__name__)
Expand All @@ -25,31 +29,45 @@
)


def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]:
for line_bytes in gps_data.splitlines():
match = NMEA_LINE_REGEX.match(line_bytes)
if match is None:
continue
nmea_line_bytes = match.group(2)
if nmea_line_bytes.startswith(b"$GPGGA"):
try:
nmea_line = nmea_line_bytes.decode("utf8")
except UnicodeDecodeError:
continue
try:
nmea = pynmea2.parse(nmea_line)
except pynmea2.nmea.ParseError:
continue
if not nmea.is_valid:
continue
epoch_ms = int(match.group(1))
yield geo.Point(
time=epoch_ms,
lat=nmea.latitude,
lon=nmea.longitude,
alt=nmea.altitude,
angle=None,
)
@dataclasses.dataclass
class BlackVueInfo:
# None and [] are equivalent here. Use None as default because:
# ValueError: mutable default <class 'list'> for field gps is not allowed: use default_factory
gps: list[geo.Point] | None = None
make: str = "BlackVue"
model: str = ""


def extract_blackvue_info(fp: T.BinaryIO) -> BlackVueInfo | None:
try:
gps_data = sparser.parse_mp4_data_first(fp, [b"free", b"gps "])
except sparser.ParsingError:
gps_data = None

if gps_data is None:
return None

points = list(_parse_gps_box(gps_data))
points.sort(key=lambda p: p.time)

if points:
first_point_time = points[0].time
for p in points:
p.time = (p.time - first_point_time) / 1000

# Camera model
try:
cprt_bytes = sparser.parse_mp4_data_first(fp, [b"free", b"cprt"])
except sparser.ParsingError:
cprt_bytes = None
model = ""

if cprt_bytes is None:
model = ""
else:
model = _extract_camera_model_from_cprt(cprt_bytes)

return BlackVueInfo(model=model, gps=points)


def extract_camera_model(fp: T.BinaryIO) -> str:
Expand All @@ -61,6 +79,10 @@ def extract_camera_model(fp: T.BinaryIO) -> str:
if cprt_bytes is None:
return ""

return _extract_camera_model_from_cprt(cprt_bytes)


def _extract_camera_model_from_cprt(cprt_bytes: bytes) -> str:
# examples: b' {"model":"DR900X Plus","ver":0.918,"lang":"English","direct":1,"psn":"","temp":34,"GPS":1}\x00'
# b' Pittasoft Co., Ltd.;DR900S-1CH;1.008;English;1;D90SS1HAE00661;T69;\x00'
cprt_bytes = cprt_bytes.strip().strip(b"\x00")
Expand Down Expand Up @@ -89,19 +111,28 @@ def extract_camera_model(fp: T.BinaryIO) -> str:
return ""


def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]:
gps_data = sparser.parse_mp4_data_first(fp, [b"free", b"gps "])
if gps_data is None:
return None

points = list(_parse_gps_box(gps_data))
if not points:
return points

points.sort(key=lambda p: p.time)

first_point_time = points[0].time
for p in points:
p.time = (p.time - first_point_time) / 1000

return points
def _parse_gps_box(gps_data: bytes) -> T.Generator[geo.Point, None, None]:
for line_bytes in gps_data.splitlines():
match = NMEA_LINE_REGEX.match(line_bytes)
if match is None:
continue
nmea_line_bytes = match.group(2)
if nmea_line_bytes.startswith(b"$GPGGA"):
try:
nmea_line = nmea_line_bytes.decode("utf8")
except UnicodeDecodeError:
continue
try:
nmea = pynmea2.parse(nmea_line)
except pynmea2.nmea.ParseError:
continue
if not nmea.is_valid:
continue
epoch_ms = int(match.group(1))
yield geo.Point(
time=epoch_ms,
lat=nmea.latitude,
lon=nmea.longitude,
alt=nmea.altitude,
angle=None,
)
27 changes: 11 additions & 16 deletions mapillary_tools/geotag/geotag_videos_from_video.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from __future__ import annotations

import io
import typing as T
from pathlib import Path

from .. import exceptions, geo, telemetry, types, utils
from .. import blackvue_parser, exceptions, geo, telemetry, types, utils
from ..camm import camm_parser
from ..gpmf import gpmf_gps_filter, gpmf_parser
from ..types import FileType
from . import blackvue_parser
from .geotag_from_generic import GenericVideoExtractor, GeotagVideosFromGeneric


Expand Down Expand Up @@ -71,26 +69,23 @@ def extract(self) -> types.VideoMetadataOrError:
class BlackVueVideoExtractor(GenericVideoExtractor):
def extract(self) -> types.VideoMetadataOrError:
with self.video_path.open("rb") as fp:
points = blackvue_parser.extract_points(fp)
blackvue_info = blackvue_parser.extract_blackvue_info(fp)

if points is None:
raise exceptions.MapillaryVideoGPSNotFoundError(
"No GPS data found from the video"
)

if not points:
raise exceptions.MapillaryGPXEmptyError("Empty GPS data found")
if blackvue_info is None:
raise exceptions.MapillaryVideoGPSNotFoundError(
"No GPS data found from the video"
)

fp.seek(0, io.SEEK_SET)
make, model = "BlackVue", blackvue_parser.extract_camera_model(fp)
if not blackvue_info.gps:
raise exceptions.MapillaryGPXEmptyError("Empty GPS data found")

video_metadata = types.VideoMetadata(
filename=self.video_path,
filesize=utils.get_file_size(self.video_path),
filetype=FileType.BLACKVUE,
points=points,
make=make,
model=model,
points=blackvue_info.gps or [],
make=blackvue_info.make,
model=blackvue_info.model,
)

return video_metadata
Expand Down
53 changes: 34 additions & 19 deletions mapillary_tools/video_data_extraction/extractors/blackvue_parser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import functools

import typing as T

from ... import geo
from ...geotag import blackvue_parser
from ...mp4 import simple_mp4_parser as sparser
from ... import blackvue_parser, geo
from .base_parser import BaseParser


Expand All @@ -13,22 +15,35 @@ class BlackVueParser(BaseParser):

pointsFound: bool = False

def extract_points(self) -> T.Sequence[geo.Point]:
@functools.cached_property
def extract_blackvue_info(self) -> blackvue_parser.BlackVueInfo | None:
source_path = self.geotag_source_path
if not source_path:
return []
return None

with source_path.open("rb") as fp:
try:
points = blackvue_parser.extract_points(fp) or []
self.pointsFound = len(points) > 0
return points
except sparser.ParsingError:
return []

def extract_make(self) -> T.Optional[str]:
# If no points were found, assume this is not a BlackVue
return "Blackvue" if self.pointsFound else None

def extract_model(self) -> T.Optional[str]:
with self.videoPath.open("rb") as fp:
return blackvue_parser.extract_camera_model(fp) or None
return blackvue_parser.extract_blackvue_info(fp)

def extract_points(self) -> T.Sequence[geo.Point]:
blackvue_info = self.extract_blackvue_info

if blackvue_info is None:
return []

return blackvue_info.gps or []

def extract_make(self) -> str | None:
blackvue_info = self.extract_blackvue_info

if blackvue_info is None:
return None

return blackvue_info.make

def extract_model(self) -> str | None:
blackvue_info = self.extract_blackvue_info

if blackvue_info is None:
return None

return blackvue_info.model
24 changes: 11 additions & 13 deletions tests/cli/blackvue_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
import gpxpy
import gpxpy.gpx

from mapillary_tools import geo, utils
from mapillary_tools.geotag import blackvue_parser
from mapillary_tools import blackvue_parser, geo, utils


def _convert_points_to_gpx_segment(
Expand All @@ -28,24 +27,23 @@ def _convert_points_to_gpx_segment(
return gpx_segment


def _parse_gpx(path: pathlib.Path) -> list[geo.Point] | None:
with path.open("rb") as fp:
points = blackvue_parser.extract_points(fp)
return points


def _convert_to_track(path: pathlib.Path):
track = gpxpy.gpx.GPXTrack()
points = _parse_gpx(path)
if points is None:
raise RuntimeError(f"Invalid BlackVue video {path}")
track.name = str(path)

segment = _convert_points_to_gpx_segment(points)
with path.open("rb") as fp:
blackvue_info = blackvue_parser.extract_blackvue_info(fp)

if blackvue_info is None:
track.description = "Invalid BlackVue video"
return track

segment = _convert_points_to_gpx_segment(blackvue_info.gps or [])
track.segments.append(segment)
with path.open("rb") as fp:
model = blackvue_parser.extract_camera_model(fp)
track.description = f"Extracted from {model}"
track.name = path.name

return track


Expand Down
9 changes: 4 additions & 5 deletions tests/unit/test_blackvue_parser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import io

import mapillary_tools.geo as geo

from mapillary_tools.geotag import blackvue_parser
from mapillary_tools import blackvue_parser
from mapillary_tools.mp4 import construct_mp4_parser as cparser


Expand Down Expand Up @@ -42,8 +41,8 @@ def test_parse_points():

box = {"type": b"free", "data": [{"type": b"gps ", "data": gps_data}]}
data = cparser.Box32ConstructBuilder({b"free": {}}).Box.build(box)
x = blackvue_parser.extract_points(io.BytesIO(data))
assert x is not None
info = blackvue_parser.extract_blackvue_info(io.BytesIO(data))
assert info is not None
assert [
geo.Point(
time=0.0, lat=38.8861575, lon=-76.99239516666667, alt=10.2, angle=None
Expand All @@ -54,4 +53,4 @@ def test_parse_points():
geo.Point(
time=0.968, lat=38.88615816666667, lon=-76.992434, alt=7.7, angle=None
),
] == list(x)
] == list(info.gps or [])
Loading