diff --git a/mapillary_tools/geotag/geotag_videos_from_video.py b/mapillary_tools/geotag/geotag_videos_from_video.py index 833b5bc5d..328ddc021 100644 --- a/mapillary_tools/geotag/geotag_videos_from_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_video.py @@ -68,47 +68,45 @@ def _extract_video_metadata( if ( filetypes is None or types.FileType.VIDEO in filetypes - or types.FileType.CAMM in filetypes + or types.FileType.GOPRO in filetypes ): with video_path.open("rb") as fp: try: - points = camm_parser.extract_points(fp) + gopro_info = gpmf_parser.extract_gopro_info(fp) except sparser.ParsingError: - points = None - - if points is not None: - fp.seek(0, io.SEEK_SET) - make, model = camm_parser.extract_camera_make_and_model(fp) - return types.VideoMetadata( - filename=video_path, - md5sum=None, - filesize=utils.get_file_size(video_path), - filetype=types.FileType.CAMM, - points=points, - make=make, - model=model, - ) + gopro_info = None + + if gopro_info is not None: + return types.VideoMetadata( + filename=video_path, + md5sum=None, + filesize=utils.get_file_size(video_path), + filetype=types.FileType.GOPRO, + points=T.cast(T.List[geo.Point], gopro_info.gps), + make=gopro_info.make, + model=gopro_info.model, + ) if ( filetypes is None or types.FileType.VIDEO in filetypes - or types.FileType.GOPRO in filetypes + or types.FileType.CAMM in filetypes ): with video_path.open("rb") as fp: try: - points_with_fix = gpmf_parser.extract_points(fp) + points = camm_parser.extract_points(fp) except sparser.ParsingError: - points_with_fix = None + points = None - if points_with_fix is not None: + if points is not None: fp.seek(0, io.SEEK_SET) - make, model = "GoPro", gpmf_parser.extract_camera_model(fp) + make, model = camm_parser.extract_camera_make_and_model(fp) return types.VideoMetadata( filename=video_path, md5sum=None, filesize=utils.get_file_size(video_path), - filetype=types.FileType.GOPRO, - points=T.cast(T.List[geo.Point], points_with_fix), + filetype=types.FileType.CAMM, + points=points, make=make, model=model, ) @@ -176,9 +174,6 @@ def geotag_video( ) if stationary: raise exceptions.MapillaryStationaryVideoError("Stationary video") - - LOG.debug("Calculating MD5 checksum for %s", str(video_metadata.filename)) - video_metadata.update_md5sum() except Exception as ex: if not isinstance(ex, exceptions.MapillaryDescriptionError): LOG.warning( diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/geotag/gpmf_parser.py index 9e1360660..6b1bdd7d1 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/geotag/gpmf_parser.py @@ -1,8 +1,8 @@ +from __future__ import annotations import dataclasses import datetime import io import itertools -import pathlib import typing as T import construct as C @@ -130,11 +130,95 @@ class KLVDict(T.TypedDict): @dataclasses.dataclass -class TelemetryData: - gps: T.List[GPSPoint] - accl: T.List[telemetry.AccelerationData] - gyro: T.List[telemetry.GyroscopeData] - magn: T.List[telemetry.MagnetometerData] +class GoProInfo: + # None indicates the data has been extracted, + # while [] indicates extracetd but no data point found + gps: list[GPSPoint] | None = None + accl: list[telemetry.AccelerationData] | None = None + gyro: list[telemetry.GyroscopeData] | None = None + magn: list[telemetry.MagnetometerData] | None = None + make: str = "GoPro" + model: str = "" + + +def extract_gopro_info( + fp: T.BinaryIO, telemetry_only: bool = False +) -> T.Optional[GoProInfo]: + """ + Return the GoProInfo object if found. None indicates it's not a valid GoPro video. + """ + + moov = MovieBoxParser.parse_stream(fp) + for track in moov.extract_tracks(): + if _contains_gpmd_description(track): + gpmd_samples = _filter_gpmd_samples(track) + + if telemetry_only: + points_by_dvid: dict[int, list[GPSPoint]] | None = None + dvnm_by_dvid: dict[int, bytes] | None = None + accls_by_dvid: dict[int, list[telemetry.AccelerationData]] | None = {} + gyros_by_dvid: dict[int, list[telemetry.GyroscopeData]] | None = {} + magns_by_dvid: dict[int, list[telemetry.MagnetometerData]] | None = {} + else: + points_by_dvid = {} + dvnm_by_dvid = {} + accls_by_dvid = None + gyros_by_dvid = None + magns_by_dvid = None + + _load_telemetry_from_samples( + fp, + gpmd_samples, + points_by_dvid=points_by_dvid, + accls_by_dvid=accls_by_dvid, + gyros_by_dvid=gyros_by_dvid, + magns_by_dvid=magns_by_dvid, + dvnm_by_dvid=dvnm_by_dvid, + ) + + gopro_info = GoProInfo() + + if points_by_dvid is not None: + gps_points = list(points_by_dvid.values())[0] if points_by_dvid else [] + # backfill forward from the first point with epoch time + _backfill_gps_timestamps(gps_points) + # backfill backward from the first point with epoch time in reversed order + _backfill_gps_timestamps(reversed(gps_points)) + gopro_info.gps = gps_points + + if accls_by_dvid is not None: + gopro_info.accl = ( + list(accls_by_dvid.values())[0] if accls_by_dvid else [] + ) + + if gyros_by_dvid is not None: + gopro_info.gyro = ( + list(gyros_by_dvid.values())[0] if gyros_by_dvid else [] + ) + + if magns_by_dvid is not None: + gopro_info.magn = ( + list(magns_by_dvid.values())[0] if magns_by_dvid else [] + ) + + if dvnm_by_dvid is not None: + gopro_info.model = _extract_camera_model_from_devices(dvnm_by_dvid) + + return gopro_info + + return None + + +def extract_camera_model(fp: T.BinaryIO) -> str: + moov = MovieBoxParser.parse_stream(fp) + for track in moov.extract_tracks(): + if _contains_gpmd_description(track): + gpmd_samples = _filter_gpmd_samples(track) + dvnm_by_dvid: dict[int, bytes] = {} + _load_telemetry_from_samples(fp, gpmd_samples, dvnm_by_dvid=dvnm_by_dvid) + return _extract_camera_model_from_devices(dvnm_by_dvid) + + return "" def _gps5_timestamp_to_epoch_time(dtstr: str): @@ -181,7 +265,7 @@ def _gps5_timestamp_to_epoch_time(dtstr: str): # [378081666, -1224280064, 9621, 1492, 138], # [378081662, -1224280049, 9592, 1476, 150], # ] -def gps5_from_stream( +def _gps5_from_stream( stream: T.Sequence[KLVDict], ) -> T.Generator[GPSPoint, None, None]: indexed: T.Dict[bytes, T.List[T.List[T.Any]]] = { @@ -265,7 +349,7 @@ def _get_gps_type(input) -> bytes: return final -def gps9_from_stream( +def _gps9_from_stream( stream: T.Sequence[KLVDict], ) -> T.Generator[GPSPoint, None, None]: NUM_VALUES = 9 @@ -357,11 +441,11 @@ def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[GPSPoint]: for klv in stream: if klv["key"] == b"STRM": - sample_points = list(gps9_from_stream(klv["data"])) + sample_points = list(_gps9_from_stream(klv["data"])) if sample_points: break - sample_points = list(gps5_from_stream(klv["data"])) + sample_points = list(_gps5_from_stream(klv["data"])) if sample_points: break @@ -480,29 +564,6 @@ def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes): return values -def _extract_dvnm_from_samples( - fp: T.BinaryIO, samples: T.Iterable[Sample] -) -> T.Dict[int, bytes]: - dvnm_by_dvid: T.Dict[int, bytes] = {} - - for sample in samples: - fp.seek(sample.raw_sample.offset, io.SEEK_SET) - data = fp.read(sample.raw_sample.size) - gpmf_sample_data = T.cast(T.Dict, GPMFSampleData.parse(data)) - - # iterate devices - devices = (klv for klv in gpmf_sample_data if klv["key"] == b"DEVC") - for device in devices: - device_id = _find_first_device_id(device["data"]) - for klv in device["data"]: - if klv["key"] == b"DVNM" and klv["data"]: - # klv["data"] could be [b"H", b"e", b"r", b"o", b"8", b" ", b"B", b"l", b"a", b"c", b"k"] - # or [b"Hero8 Black"] - dvnm_by_dvid[device_id] = b"".join(klv["data"]) - - return dvnm_by_dvid - - def _backfill_gps_timestamps(gps_points: T.Iterable[GPSPoint]) -> None: it = iter(gps_points) @@ -525,91 +586,86 @@ def _backfill_gps_timestamps(gps_points: T.Iterable[GPSPoint]) -> None: last = point -def _extract_points_from_samples( - fp: T.BinaryIO, samples: T.Iterable[Sample] -) -> TelemetryData: - # To keep GPS points from different devices separated - points_by_dvid: T.Dict[int, T.List[GPSPoint]] = {} - accls_by_dvid: T.Dict[int, T.List[telemetry.AccelerationData]] = {} - gyros_by_dvid: T.Dict[int, T.List[telemetry.GyroscopeData]] = {} - magns_by_dvid: T.Dict[int, T.List[telemetry.MagnetometerData]] = {} - - for sample in samples: - fp.seek(sample.raw_sample.offset, io.SEEK_SET) - data = fp.read(sample.raw_sample.size) - gpmf_sample_data = T.cast(T.Dict, GPMFSampleData.parse(data)) +# This API is designed for performance +def _load_telemetry_from_samples( + fp: T.BinaryIO, + samples: T.Iterable[Sample], + points_by_dvid: dict[int, list[GPSPoint]] | None = None, + accls_by_dvid: dict[int, list[telemetry.AccelerationData]] | None = None, + gyros_by_dvid: dict[int, list[telemetry.GyroscopeData]] | None = None, + magns_by_dvid: dict[int, list[telemetry.MagnetometerData]] | None = None, + dvnm_by_dvid: dict[int, bytes] | None = None, +) -> None: + for sample, sample_data in _iterate_read_sample_data(fp, samples): + gpmf_sample_data = T.cast(T.Dict, GPMFSampleData.parse(sample_data)) # iterate devices devices = (klv for klv in gpmf_sample_data if klv["key"] == b"DEVC") for device in devices: device_id = _find_first_device_id(device["data"]) - sample_points = _find_first_gps_stream(device["data"]) - if sample_points: - # interpolate timestamps in between - avg_timedelta = sample.exact_timedelta / len(sample_points) - for idx, point in enumerate(sample_points): - point.time = sample.exact_time + avg_timedelta * idx - - device_points = points_by_dvid.setdefault(device_id, []) - device_points.extend(sample_points) - - sample_accls = _find_first_telemetry_stream(device["data"], b"ACCL") - if sample_accls: - # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_accls) - accls_by_dvid.setdefault(device_id, []).extend( - telemetry.AccelerationData( - time=sample.exact_time + avg_delta * idx, - x=x, - y=y, - z=z, + if dvnm_by_dvid is not None: + for klv in device["data"]: + if klv["key"] == b"DVNM" and klv["data"]: + # klv["data"] could be [b"H", b"e", b"r", b"o", b"8", b" ", b"B", b"l", b"a", b"c", b"k"] + # or [b"Hero8 Black"] + dvnm_by_dvid[device_id] = b"".join(klv["data"]) + + if points_by_dvid is not None: + sample_points = _find_first_gps_stream(device["data"]) + if sample_points: + # interpolate timestamps in between + avg_timedelta = sample.exact_timedelta / len(sample_points) + for idx, point in enumerate(sample_points): + point.time = sample.exact_time + avg_timedelta * idx + + device_points = points_by_dvid.setdefault(device_id, []) + device_points.extend(sample_points) + + if accls_by_dvid is not None: + sample_accls = _find_first_telemetry_stream(device["data"], b"ACCL") + if sample_accls: + # interpolate timestamps in between + avg_delta = sample.exact_timedelta / len(sample_accls) + accls_by_dvid.setdefault(device_id, []).extend( + telemetry.AccelerationData( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (z, x, y, *_) in enumerate(sample_accls) ) - for idx, (z, x, y, *_) in enumerate(sample_accls) - ) - sample_gyros = _find_first_telemetry_stream(device["data"], b"GYRO") - if sample_gyros: - # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_gyros) - gyros_by_dvid.setdefault(device_id, []).extend( - telemetry.GyroscopeData( - time=sample.exact_time + avg_delta * idx, - x=x, - y=y, - z=z, + if gyros_by_dvid is not None: + sample_gyros = _find_first_telemetry_stream(device["data"], b"GYRO") + if sample_gyros: + # interpolate timestamps in between + avg_delta = sample.exact_timedelta / len(sample_gyros) + gyros_by_dvid.setdefault(device_id, []).extend( + telemetry.GyroscopeData( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (z, x, y, *_) in enumerate(sample_gyros) ) - for idx, (z, x, y, *_) in enumerate(sample_gyros) - ) - sample_magns = _find_first_telemetry_stream(device["data"], b"MAGN") - if sample_magns: - # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_magns) - magns_by_dvid.setdefault(device_id, []).extend( - telemetry.MagnetometerData( - time=sample.exact_time + avg_delta * idx, - x=x, - y=y, - z=z, + if magns_by_dvid is not None: + sample_magns = _find_first_telemetry_stream(device["data"], b"MAGN") + if sample_magns: + # interpolate timestamps in between + avg_delta = sample.exact_timedelta / len(sample_magns) + magns_by_dvid.setdefault(device_id, []).extend( + telemetry.MagnetometerData( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (z, x, y, *_) in enumerate(sample_magns) ) - for idx, (z, x, y, *_) in enumerate(sample_magns) - ) - - gps_points = list(points_by_dvid.values())[0] if points_by_dvid else [] - - # backfill forward from the first point with epoch time - _backfill_gps_timestamps(gps_points) - - # backfill backward from the first point with epoch time in reversed order - _backfill_gps_timestamps(reversed(gps_points)) - - return TelemetryData( - gps=gps_points, - accl=list(accls_by_dvid.values())[0] if accls_by_dvid else [], - gyro=list(gyros_by_dvid.values())[0] if gyros_by_dvid else [], - magn=list(magns_by_dvid.values())[0] if magns_by_dvid else [], - ) def _is_gpmd_description(description: T.Dict) -> bool: @@ -627,56 +683,7 @@ def _filter_gpmd_samples(track: TrackBoxParser) -> T.Generator[Sample, None, Non yield sample -def extract_points(fp: T.BinaryIO) -> T.List[GPSPoint]: - """ - Return a list of points (could be empty) if it is a valid GoPro video, - otherwise None - """ - moov = MovieBoxParser.parse_stream(fp) - for track in moov.extract_tracks(): - if _contains_gpmd_description(track): - gpmd_samples = _filter_gpmd_samples(track) - telemetry = _extract_points_from_samples(fp, gpmd_samples) - # return the firstly found non-empty points - if telemetry.gps: - return telemetry.gps - - # points could be empty list or None here - return [] - - -def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[TelemetryData]: - """ - Return the telemetry data from the first found GoPro GPMF track - """ - moov = MovieBoxParser.parse_stream(fp) - - for track in moov.extract_tracks(): - if _contains_gpmd_description(track): - gpmd_samples = _filter_gpmd_samples(track) - telemetry = _extract_points_from_samples(fp, gpmd_samples) - # return the firstly found non-empty points - if telemetry.gps: - return telemetry - - # points could be empty list or None here - return None - - -def extract_all_device_names(fp: T.BinaryIO) -> T.Dict[int, bytes]: - moov = MovieBoxParser.parse_stream(fp) - for track in moov.extract_tracks(): - if _contains_gpmd_description(track): - gpmd_samples = _filter_gpmd_samples(track) - device_names = _extract_dvnm_from_samples(fp, gpmd_samples) - if device_names: - return device_names - return {} - - -def extract_camera_model(fp: T.BinaryIO) -> str: - device_names = extract_all_device_names(fp) - +def _extract_camera_model_from_devices(device_names: T.Dict[int, bytes]) -> str: if not device_names: return "" @@ -705,9 +712,9 @@ def extract_camera_model(fp: T.BinaryIO) -> str: return unicode_names[0].strip() -def parse_gpx(path: pathlib.Path) -> T.List[GPSPoint]: - with path.open("rb") as fp: - points = extract_points(fp) - if points is None: - return [] - return points +def _iterate_read_sample_data( + fp: T.BinaryIO, samples: T.Iterable[Sample] +) -> T.Generator[T.Tuple[Sample, bytes], None, None]: + for sample in samples: + fp.seek(sample.raw_sample.offset, io.SEEK_SET) + yield (sample, fp.read(sample.raw_sample.size)) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 6f6d39f3d..b83d1db7e 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -588,11 +588,13 @@ def upload( if MAPILLARY__EXPERIMENTAL_ENABLE_IMU == "YES": if video_metadata.filetype is FileType.GOPRO: with video_metadata.filename.open("rb") as fp: - telemetry_data = gpmf_parser.extract_telemetry_data(fp) - if telemetry_data: - telemetry_measurements.extend(telemetry_data.accl) - telemetry_measurements.extend(telemetry_data.gyro) - telemetry_measurements.extend(telemetry_data.magn) + gopro_info = gpmf_parser.extract_gopro_info( + fp, telemetry_only=True + ) + if gopro_info is not None: + telemetry_measurements.extend(gopro_info.accl or []) + telemetry_measurements.extend(gopro_info.gyro or []) + telemetry_measurements.extend(gopro_info.magn or []) telemetry_measurements.sort(key=lambda m: m.time) generator = camm_builder.camm_sample_generator2( diff --git a/mapillary_tools/video_data_extraction/extractors/gopro_parser.py b/mapillary_tools/video_data_extraction/extractors/gopro_parser.py index 6fb115d5c..b69e7a111 100644 --- a/mapillary_tools/video_data_extraction/extractors/gopro_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/gopro_parser.py @@ -1,3 +1,4 @@ +from __future__ import annotations import typing as T from ... import geo @@ -11,33 +12,46 @@ class GoProParser(BaseParser): must_rebase_times_to_zero = False parser_label = "gopro" - pointsFound: bool = False + _extracted: bool = False + _cached_gopro_info: gpmf_parser.GoProInfo | None = None + + def _extract_gopro_info(self) -> gpmf_parser.GoProInfo | None: + if self._extracted: + return self._cached_gopro_info + + self._extracted = True - def extract_points(self) -> T.Sequence[geo.Point]: source_path = self.geotag_source_path - if not source_path: - return [] + + if source_path is None: + # source_path not found + return None + with source_path.open("rb") as fp: try: - points = gpmf_parser.extract_points(fp) or [] - self.pointsFound = len(points) > 0 - return points + self._cached_gopro_info = gpmf_parser.extract_gopro_info(fp) except sparser.ParsingError: - return [] + self._cached_gopro_info = None + + return self._cached_gopro_info - def extract_make(self) -> T.Optional[str]: - model = self.extract_model() - if model: - return "GoPro" + def extract_points(self) -> T.Sequence[geo.Point]: + gopro_info = self._extract_gopro_info() + if gopro_info is None: + return [] - # make sure self.pointsFound is updated - _ = self.extract_points() - # If no points were found, assume this is not a GoPro - return "GoPro" if self.pointsFound else None + return T.cast(T.Sequence[geo.Point], gopro_info.gps) - def extract_model(self) -> T.Optional[str]: - source_path = self.geotag_source_path - if not source_path: + def extract_make(self) -> str | None: + gopro_info = self._extract_gopro_info() + if gopro_info is None: return None - with source_path.open("rb") as fp: - return gpmf_parser.extract_camera_model(fp) or None + + return gopro_info.make + + def extract_model(self) -> str | None: + gopro_info = self._extract_gopro_info() + if gopro_info is None: + return None + + return gopro_info.model