diff --git a/mapillary_tools/camm/camm_builder.py b/mapillary_tools/camm/camm_builder.py index 83e4507cf..f8191f2b9 100644 --- a/mapillary_tools/camm/camm_builder.py +++ b/mapillary_tools/camm/camm_builder.py @@ -1,7 +1,7 @@ import io import typing as T -from .. import geo, telemetry, types +from .. import geo, types from ..mp4 import ( construct_mp4_parser as cparser, mp4_sample_parser as sample_parser, @@ -11,62 +11,15 @@ from . import camm_parser -TelemetryMeasurement = T.Union[ - geo.Point, - telemetry.TelemetryMeasurement, -] +def _build_camm_sample(measurement: camm_parser.TelemetryMeasurement) -> bytes: + if camm_parser.GoProGPSSampleEntry.serializable(measurement): + return camm_parser.GoProGPSSampleEntry.serialize(measurement) + for sample_entry_cls in camm_parser.SAMPLE_ENTRY_CLS_BY_CAMM_TYPE.values(): + if sample_entry_cls.serializable(measurement): + return sample_entry_cls.serialize(measurement) -def _build_camm_sample(measurement: TelemetryMeasurement) -> bytes: - if isinstance(measurement, geo.Point): - return camm_parser.CAMMSampleData.build( - { - "type": camm_parser.CAMMType.MIN_GPS.value, - "data": [ - measurement.lat, - measurement.lon, - -1.0 if measurement.alt is None else measurement.alt, - ], - } - ) - elif isinstance(measurement, telemetry.AccelerationData): - # Accelerometer reading in meters/second^2 along XYZ axes of the camera. - return camm_parser.CAMMSampleData.build( - { - "type": camm_parser.CAMMType.ACCELERATION.value, - "data": [ - measurement.x, - measurement.y, - measurement.z, - ], - } - ) - elif isinstance(measurement, telemetry.GyroscopeData): - # Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction. - return camm_parser.CAMMSampleData.build( - { - "type": camm_parser.CAMMType.GYRO.value, - "data": [ - measurement.x, - measurement.y, - measurement.z, - ], - } - ) - elif isinstance(measurement, telemetry.MagnetometerData): - # Ambient magnetic field. - return camm_parser.CAMMSampleData.build( - { - "type": camm_parser.CAMMType.MAGNETIC_FIELD.value, - "data": [ - measurement.x, - measurement.y, - measurement.z, - ], - } - ) - else: - raise ValueError(f"unexpected measurement type {type(measurement)}") + raise ValueError(f"Unsupported measurement type {type(measurement)}") def _create_edit_list_from_points( @@ -121,16 +74,19 @@ def _create_edit_list_from_points( def _multiplex( points: T.Sequence[geo.Point], - measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None, -) -> T.List[TelemetryMeasurement]: - mutiplexed: T.List[TelemetryMeasurement] = [*points, *(measurements or [])] + measurements: T.Optional[T.List[camm_parser.TelemetryMeasurement]] = None, +) -> T.List[camm_parser.TelemetryMeasurement]: + mutiplexed: T.List[camm_parser.TelemetryMeasurement] = [ + *points, + *(measurements or []), + ] mutiplexed.sort(key=lambda m: m.time) return mutiplexed def convert_telemetry_to_raw_samples( - measurements: T.Sequence[TelemetryMeasurement], + measurements: T.Sequence[camm_parser.TelemetryMeasurement], timescale: int, ) -> T.Generator[sample_parser.RawSample, None, None]: for idx, measurement in enumerate(measurements): @@ -283,7 +239,7 @@ def create_camm_trak( def camm_sample_generator2( video_metadata: types.VideoMetadata, - telemetry_measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None, + telemetry_measurements: T.Optional[T.List[camm_parser.TelemetryMeasurement]] = None, ): def _f( fp: T.BinaryIO, diff --git a/mapillary_tools/camm/camm_parser.py b/mapillary_tools/camm/camm_parser.py index 508d73b02..e35fe283f 100644 --- a/mapillary_tools/camm/camm_parser.py +++ b/mapillary_tools/camm/camm_parser.py @@ -1,11 +1,13 @@ # pyre-ignore-all-errors[5, 11, 16, 21, 24, 58] +from __future__ import annotations +import abc import dataclasses import io import logging -import pathlib import typing as T from enum import Enum +from typing_extensions import TypeIs import construct as C @@ -19,9 +21,7 @@ TelemetryMeasurement = T.Union[ geo.Point, - telemetry.AccelerationData, - telemetry.GyroscopeData, - telemetry.MagnetometerData, + telemetry.TelemetryMeasurement, ] @@ -41,95 +41,285 @@ class CAMMType(Enum): Float = C.Float32l Double = C.Float64l -_SWITCH: T.Dict[int, C.Struct] = { - # angle_axis - CAMMType.ANGLE_AXIS.value: Float[3], - CAMMType.EXPOSURE_TIME.value: C.Struct( - "pixel_exposure_time" / C.Int32sl, - "rolling_shutter_skew_time" / C.Int32sl, - ), - # gyro - CAMMType.GYRO.value: Float[3], - # acceleration - CAMMType.ACCELERATION.value: Float[3], - # position - CAMMType.POSITION.value: Float[3], - # lat, lon, alt - CAMMType.MIN_GPS.value: Double[3], - CAMMType.GPS.value: C.Struct( - "time_gps_epoch" / Double, - "gps_fix_type" / C.Int32sl, - "latitude" / Double, - "longitude" / Double, - "altitude" / Float, - "horizontal_accuracy" / Float, - "vertical_accuracy" / Float, - "velocity_east" / Float, - "velocity_north" / Float, - "velocity_up" / Float, - "speed_accuracy" / Float, - ), - # magnetic_field - CAMMType.MAGNETIC_FIELD.value: Float[3], -} -CAMMSampleData = C.Struct( - C.Padding(2), - "type" / C.Int16ul, - "data" - / C.Switch( - C.this.type, - _SWITCH, - ), -) +TTelemetry = T.TypeVar("TTelemetry", bound=TelemetryMeasurement) -def _parse_telemetry_from_sample( - fp: T.BinaryIO, sample: Sample -) -> T.Optional[TelemetryMeasurement]: - fp.seek(sample.raw_sample.offset, io.SEEK_SET) - data = fp.read(sample.raw_sample.size) - box = CAMMSampleData.parse(data) - if box.type == CAMMType.MIN_GPS.value: +class CAMMSampleEntry(abc.ABC, T.Generic[TTelemetry]): + serialized_camm_type: CAMMType + + telemetry_cls_type: T.Type[TTelemetry] + + construct: C.Struct + + @classmethod + def serializable(cls, data: T.Any, throw: bool = False) -> TypeIs[TTelemetry]: + # Use "is" for exact type match, instead of isinstance + if type(data) is cls.telemetry_cls_type: + return True + + if throw: + raise TypeError( + f"{cls} can not serialize {type(data)}: expect {cls.telemetry_cls_type}" + ) + + return False + + @classmethod + @abc.abstractmethod + def serialize(cls, data: TTelemetry) -> bytes: + raise NotImplementedError + + @classmethod + @abc.abstractmethod + def deserialize(cls, sample: Sample, data: T.Any) -> TTelemetry: + raise NotImplementedError + + +class MinGPSSampleEntry(CAMMSampleEntry): + serialized_camm_type = CAMMType.MIN_GPS + + telemetry_cls_type = geo.Point + + construct = Double[3] # type: ignore + + @classmethod + def deserialize(cls, sample: Sample, data: T.Any) -> geo.Point: return geo.Point( time=sample.exact_time, - lat=box.data[0], - lon=box.data[1], - alt=box.data[2], + lat=data[0], + lon=data[1], + alt=data[2], angle=None, ) - elif box.type == CAMMType.GPS.value: - # Not using box.data.time_gps_epoch as the point timestamp - # because it is from another clock - return geo.Point( + + @classmethod + def serialize(cls, data: geo.Point) -> bytes: + cls.serializable(data, throw=True) + + return CAMMSampleData.build( + { + "type": cls.serialized_camm_type.value, + "data": [ + data.lat, + data.lon, + -1.0 if data.alt is None else data.alt, + ], + } + ) + + +class GPSSampleEntry(CAMMSampleEntry): + serialized_camm_type: CAMMType = CAMMType.GPS + + telemetry_cls_type = telemetry.CAMMGPSPoint + + construct = C.Struct( + "time_gps_epoch" / Double, # type: ignore + "gps_fix_type" / C.Int32sl, # type: ignore + "latitude" / Double, # type: ignore + "longitude" / Double, # type: ignore + "altitude" / Float, # type: ignore + "horizontal_accuracy" / Float, # type: ignore + "vertical_accuracy" / Float, # type: ignore + "velocity_east" / Float, # type: ignore + "velocity_north" / Float, # type: ignore + "velocity_up" / Float, # type: ignore + "speed_accuracy" / Float, # type: ignore + ) + + @classmethod + def deserialize(cls, sample: Sample, data: T.Any) -> telemetry.CAMMGPSPoint: + return telemetry.CAMMGPSPoint( time=sample.exact_time, - lat=box.data.latitude, - lon=box.data.longitude, - alt=box.data.altitude, + lat=data.latitude, + lon=data.longitude, + alt=data.altitude, angle=None, + time_gps_epoch=data.time_gps_epoch, + gps_fix_type=data.gps_fix_type, + horizontal_accuracy=data.horizontal_accuracy, + vertical_accuracy=data.vertical_accuracy, + velocity_east=data.velocity_east, + velocity_north=data.velocity_north, + velocity_up=data.velocity_up, + speed_accuracy=data.speed_accuracy, ) - elif box.type == CAMMType.ACCELERATION.value: + + @classmethod + def serialize(cls, data: telemetry.CAMMGPSPoint) -> bytes: + cls.serializable(data, throw=True) + + return CAMMSampleData.build( + { + "type": cls.serialized_camm_type.value, + "data": { + "time_gps_epoch": data.time_gps_epoch, + "gps_fix_type": data.gps_fix_type, + "latitude": data.lat, + "longitude": data.lon, + "altitude": -1.0 if data.alt is None else data.alt, + "horizontal_accuracy": data.horizontal_accuracy, + "vertical_accuracy": data.vertical_accuracy, + "velocity_east": data.velocity_east, + "velocity_north": data.velocity_north, + "velocity_up": data.velocity_up, + "speed_accuracy": data.speed_accuracy, + }, + } + ) + + +class GoProGPSSampleEntry(CAMMSampleEntry): + serialized_camm_type: CAMMType = CAMMType.MIN_GPS + + telemetry_cls_type = telemetry.GPSPoint + + construct = Double[3] # type: ignore + + @classmethod + def deserialize(cls, sample: Sample, data: T.Any) -> telemetry.GPSPoint: + raise NotImplementedError("Deserializing GoPro GPS Point is not supported") + + @classmethod + def serialize(cls, data: telemetry.GPSPoint) -> bytes: + cls.serializable(data, throw=True) + + return CAMMSampleData.build( + { + "type": cls.serialized_camm_type.value, + "data": [ + data.lat, + data.lon, + -1.0 if data.alt is None else data.alt, + ], + } + ) + + +class AccelerationSampleEntry(CAMMSampleEntry): + serialized_camm_type: CAMMType = CAMMType.ACCELERATION + + telemetry_cls_type = telemetry.AccelerationData + + construct: C.Struct = Float[3] # type: ignore + + @classmethod + def deserialize(cls, sample: Sample, data: T.Any) -> telemetry.AccelerationData: return telemetry.AccelerationData( time=sample.exact_time, - x=box.data[0], - y=box.data[1], - z=box.data[2], + x=data[0], + y=data[1], + z=data[2], ) - elif box.type == CAMMType.GYRO.value: + + @classmethod + def serialize(cls, data: telemetry.AccelerationData) -> bytes: + cls.serializable(data, throw=True) + + return CAMMSampleData.build( + { + "type": cls.serialized_camm_type.value, + "data": [data.x, data.y, data.z], + } + ) + + +class GyroscopeSampleEntry(CAMMSampleEntry): + serialized_camm_type: CAMMType = CAMMType.GYRO + + telemetry_cls_type = telemetry.GyroscopeData + + construct: C.Struct = Float[3] # type: ignore + + @classmethod + def deserialize(cls, sample: Sample, data: T.Any) -> telemetry.GyroscopeData: return telemetry.GyroscopeData( time=sample.exact_time, - x=box.data[0], - y=box.data[1], - z=box.data[2], + x=data[0], + y=data[1], + z=data[2], ) - elif box.type == CAMMType.MAGNETIC_FIELD.value: + + @classmethod + def serialize(cls, data: telemetry.GyroscopeData) -> bytes: + cls.serializable(data) + + return CAMMSampleData.build( + { + "type": cls.serialized_camm_type.value, + "data": [data.x, data.y, data.z], + } + ) + + +class MagnetometerSampleEntry(CAMMSampleEntry): + serialized_camm_type: CAMMType = CAMMType.MAGNETIC_FIELD + + telemetry_cls_type = telemetry.MagnetometerData + + construct: C.Struct = Float[3] # type: ignore + + @classmethod + def deserialize(cls, sample: Sample, data: T.Any) -> telemetry.MagnetometerData: return telemetry.MagnetometerData( time=sample.exact_time, - x=box.data[0], - y=box.data[1], - z=box.data[2], + x=data[0], + y=data[1], + z=data[2], ) - return None + + @classmethod + def serialize(cls, data: telemetry.MagnetometerData) -> bytes: + cls.serializable(data) + + return CAMMSampleData.build( + { + "type": cls.serialized_camm_type.value, + "data": [data.x, data.y, data.z], + } + ) + + +SAMPLE_ENTRY_CLS_BY_CAMM_TYPE = { + sample_entry_cls.serialized_camm_type: sample_entry_cls + for sample_entry_cls in CAMMSampleEntry.__subclasses__() + if sample_entry_cls not in [GoProGPSSampleEntry] +} +assert len(SAMPLE_ENTRY_CLS_BY_CAMM_TYPE) == 5, SAMPLE_ENTRY_CLS_BY_CAMM_TYPE.keys() + + +_SWITCH: T.Dict[int, C.Struct] = { + # angle_axis + CAMMType.ANGLE_AXIS.value: Float[3], # type: ignore + CAMMType.EXPOSURE_TIME.value: C.Struct( + "pixel_exposure_time" / C.Int32sl, # type: ignore + "rolling_shutter_skew_time" / C.Int32sl, # type: ignore + ), + # position + CAMMType.POSITION.value: Float[3], # type: ignore + **{t.value: cls.construct for t, cls in SAMPLE_ENTRY_CLS_BY_CAMM_TYPE.items()}, +} + +CAMMSampleData = C.Struct( + C.Padding(2), + "type" / C.Int16ul, + "data" / C.Switch(C.this.type, _SWITCH), +) + + +def _parse_telemetry_from_sample( + fp: T.BinaryIO, sample: Sample +) -> T.Optional[TelemetryMeasurement]: + fp.seek(sample.raw_sample.offset, io.SEEK_SET) + data = fp.read(sample.raw_sample.size) + box = CAMMSampleData.parse(data) + + camm_type = CAMMType(box.type) # type: ignore + SampleKlass = SAMPLE_ENTRY_CLS_BY_CAMM_TYPE.get(camm_type) + if SampleKlass is None: + return None + return SampleKlass.deserialize(sample, box.data) def _filter_telemetry_by_elst_segments( @@ -259,14 +449,6 @@ def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[T.List[TelemetryMeasure return None -def parse_gpx(path: pathlib.Path) -> T.List[geo.Point]: - with path.open("rb") as fp: - points = extract_points(fp) - if points is None: - return [] - return points - - MakeOrModel = C.Struct( "size" / C.Int16ub, C.Padding(2), @@ -288,10 +470,14 @@ def _parse_quietly(data: bytes, h: sparser.Header) -> bytes: except C.ConstructError: LOG.warning("Failed to parse %s: %s", h, data[:512]) return b"" + + if parsed is None: + return b"" + return parsed["data"] -def extract_camera_make_and_model(fp: T.BinaryIO) -> T.Tuple[str, str]: +def extract_camera_make_and_model(fp: T.BinaryIO) -> tuple[str, str]: header_and_stream = sparser.parse_path( fp, [ @@ -311,8 +497,8 @@ def extract_camera_make_and_model(fp: T.BinaryIO) -> T.Tuple[str, str]: ], ) - make: T.Optional[str] = None - model: T.Optional[str] = None + make: str = "" + model: str = "" try: for h, s in header_and_stream: @@ -339,4 +525,5 @@ def extract_camera_make_and_model(fp: T.BinaryIO) -> T.Tuple[str, str]: make = make.strip() if model: model = model.strip() - return make or "", model or "" + + return make, model diff --git a/mapillary_tools/telemetry.py b/mapillary_tools/telemetry.py index 81ce58882..1ebdd6f1a 100644 --- a/mapillary_tools/telemetry.py +++ b/mapillary_tools/telemetry.py @@ -12,14 +12,6 @@ class GPSFix(Enum): FIX_3D = 3 -@dataclasses.dataclass -class GPSPoint(Point): - epoch_time: T.Optional[float] - fix: T.Optional[GPSFix] - precision: T.Optional[float] - ground_speed: T.Optional[float] - - @dataclasses.dataclass(order=True) class TelemetryMeasurement: """Base class for all telemetry measurements. @@ -32,6 +24,26 @@ class TelemetryMeasurement: time: float +@dataclasses.dataclass +class GPSPoint(TelemetryMeasurement, Point): + epoch_time: T.Optional[float] + fix: T.Optional[GPSFix] + precision: T.Optional[float] + ground_speed: T.Optional[float] + + +@dataclasses.dataclass +class CAMMGPSPoint(TelemetryMeasurement, Point): + time_gps_epoch: float + gps_fix_type: int + horizontal_accuracy: float + vertical_accuracy: float + velocity_east: float + velocity_north: float + velocity_up: float + speed_accuracy: float + + @dataclasses.dataclass(order=True) class GyroscopeData(TelemetryMeasurement): """Gyroscope signal in radians/seconds around XYZ axes of the camera.""" diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 918fecdb5..0bb19ce17 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -18,14 +18,13 @@ exceptions, history, ipc, - telemetry, types, upload_api_v4, uploader, utils, VERSION, ) -from .camm import camm_builder +from .camm import camm_builder, camm_parser from .gpmf import gpmf_parser from .mp4 import simple_mp4_builder from .types import FileType @@ -584,7 +583,7 @@ def upload( assert isinstance(video_metadata.md5sum, str), "md5sum should be updated" # extract telemetry measurements from GoPro videos - telemetry_measurements: T.List[telemetry.TelemetryMeasurement] = [] + telemetry_measurements: T.List[camm_parser.TelemetryMeasurement] = [] if MAPILLARY__EXPERIMENTAL_ENABLE_IMU == "YES": if video_metadata.filetype is FileType.GOPRO: with video_metadata.filename.open("rb") as fp: diff --git a/tests/unit/test_camm_parser.py b/tests/unit/test_camm_parser.py index 8eb48c07b..d96ebb089 100644 --- a/tests/unit/test_camm_parser.py +++ b/tests/unit/test_camm_parser.py @@ -3,7 +3,7 @@ import typing as T from pathlib import Path -from mapillary_tools import geo, types +from mapillary_tools import geo, telemetry, types from mapillary_tools.camm import camm_builder, camm_parser from mapillary_tools.mp4 import construct_mp4_parser as cparser, simple_mp4_builder @@ -38,8 +38,8 @@ def test_filter_points_by_edit_list(): ) -def build_mp4(metadata: types.VideoMetadata) -> types.VideoMetadata: - movie_timescale = simple_mp4_builder.UINT32_MAX +def encode_decode_empty_camm_mp4(metadata: types.VideoMetadata) -> types.VideoMetadata: + movie_timescale = 1_000_000 mvhd: cparser.BoxDict = { "type": b"mvhd", @@ -60,11 +60,16 @@ def build_mp4(metadata: types.VideoMetadata) -> types.VideoMetadata: io.BytesIO(src), camm_builder.camm_sample_generator2(metadata) ) + # extract points points = camm_parser.extract_points(T.cast(T.BinaryIO, target_fp)) + + # extract make/model target_fp.seek(0, io.SEEK_SET) make, model = camm_parser.extract_camera_make_and_model( T.cast(T.BinaryIO, target_fp) ) + + # return metadata return types.VideoMetadata( Path(""), None, @@ -76,16 +81,23 @@ def build_mp4(metadata: types.VideoMetadata) -> types.VideoMetadata: def approximate(expected, actual): - points_equal = all(abs(x.time - y.time) < 0.00001 for x, y in zip(expected, actual)) - the_others_equal = all( - dataclasses.replace(x, time=0) == dataclasses.replace(y, time=0) - for x, y in zip(expected, actual) - ) - return points_equal and the_others_equal + assert len(expected) == len(actual) + + for x, y in zip(expected, actual): + x = dataclasses.asdict(x) + y = dataclasses.asdict(y) + + keys = set([*x.keys(), *y.keys()]) + for k in keys: + if isinstance(x[k], float): + assert abs(x[k] - y[k]) < 10e-6 + else: + assert x[k] == y[k] -def test_build_and_parse(): +def test_build_and_parse_points(): points = [ + geo.Point(time=-0.1, lat=0.01, lon=0.2, alt=None, angle=None), geo.Point(time=0.1, lat=0.01, lon=0.2, alt=None, angle=None), geo.Point(time=0.23, lat=0.001, lon=0.21, alt=None, angle=None), geo.Point(time=0.29, lat=0.002, lon=0.203, alt=None, angle=None), @@ -96,31 +108,185 @@ def test_build_and_parse(): None, filetype=types.FileType.CAMM, points=points, - make="test_make", - model=" test_model ", ) - x = build_mp4(metadata) - assert x.make == "test_make" - assert x.model == "test_model" - assert approximate( + x = encode_decode_empty_camm_mp4(metadata) + approximate( [ - geo.Point( - time=0.09999999988358467, lat=0.01, lon=0.2, alt=-1.0, angle=None - ), - geo.Point( - time=0.22999999580209396, lat=0.001, lon=0.21, alt=-1.0, angle=None - ), - geo.Point( - time=0.2899999996391125, lat=0.002, lon=0.203, alt=-1.0, angle=None + geo.Point(time=0.1, lat=0.01, lon=0.2, alt=-1.0, angle=None), + geo.Point(time=0.23, lat=0.001, lon=0.21, alt=-1.0, angle=None), + geo.Point(time=0.29, lat=0.002, lon=0.203, alt=-1.0, angle=None), + geo.Point(time=0.31, lat=0.0025, lon=0.2004, alt=-1.0, angle=None), + ], + x.points, + ) + + +def test_build_and_parse_camm_gps_points(): + points = [ + telemetry.CAMMGPSPoint( + time=-0.1, + lat=0.01, + lon=0.2, + alt=None, + angle=None, + time_gps_epoch=1.1, + gps_fix_type=1, + horizontal_accuracy=3.3, + vertical_accuracy=4.4, + velocity_east=5.5, + velocity_north=6.6, + velocity_up=7.7, + speed_accuracy=8.0, + ), + telemetry.CAMMGPSPoint( + time=0.1, + lat=0.01, + lon=0.2, + alt=None, + angle=None, + time_gps_epoch=1.2, + gps_fix_type=1, + horizontal_accuracy=3.3, + vertical_accuracy=4.4, + velocity_east=5.5, + velocity_north=6.6, + velocity_up=7.7, + speed_accuracy=8.0, + ), + telemetry.CAMMGPSPoint( + time=0.23, + lat=0.001, + lon=0.21, + alt=None, + angle=None, + time_gps_epoch=1.3, + gps_fix_type=1, + horizontal_accuracy=3.3, + vertical_accuracy=4.4, + velocity_east=5.5, + velocity_north=6.6, + velocity_up=7.7, + speed_accuracy=8.0, + ), + ] + metadata = types.VideoMetadata( + Path(""), + None, + filetype=types.FileType.CAMM, + points=points, + ) + x = encode_decode_empty_camm_mp4(metadata) + approximate( + [ + telemetry.CAMMGPSPoint( + time=0.1, + lat=0.01, + lon=0.2, + alt=-1, + angle=None, + time_gps_epoch=1.2, + gps_fix_type=1, + horizontal_accuracy=3.3, + vertical_accuracy=4.4, + velocity_east=5.5, + velocity_north=6.6, + velocity_up=7.7, + speed_accuracy=8.0, ), - geo.Point( - time=0.3099999994295649, lat=0.0025, lon=0.2004, alt=-1.0, angle=None + telemetry.CAMMGPSPoint( + time=0.23, + lat=0.001, + lon=0.21, + alt=-1, + angle=None, + time_gps_epoch=1.3, + gps_fix_type=1, + horizontal_accuracy=3.3, + vertical_accuracy=4.4, + velocity_east=5.5, + velocity_north=6.6, + velocity_up=7.7, + speed_accuracy=8.0, ), ], x.points, ) +def test_build_and_parse_single_points(): + points = [ + geo.Point(time=1.2, lat=0.01, lon=0.2, alt=None, angle=None), + ] + metadata = types.VideoMetadata( + Path(""), + None, + filetype=types.FileType.CAMM, + points=points, + ) + x = encode_decode_empty_camm_mp4(metadata) + approximate( + [ + geo.Point(time=1.2, lat=0.01, lon=0.2, alt=-1.0, angle=None), + ], + x.points, + ) + + +def test_build_and_parse_single_point_0(): + points = [ + geo.Point(time=0, lat=0.01, lon=0.2, alt=None, angle=None), + ] + metadata = types.VideoMetadata( + Path(""), + None, + filetype=types.FileType.CAMM, + points=points, + ) + x = encode_decode_empty_camm_mp4(metadata) + approximate( + [ + geo.Point(time=0, lat=0.01, lon=0.2, alt=-1.0, angle=None), + ], + x.points, + ) + + +def test_build_and_parse_single_point_neg(): + points = [ + geo.Point(time=-1.2, lat=0.01, lon=0.2, alt=None, angle=None), + ] + metadata = types.VideoMetadata( + Path(""), + None, + filetype=types.FileType.CAMM, + points=points, + ) + x = encode_decode_empty_camm_mp4(metadata) + approximate([], x.points) + + +def test_build_and_parse_start_early(): + points = [ + geo.Point(time=-1, lat=0.01, lon=0.2, alt=None, angle=None), + geo.Point(time=1.2, lat=0.01, lon=0.2, alt=None, angle=None), + geo.Point(time=1.4, lat=0.01, lon=0.2, alt=None, angle=None), + ] + metadata = types.VideoMetadata( + Path(""), + None, + filetype=types.FileType.CAMM, + points=points, + ) + x = encode_decode_empty_camm_mp4(metadata) + approximate( + [ + geo.Point(time=1.2, lat=0.01, lon=0.2, alt=-1, angle=None), + geo.Point(time=1.4, lat=0.01, lon=0.2, alt=-1, angle=None), + ], + x.points, + ) + + def test_build_and_parse2(): points = [ geo.Point(time=0.1, lat=0.01, lon=0.2, alt=None, angle=None), @@ -133,10 +299,10 @@ def test_build_and_parse2(): make="test_make汉字", model="test_model汉字", ) - x = build_mp4(metadata) + x = encode_decode_empty_camm_mp4(metadata) assert x.make == "test_make汉字" assert x.model == "test_model汉字" - assert approximate( + approximate( [geo.Point(time=0.09999999988358468, lat=0.01, lon=0.2, alt=-1.0, angle=None)], x.points, ) @@ -154,7 +320,7 @@ def test_build_and_parse9(): make="test_make汉字", model="test_model汉字", ) - x = build_mp4(metadata) + x = encode_decode_empty_camm_mp4(metadata) assert [geo.Point(time=0.0, lat=0.01, lon=0.2, alt=-1.0, angle=None)] == x.points @@ -171,8 +337,8 @@ def test_build_and_parse10(): make="test_make汉字", model="test_model汉字", ) - x = build_mp4(metadata) - assert approximate( + x = encode_decode_empty_camm_mp4(metadata) + approximate( [ geo.Point(time=0.0, lat=0.01, lon=0.2, alt=-1.0, angle=None), geo.Point(time=0.1, lat=0.03, lon=0.2, alt=-1.0, angle=None), @@ -191,5 +357,5 @@ def test_build_and_parse3(): make="test_make汉字", model="test_model汉字", ) - x = build_mp4(metadata) + x = encode_decode_empty_camm_mp4(metadata) assert [] == x.points