diff --git a/mapillary_tools/commands/process.py b/mapillary_tools/commands/process.py index 30275fc06..0d7c52fa3 100644 --- a/mapillary_tools/commands/process.py +++ b/mapillary_tools/commands/process.py @@ -13,6 +13,12 @@ from ..process_sequence_properties import process_sequence_properties +def bold_text(text: str) -> str: + ANSI_BOLD = "\033[1m" + ANSI_RESET_ALL = "\033[0m" + return f"{ANSI_BOLD}{text}{ANSI_RESET_ALL}" + + class Command: name = "process" help = "process images and videos" @@ -52,9 +58,7 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser): default=",".join(sorted(t.value for t in FileType)), required=False, ) - group = parser.add_argument_group( - f"{constants.ANSI_BOLD}PROCESS EXIF OPTIONS{constants.ANSI_RESET_ALL}" - ) + group = parser.add_argument_group(bold_text("PROCESS EXIF OPTIONS")) group.add_argument( "--overwrite_all_EXIF_tags", help="Overwrite all of the relevant EXIF tags with the values obtained in process. It is equivalent to supplying all the --overwrite_EXIF_*_tag flags.", @@ -92,7 +96,7 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser): ) group_metadata = parser.add_argument_group( - f"{constants.ANSI_BOLD}PROCESS METADATA OPTIONS{constants.ANSI_RESET_ALL}" + bold_text("PROCESS METADATA OPTIONS") ) group_metadata.add_argument( "--device_make", @@ -108,7 +112,7 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser): ) group_geotagging = parser.add_argument_group( - f"{constants.ANSI_BOLD}PROCESS GEOTAGGING OPTIONS{constants.ANSI_RESET_ALL}" + bold_text("PROCESS GEOTAGGING OPTIONS") ) group_geotagging.add_argument( "--desc_path", @@ -174,7 +178,7 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser): ) group_sequence = parser.add_argument_group( - f"{constants.ANSI_BOLD}PROCESS SEQUENCE OPTIONS{constants.ANSI_RESET_ALL}" + bold_text("PROCESS SEQUENCE OPTIONS") ) group_sequence.add_argument( "--cutoff_distance", diff --git a/mapillary_tools/commands/sample_video.py b/mapillary_tools/commands/sample_video.py index f3bc2b7a7..1d4e58bf4 100644 --- a/mapillary_tools/commands/sample_video.py +++ b/mapillary_tools/commands/sample_video.py @@ -4,6 +4,7 @@ from .. import constants from ..sample_video import sample_video +from .process import bold_text class Command: @@ -11,9 +12,7 @@ class Command: help = "sample video into images" def add_basic_arguments(self, parser: argparse.ArgumentParser): - group = parser.add_argument_group( - f"{constants.ANSI_BOLD}VIDEO PROCESS OPTIONS{constants.ANSI_RESET_ALL}" - ) + group = parser.add_argument_group(bold_text("VIDEO PROCESS OPTIONS")) group.add_argument( "--video_sample_distance", help="The minimal distance interval, in meters, for sampling video frames. [default: %(default)s]", diff --git a/mapillary_tools/commands/upload.py b/mapillary_tools/commands/upload.py index 34cfc1d3f..f8c6748cc 100644 --- a/mapillary_tools/commands/upload.py +++ b/mapillary_tools/commands/upload.py @@ -2,6 +2,7 @@ from .. import constants from ..upload import upload +from .process import bold_text class Command: @@ -30,9 +31,7 @@ def add_common_upload_options(group): ) def add_basic_arguments(self, parser): - group = parser.add_argument_group( - f"{constants.ANSI_BOLD}UPLOAD OPTIONS{constants.ANSI_RESET_ALL}" - ) + group = parser.add_argument_group(bold_text("UPLOAD OPTIONS")) group.add_argument( "--desc_path", help=f'Path to the description file generated by the process command. The hyphen "-" indicates STDIN. [default: {{IMPORT_PATH}}/{constants.IMAGE_DESCRIPTION_FILENAME}]', diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index 8f88fb781..60c13023b 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -5,8 +5,6 @@ _ENV_PREFIX = "MAPILLARY_TOOLS_" -ANSI_BOLD = "\033[1m" -ANSI_RESET_ALL = "\033[0m" # In meters CUTOFF_DISTANCE = float(os.getenv(_ENV_PREFIX + "CUTOFF_DISTANCE", 600)) # In seconds diff --git a/mapillary_tools/geo.py b/mapillary_tools/geo.py index bf5691cdf..3c4110414 100644 --- a/mapillary_tools/geo.py +++ b/mapillary_tools/geo.py @@ -1,10 +1,12 @@ # pyre-ignore-all-errors[4] +from __future__ import annotations import bisect import dataclasses import datetime import itertools import math +import sys import typing as T WGS84_a = 6378137.0 @@ -27,34 +29,14 @@ class Point: time: float lat: float lon: float - alt: T.Optional[float] - angle: T.Optional[float] + alt: float | None + angle: float | None -def _ecef_from_lla2(lat: float, lon: float) -> T.Tuple[float, float, float]: - """ - Compute ECEF XYZ from latitude, longitude and altitude. +PointLike = T.TypeVar("PointLike", bound=Point) - All using the WGS94 model. - Altitude is the distance to the WGS94 ellipsoid. - Check results here http://www.oc.nps.edu/oc2902w/coord/llhxyz.htm - """ - lat = math.radians(lat) - lon = math.radians(lon) - cos_lat = math.cos(lat) - sin_lat = math.sin(lat) - L = 1.0 / math.sqrt(WGS84_a_SQ * cos_lat**2 + WGS84_b_SQ * sin_lat**2) - K = WGS84_a_SQ * L * cos_lat - x = K * math.cos(lon) - y = K * math.sin(lon) - z = WGS84_b_SQ * L * sin_lat - return x, y, z - - -def gps_distance( - latlon_1: T.Tuple[float, float], latlon_2: T.Tuple[float, float] -) -> float: +def gps_distance(latlon_1: tuple[float, float], latlon_2: tuple[float, float]) -> float: """ Distance between two (lat,lon) pairs. @@ -69,19 +51,9 @@ def gps_distance( return math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2 + (z1 - z2) ** 2) -def get_max_distance_from_start(latlons: T.List[T.Tuple[float, float]]) -> float: - """ - Returns the radius of an entire GPS track. Used to calculate whether or not the entire sequence was just stationary video - Takes a sequence of points as input - """ - if not latlons: - return 0 - start = latlons[0] - return max(gps_distance(start, latlon) for latlon in latlons) - - def compute_bearing( - start_lat: float, start_lon: float, end_lat: float, end_lon: float + latlon_1: tuple[float, float], + latlon_2: tuple[float, float], ) -> float: """ Get the compass bearing from start to end. @@ -89,7 +61,10 @@ def compute_bearing( Formula from http://www.movable-type.co.uk/scripts/latlong.html """ - # make sure everything is in radians + start_lat, start_lon = latlon_1 + end_lat, end_lon = latlon_2 + + # Make sure everything is in radians start_lat = math.radians(start_lat) start_lon = math.radians(start_lon) end_lat = math.radians(end_lat) @@ -125,14 +100,14 @@ def diff_bearing(b1: float, b2: float) -> float: # http://stackoverflow.com/a/5434936 -def pairwise(iterable: T.Iterable[_IT]) -> T.Iterable[T.Tuple[_IT, _IT]]: +def pairwise(iterable: T.Iterable[_IT]) -> T.Iterable[tuple[_IT, _IT]]: """s -> (s0,s1), (s1,s2), (s2, s3), ...""" a, b = itertools.tee(iterable) next(b, None) return zip(a, b) -def as_unix_time(dt: T.Union[datetime.datetime, int, float]) -> float: +def as_unix_time(dt: datetime.datetime | int | float) -> float: if isinstance(dt, (int, float)): return dt else: @@ -148,59 +123,37 @@ def as_unix_time(dt: T.Union[datetime.datetime, int, float]) -> float: return 0.0 -def _interpolate_segment(start: Point, end: Point, t: float) -> Point: - if start.time == end.time: - weight = 0.0 - else: - weight = (t - start.time) / (end.time - start.time) - - lat = start.lat + (end.lat - start.lat) * weight - lon = start.lon + (end.lon - start.lon) * weight - angle = compute_bearing(start.lat, start.lon, end.lat, end.lon) - alt: T.Optional[float] - if start.alt is not None and end.alt is not None: - alt = start.alt + (end.alt - start.alt) * weight - else: - alt = None - - return Point(time=t, lat=lat, lon=lon, alt=alt, angle=angle) - +if sys.version_info < (3, 10): -def _interpolate_at_index(points: T.Sequence[Point], t: float, idx: int): - assert points, "expect non-empty points" - - # find the segment (start point, end point) - if len(points) == 1: - start, end = points[0], points[0] - else: - if 0 < idx < len(points): - # interpolating within the range - start, end = points[idx - 1], points[idx] - elif idx <= 0: - # extrapolating behind the range - start, end = points[0], points[1] - else: - # extrapolating beyond the range - assert len(points) <= idx - start, end = points[-2], points[-1] + def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point: + """ + Interpolate or extrapolate the point at time t along the sequence of points (sorted by time). + """ + if not points: + raise ValueError("Expect non-empty points") - return _interpolate_segment(start, end, t) + # Make sure that points are sorted (disabled because the check costs O(N)): + # for cur, nex in pairwise(points): + # assert cur.time <= nex.time, "Points not sorted" + p = Point(time=t, lat=float("-inf"), lon=float("-inf"), alt=None, angle=None) + idx = bisect.bisect_left(points, p, lo=lo) + return _interpolate_at_segment_idx(points, t, idx) +else: -def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point: - """ - Interpolate or extrapolate the point at time t along the sequence of points (sorted by time). - """ - if not points: - raise ValueError("Expect non-empty points") + def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point: + """ + Interpolate or extrapolate the point at time t along the sequence of points (sorted by time). + """ + if not points: + raise ValueError("Expect non-empty points") - # Make sure that points are sorted (disabled because the check costs O(N)): - # for cur, nex in pairwise(points): - # assert cur.time <= nex.time, "Points not sorted" + # Make sure that points are sorted (disabled because the check costs O(N)): + # for cur, nex in pairwise(points): + # assert cur.time <= nex.time, "Points not sorted" - p = Point(time=t, lat=float("-inf"), lon=float("-inf"), alt=None, angle=None) - idx = bisect.bisect_left(points, p, lo=lo) - return _interpolate_at_index(points, t, idx) + idx = bisect.bisect_left(points, t, lo=lo, key=lambda x: x.time) + return _interpolate_at_segment_idx(points, t, idx) class Interpolator: @@ -212,12 +165,22 @@ class Interpolator: track_idx: int # interpolation starts from the lower bound point index in the current track lo: int - prev_time: T.Optional[float] + prev_time: float | None def __init__(self, tracks: T.Sequence[T.Sequence[Point]]): + # Remove empty tracks self.tracks = [track for track in tracks if track] + if not self.tracks: - raise ValueError("Expect non-empty tracks") + raise ValueError("Expect at least one non-empty track") + + for track in self.tracks: + for left, right in pairwise(track): + if not (left.time <= right.time): + raise ValueError( + "Expect points to be sorted by time, but got {left.time} then {right.time}" + ) + self.tracks.sort(key=lambda track: track[0].time) self.track_idx = 0 self.lo = 0 @@ -225,7 +188,7 @@ def __init__(self, tracks: T.Sequence[T.Sequence[Point]]): @staticmethod def _lsearch_left( - track: T.Sequence[Point], t: float, lo: int = 0, hi: T.Optional[int] = None + track: T.Sequence[Point], t: float, lo: int = 0, hi: int | None = None ) -> int: """ similar to bisect.bisect_left, but faster in the incremental search case @@ -244,24 +207,37 @@ def _lsearch_left( def interpolate(self, t: float) -> Point: if self.prev_time is not None: - assert self.prev_time <= t, "requires time to be monotonically increasing" + if not (self.prev_time <= t): + raise ValueError( + f"Require times to be monotonically increasing, but got {self.prev_time} then {t}" + ) + + interpolated: Point | None = None while self.track_idx < len(self.tracks): track = self.tracks[self.track_idx] + assert track, "expect non-empty track" + if t < track[0].time: - return _interpolate_at_index(track, t, 0) + interpolated = _interpolate_at_segment_idx(track, t, 0) + break + elif track[0].time <= t <= track[-1].time: - # similar to bisect.bisect_left(points, p, lo=lo) but faster in this case + # Similar to bisect.bisect_left(points, p, lo=lo) but faster in this case idx = Interpolator._lsearch_left(track, t, lo=self.lo) - # t must sit between (track[idx - 1], track[idx]] - # set the lower bound to idx - 1 - # because the next t can still be interpolated anywhere between (track[idx - 1], track[idx]] + # Time t must be between (track[idx - 1], track[idx]], so set the lower bound to idx - 1 + # Because the next t can still be interpolated anywhere between (track[idx - 1], track[idx]] self.lo = max(idx - 1, 0) - return _interpolate_at_index(track, t, idx) + interpolated = _interpolate_at_segment_idx(track, t, idx) + break + self.track_idx += 1 self.lo = 0 - interpolated = _interpolate_at_index(self.tracks[-1], t, len(self.tracks[-1])) + if interpolated is None: + interpolated = _interpolate_at_segment_idx( + self.tracks[-1], t, len(self.tracks[-1]) + ) self.prev_time = t @@ -276,7 +252,7 @@ def sample_points_by_distance( min_distance: float, point_func: T.Callable[[_PointAbstract], Point], ) -> T.Generator[_PointAbstract, None, None]: - prevp: T.Optional[Point] = None + prevp: Point | None = None for sample in samples: if prevp is None: yield sample @@ -288,26 +264,27 @@ def sample_points_by_distance( prevp = p -def interpolate_directions_if_none(sequence: T.Sequence[Point]) -> None: +def interpolate_directions_if_none(sequence: T.Sequence[PointLike]) -> None: for cur, nex in pairwise(sequence): if cur.angle is None: - cur.angle = compute_bearing(cur.lat, cur.lon, nex.lat, nex.lon) + cur.angle = compute_bearing((cur.lat, cur.lon), (nex.lat, nex.lon)) if len(sequence) == 1: if sequence[-1].angle is None: sequence[-1].angle = 0 elif 2 <= len(sequence): if sequence[-1].angle is None: - sequence[-1].angle = sequence[-2].angle - - -_PointLike = T.TypeVar("_PointLike", bound=Point) + prev_angle = sequence[-2].angle + assert prev_angle is not None, ( + "expect the last second point to have an interpolated angle" + ) + sequence[-1].angle = prev_angle def extend_deduplicate_points( - sequence: T.Iterable[_PointLike], - to_extend: T.Optional[T.List[_PointLike]] = None, -) -> T.List[_PointLike]: + sequence: T.Iterable[PointLike], + to_extend: list[PointLike] | None = None, +) -> list[PointLike]: if to_extend is None: to_extend = [] for point in sequence: @@ -319,3 +296,67 @@ def extend_deduplicate_points( else: to_extend.append(point) return to_extend + + +def _ecef_from_lla2(lat: float, lon: float) -> tuple[float, float, float]: + """ + Compute ECEF XYZ from latitude and longitude. + + All using the WGS94 model. + Altitude is the distance to the WGS94 ellipsoid. + Check results here http://www.oc.nps.edu/oc2902w/coord/llhxyz.htm + + """ + lat = math.radians(lat) + lon = math.radians(lon) + cos_lat = math.cos(lat) + sin_lat = math.sin(lat) + L = 1.0 / math.sqrt(WGS84_a_SQ * cos_lat**2 + WGS84_b_SQ * sin_lat**2) + K = WGS84_a_SQ * L * cos_lat + x = K * math.cos(lon) + y = K * math.sin(lon) + z = WGS84_b_SQ * L * sin_lat + return x, y, z + + +def _interpolate_segment(start: Point, end: Point, t: float) -> Point: + try: + weight = (t - start.time) / (end.time - start.time) + except ZeroDivisionError: + weight = 0.0 + + lat = start.lat + (end.lat - start.lat) * weight + lon = start.lon + (end.lon - start.lon) * weight + angle = compute_bearing((start.lat, start.lon), (end.lat, end.lon)) + alt: float | None + if start.alt is not None and end.alt is not None: + alt = start.alt + (end.alt - start.alt) * weight + else: + alt = None + + return Point(time=t, lat=lat, lon=lon, alt=alt, angle=angle) + + +def _interpolate_at_segment_idx(points: T.Sequence[Point], t: float, idx: int) -> Point: + """ + Interpolate time t along the segment between idx - 1 and idx. + If idx is out of range, extrapolate it to the nearest segment (first or last). + """ + + if len(points) == 1: + start, end = points[0], points[0] + elif 2 <= len(points): + if 0 < idx < len(points): + # Normal interpolation within the range + start, end = points[idx - 1], points[idx] + elif idx <= 0: + # Extrapolating before the first point + start, end = points[0], points[1] + else: + # Extrapolating after the last point + assert len(points) <= idx + start, end = points[-2], points[-1] + else: + assert False, "expect non-empty points" + + return _interpolate_segment(start, end, t) diff --git a/mapillary_tools/geotag/blackvue_parser.py b/mapillary_tools/geotag/blackvue_parser.py index 99fc92ba3..cddd75e60 100644 --- a/mapillary_tools/geotag/blackvue_parser.py +++ b/mapillary_tools/geotag/blackvue_parser.py @@ -1,6 +1,5 @@ import json import logging -import pathlib import re import typing as T @@ -106,13 +105,3 @@ def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]: p.time = (p.time - first_point_time) / 1000 return points - - -def parse_gps_points(path: pathlib.Path) -> T.List[geo.Point]: - with path.open("rb") as fp: - points = extract_points(fp) - - if points is None: - return [] - - return points diff --git a/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py b/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py index c03f0c07e..f89b16255 100644 --- a/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py @@ -8,8 +8,9 @@ from .. import exceptions, exiftool_read, geo, types, utils from ..exiftool_read_video import ExifToolReadVideo +from ..gpmf import gpmf_gps_filter from ..telemetry import GPSPoint -from . import gpmf_gps_filter, utils as video_utils +from . import utils as video_utils from .geotag_from_generic import GeotagVideosFromGeneric LOG = logging.getLogger(__name__) @@ -57,7 +58,9 @@ def geotag_video(element: ET.Element) -> types.VideoMetadataOrError: raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") stationary = video_utils.is_video_stationary( - geo.get_max_distance_from_start([(p.lat, p.lon) for p in points]) + video_utils.get_max_distance_from_start( + [(p.lat, p.lon) for p in points] + ) ) if stationary: diff --git a/mapillary_tools/geotag/geotag_videos_from_video.py b/mapillary_tools/geotag/geotag_videos_from_video.py index 328ddc021..7d3a78df3 100644 --- a/mapillary_tools/geotag/geotag_videos_from_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_video.py @@ -8,9 +8,10 @@ from .. import exceptions, geo, types, utils from ..camm import camm_parser +from ..gpmf import gpmf_gps_filter, gpmf_parser from ..mp4 import simple_mp4_parser as sparser from ..telemetry import GPSPoint -from . import blackvue_parser, gpmf_gps_filter, gpmf_parser, utils as video_utils +from . import blackvue_parser, utils as video_utils from .geotag_from_generic import GeotagVideosFromGeneric LOG = logging.getLogger(__name__) @@ -168,7 +169,7 @@ def geotag_video( raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") stationary = video_utils.is_video_stationary( - geo.get_max_distance_from_start( + video_utils.get_max_distance_from_start( [(p.lat, p.lon) for p in video_metadata.points] ) ) diff --git a/mapillary_tools/geotag/utils.py b/mapillary_tools/geotag/utils.py index e70f44f05..7f2abdeb5 100644 --- a/mapillary_tools/geotag/utils.py +++ b/mapillary_tools/geotag/utils.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import typing as T @@ -12,6 +14,17 @@ def is_video_stationary(max_distance_from_start: float) -> bool: return max_distance_from_start < radius_threshold +def get_max_distance_from_start(latlons: T.Sequence[tuple[float, float]]) -> float: + """ + Returns the radius of an entire GPS track. Used to calculate whether or not the entire sequence was just stationary video + Takes a sequence of points as input + """ + if not latlons: + return 0 + start = latlons[0] + return max(geo.gps_distance(start, latlon) for latlon in latlons) + + def convert_points_to_gpx_segment(points: T.Sequence[geo.Point]): gpx_segment = gpxpy.gpx.GPXTrackSegment() for point in points: @@ -20,7 +33,7 @@ def convert_points_to_gpx_segment(points: T.Sequence[geo.Point]): point.lat, point.lon, elevation=point.alt, - time=datetime.datetime.utcfromtimestamp(point.time), + time=datetime.datetime.fromtimestamp(point.time, datetime.timezone.utc), ) ) return gpx_segment diff --git a/mapillary_tools/geotag/gpmf_gps_filter.py b/mapillary_tools/gpmf/gpmf_gps_filter.py similarity index 100% rename from mapillary_tools/geotag/gpmf_gps_filter.py rename to mapillary_tools/gpmf/gpmf_gps_filter.py diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/gpmf/gpmf_parser.py similarity index 97% rename from mapillary_tools/geotag/gpmf_parser.py rename to mapillary_tools/gpmf/gpmf_parser.py index 6b1bdd7d1..21abd5f98 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/gpmf/gpmf_parser.py @@ -1,4 +1,5 @@ from __future__ import annotations + import dataclasses import datetime import io @@ -9,7 +10,6 @@ from .. import telemetry from ..mp4.mp4_sample_parser import MovieBoxParser, Sample, TrackBoxParser -from ..telemetry import GPSFix, GPSPoint """ Parsing GPS from GPMF data format stored in GoPros. See the GPMF spec: https://github.com/gopro/gpmf-parser @@ -133,7 +133,7 @@ class KLVDict(T.TypedDict): class GoProInfo: # None indicates the data has been extracted, # while [] indicates extracetd but no data point found - gps: list[GPSPoint] | None = None + gps: list[telemetry.GPSPoint] | None = None accl: list[telemetry.AccelerationData] | None = None gyro: list[telemetry.GyroscopeData] | None = None magn: list[telemetry.MagnetometerData] | None = None @@ -154,7 +154,7 @@ def extract_gopro_info( gpmd_samples = _filter_gpmd_samples(track) if telemetry_only: - points_by_dvid: dict[int, list[GPSPoint]] | None = None + points_by_dvid: dict[int, list[telemetry.GPSPoint]] | None = None dvnm_by_dvid: dict[int, bytes] | None = None accls_by_dvid: dict[int, list[telemetry.AccelerationData]] | None = {} gyros_by_dvid: dict[int, list[telemetry.GyroscopeData]] | None = {} @@ -267,7 +267,7 @@ def _gps5_timestamp_to_epoch_time(dtstr: str): # ] def _gps5_from_stream( stream: T.Sequence[KLVDict], -) -> T.Generator[GPSPoint, None, None]: +) -> T.Generator[telemetry.GPSPoint, None, None]: indexed: T.Dict[bytes, T.List[T.List[T.Any]]] = { klv["key"]: klv["data"] for klv in stream } @@ -285,7 +285,7 @@ def _gps5_from_stream( gpsf = indexed.get(b"GPSF") if gpsf is not None: - gpsf_value = GPSFix(gpsf[0][0]) + gpsf_value = telemetry.GPSFix(gpsf[0][0]) else: gpsf_value = None @@ -309,7 +309,7 @@ def _gps5_from_stream( lat, lon, alt, ground_speed, _speed_3d = [ v / s for v, s in zip(point, scal_values) ] - yield GPSPoint( + yield telemetry.GPSPoint( # will figure out the actual timestamp later time=0, lat=lat, @@ -351,7 +351,7 @@ def _get_gps_type(input) -> bytes: def _gps9_from_stream( stream: T.Sequence[KLVDict], -) -> T.Generator[GPSPoint, None, None]: +) -> T.Generator[telemetry.GPSPoint, None, None]: NUM_VALUES = 9 indexed: T.Dict[bytes, T.List[T.List[T.Any]]] = { @@ -406,14 +406,14 @@ def _gps9_from_stream( epoch_time = _gps9_timestamp_to_epoch_time(days_since_2000, secs_since_midnight) - yield GPSPoint( + yield telemetry.GPSPoint( # will figure out the actual timestamp later time=0, lat=lat, lon=lon, alt=alt, epoch_time=epoch_time, - fix=GPSFix(gps_fix), + fix=telemetry.GPSFix(gps_fix), precision=dop * 100, ground_speed=speed_2d, angle=None, @@ -436,8 +436,8 @@ def _find_first_device_id(stream: T.Sequence[KLVDict]) -> int: return device_id -def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[GPSPoint]: - sample_points: T.List[GPSPoint] = [] +def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[telemetry.GPSPoint]: + sample_points: T.List[telemetry.GPSPoint] = [] for klv in stream: if klv["key"] == b"STRM": @@ -564,7 +564,7 @@ def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes): return values -def _backfill_gps_timestamps(gps_points: T.Iterable[GPSPoint]) -> None: +def _backfill_gps_timestamps(gps_points: T.Iterable[telemetry.GPSPoint]) -> None: it = iter(gps_points) # find the first point with epoch time @@ -590,7 +590,7 @@ def _backfill_gps_timestamps(gps_points: T.Iterable[GPSPoint]) -> None: def _load_telemetry_from_samples( fp: T.BinaryIO, samples: T.Iterable[Sample], - points_by_dvid: dict[int, list[GPSPoint]] | None = None, + points_by_dvid: dict[int, list[telemetry.GPSPoint]] | None = None, accls_by_dvid: dict[int, list[telemetry.AccelerationData]] | None = None, gyros_by_dvid: dict[int, list[telemetry.GyroscopeData]] | None = None, magns_by_dvid: dict[int, list[telemetry.MagnetometerData]] | None = None, diff --git a/mapillary_tools/geotag/gps_filter.py b/mapillary_tools/gpmf/gps_filter.py similarity index 100% rename from mapillary_tools/geotag/gps_filter.py rename to mapillary_tools/gpmf/gps_filter.py diff --git a/mapillary_tools/mp4/mp4_sample_parser.py b/mapillary_tools/mp4/mp4_sample_parser.py index 6f5afdd46..7cff77899 100644 --- a/mapillary_tools/mp4/mp4_sample_parser.py +++ b/mapillary_tools/mp4/mp4_sample_parser.py @@ -278,12 +278,12 @@ def __init__(self, moov_data: bytes): def parse_file(cls, video_path: Path) -> "MovieBoxParser": with video_path.open("rb") as fp: moov = sparser.parse_box_data_firstx(fp, [b"moov"]) - return MovieBoxParser(moov) + return cls(moov) @classmethod def parse_stream(cls, stream: T.BinaryIO) -> "MovieBoxParser": moov = sparser.parse_box_data_firstx(stream, [b"moov"]) - return MovieBoxParser(moov) + return cls(moov) def extract_mvhd_boxdata(self) -> T.Dict: mvhd = cparser.find_box_at_pathx(self.moov_children, [b"mvhd"]) @@ -312,7 +312,7 @@ def extract_track_at(self, stream_idx: int) -> TrackBoxParser: return TrackBoxParser(trak_children) -_DT_1904 = datetime.datetime.utcfromtimestamp(0).replace(year=1904) +_DT_1904 = datetime.datetime.fromtimestamp(0, datetime.timezone.utc).replace(year=1904) def to_datetime(seconds_since_1904: int) -> datetime.datetime: diff --git a/mapillary_tools/mp4/simple_mp4_parser.py b/mapillary_tools/mp4/simple_mp4_parser.py index 6575eb88c..8f1083397 100644 --- a/mapillary_tools/mp4/simple_mp4_parser.py +++ b/mapillary_tools/mp4/simple_mp4_parser.py @@ -187,16 +187,6 @@ def _parse_path_first( return None -def parse_box_path_firstx( - stream: T.BinaryIO, path: T.List[bytes], maxsize: int = -1 -) -> T.Tuple[Header, T.BinaryIO]: - # depth=1 will disable EoF extension - parsed = _parse_path_first(stream, path, maxsize=maxsize, depth=1) - if parsed is None: - raise BoxNotFoundError(f"unable find box at path {path}") - return parsed - - def parse_mp4_data_first( stream: T.BinaryIO, path: T.List[bytes], maxsize: int = -1 ) -> T.Optional[bytes]: diff --git a/mapillary_tools/process_geotag_properties.py b/mapillary_tools/process_geotag_properties.py index 698e8889e..4029eadaf 100644 --- a/mapillary_tools/process_geotag_properties.py +++ b/mapillary_tools/process_geotag_properties.py @@ -324,7 +324,7 @@ def _overwrite_exif_tags( unit="images", disable=LOG.getEffectiveLevel() <= logging.DEBUG, ): - dt = datetime.datetime.utcfromtimestamp(metadata.time) + dt = datetime.datetime.fromtimestamp(metadata.time, datetime.timezone.utc) dt = dt.replace(tzinfo=datetime.timezone.utc) try: diff --git a/mapillary_tools/process_sequence_properties.py b/mapillary_tools/process_sequence_properties.py index 0b560805e..1faac48df 100644 --- a/mapillary_tools/process_sequence_properties.py +++ b/mapillary_tools/process_sequence_properties.py @@ -10,8 +10,7 @@ SeqItem = T.TypeVar("SeqItem") -PointLike = T.TypeVar("PointLike", bound=geo.Point) -PointSequence = T.List[PointLike] +PointSequence = T.List[geo.PointLike] def split_sequence_by( @@ -211,7 +210,7 @@ def _parse_pixels(pixels_str: str) -> int: ) -def _avg_speed(sequence: T.Sequence[PointLike]) -> float: +def _avg_speed(sequence: T.Sequence[geo.PointLike]) -> float: total_distance = 0.0 for cur, nxt in geo.pairwise(sequence): total_distance += geo.gps_distance( diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index 738ee556a..6447f6470 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -453,7 +453,7 @@ def validate_video_desc(desc: T.Any) -> None: def datetime_to_map_capture_time(time: T.Union[datetime.datetime, int, float]) -> str: if isinstance(time, (float, int)): - dt = datetime.datetime.utcfromtimestamp(time) + dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc) # otherwise it will be assumed to be in local time dt = dt.replace(tzinfo=datetime.timezone.utc) else: diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index b83d1db7e..918fecdb5 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -26,7 +26,7 @@ VERSION, ) from .camm import camm_builder -from .geotag import gpmf_parser +from .gpmf import gpmf_parser from .mp4 import simple_mp4_builder from .types import FileType diff --git a/mapillary_tools/video_data_extraction/extract_video_data.py b/mapillary_tools/video_data_extraction/extract_video_data.py index 1d90c400c..97f1f16ab 100644 --- a/mapillary_tools/video_data_extraction/extract_video_data.py +++ b/mapillary_tools/video_data_extraction/extract_video_data.py @@ -6,7 +6,8 @@ import tqdm from .. import exceptions, geo, utils -from ..geotag import gpmf_gps_filter, utils as video_utils +from ..geotag import utils as video_utils +from ..gpmf import gpmf_gps_filter from ..telemetry import GPSPoint from ..types import ( ErrorMetadata, @@ -167,7 +168,7 @@ def _sanitize_points(points: T.Sequence[geo.Point]) -> T.Sequence[geo.Point]: raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") stationary = video_utils.is_video_stationary( - geo.get_max_distance_from_start([(p.lat, p.lon) for p in points]) + video_utils.get_max_distance_from_start([(p.lat, p.lon) for p in points]) ) if stationary: diff --git a/mapillary_tools/video_data_extraction/extractors/gopro_parser.py b/mapillary_tools/video_data_extraction/extractors/gopro_parser.py index b69e7a111..9edfe2cd0 100644 --- a/mapillary_tools/video_data_extraction/extractors/gopro_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/gopro_parser.py @@ -1,8 +1,9 @@ from __future__ import annotations + import typing as T from ... import geo -from ...geotag import gpmf_parser +from ...gpmf import gpmf_parser from ...mp4 import simple_mp4_parser as sparser from .base_parser import BaseParser diff --git a/setup.py b/setup.py index c2c200cdc..198adbd09 100644 --- a/setup.py +++ b/setup.py @@ -47,6 +47,7 @@ def readme(): "mapillary_tools.camm", "mapillary_tools.commands", "mapillary_tools.geotag", + "mapillary_tools.gpmf", "mapillary_tools.mp4", "mapillary_tools.video_data_extraction", "mapillary_tools.video_data_extraction.extractors", diff --git a/tests/cli/gpmf_parser.py b/tests/cli/gpmf_parser.py index f621fc76e..1cbd11b16 100644 --- a/tests/cli/gpmf_parser.py +++ b/tests/cli/gpmf_parser.py @@ -10,8 +10,8 @@ import gpxpy.gpx import mapillary_tools.geo as geo -import mapillary_tools.geotag.gpmf_parser as gpmf_parser -import mapillary_tools.geotag.gps_filter as gps_filter +import mapillary_tools.gpmf.gpmf_parser as gpmf_parser +import mapillary_tools.gpmf.gps_filter as gps_filter import mapillary_tools.telemetry as telemetry import mapillary_tools.utils as utils from mapillary_tools.mp4 import mp4_sample_parser @@ -55,7 +55,7 @@ def _convert_points_to_gpx_track_segment( point.lat, point.lon, elevation=point.alt, - time=datetime.datetime.utcfromtimestamp(epoch_time), + time=datetime.datetime.fromtimestamp(epoch_time, datetime.timezone.utc), position_dilution=point.precision, comment=comment, ) diff --git a/tests/cli/gps_filter.py b/tests/cli/gps_filter.py index 96d0baddd..47c132e48 100644 --- a/tests/cli/gps_filter.py +++ b/tests/cli/gps_filter.py @@ -7,7 +7,7 @@ import gpxpy from mapillary_tools import constants, geo, telemetry -from mapillary_tools.geotag import gps_filter +from mapillary_tools.gpmf import gps_filter from .gpmf_parser import _convert_points_to_gpx_track_segment diff --git a/tests/cli/upload_api_v4.py b/tests/cli/upload_api_v4.py index beeaf5250..b1ec80d75 100644 --- a/tests/cli/upload_api_v4.py +++ b/tests/cli/upload_api_v4.py @@ -19,7 +19,7 @@ def wrap_http_exception(ex: requests.HTTPError): lines = [ f"{ex.request.method} {resp.url}", f"> HTTP Status: {ex.response.status_code}", - f"{resp.content}", + f"{resp.content!r}", ] return Exception("\n".join(lines)) diff --git a/tests/integration/test_history.py b/tests/integration/test_history.py index 0dbf0e4f8..0b7060fab 100644 --- a/tests/integration/test_history.py +++ b/tests/integration/test_history.py @@ -3,13 +3,7 @@ import py.path import pytest -from .fixtures import ( - EXECUTABLE, - setup_config, - setup_data, - setup_upload, - USERNAME, -) +from .fixtures import EXECUTABLE, setup_config, setup_data, setup_upload, USERNAME UPLOAD_FLAGS = f"--dry_run --user_name={USERNAME}" diff --git a/tests/integration/test_process_and_upload.py b/tests/integration/test_process_and_upload.py index edbea8685..1e8b97bd0 100644 --- a/tests/integration/test_process_and_upload.py +++ b/tests/integration/test_process_and_upload.py @@ -259,7 +259,7 @@ def test_video_process_and_upload( @pytest.mark.usefixtures("setup_config") -def xtest_video_process_and_upload_after_gpx( +def test_video_process_and_upload_after_gpx( setup_upload: py.path.local, setup_data: py.path.local ): if not IS_FFMPEG_INSTALLED: diff --git a/tests/unit/test_geo.py b/tests/unit/test_geo.py index 66c61bba2..f3f1978f6 100644 --- a/tests/unit/test_geo.py +++ b/tests/unit/test_geo.py @@ -2,6 +2,7 @@ import datetime import random import typing as T +import unittest from mapillary_tools import geo from mapillary_tools.geo import Point @@ -29,6 +30,15 @@ def test_interpolate_compare(): ) +def test_interpolate_empty_list(): + try: + geo.interpolate([], 1.5) + except ValueError: + pass + else: + assert False, "should raise ValueError" + + def test_interpolate(): points = [ Point(lon=1, lat=1, time=1, alt=1, angle=None), @@ -201,7 +211,7 @@ class _Point(Point): def test_timestamp(): - t = datetime.datetime.utcfromtimestamp(123) + t = datetime.datetime.fromtimestamp(123, datetime.timezone.utc) t = t.replace(tzinfo=datetime.timezone.utc) assert geo.as_unix_time(t) == 123 @@ -263,11 +273,12 @@ def test_distance(): def test_compute_bearing(): - assert 0 == geo.compute_bearing(0, 0, 0, 0) - assert 0 == geo.compute_bearing(0, 0, 1, 0) - assert 90 == geo.compute_bearing(0, 0, 0, 1) - assert 180 == geo.compute_bearing(0, 0, -1, 0) - assert 270 == geo.compute_bearing(0, 0, 0, -1) + assert 0 == geo.compute_bearing((1, 1), (1, 1)) + assert 0 == geo.compute_bearing((0, 0), (0, 0)) + assert 0 == geo.compute_bearing((0, 0), (1, 0)) + assert 90 == geo.compute_bearing((0, 0), (0, 1)) + assert 180 == geo.compute_bearing((0, 0), (-1, 0)) + assert 270 == geo.compute_bearing((0, 0), (0, -1)) def test_interpolate_directions_if_none(): @@ -280,3 +291,466 @@ def test_interpolate_directions_if_none(): ] geo.interpolate_directions_if_none(points) assert [90, 1, 2, 180, 180] == [p.angle for p in points] + + +class TestInterpolator(unittest.TestCase): + """Test cases for the Interpolator class, focusing on tricky edge cases.""" + + def setUp(self): + """Set up test data for interpolation tests.""" + + # Helper function to create a series of points + def create_track( + start_time, + points_count, + time_step, + start_lat, + start_lon, + lat_step, + lon_step, + alt=None, + angle=None, + ): + track = [] + for i in range(points_count): + time = start_time + i * time_step + lat = start_lat + i * lat_step + lon = start_lon + i * lon_step + alt_val = None if alt is None else alt + i * 10 + angle_val = None if angle is None else (angle + i * 5) % 360 + track.append( + Point(time=time, lat=lat, lon=lon, alt=alt_val, angle=angle_val) + ) + return track + + # Regular track + self.regular_track = create_track( + start_time=1000.0, + points_count=5, + time_step=100.0, + start_lat=10.0, + start_lon=20.0, + lat_step=0.1, + lon_step=0.1, + alt=100, + angle=45, + ) + + # Track with very close timestamps (nearly identical) + self.close_timestamps_track = [ + Point(time=2000.0, lat=30.0, lon=40.0, alt=200, angle=90), + Point(time=2000.000001, lat=30.1, lon=40.1, alt=210, angle=95), + Point(time=2000.000002, lat=30.2, lon=40.2, alt=220, angle=100), + ] + + # Track with identical timestamps + self.identical_timestamps_track = [ + Point(time=3000.0, lat=50.0, lon=60.0, alt=300, angle=180), + Point(time=3000.0, lat=50.1, lon=60.1, alt=310, angle=185), + Point(time=3100.0, lat=50.2, lon=60.2, alt=320, angle=190), + ] + + # Track with large time gaps + self.large_gaps_track = [ + Point(time=4000.0, lat=70.0, lon=80.0, alt=400, angle=270), + Point(time=14000.0, lat=70.1, lon=80.1, alt=410, angle=275), + Point(time=24000.0, lat=70.2, lon=80.2, alt=420, angle=280), + ] + + # Track crossing the antimeridian (longitude wrapping around 180/-180) + self.antimeridian_track = [ + Point(time=5000.0, lat=0.0, lon=179.9, alt=500, angle=0), + Point(time=5100.0, lat=0.1, lon=-179.9, alt=510, angle=5), + Point(time=5200.0, lat=0.2, lon=-179.8, alt=520, angle=10), + ] + + # Track crossing the poles (extreme latitudes) + self.polar_track = [ + Point(time=6000.0, lat=89.0, lon=0.0, alt=600, angle=0), + Point(time=6100.0, lat=90.0, lon=0.0, alt=610, angle=0), + Point(time=6200.0, lat=89.0, lon=180.0, alt=620, angle=180), + ] + + # Multiple tracks with time gaps between them + self.track_1 = create_track( + start_time=7000.0, + points_count=3, + time_step=100.0, + start_lat=10.0, + start_lon=20.0, + lat_step=0.1, + lon_step=0.1, + ) + + self.track_2 = create_track( + start_time=7500.0, + points_count=3, + time_step=100.0, + start_lat=11.0, + start_lon=21.0, + lat_step=0.1, + lon_step=0.1, + ) + + self.track_3 = create_track( + start_time=8000.0, + points_count=3, + time_step=100.0, + start_lat=12.0, + start_lon=22.0, + lat_step=0.1, + lon_step=0.1, + ) + + # Track with None values for alt and angle + self.none_values_track = [ + Point(time=9000.0, lat=80.0, lon=90.0, alt=None, angle=None), + Point(time=9100.0, lat=80.1, lon=90.1, alt=None, angle=None), + Point(time=9200.0, lat=80.2, lon=90.2, alt=None, angle=None), + ] + + # Mixed track: some points with alt/angle, some without + self.mixed_values_track = [ + Point(time=10000.0, lat=85.0, lon=95.0, alt=None, angle=45), + Point(time=10100.0, lat=85.1, lon=95.1, alt=800, angle=None), + Point(time=10200.0, lat=85.2, lon=95.2, alt=810, angle=50), + ] + + def test_basic_interpolation(self): + """Test basic interpolation within a track.""" + interpolator = geo.Interpolator([self.regular_track]) + + # Interpolate in the middle of a segment + point = interpolator.interpolate(1050.0) + self.assertEqual(point.time, 1050.0) + self.assertAlmostEqual(point.lat, 10.05) + self.assertAlmostEqual(point.lon, 20.05) + self.assertAlmostEqual(point.alt, 105) + + # Interpolate exactly at a point + point = interpolator.interpolate(1100.0) + self.assertEqual(point.time, 1100.0) + self.assertAlmostEqual(point.lat, 10.1) + self.assertAlmostEqual(point.lon, 20.1) + self.assertAlmostEqual(point.alt, 110) + + def test_extrapolation_before_track(self): + """Test extrapolation before the start of a track.""" + interpolator = geo.Interpolator([self.regular_track]) + + # Extrapolate before first point + point = interpolator.interpolate(900.0) + self.assertEqual(point.time, 900.0) + self.assertAlmostEqual(point.lat, 9.9) + self.assertAlmostEqual(point.lon, 19.9) + self.assertAlmostEqual(point.alt, 90) + + def test_extrapolation_after_track(self): + """Test extrapolation after the end of a track.""" + interpolator = geo.Interpolator([self.regular_track]) + + # Extrapolate after last point + point = interpolator.interpolate(1500.0) + self.assertEqual(point.time, 1500.0) + self.assertAlmostEqual(point.lat, 10.5) + self.assertAlmostEqual(point.lon, 20.5) + self.assertAlmostEqual(point.alt, 150) + + def test_close_timestamps(self): + """Test interpolation between points with very close timestamps.""" + interpolator = geo.Interpolator([self.close_timestamps_track]) + + # Interpolate between very close timestamps + point = interpolator.interpolate(2000.0000015) + self.assertEqual(point.time, 2000.0000015) + # Should be halfway between point 1 and 2 + self.assertAlmostEqual(point.lat, 30.15) + self.assertAlmostEqual(point.lon, 40.15) + self.assertAlmostEqual(point.alt, 215) + + def test_identical_timestamps(self): + """Test interpolation with points having identical timestamps.""" + interpolator = geo.Interpolator([self.identical_timestamps_track]) + + # Interpolate at time matching multiple points + point = interpolator.interpolate(3000.0) + self.assertEqual(point.time, 3000.0) + # Should pick the first point with that time + self.assertAlmostEqual(point.lat, 50.0) + self.assertAlmostEqual(point.lon, 60.0) + self.assertAlmostEqual(point.alt, 300) + + # Interpolate between identical and unique timestamps + point = interpolator.interpolate(3050.0) + self.assertEqual(point.time, 3050.0) + # Should interpolate between the last point with identical timestamp and the next point + self.assertAlmostEqual(point.lat, 50.15) + self.assertAlmostEqual(point.lon, 60.15) + self.assertAlmostEqual(point.alt, 315) + + def test_large_time_gaps(self): + """Test interpolation across large time gaps.""" + interpolator = geo.Interpolator([self.large_gaps_track]) + + # Interpolate in a large time gap + point = interpolator.interpolate(9000.0) + self.assertEqual(point.time, 9000.0) + # Should interpolate linearly despite the large gap + self.assertAlmostEqual(point.lat, 70.05) + self.assertAlmostEqual(point.lon, 80.05) + self.assertAlmostEqual(point.alt, 405) + + def test_antimeridian_crossing(self): + """Test interpolation across the antimeridian (180/-180 longitude).""" + interpolator = geo.Interpolator([self.antimeridian_track]) + + # Interpolate across the antimeridian + point = interpolator.interpolate(5050.0) + self.assertEqual(point.time, 5050.0) + self.assertAlmostEqual(point.lat, 0.05) + # This is tricky - we need to check if the angle calculation is correct + # The bearing should adjust correctly for the antimeridian crossing + + def test_polar_region(self): + """Test interpolation near the poles.""" + interpolator = geo.Interpolator([self.polar_track]) + + # Interpolate near the poles + point = interpolator.interpolate(6150.0) + self.assertEqual(point.time, 6150.0) + self.assertAlmostEqual(point.lat, 89.5) + # Near the poles, longitude values can change rapidly for small movements + + def test_multiple_tracks(self): + """Test interpolation across multiple tracks.""" + interpolator = geo.Interpolator([self.track_1, self.track_2, self.track_3]) + + # Interpolate within first track + point = interpolator.interpolate(7050.0) + self.assertEqual(point.time, 7050.0) + self.assertAlmostEqual(point.lat, 10.05) + self.assertAlmostEqual(point.lon, 20.05) + + # Interpolate in gap between tracks (should use the appropriate tracks) + point = interpolator.interpolate(7400.0) + self.assertEqual(point.time, 7400.0) + # Should extrapolate from track_1 + + # Interpolate within second track + point = interpolator.interpolate(7550.0) + self.assertEqual(point.time, 7550.0) + self.assertAlmostEqual(point.lat, 11.05) + self.assertAlmostEqual(point.lon, 21.05) + + # Interpolate in gap between tracks again + point = interpolator.interpolate(7900.0) + self.assertEqual(point.time, 7900.0) + # Should extrapolate from track_2 + + # Interpolate within third track + point = interpolator.interpolate(8050.0) + self.assertEqual(point.time, 8050.0) + self.assertAlmostEqual(point.lat, 12.05) + self.assertAlmostEqual(point.lon, 22.05) + + # Interpolate after all tracks + point = interpolator.interpolate(8500.0) + self.assertEqual(point.time, 8500.0) + # Should extrapolate from track_3 + + def test_sequence_of_calls(self): + """Test a sequence of interpolation calls in different orders.""" + interpolator = geo.Interpolator([self.track_1, self.track_2, self.track_3]) + + # Sequential calls with increasing time + point1 = interpolator.interpolate(7100.0) + point2 = interpolator.interpolate(7200.0) + point3 = interpolator.interpolate(7600.0) + point4 = interpolator.interpolate(8100.0) + + # All points should be correctly interpolated + self.assertAlmostEqual(point1.lat, 10.1) + self.assertAlmostEqual(point2.lat, 10.2) + self.assertAlmostEqual(point3.lat, 11.1) + self.assertAlmostEqual(point4.lat, 12.1) + + def test_non_monotonic_times(self): + """Test that the interpolator raises on non-monotonic times.""" + interpolator = geo.Interpolator([self.regular_track]) + + # First call should work + interpolator.interpolate(1100.0) + + # Second call with earlier time should fail + with self.assertRaises(ValueError): + interpolator.interpolate(1050.0) + + def test_none_values(self): + """Test interpolation with None values for alt and angle.""" + interpolator = geo.Interpolator([self.none_values_track]) + + # Interpolate with None values + point = interpolator.interpolate(9050.0) + self.assertEqual(point.time, 9050.0) + self.assertAlmostEqual(point.lat, 80.05) + self.assertAlmostEqual(point.lon, 90.05) + self.assertIsNone(point.alt) + # Angle should be calculated even if the original points have None angles + + def test_mixed_none_values(self): + """Test interpolation with mixed None and non-None values.""" + interpolator = geo.Interpolator([self.mixed_values_track]) + + # Interpolate between None and non-None values + point = interpolator.interpolate(10050.0) + self.assertEqual(point.time, 10050.0) + self.assertAlmostEqual(point.lat, 85.05) + self.assertAlmostEqual(point.lon, 95.05) + # Alt should be None because one of the endpoints has None + self.assertIsNone(point.alt) + + def test_empty_tracks(self): + """Test with empty track list (should raise).""" + with self.assertRaises(ValueError): + geo.Interpolator([]) + + def test_single_point_track(self): + """Test interpolation with a track containing only one point.""" + single_point_track = [ + Point(time=11000.0, lat=90.0, lon=100.0, alt=900, angle=0) + ] + interpolator = geo.Interpolator([single_point_track]) + + # Interpolate before the point (should use the only point) + point = interpolator.interpolate(10900.0) + self.assertEqual(point.time, 10900.0) + self.assertAlmostEqual(point.lat, 90.0) + self.assertAlmostEqual(point.lon, 100.0) + self.assertAlmostEqual(point.alt, 900) + + # Interpolate at the exact time + point = interpolator.interpolate(11000.0) + self.assertEqual(point.time, 11000.0) + self.assertAlmostEqual(point.lat, 90.0) + self.assertAlmostEqual(point.lon, 100.0) + self.assertAlmostEqual(point.alt, 900) + + # Interpolate after the point (should use the only point) + point = interpolator.interpolate(11100.0) + self.assertEqual(point.time, 11100.0) + self.assertAlmostEqual(point.lat, 90.0) + self.assertAlmostEqual(point.lon, 100.0) + self.assertAlmostEqual(point.alt, 900) + + def test_out_of_order_tracks(self): + """Test with tracks provided in non-chronological order.""" + # Create tracks in the wrong order + interpolator = geo.Interpolator([self.track_3, self.track_1, self.track_2]) + + # Should interpolate correctly despite the initial order + point = interpolator.interpolate(7050.0) + self.assertEqual(point.time, 7050.0) + self.assertAlmostEqual(point.lat, 10.05) + self.assertAlmostEqual(point.lon, 20.05) + + def test_bisect_optimization(self): + """Test that the bisect optimization works correctly.""" + # Create a long track to test bisect optimization + long_track = [] + for i in range(1000): + long_track.append( + Point( + time=12000.0 + i, + lat=0.0 + i * 0.001, + lon=0.0 + i * 0.001, + alt=0.0 + i, + angle=0.0, + ) + ) + + interpolator = geo.Interpolator([long_track]) + + # Interpolate at various points and ensure accuracy + point1 = interpolator.interpolate(12100.5) + self.assertAlmostEqual(point1.lat, 0.1005) + + point2 = interpolator.interpolate(12500.5) + self.assertAlmostEqual(point2.lat, 0.5005) + + point3 = interpolator.interpolate(12900.5) + self.assertAlmostEqual(point3.lat, 0.9005) + + def test_overlapping_tracks(self): + """Test with overlapping tracks in time.""" + # Create overlapping tracks + track_overlap_1 = [ + Point(time=13000.0, lat=10.0, lon=20.0, alt=100, angle=0), + Point(time=13100.0, lat=10.1, lon=20.1, alt=110, angle=10), + Point(time=13200.0, lat=10.2, lon=20.2, alt=120, angle=20), + ] + + track_overlap_2 = [ + Point(time=13150.0, lat=11.0, lon=21.0, alt=150, angle=30), + Point(time=13250.0, lat=11.1, lon=21.1, alt=160, angle=40), + Point(time=13350.0, lat=11.2, lon=21.2, alt=170, angle=50), + ] + + interpolator = geo.Interpolator([track_overlap_1, track_overlap_2]) + + # Test point in first track before overlap + point = interpolator.interpolate(13050.0) + self.assertEqual(point.time, 13050.0) + self.assertAlmostEqual(point.lat, 10.05) + + # Test point in overlap region (should use first track) + point = interpolator.interpolate(13175.0) + self.assertEqual(point.time, 13175.0) + self.assertAlmostEqual(point.lat, 10.175) # From track_overlap_1 + + # Test point in second track after overlap + point = interpolator.interpolate(13300.0) + self.assertEqual(point.time, 13300.0) + self.assertAlmostEqual(point.lat, 11.15) # From track_overlap_2 + + def test_extreme_value_tracks(self): + """Test with extreme timestamp values.""" + # Create track with very large timestamps + large_time_track = [ + Point(time=1e12, lat=1.0, lon=1.0, alt=100, angle=0), + Point(time=1e12 + 100, lat=1.1, lon=1.1, alt=110, angle=10), + ] + + # Create track with very small timestamps + small_time_track = [ + Point(time=1e-12, lat=2.0, lon=2.0, alt=200, angle=20), + Point(time=2e-12, lat=2.1, lon=2.1, alt=210, angle=30), + ] + + # Test large timestamps + interpolator = geo.Interpolator([large_time_track]) + point = interpolator.interpolate(1e12 + 50) + self.assertEqual(point.time, 1e12 + 50) + self.assertAlmostEqual(point.lat, 1.05) + + # Test small timestamps + interpolator = geo.Interpolator([small_time_track]) + point = interpolator.interpolate(1.5e-12) + self.assertEqual(point.time, 1.5e-12) + self.assertAlmostEqual(point.lat, 2.05) + + def test_negative_timestamps(self): + """Test with negative timestamp values.""" + negative_time_track = [ + Point(time=-1000.0, lat=3.0, lon=3.0, alt=300, angle=0), + Point(time=-900.0, lat=3.1, lon=3.1, alt=310, angle=10), + Point(time=-800.0, lat=3.2, lon=3.2, alt=320, angle=20), + ] + + interpolator = geo.Interpolator([negative_time_track]) + + # Interpolate at negative time + point = interpolator.interpolate(-950.0) + self.assertEqual(point.time, -950.0) + self.assertAlmostEqual(point.lat, 3.05) + self.assertAlmostEqual(point.lon, 3.05) + self.assertAlmostEqual(point.alt, 305) diff --git a/tests/unit/test_gpmf_parser.py b/tests/unit/test_gpmf_parser.py index 5d9511d36..93b3b6112 100644 --- a/tests/unit/test_gpmf_parser.py +++ b/tests/unit/test_gpmf_parser.py @@ -1,4 +1,4 @@ -from mapillary_tools.geotag import gpmf_parser +from mapillary_tools.gpmf import gpmf_parser def test_simple(): diff --git a/tests/unit/test_gps_filter.py b/tests/unit/test_gps_filter.py index e73dbadf9..bd7ea7483 100644 --- a/tests/unit/test_gps_filter.py +++ b/tests/unit/test_gps_filter.py @@ -1,5 +1,5 @@ import mapillary_tools.geo as geo -import mapillary_tools.geotag.gps_filter as gps_filter +import mapillary_tools.gpmf.gps_filter as gps_filter def test_upper_whisker():