diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/geotag/gpmf_parser.py index 7feaf7134..d86deed72 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/geotag/gpmf_parser.py @@ -1,11 +1,13 @@ +import dataclasses import io +import itertools import pathlib import typing as T import construct as C -from .. import geo -from ..mp4 import mp4_sample_parser as sample_parser +from .. import geo, imu +from ..mp4.mp4_sample_parser import TrackBoxParser, MovieBoxParser, Sample """ Parsing GPS from GPMF data format stored in GoPros. See the GPMF spec: https://github.com/gopro/gpmf-parser @@ -125,6 +127,14 @@ class KLVDict(T.TypedDict): GPMFSampleData = C.GreedyRange(KLV) +@dataclasses.dataclass +class TelemetryData: + gps: T.List[geo.PointWithFix] + accl: T.List[imu.AccelerationData] + gyro: T.List[imu.GyroscopeData] + magn: T.List[imu.MagnetometerData] + + # A GPS5 stream example: # key = b'STRM' type = b'\x00' structure_size = 1 repeat = 400 # data = ListContainer: @@ -298,8 +308,120 @@ def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[geo.PointWithF return sample_points +# a sensor matrix with only [1,0,0, 0,-1,0, 0,0,1], is just a form of non-calibrated sensor orientation +def _is_matrix_calibration(matrix: T.Sequence[float]) -> bool: + for v in matrix: + if v not in [0, -1, 1]: + return True + return False + + +def _build_matrix( + orin: T.Union[bytes, T.Sequence[int]], orio: T.Union[bytes, T.Sequence[int]] +) -> T.Sequence[float]: + matrix = [] + + # list(b'aA') == [97, 65] + lower_a, upper_A = 97, 65 + + for out_char in orin: + for in_char in orio: + if in_char == out_char: + matrix.append(1.0) + elif (in_char - lower_a) == (out_char - upper_A): + matrix.append(-1.0) + elif (in_char - upper_A) == (out_char - lower_a): + matrix.append(-1.0) + else: + matrix.append(0.0) + + return matrix + + +def _apply_matrix( + matrix: T.Sequence[float], values: T.Sequence[float] +) -> T.Generator[float, None, None]: + size = len(values) + assert ( + len(matrix) == size * size + ), f"expecting a square matrix of size {size} x {size} but got {len(matrix)}" + + for y in range(size): + row_start = y * size + yield sum(matrix[row_start + x] * values[x] for x in range(size)) + + +def _flatten(nested: T.Sequence[T.Sequence[float]]) -> T.List[float]: + output: T.List[float] = [] + for row in nested: + output.extend(row) + return output + + +def _get_matrix(klv: T.Dict[bytes, KLVDict]) -> T.Optional[T.Sequence[float]]: + mtrx = klv.get(b"MTRX") + if mtrx is not None: + matrix: T.Sequence[float] = _flatten(mtrx["data"]) + if _is_matrix_calibration(matrix): + return matrix + + orin = klv.get(b"ORIN") + orio = klv.get(b"ORIO") + + if orin is not None and orio is not None: + matrix = _build_matrix(b"".join(orin["data"]), b"".join(orio["data"])) + return matrix + + return None + + +def _scale_and_calibrate( + stream: T.Sequence[KLVDict], key: bytes +) -> T.Generator[T.Sequence[float], None, None]: + indexed: T.Dict[bytes, KLVDict] = {klv["key"]: klv for klv in stream} + + klv = indexed.get(key) + if klv is None: + return + + scal_klv = indexed.get(b"SCAL") + + if scal_klv is not None: + # replace 0s with 1s to avoid division by zero + scals = [s or 1 for s in _flatten(scal_klv["data"])] + + if not scals: + scals = [1] + + if len(scals) == 1: + # infinite repeat + scales: T.Iterable[float] = itertools.repeat(scals[0]) + else: + scales = scals + + matrix = _get_matrix(indexed) + + for values in klv["data"]: + if matrix is None: + yield tuple(v / s for v, s in zip(values, scales)) + else: + yield tuple(v / s for v, s in zip(_apply_matrix(matrix, values), scales)) + + +def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes): + values: T.List[T.Sequence[float]] = [] + + for klv in stream: + if klv["key"] == b"STRM": + values = list(_scale_and_calibrate(klv["data"], key)) + if values: + break + + return values + + def _extract_dvnm_from_samples( - fp: T.BinaryIO, samples: T.Iterable[sample_parser.Sample] + fp: T.BinaryIO, samples: T.Iterable[Sample] ) -> T.Dict[int, bytes]: dvnm_by_dvid: T.Dict[int, bytes] = {} @@ -322,10 +444,13 @@ def _extract_dvnm_from_samples( def _extract_points_from_samples( - fp: T.BinaryIO, samples: T.Iterable[sample_parser.Sample] -) -> T.List[geo.PointWithFix]: + fp: T.BinaryIO, samples: T.Iterable[Sample] +) -> TelemetryData: # To keep GPS points from different devices separated points_by_dvid: T.Dict[int, T.List[geo.PointWithFix]] = {} + accls_by_dvid: T.Dict[int, T.List[imu.AccelerationData]] = {} + gyros_by_dvid: T.Dict[int, T.List[imu.GyroscopeData]] = {} + magns_by_dvid: T.Dict[int, T.List[imu.MagnetometerData]] = {} for sample in samples: fp.seek(sample.raw_sample.offset, io.SEEK_SET) @@ -335,6 +460,8 @@ def _extract_points_from_samples( # iterate devices devices = (klv for klv in gpmf_sample_data if klv["key"] == b"DEVC") for device in devices: + device_id = _find_first_device_id(device["data"]) + sample_points = _find_first_gps_stream(device["data"]) if sample_points: # interpolate timestamps in between @@ -342,51 +469,115 @@ def _extract_points_from_samples( for idx, point in enumerate(sample_points): point.time = sample.exact_time + avg_timedelta * idx - device_id = _find_first_device_id(device["data"]) device_points = points_by_dvid.setdefault(device_id, []) device_points.extend(sample_points) - values = list(points_by_dvid.values()) - return values[0] if values else [] + sample_accls = _find_first_telemetry_stream(device["data"], b"ACCL") + if sample_accls: + # interpolate timestamps in between + avg_delta = sample.exact_timedelta / len(sample_accls) + accls_by_dvid.setdefault(device_id, []).extend( + imu.AccelerationData( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (z, x, y, *_) in enumerate(sample_accls) + ) + + sample_gyros = _find_first_telemetry_stream(device["data"], b"GYRO") + if sample_gyros: + # interpolate timestamps in between + avg_delta = sample.exact_timedelta / len(sample_gyros) + gyros_by_dvid.setdefault(device_id, []).extend( + imu.GyroscopeData( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (z, x, y, *_) in enumerate(sample_gyros) + ) + + sample_magns = _find_first_telemetry_stream(device["data"], b"MAGN") + if sample_magns: + # interpolate timestamps in between + avg_delta = sample.exact_timedelta / len(sample_magns) + magns_by_dvid.setdefault(device_id, []).extend( + imu.MagnetometerData( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (z, x, y, *_) in enumerate(sample_magns) + ) + + return TelemetryData( + gps=list(points_by_dvid.values())[0] if points_by_dvid else [], + accl=list(accls_by_dvid.values())[0] if accls_by_dvid else [], + gyro=list(gyros_by_dvid.values())[0] if gyros_by_dvid else [], + magn=list(magns_by_dvid.values())[0] if magns_by_dvid else [], + ) def _is_gpmd_description(description: T.Dict) -> bool: return description["format"] == b"gpmd" -def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.PointWithFix]]: +def _contains_gpmd_description(track: TrackBoxParser) -> bool: + descriptions = track.extract_sample_descriptions() + return any(_is_gpmd_description(d) for d in descriptions) + + +def _filter_gpmd_samples(track: TrackBoxParser) -> T.Generator[Sample, None, None]: + for sample in track.extract_samples(): + if _is_gpmd_description(sample.description): + yield sample + + +def extract_points(fp: T.BinaryIO) -> T.List[geo.PointWithFix]: """ Return a list of points (could be empty) if it is a valid GoPro video, otherwise None """ - points = None - moov = sample_parser.MovieBoxParser.parse_stream(fp) + moov = MovieBoxParser.parse_stream(fp) for track in moov.extract_tracks(): - descriptions = track.extract_sample_descriptions() - if any(_is_gpmd_description(d) for d in descriptions): - gpmd_samples = ( - sample - for sample in track.extract_samples() - if _is_gpmd_description(sample.description) - ) - points = list(_extract_points_from_samples(fp, gpmd_samples)) + if _contains_gpmd_description(track): + gpmd_samples = _filter_gpmd_samples(track) + telemetry = _extract_points_from_samples(fp, gpmd_samples) # return the firstly found non-empty points - if points: - return points + if telemetry.gps: + return telemetry.gps + # points could be empty list or None here - return points + return [] + + +def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[TelemetryData]: + """ + Return the telemetry data from the first found GoPro GPMF track + """ + moov = MovieBoxParser.parse_stream(fp) + + for track in moov.extract_tracks(): + if _contains_gpmd_description(track): + gpmd_samples = _filter_gpmd_samples(track) + telemetry = _extract_points_from_samples(fp, gpmd_samples) + # return the firstly found non-empty points + if telemetry.gps: + return telemetry + + # points could be empty list or None here + return None def extract_all_device_names(fp: T.BinaryIO) -> T.Dict[int, bytes]: - moov = sample_parser.MovieBoxParser.parse_stream(fp) + moov = MovieBoxParser.parse_stream(fp) for track in moov.extract_tracks(): - descriptions = track.extract_sample_descriptions() - if any(_is_gpmd_description(d) for d in descriptions): - gpmd_samples = ( - sample - for sample in track.extract_samples() - if _is_gpmd_description(sample.description) - ) + if _contains_gpmd_description(track): + gpmd_samples = _filter_gpmd_samples(track) device_names = _extract_dvnm_from_samples(fp, gpmd_samples) if device_names: return device_names diff --git a/mapillary_tools/imu.py b/mapillary_tools/imu.py new file mode 100644 index 000000000..d49c4172b --- /dev/null +++ b/mapillary_tools/imu.py @@ -0,0 +1,25 @@ +import typing as T + + +# Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction. +class GyroscopeData(T.NamedTuple): + time: float + x: float + y: float + z: float + + +# Accelerometer reading in meters/second^2 along XYZ axes of the camera. +class AccelerationData(T.NamedTuple): + time: float + x: float + y: float + z: float + + +# Ambient magnetic field. +class MagnetometerData(T.NamedTuple): + time: float + x: float + y: float + z: float diff --git a/tests/cli/gpmf_parser.py b/tests/cli/gpmf_parser.py index 0ad1ce390..713388fb5 100644 --- a/tests/cli/gpmf_parser.py +++ b/tests/cli/gpmf_parser.py @@ -1,5 +1,6 @@ import argparse import datetime +import io import json import pathlib import typing as T @@ -10,6 +11,7 @@ import mapillary_tools.geotag.gpmf_parser as gpmf_parser import mapillary_tools.geotag.gps_filter as gps_filter +from mapillary_tools.mp4 import mp4_sample_parser import mapillary_tools.utils as utils @@ -99,6 +101,17 @@ def _convert_geojson(path: pathlib.Path): return features +def _parse_samples(path: pathlib.Path) -> T.Generator[T.Dict, None, None]: + with path.open("rb") as fp: + parser = mp4_sample_parser.MovieBoxParser.parse_stream(fp) + for t in parser.extract_tracks(): + for sample in t.extract_samples(): + if gpmf_parser._is_gpmd_description(sample.description): + fp.seek(sample.raw_sample.offset, io.SEEK_SET) + data = fp.read(sample.raw_sample.size) + yield T.cast(T.Dict, gpmf_parser.GPMFSampleData.parse(data)) + + def _parse_args(): parser = argparse.ArgumentParser() parser.add_argument("path", nargs="+", help="Path to video file or directory") @@ -113,13 +126,12 @@ def main(): parsed_args = _parse_args() features = [] - samples = [] + parsed_samples = [] gpx = gpxpy.gpx.GPX() def _process(path: pathlib.Path): if parsed_args.dump: - with path.open("rb") as fp: - samples.extend(gpmf_parser.iterate_gpmd_sample_data(fp)) + parsed_samples.extend(_parse_samples(path)) elif parsed_args.geojson: features.extend(_convert_geojson(path)) else: @@ -129,7 +141,7 @@ def _process(path: pathlib.Path): _process(path) if parsed_args.dump: - for sample in samples: + for sample in parsed_samples: print(sample) else: if features: