From e28b37cc53f63e5248e8f0c1935e582f006cf9d8 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 1 Apr 2025 16:29:02 -0700 Subject: [PATCH 1/5] git rm -r mapillary_tools/video_data_extraction/ --- .../video_data_extraction/cli_options.py | 22 --- .../extract_video_data.py | 157 ------------------ .../extractors/base_parser.py | 75 --------- .../extractors/blackvue_parser.py | 49 ------ .../extractors/camm_parser.py | 62 ------- .../extractors/exiftool_runtime_parser.py | 74 --------- .../extractors/exiftool_xml_parser.py | 52 ------ .../extractors/generic_video_parser.py | 52 ------ .../extractors/gopro_parser.py | 58 ------- .../extractors/gpx_parser.py | 108 ------------ .../extractors/nmea_parser.py | 24 --- .../video_data_parser_factory.py | 39 ----- setup.py | 2 - 13 files changed, 774 deletions(-) delete mode 100644 mapillary_tools/video_data_extraction/cli_options.py delete mode 100644 mapillary_tools/video_data_extraction/extract_video_data.py delete mode 100644 mapillary_tools/video_data_extraction/extractors/base_parser.py delete mode 100644 mapillary_tools/video_data_extraction/extractors/blackvue_parser.py delete mode 100644 mapillary_tools/video_data_extraction/extractors/camm_parser.py delete mode 100644 mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py delete mode 100644 mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py delete mode 100644 mapillary_tools/video_data_extraction/extractors/generic_video_parser.py delete mode 100644 mapillary_tools/video_data_extraction/extractors/gopro_parser.py delete mode 100644 mapillary_tools/video_data_extraction/extractors/gpx_parser.py delete mode 100644 mapillary_tools/video_data_extraction/extractors/nmea_parser.py delete mode 100644 mapillary_tools/video_data_extraction/video_data_parser_factory.py diff --git a/mapillary_tools/video_data_extraction/cli_options.py b/mapillary_tools/video_data_extraction/cli_options.py deleted file mode 100644 index dbcd064fd..000000000 --- a/mapillary_tools/video_data_extraction/cli_options.py +++ /dev/null @@ -1,22 +0,0 @@ -import typing as T -from pathlib import Path - - -known_parser_options = ["source", "pattern", "exiftool_path"] - - -class CliParserOptions(T.TypedDict, total=False): - source: str - pattern: T.Optional[str] - exiftool_path: T.Optional[Path] - - -class CliOptions(T.TypedDict, total=False): - paths: T.Sequence[Path] - recursive: bool - geotag_sources_options: T.Sequence[CliParserOptions] - geotag_source_path: Path - exiftool_path: Path - num_processes: int - device_make: T.Optional[str] - device_model: T.Optional[str] diff --git a/mapillary_tools/video_data_extraction/extract_video_data.py b/mapillary_tools/video_data_extraction/extract_video_data.py deleted file mode 100644 index 6234096e5..000000000 --- a/mapillary_tools/video_data_extraction/extract_video_data.py +++ /dev/null @@ -1,157 +0,0 @@ -from __future__ import annotations - -import logging -import typing as T -from pathlib import Path - -import tqdm - -from .. import exceptions, geo, utils -from ..gpmf import gpmf_gps_filter -from ..telemetry import GPSPoint -from ..types import ErrorMetadata, FileType, VideoMetadata, VideoMetadataOrError -from . import video_data_parser_factory -from .cli_options import CliOptions -from .extractors.base_parser import BaseParser - - -LOG = logging.getLogger(__name__) - - -class VideoDataExtractor: - options: CliOptions - - def __init__(self, options: CliOptions) -> None: - self.options = options - - def process(self) -> T.List[VideoMetadataOrError]: - paths = self.options["paths"] - self._check_paths(paths) - video_files = utils.find_videos(paths) - self._check_sources_cardinality(video_files) - - map_results = utils.mp_map_maybe( - self.process_file, video_files, num_processes=self.options["num_processes"] - ) - - video_metadata_or_errors: list[VideoMetadataOrError] = list( - tqdm.tqdm( - map_results, - desc="Extracting GPS tracks", - unit="videos", - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - total=len(video_files), - ) - ) - - return video_metadata_or_errors - - def process_file(self, file: Path) -> VideoMetadataOrError: - parsers = video_data_parser_factory.make_parsers(file, self.options) - points: T.Sequence[geo.Point] = [] - make = self.options["device_make"] - model = self.options["device_model"] - - ex: T.Optional[Exception] - for parser in parsers: - log_vars = { - "filename": file, - "parser": parser.parser_label, - "source": parser.geotag_source_path, - } - try: - if not points: - points = self._extract_points(parser, log_vars) - if not model: - model = parser.extract_model() - if not make: - make = parser.extract_make() - except Exception as e: - ex = e - LOG.warning( - '%(filename)s: Exception for parser %(parser)s while processing source %(source)s: "%(e)s"', - {**log_vars, "e": e}, - ) - - # After trying all parsers, return the points if we found any, otherwise - # the last exception thrown or a default one. - # Note that if we have points, we return them, regardless of exceptions - # with make or model. - if points: - video_metadata = VideoMetadata( - filename=file, - filetype=FileType.VIDEO, - filesize=utils.get_file_size(file), - points=points, - make=make, - model=model, - ) - return video_metadata - else: - return ErrorMetadata( - filename=file, - error=( - ex - if ex - else exceptions.MapillaryVideoGPSNotFoundError( - "No GPS data found from the video" - ) - ), - filetype=FileType.VIDEO, - ) - - def _extract_points( - self, parser: BaseParser, log_vars: T.Dict - ) -> T.Sequence[geo.Point]: - points = parser.extract_points() - if points: - LOG.debug( - "%(filename)s: %(points)d points extracted by parser %(parser)s from file %(source)s}", - {**log_vars, "points": len(points)}, - ) - - return self._sanitize_points(points) - - @staticmethod - def _check_paths(import_paths: T.Sequence[Path]): - for path in import_paths: - if not path.is_file() and not path.is_dir(): - raise exceptions.MapillaryFileNotFoundError( - f"Import file or directory not found: {path}" - ) - - def _check_sources_cardinality(self, files: T.Sequence[Path]): - if len(files) > 1: - for parser_opts in self.options["geotag_sources_options"]: - pattern = parser_opts.get("pattern") - if pattern and "%" not in pattern: - raise exceptions.MapillaryUserError( - "Multiple video files found: Geotag source pattern for source %s must include filename placeholders", - parser_opts["source"], - ) - - @staticmethod - def _sanitize_points(points: T.Sequence[geo.Point]) -> T.Sequence[geo.Point]: - """ - Deduplicates points, when possible removes noisy ones, and checks - against stationary videos - """ - - if not points: - raise exceptions.MapillaryVideoGPSNotFoundError( - "No GPS data found in the given sources" - ) - - points = geo.extend_deduplicate_points(points) - - if all(isinstance(p, GPSPoint) for p in points): - points = T.cast( - T.Sequence[geo.Point], - gpmf_gps_filter.remove_noisy_points( - T.cast(T.Sequence[GPSPoint], points) - ), - ) - if not points: - raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") - - return points diff --git a/mapillary_tools/video_data_extraction/extractors/base_parser.py b/mapillary_tools/video_data_extraction/extractors/base_parser.py deleted file mode 100644 index bd715f5af..000000000 --- a/mapillary_tools/video_data_extraction/extractors/base_parser.py +++ /dev/null @@ -1,75 +0,0 @@ -import abc -import functools -import logging -import os -import typing as T -from pathlib import Path - -from ... import geo -from ..cli_options import CliOptions, CliParserOptions - -LOG = logging.getLogger(__name__) - - -class BaseParser(metaclass=abc.ABCMeta): - videoPath: Path - options: CliOptions - parserOptions: CliParserOptions - - def __init__( - self, video_path: Path, options: CliOptions, parser_options: CliParserOptions - ) -> None: - self.videoPath = video_path - self.options = options - self.parserOptions = parser_options - - @property - @abc.abstractmethod - def default_source_pattern(self) -> str: - raise NotImplementedError - - @property - @abc.abstractmethod - def parser_label(self) -> str: - raise NotImplementedError - - @abc.abstractmethod - def extract_points(self) -> T.Sequence[geo.Point]: - raise NotImplementedError - - @abc.abstractmethod - def extract_make(self) -> T.Optional[str]: - raise NotImplementedError - - @abc.abstractmethod - def extract_model(self) -> T.Optional[str]: - raise NotImplementedError - - @functools.cached_property - def geotag_source_path(self) -> T.Optional[Path]: - video_dir = self.videoPath.parent.resolve() - video_filename = self.videoPath.name - video_basename, video_ext = os.path.splitext(video_filename) - pattern = self.parserOptions.get("pattern") or self.default_source_pattern - - replaced = Path( - pattern.replace("%f", video_filename) - .replace("%g", video_basename) - .replace("%e", video_ext) - ) - abs_path = ( - replaced if replaced.is_absolute() else Path.joinpath(video_dir, replaced) - ).resolve() - - return abs_path if abs_path.is_file() else None - - @staticmethod - def _rebase_times(points: T.Sequence[geo.Point], offset: float = 0.0): - """ - Make point times start from 0 - """ - if points: - first_timestamp = points[0].time - for p in points: - p.time = (p.time - first_timestamp) + offset - return points diff --git a/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py b/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py deleted file mode 100644 index c1ba1cdb0..000000000 --- a/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py +++ /dev/null @@ -1,49 +0,0 @@ -from __future__ import annotations - -import functools - -import typing as T - -from ... import blackvue_parser, geo -from .base_parser import BaseParser - - -class BlackVueParser(BaseParser): - default_source_pattern = "%f" - must_rebase_times_to_zero = False - parser_label = "blackvue" - - pointsFound: bool = False - - @functools.cached_property - def extract_blackvue_info(self) -> blackvue_parser.BlackVueInfo | None: - source_path = self.geotag_source_path - if not source_path: - return None - - with source_path.open("rb") as fp: - return blackvue_parser.extract_blackvue_info(fp) - - def extract_points(self) -> T.Sequence[geo.Point]: - blackvue_info = self.extract_blackvue_info - - if blackvue_info is None: - return [] - - return blackvue_info.gps or [] - - def extract_make(self) -> str | None: - blackvue_info = self.extract_blackvue_info - - if blackvue_info is None: - return None - - return blackvue_info.make - - def extract_model(self) -> str | None: - blackvue_info = self.extract_blackvue_info - - if blackvue_info is None: - return None - - return blackvue_info.model diff --git a/mapillary_tools/video_data_extraction/extractors/camm_parser.py b/mapillary_tools/video_data_extraction/extractors/camm_parser.py deleted file mode 100644 index ea0a81d93..000000000 --- a/mapillary_tools/video_data_extraction/extractors/camm_parser.py +++ /dev/null @@ -1,62 +0,0 @@ -from __future__ import annotations - -import typing as T - -from ... import geo -from ...camm import camm_parser -from ...mp4 import simple_mp4_parser as sparser -from .base_parser import BaseParser - - -class CammParser(BaseParser): - default_source_pattern = "%f" - must_rebase_times_to_zero = False - parser_label = "camm" - - _extracted: bool = False - _cached_camm_info: camm_parser.CAMMInfo | None = None - - # TODO: use @functools.cached_property - def _extract_camm_info(self) -> camm_parser.CAMMInfo | None: - if self._extracted: - return self._cached_camm_info - - self._extracted = True - - source_path = self.geotag_source_path - - if source_path is None: - # source_path not found - return None - - with source_path.open("rb") as fp: - try: - self._cached_camm_info = camm_parser.extract_camm_info(fp) - except sparser.ParsingError: - self._cached_camm_info = None - - return self._cached_camm_info - - def extract_points(self) -> T.Sequence[geo.Point]: - camm_info = self._extract_camm_info() - - if camm_info is None: - return [] - - return T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps) - - def extract_make(self) -> str | None: - camm_info = self._extract_camm_info() - - if camm_info is None: - return None - - return camm_info.make - - def extract_model(self) -> str | None: - camm_info = self._extract_camm_info() - - if camm_info is None: - return None - - return camm_info.model diff --git a/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py b/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py deleted file mode 100644 index cbb3ad50f..000000000 --- a/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py +++ /dev/null @@ -1,74 +0,0 @@ -import shutil -import subprocess -import typing as T -from pathlib import Path - -from ... import constants, exceptions, geo -from ..cli_options import CliOptions, CliParserOptions -from .base_parser import BaseParser -from .exiftool_xml_parser import ExiftoolXmlParser - - -class ExiftoolRuntimeParser(BaseParser): - """ - Wrapper around ExiftoolRdfParser that executes exiftool - """ - - exiftoolXmlParser: ExiftoolXmlParser - - default_source_pattern = "%f" - must_rebase_times_to_zero = True - parser_label = "exiftool_runtime" - - def __init__( - self, video_path: Path, options: CliOptions, parser_options: CliParserOptions - ): - super().__init__(video_path, options, parser_options) - if constants.EXIFTOOL_PATH is None: - exiftool_path = shutil.which("exiftool") - else: - exiftool_path = shutil.which(constants.EXIFTOOL_PATH) - - if not exiftool_path: - raise exceptions.MapillaryExiftoolNotFoundError( - "Cannot execute exiftool. Please install it from https://exiftool.org/ or you package manager, or set the environment variable MAPILLARY_TOOLS_EXIFTOOL_PATH" - ) - if not self.geotag_source_path: - return - - # To handle non-latin1 filenames under Windows, we pass the path - # via stdin. See https://exiftool.org/faq.html#Q18 - stdin = str(self.geotag_source_path) - args = [ - exiftool_path, - "-q", - "-r", - "-n", - "-ee", - "-api", - "LargeFileSupport=1", - "-X", - "-charset", - "filename=utf8", - "-@", - "-", - ] - - process = subprocess.run( - args, capture_output=True, text=True, input=stdin, encoding="utf-8" - ) - - self.exiftoolXmlParser = ExiftoolXmlParser( - video_path, options, parser_options, process.stdout - ) - - def extract_points(self) -> T.Sequence[geo.Point]: - return self.exiftoolXmlParser.extract_points() if self.exiftoolXmlParser else [] - - def extract_make(self) -> T.Optional[str]: - return self.exiftoolXmlParser.extract_make() if self.exiftoolXmlParser else None - - def extract_model(self) -> T.Optional[str]: - return ( - self.exiftoolXmlParser.extract_model() if self.exiftoolXmlParser else None - ) diff --git a/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py b/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py deleted file mode 100644 index dcfadf5bb..000000000 --- a/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py +++ /dev/null @@ -1,52 +0,0 @@ -import typing as T -import xml.etree.ElementTree as ET - -from pathlib import Path - -from ... import geo -from ...exiftool_read import _DESCRIPTION_TAG, EXIFTOOL_NAMESPACES -from ...exiftool_read_video import ExifToolReadVideo -from ..cli_options import CliOptions, CliParserOptions -from .base_parser import BaseParser - - -class ExiftoolXmlParser(BaseParser): - default_source_pattern = "%g.xml" - parser_label = "exiftool_xml" - - exifToolReadVideo: T.Optional[ExifToolReadVideo] = None - - def __init__( - self, - video_path: Path, - options: CliOptions, - parser_options: CliParserOptions, - xml_content: T.Optional[str] = None, - ) -> None: - super().__init__(video_path, options, parser_options) - - if xml_content: - etree = ET.fromstring(xml_content) - else: - xml_path = self.geotag_source_path - if not xml_path: - return - etree = ET.parse(xml_path).getroot() - - element = next(etree.iterfind(_DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES)) - self.exifToolReadVideo = ExifToolReadVideo(ET.ElementTree(element)) - - def extract_points(self) -> T.Sequence[geo.Point]: - gps_points = ( - self.exifToolReadVideo.extract_gps_track() if self.exifToolReadVideo else [] - ) - self._rebase_times(gps_points) - return gps_points - - def extract_make(self) -> T.Optional[str]: - return self.exifToolReadVideo.extract_make() if self.exifToolReadVideo else None - - def extract_model(self) -> T.Optional[str]: - return ( - self.exifToolReadVideo.extract_model() if self.exifToolReadVideo else None - ) diff --git a/mapillary_tools/video_data_extraction/extractors/generic_video_parser.py b/mapillary_tools/video_data_extraction/extractors/generic_video_parser.py deleted file mode 100644 index 270558aa0..000000000 --- a/mapillary_tools/video_data_extraction/extractors/generic_video_parser.py +++ /dev/null @@ -1,52 +0,0 @@ -import typing as T -from pathlib import Path - -from ... import geo -from ..cli_options import CliOptions, CliParserOptions -from .base_parser import BaseParser -from .blackvue_parser import BlackVueParser -from .camm_parser import CammParser -from .gopro_parser import GoProParser - - -class GenericVideoParser(BaseParser): - """ - Wrapper around the three native video parsers. It will try to execute them - in the order camm-gopro-blackvue, like the previous implementation - """ - - parsers: T.Sequence[BaseParser] = [] - - default_source_pattern = "%f" - must_rebase_times_to_zero = False - parser_label = "video" - - def __init__( - self, video_path: Path, options: CliOptions, parser_options: CliParserOptions - ) -> None: - super().__init__(video_path, options, parser_options) - camm_parser = CammParser(video_path, options, parser_options) - gopro_parser = GoProParser(video_path, options, parser_options) - blackvue_parser = BlackVueParser(video_path, options, parser_options) - self.parsers = [camm_parser, gopro_parser, blackvue_parser] - - def extract_points(self) -> T.Sequence[geo.Point]: - for parser in self.parsers: - points = parser.extract_points() - if points: - return points - return [] - - def extract_make(self) -> T.Optional[str]: - for parser in self.parsers: - make = parser.extract_make() - if make: - return make - return None - - def extract_model(self) -> T.Optional[str]: - for parser in self.parsers: - model = parser.extract_model() - if model: - return model - return None diff --git a/mapillary_tools/video_data_extraction/extractors/gopro_parser.py b/mapillary_tools/video_data_extraction/extractors/gopro_parser.py deleted file mode 100644 index 9edfe2cd0..000000000 --- a/mapillary_tools/video_data_extraction/extractors/gopro_parser.py +++ /dev/null @@ -1,58 +0,0 @@ -from __future__ import annotations - -import typing as T - -from ... import geo -from ...gpmf import gpmf_parser -from ...mp4 import simple_mp4_parser as sparser -from .base_parser import BaseParser - - -class GoProParser(BaseParser): - default_source_pattern = "%f" - must_rebase_times_to_zero = False - parser_label = "gopro" - - _extracted: bool = False - _cached_gopro_info: gpmf_parser.GoProInfo | None = None - - def _extract_gopro_info(self) -> gpmf_parser.GoProInfo | None: - if self._extracted: - return self._cached_gopro_info - - self._extracted = True - - source_path = self.geotag_source_path - - if source_path is None: - # source_path not found - return None - - with source_path.open("rb") as fp: - try: - self._cached_gopro_info = gpmf_parser.extract_gopro_info(fp) - except sparser.ParsingError: - self._cached_gopro_info = None - - return self._cached_gopro_info - - def extract_points(self) -> T.Sequence[geo.Point]: - gopro_info = self._extract_gopro_info() - if gopro_info is None: - return [] - - return T.cast(T.Sequence[geo.Point], gopro_info.gps) - - def extract_make(self) -> str | None: - gopro_info = self._extract_gopro_info() - if gopro_info is None: - return None - - return gopro_info.make - - def extract_model(self) -> str | None: - gopro_info = self._extract_gopro_info() - if gopro_info is None: - return None - - return gopro_info.model diff --git a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py deleted file mode 100644 index 60e23c1bf..000000000 --- a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py +++ /dev/null @@ -1,108 +0,0 @@ -import datetime -import logging -import typing as T - -from ... import geo, telemetry -from ...geotag import geotag_images_from_gpx_file -from .base_parser import BaseParser -from .generic_video_parser import GenericVideoParser - - -LOG = logging.getLogger(__name__) - - -class GpxParser(BaseParser): - default_source_pattern = "%g.gpx" - parser_label = "gpx" - - def extract_points(self) -> T.Sequence[geo.Point]: - path = self.geotag_source_path - if not path: - return [] - - try: - gpx_tracks = geotag_images_from_gpx_file.parse_gpx(path) - except Exception as ex: - raise RuntimeError( - f"Error parsing GPX {path}: {ex.__class__.__name__}: {ex}" - ) - - if 1 < len(gpx_tracks): - LOG.warning( - "Found %s tracks in the GPX file %s. Will merge points in all the tracks as a single track for interpolation", - len(gpx_tracks), - self.videoPath, - ) - - gpx_points: T.Sequence[geo.Point] = sum(gpx_tracks, []) - if not gpx_points: - return gpx_points - - offset = self._synx_gpx_by_first_gps_timestamp(gpx_points) - - self._rebase_times(gpx_points, offset=offset) - - return gpx_points - - def _synx_gpx_by_first_gps_timestamp( - self, gpx_points: T.Sequence[geo.Point] - ) -> float: - offset: float = 0.0 - - if not gpx_points: - return offset - - first_gpx_dt = datetime.datetime.fromtimestamp( - gpx_points[0].time, tz=datetime.timezone.utc - ) - LOG.info("First GPX timestamp: %s", first_gpx_dt) - - # Extract first GPS timestamp (if found) for synchronization - # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself - parser = GenericVideoParser(self.videoPath, self.options, {}) - gps_points = parser.extract_points() - - if not gps_points: - LOG.warning( - "Skip GPX synchronization because no GPS found in video %s", - self.videoPath, - ) - return offset - - first_gps_point = gps_points[0] - if isinstance(first_gps_point, telemetry.GPSPoint): - if first_gps_point.epoch_time is not None: - first_gps_dt = datetime.datetime.fromtimestamp( - first_gps_point.epoch_time, tz=datetime.timezone.utc - ) - LOG.info("First GPS timestamp: %s", first_gps_dt) - offset = gpx_points[0].time - first_gps_point.epoch_time - if offset: - LOG.warning( - "Found offset between GPX %s and video GPS timestamps %s: %s seconds", - first_gpx_dt, - first_gps_dt, - offset, - ) - else: - LOG.info( - "GPX and GPS are perfectly synchronized (all starts from %s)", - first_gpx_dt, - ) - else: - LOG.warning( - "Skip GPX synchronization because no GPS epoch time found in video %s", - self.videoPath, - ) - - return offset - - def extract_make(self) -> T.Optional[str]: - # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself - parser = GenericVideoParser(self.videoPath, self.options, {}) - return parser.extract_make() - - def extract_model(self) -> T.Optional[str]: - # Use an empty dictionary to force video parsers to extract make/model from the video metadata itself - parser = GenericVideoParser(self.videoPath, self.options, {}) - return parser.extract_model() diff --git a/mapillary_tools/video_data_extraction/extractors/nmea_parser.py b/mapillary_tools/video_data_extraction/extractors/nmea_parser.py deleted file mode 100644 index 490b73ad3..000000000 --- a/mapillary_tools/video_data_extraction/extractors/nmea_parser.py +++ /dev/null @@ -1,24 +0,0 @@ -import typing as T - -from ... import geo -from ...geotag import geotag_images_from_nmea_file -from .base_parser import BaseParser - - -class NmeaParser(BaseParser): - default_source_pattern = "%g.nmea" - must_rebase_times_to_zero = True - parser_label = "nmea" - - def extract_points(self) -> T.Sequence[geo.Point]: - source_path = self.geotag_source_path - if not source_path: - return [] - points = geotag_images_from_nmea_file.get_lat_lon_time_from_nmea(source_path) - return points - - def extract_make(self) -> T.Optional[str]: - return None - - def extract_model(self) -> T.Optional[str]: - return None diff --git a/mapillary_tools/video_data_extraction/video_data_parser_factory.py b/mapillary_tools/video_data_extraction/video_data_parser_factory.py deleted file mode 100644 index 2e5d1dfe9..000000000 --- a/mapillary_tools/video_data_extraction/video_data_parser_factory.py +++ /dev/null @@ -1,39 +0,0 @@ -import typing as T -from pathlib import Path - -from .cli_options import CliOptions - -from .extractors.base_parser import BaseParser - -from .extractors.blackvue_parser import BlackVueParser -from .extractors.camm_parser import CammParser - -from .extractors.exiftool_runtime_parser import ExiftoolRuntimeParser -from .extractors.exiftool_xml_parser import ExiftoolXmlParser -from .extractors.generic_video_parser import GenericVideoParser -from .extractors.gopro_parser import GoProParser -from .extractors.gpx_parser import GpxParser -from .extractors.nmea_parser import NmeaParser - - -known_parsers = { - "gpx": GpxParser, - "nmea": NmeaParser, - "exiftool_xml": ExiftoolXmlParser, - "exiftool_runtime": ExiftoolRuntimeParser, - "camm": CammParser, - "blackvue": BlackVueParser, - "gopro": GoProParser, - "video": GenericVideoParser, -} - - -def make_parsers(file: Path, options: CliOptions) -> T.Sequence[BaseParser]: - src_options = options["geotag_sources_options"] - parsers = [ - known_parsers[s["source"]](file, options, s) - for s in src_options - if s["source"] in known_parsers - ] - - return parsers diff --git a/setup.py b/setup.py index 198adbd09..df3aa5a59 100644 --- a/setup.py +++ b/setup.py @@ -49,8 +49,6 @@ def readme(): "mapillary_tools.geotag", "mapillary_tools.gpmf", "mapillary_tools.mp4", - "mapillary_tools.video_data_extraction", - "mapillary_tools.video_data_extraction.extractors", ], entry_points=""" [console_scripts] From 849cbbe95eba3bc1b3dc37d6ec0b24ade1b53ddc Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 1 Apr 2025 16:58:22 -0700 Subject: [PATCH 2/5] upgrade types with prompt: Update types with the following rules: 1. Relace T.Optional with | and T.List with list, and so on 2. Keep compatible with Python>=3.8 3. Do no use `from typing import ?`. Instead, always use `import typing as T` --- mapillary_tools/api_v4.py | 8 +- mapillary_tools/camm/camm_parser.py | 10 +- mapillary_tools/commands/__main__.py | 2 +- mapillary_tools/config.py | 12 +- mapillary_tools/constants.py | 3 +- mapillary_tools/exif_read.py | 130 +++++++++--------- mapillary_tools/exif_write.py | 13 +- mapillary_tools/exiftool_read.py | 44 +++--- mapillary_tools/exiftool_read_video.py | 76 +++++----- mapillary_tools/ffmpeg.py | 47 +++---- mapillary_tools/geo.py | 25 +--- mapillary_tools/gpmf/gpmf_parser.py | 32 ++--- mapillary_tools/gpmf/gps_filter.py | 8 +- mapillary_tools/history.py | 6 +- mapillary_tools/mp4/construct_mp4_parser.py | 19 +-- mapillary_tools/mp4/mp4_sample_parser.py | 56 ++++---- mapillary_tools/mp4/simple_mp4_builder.py | 19 +-- mapillary_tools/mp4/simple_mp4_parser.py | 25 ++-- mapillary_tools/process_geotag_properties.py | 12 +- .../process_sequence_properties.py | 78 ++++++----- mapillary_tools/sample_video.py | 12 +- mapillary_tools/telemetry.py | 10 +- mapillary_tools/types.py | 71 +++++----- mapillary_tools/utils.py | 34 +++-- 24 files changed, 371 insertions(+), 381 deletions(-) diff --git a/mapillary_tools/api_v4.py b/mapillary_tools/api_v4.py index 9feee7d55..4511f2088 100644 --- a/mapillary_tools/api_v4.py +++ b/mapillary_tools/api_v4.py @@ -135,7 +135,7 @@ def _log_debug_response(resp: requests.Response): if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: return - data: T.Union[str, bytes] + data: str | bytes try: data = _truncate(dumps(_sanitize(resp.json()))) except Exception: @@ -148,7 +148,7 @@ def readable_http_error(ex: requests.HTTPError) -> str: req = ex.request resp = ex.response - data: T.Union[str, bytes] + data: str | bytes try: data = _truncate(dumps(_sanitize(resp.json()))) except Exception: @@ -284,7 +284,7 @@ def get_upload_token(email: str, password: str) -> requests.Response: def fetch_organization( - user_access_token: str, organization_id: T.Union[int, str] + user_access_token: str, organization_id: int | str ) -> requests.Response: resp = request_get( f"{MAPILLARY_GRAPH_API_ENDPOINT}/{organization_id}", @@ -329,7 +329,7 @@ def fetch_user_or_me( ] -def log_event(action_type: ActionType, properties: T.Dict) -> requests.Response: +def log_event(action_type: ActionType, properties: dict) -> requests.Response: resp = request_post( f"{MAPILLARY_GRAPH_API_ENDPOINT}/logging", json={ diff --git a/mapillary_tools/camm/camm_parser.py b/mapillary_tools/camm/camm_parser.py index 512cc81bd..3cb372b54 100644 --- a/mapillary_tools/camm/camm_parser.py +++ b/mapillary_tools/camm/camm_parser.py @@ -373,7 +373,7 @@ def serialize(cls, data: telemetry.MagnetometerData) -> bytes: assert len(SAMPLE_ENTRY_CLS_BY_CAMM_TYPE) == 5, SAMPLE_ENTRY_CLS_BY_CAMM_TYPE.keys() -_SWITCH: T.Dict[int, C.Struct] = { +_SWITCH: dict[int, C.Struct] = { # Angle_axis CAMMType.ANGLE_AXIS.value: _Float[3], # type: ignore # Exposure time @@ -436,7 +436,7 @@ def _parse_telemetry_from_sample( def _filter_telemetry_by_elst_segments( measurements: T.Iterable[TelemetryMeasurement], - elst: T.Sequence[T.Tuple[float, float]], + elst: T.Sequence[tuple[float, float]], ) -> T.Generator[TelemetryMeasurement, None, None]: empty_elst = [entry for entry in elst if entry[0] == -1] if empty_elst: @@ -466,8 +466,8 @@ def _filter_telemetry_by_elst_segments( def elst_entry_to_seconds( - entry: T.Dict, movie_timescale: int, media_timescale: int -) -> T.Tuple[float, float]: + entry: dict, movie_timescale: int, media_timescale: int +) -> tuple[float, float]: assert movie_timescale > 0, "expected positive movie_timescale" assert media_timescale > 0, "expected positive media_timescale" media_time, duration = entry["media_time"], entry["segment_duration"] @@ -477,7 +477,7 @@ def elst_entry_to_seconds( return (media_time, duration) -def _is_camm_description(description: T.Dict) -> bool: +def _is_camm_description(description: dict) -> bool: return description["format"] == b"camm" diff --git a/mapillary_tools/commands/__main__.py b/mapillary_tools/commands/__main__.py index b158d4258..f45ecb2d9 100644 --- a/mapillary_tools/commands/__main__.py +++ b/mapillary_tools/commands/__main__.py @@ -86,7 +86,7 @@ def configure_logger(logger: logging.Logger, stream=None) -> None: logger.addHandler(handler) -def _log_params(argvars: T.Dict) -> None: +def _log_params(argvars: dict) -> None: MAX_ENTRIES = 5 def _stringify(x) -> str: diff --git a/mapillary_tools/config.py b/mapillary_tools/config.py index 8f100262f..20d477d64 100644 --- a/mapillary_tools/config.py +++ b/mapillary_tools/config.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import configparser import os import typing as T @@ -35,8 +37,8 @@ def _load_config(config_path: str) -> configparser.ConfigParser: def load_user( - profile_name: str, config_path: T.Optional[str] = None -) -> T.Optional[types.UserItem]: + profile_name: str, config_path: str | None = None +) -> types.UserItem | None: if config_path is None: config_path = MAPILLARY_CONFIG_PATH config = _load_config(config_path) @@ -46,7 +48,7 @@ def load_user( return T.cast(types.UserItem, user_items) -def list_all_users(config_path: T.Optional[str] = None) -> T.Dict[str, types.UserItem]: +def list_all_users(config_path: str | None = None) -> dict[str, types.UserItem]: if config_path is None: config_path = MAPILLARY_CONFIG_PATH cp = _load_config(config_path) @@ -58,7 +60,7 @@ def list_all_users(config_path: T.Optional[str] = None) -> T.Dict[str, types.Use def update_config( - profile_name: str, user_items: types.UserItem, config_path: T.Optional[str] = None + profile_name: str, user_items: types.UserItem, config_path: str | None = None ) -> None: if config_path is None: config_path = MAPILLARY_CONFIG_PATH @@ -72,7 +74,7 @@ def update_config( config.write(fp) -def remove_config(profile_name: str, config_path: T.Optional[str] = None) -> None: +def remove_config(profile_name: str, config_path: str | None = None) -> None: if config_path is None: config_path = MAPILLARY_CONFIG_PATH diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index 4c0497cf0..c37e61003 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -1,7 +1,6 @@ from __future__ import annotations import os -import typing as T import appdirs @@ -52,7 +51,7 @@ def _yes_or_no(val: str) -> bool: # It is used to filter out noisy points GOPRO_MAX_DOP100 = int(os.getenv(_ENV_PREFIX + "GOPRO_MAX_DOP100", 1000)) # Within the GPS stream: 0 - no lock, 2 or 3 - 2D or 3D Lock -GOPRO_GPS_FIXES: T.Set[int] = set( +GOPRO_GPS_FIXES: set[int] = set( int(fix) for fix in os.getenv(_ENV_PREFIX + "GOPRO_GPS_FIXES", "2,3").split(",") ) MAX_UPLOAD_RETRIES: int = int(os.getenv(_ENV_PREFIX + "MAX_UPLOAD_RETRIES", 200)) diff --git a/mapillary_tools/exif_read.py b/mapillary_tools/exif_read.py index 8a025d255..f0f3894ce 100644 --- a/mapillary_tools/exif_read.py +++ b/mapillary_tools/exif_read.py @@ -36,7 +36,7 @@ def eval_frac(value: Ratio) -> float: return float(value.num) / float(value.den) -def gps_to_decimal(values: T.Tuple[Ratio, Ratio, Ratio]) -> T.Optional[float]: +def gps_to_decimal(values: tuple[Ratio, Ratio, Ratio]) -> float | None: try: deg, min, sec, *_ = values except (TypeError, ValueError): @@ -56,14 +56,14 @@ def gps_to_decimal(values: T.Tuple[Ratio, Ratio, Ratio]) -> T.Optional[float]: return degrees + minutes / 60 + seconds / 3600 -def _parse_coord_numeric(coord: str, ref: T.Optional[str]) -> T.Optional[float]: +def _parse_coord_numeric(coord: str, ref: str | None) -> float | None: try: return float(coord) * SIGN_BY_DIRECTION[ref] except (ValueError, KeyError): return None -def _parse_coord_adobe(coord: str) -> T.Optional[float]: +def _parse_coord_adobe(coord: str) -> float | None: """ Parse Adobe coordinate format: """ @@ -79,7 +79,7 @@ def _parse_coord_adobe(coord: str) -> T.Optional[float]: return None -def _parse_coord(coord: T.Optional[str], ref: T.Optional[str]) -> T.Optional[float]: +def _parse_coord(coord: str | None, ref: str | None) -> float | None: if coord is None: return None parsed = _parse_coord_numeric(coord, ref) @@ -88,7 +88,7 @@ def _parse_coord(coord: T.Optional[str], ref: T.Optional[str]) -> T.Optional[flo return parsed -def _parse_iso(dtstr: str) -> T.Optional[datetime.datetime]: +def _parse_iso(dtstr: str) -> datetime.datetime | None: try: return datetime.datetime.fromisoformat(dtstr) except ValueError: @@ -99,8 +99,8 @@ def _parse_iso(dtstr: str) -> T.Optional[datetime.datetime]: def strptime_alternative_formats( - dtstr: str, formats: T.Sequence[str] -) -> T.Optional[datetime.datetime]: + dtstr: str, formats: list[str] +) -> datetime.datetime | None: for format in formats: if format == "ISO": dt = _parse_iso(dtstr) @@ -114,7 +114,7 @@ def strptime_alternative_formats( return None -def parse_timestr_as_timedelta(timestr: str) -> T.Optional[datetime.timedelta]: +def parse_timestr_as_timedelta(timestr: str) -> datetime.timedelta | None: timestr = timestr.strip() parts = timestr.strip().split(":") try: @@ -133,8 +133,8 @@ def parse_timestr_as_timedelta(timestr: str) -> T.Optional[datetime.timedelta]: def parse_time_ratios_as_timedelta( - time_tuple: T.Sequence[Ratio], -) -> T.Optional[datetime.timedelta]: + time_tuple: list[Ratio], +) -> datetime.timedelta | None: try: hours, minutes, seconds, *_ = time_tuple except (ValueError, TypeError): @@ -156,8 +156,8 @@ def parse_time_ratios_as_timedelta( def parse_gps_datetime( dtstr: str, - default_tz: T.Optional[datetime.timezone] = datetime.timezone.utc, -) -> T.Optional[datetime.datetime]: + default_tz: datetime.timezone | None = datetime.timezone.utc, +) -> datetime.datetime | None: dtstr = dtstr.strip() dt = strptime_alternative_formats(dtstr, ["ISO"]) @@ -176,8 +176,8 @@ def parse_gps_datetime( def parse_gps_datetime_separately( datestr: str, timestr: str, - default_tz: T.Optional[datetime.timezone] = datetime.timezone.utc, -) -> T.Optional[datetime.datetime]: + default_tz: datetime.timezone | None = datetime.timezone.utc, +) -> datetime.datetime | None: """ Parse GPSDateStamp and GPSTimeStamp and return the corresponding datetime object in GMT. @@ -232,8 +232,8 @@ def parse_gps_datetime_separately( def parse_datetimestr_with_subsec_and_offset( - dtstr: str, subsec: T.Optional[str] = None, tz_offset: T.Optional[str] = None -) -> T.Optional[datetime.datetime]: + dtstr: str, subsec: str | None = None, tz_offset: str | None = None +) -> datetime.datetime | None: """ Convert dtstr "YYYY:mm:dd HH:MM:SS[.sss]" to a datetime object. It handles time "24:00:00" as "00:00:00" of the next day. @@ -294,35 +294,35 @@ def make_valid_timezone_offset(delta: datetime.timedelta) -> datetime.timedelta: class ExifReadABC(abc.ABC): @abc.abstractmethod - def extract_altitude(self) -> T.Optional[float]: + def extract_altitude(self) -> float | None: raise NotImplementedError @abc.abstractmethod - def extract_capture_time(self) -> T.Optional[datetime.datetime]: + def extract_capture_time(self) -> datetime.datetime | None: raise NotImplementedError @abc.abstractmethod - def extract_direction(self) -> T.Optional[float]: + def extract_direction(self) -> float | None: raise NotImplementedError @abc.abstractmethod - def extract_lon_lat(self) -> T.Optional[T.Tuple[float, float]]: + def extract_lon_lat(self) -> tuple[float, float] | None: raise NotImplementedError @abc.abstractmethod - def extract_make(self) -> T.Optional[str]: + def extract_make(self) -> str | None: raise NotImplementedError @abc.abstractmethod - def extract_model(self) -> T.Optional[str]: + def extract_model(self) -> str | None: raise NotImplementedError @abc.abstractmethod - def extract_width(self) -> T.Optional[int]: + def extract_width(self) -> int | None: raise NotImplementedError @abc.abstractmethod - def extract_height(self) -> T.Optional[int]: + def extract_height(self) -> int | None: raise NotImplementedError @abc.abstractmethod @@ -333,7 +333,7 @@ def extract_orientation(self) -> int: class ExifReadFromXMP(ExifReadABC): def __init__(self, etree: et.ElementTree): self.etree = etree - self._tags_or_attrs: T.Dict[str, str] = {} + self._tags_or_attrs: dict[str, str] = {} for description in self.etree.iterfind( ".//rdf:Description", namespaces=XMP_NAMESPACES ): @@ -343,12 +343,12 @@ def __init__(self, etree: et.ElementTree): if child.text is not None: self._tags_or_attrs[child.tag] = child.text - def extract_altitude(self) -> T.Optional[float]: + def extract_altitude(self) -> float | None: return self._extract_alternative_fields(["exif:GPSAltitude"], float) def _extract_exif_datetime( self, dt_tag: str, subsec_tag: str, offset_tag: str - ) -> T.Optional[datetime.datetime]: + ) -> datetime.datetime | None: dtstr = self._extract_alternative_fields([dt_tag], str) if dtstr is None: return None @@ -363,7 +363,7 @@ def _extract_exif_datetime( return None return dt - def extract_exif_datetime(self) -> T.Optional[datetime.datetime]: + def extract_exif_datetime(self) -> datetime.datetime | None: dt = self._extract_exif_datetime( "exif:DateTimeOriginal", "exif:SubsecTimeOriginal", @@ -382,7 +382,7 @@ def extract_exif_datetime(self) -> T.Optional[datetime.datetime]: return None - def extract_gps_datetime(self) -> T.Optional[datetime.datetime]: + def extract_gps_datetime(self) -> datetime.datetime | None: """ Extract timestamp from GPS field. """ @@ -402,7 +402,7 @@ def extract_gps_datetime(self) -> T.Optional[datetime.datetime]: # handle: exif:GPSTimeStamp="17:22:05.999000" return parse_gps_datetime_separately(datestr, timestr) - def extract_capture_time(self) -> T.Optional[datetime.datetime]: + def extract_capture_time(self) -> datetime.datetime | None: dt = self.extract_gps_datetime() if dt is not None and dt.date() != datetime.date(1970, 1, 1): return dt @@ -413,22 +413,22 @@ def extract_capture_time(self) -> T.Optional[datetime.datetime]: return None - def extract_direction(self) -> T.Optional[float]: + def extract_direction(self) -> float | None: return self._extract_alternative_fields( ["exif:GPSImgDirection", "exif:GPSTrack"], float ) - def extract_lon_lat(self) -> T.Optional[T.Tuple[float, float]]: + def extract_lon_lat(self) -> tuple[float, float] | None: lat_ref = self._extract_alternative_fields(["exif:GPSLatitudeRef"], str) - lat_str: T.Optional[str] = self._extract_alternative_fields( + lat_str: str | None = self._extract_alternative_fields( ["exif:GPSLatitude"], str ) - lat: T.Optional[float] = _parse_coord(lat_str, lat_ref) + lat: float | None = _parse_coord(lat_str, lat_ref) if lat is None: return None lon_ref = self._extract_alternative_fields(["exif:GPSLongitudeRef"], str) - lon_str: T.Optional[str] = self._extract_alternative_fields( + lon_str: str | None = self._extract_alternative_fields( ["exif:GPSLongitude"], str ) lon = _parse_coord(lon_str, lon_ref) @@ -437,13 +437,13 @@ def extract_lon_lat(self) -> T.Optional[T.Tuple[float, float]]: return lon, lat - def extract_make(self) -> T.Optional[str]: + def extract_make(self) -> str | None: make = self._extract_alternative_fields(["tiff:Make", "exifEX:LensMake"], str) if make is None: return None return make.strip() - def extract_model(self) -> T.Optional[str]: + def extract_model(self) -> str | None: model = self._extract_alternative_fields( ["tiff:Model", "exifEX:LensModel"], str ) @@ -451,7 +451,7 @@ def extract_model(self) -> T.Optional[str]: return None return model.strip() - def extract_width(self) -> T.Optional[int]: + def extract_width(self) -> int | None: return self._extract_alternative_fields( [ "exif:PixelXDimension", @@ -461,7 +461,7 @@ def extract_width(self) -> T.Optional[int]: int, ) - def extract_height(self) -> T.Optional[int]: + def extract_height(self) -> int | None: return self._extract_alternative_fields( [ "exif:PixelYDimension", @@ -513,7 +513,7 @@ def _extract_alternative_fields( return None -def extract_xmp_efficiently(fp) -> T.Optional[str]: +def extract_xmp_efficiently(fp) -> str | None: """ Extract XMP metadata from a JPEG file efficiently by reading only necessary chunks. @@ -598,7 +598,7 @@ class ExifReadFromEXIF(ExifReadABC): EXIF class for reading exif from an image """ - def __init__(self, path_or_stream: T.Union[Path, T.BinaryIO]) -> None: + def __init__(self, path_or_stream: Path | T.BinaryIO) -> None: """ Initialize EXIF object with FILE as filename or fileobj """ @@ -621,7 +621,7 @@ def __init__(self, path_or_stream: T.Union[Path, T.BinaryIO]) -> None: LOG.warning("Error reading EXIF: %s", ex) self.tags = {} - def extract_altitude(self) -> T.Optional[float]: + def extract_altitude(self) -> float | None: """ Extract altitude """ @@ -634,7 +634,7 @@ def extract_altitude(self) -> T.Optional[float]: altitude_ref = {0: 1, 1: -1} return altitude * altitude_ref.get(ref, 1) - def extract_gps_datetime(self) -> T.Optional[datetime.datetime]: + def extract_gps_datetime(self) -> datetime.datetime | None: """ Extract timestamp from GPS field. """ @@ -662,7 +662,7 @@ def extract_gps_datetime(self) -> T.Optional[datetime.datetime]: def _extract_exif_datetime( self, dt_tag: str, subsec_tag: str, offset_tag: str - ) -> T.Optional[datetime.datetime]: + ) -> datetime.datetime | None: dtstr = self._extract_alternative_fields([dt_tag], field_type=str) if dtstr is None: return None @@ -677,7 +677,7 @@ def _extract_exif_datetime( return None return dt - def extract_exif_datetime(self) -> T.Optional[datetime.datetime]: + def extract_exif_datetime(self) -> datetime.datetime | None: # EXIF DateTimeOriginal: 0x9003 (date/time when original image was taken) # EXIF SubSecTimeOriginal: 0x9291 (fractional seconds for DateTimeOriginal) # EXIF OffsetTimeOriginal: 0x9011 (time zone for DateTimeOriginal) @@ -711,7 +711,7 @@ def extract_exif_datetime(self) -> T.Optional[datetime.datetime]: return None - def extract_capture_time(self) -> T.Optional[datetime.datetime]: + def extract_capture_time(self) -> datetime.datetime | None: """ Extract capture time from EXIF DateTime tags """ @@ -730,7 +730,7 @@ def extract_capture_time(self) -> T.Optional[datetime.datetime]: return None - def extract_direction(self) -> T.Optional[float]: + def extract_direction(self) -> float | None: """ Extract image direction (i.e. compass, heading, bearing) """ @@ -740,7 +740,7 @@ def extract_direction(self) -> T.Optional[float]: ] return self._extract_alternative_fields(fields, float) - def extract_lon_lat(self) -> T.Optional[T.Tuple[float, float]]: + def extract_lon_lat(self) -> tuple[float, float] | None: lat_tag = self.tags.get("GPS GPSLatitude") lon_tag = self.tags.get("GPS GPSLongitude") if lat_tag and lon_tag: @@ -762,7 +762,7 @@ def extract_lon_lat(self) -> T.Optional[T.Tuple[float, float]]: return None - def extract_make(self) -> T.Optional[str]: + def extract_make(self) -> str | None: """ Extract camera make """ @@ -773,7 +773,7 @@ def extract_make(self) -> T.Optional[str]: return None return make.strip() - def extract_model(self) -> T.Optional[str]: + def extract_model(self) -> str | None: """ Extract camera model """ @@ -784,7 +784,7 @@ def extract_model(self) -> T.Optional[str]: return None return model.strip() - def extract_width(self) -> T.Optional[int]: + def extract_width(self) -> int | None: """ Extract image width in pixels """ @@ -792,7 +792,7 @@ def extract_width(self) -> T.Optional[int]: ["Image ImageWidth", "EXIF ExifImageWidth"], int ) - def extract_height(self) -> T.Optional[int]: + def extract_height(self) -> int | None: """ Extract image height in pixels """ @@ -813,7 +813,7 @@ def extract_orientation(self) -> int: def _extract_alternative_fields( self, - fields: T.Sequence[str], + fields: T.Iterable[str], field_type: type[_FIELD_TYPE], ) -> _FIELD_TYPE | None: """ @@ -847,7 +847,7 @@ def _extract_alternative_fields( raise ValueError(f"Invalid field type {field_type}") return None - def extract_application_notes(self) -> T.Optional[str]: + def extract_application_notes(self) -> str | None: xmp = self.tags.get("Image ApplicationNotes") if xmp is None: return None @@ -863,13 +863,13 @@ class ExifRead(ExifReadFromEXIF): NOTE: For performance reasons, XMP is only extracted if EXIF does not contain the required fields """ - def __init__(self, path_or_stream: T.Union[Path, T.BinaryIO]) -> None: + def __init__(self, path_or_stream: Path | T.BinaryIO) -> None: super().__init__(path_or_stream) self._path_or_stream = path_or_stream self._xml_extracted: bool = False - self._cached_xml: T.Optional[ExifReadFromXMP] = None + self._cached_xml: ExifReadFromXMP | None = None - def _xmp_with_reason(self, reason: str) -> T.Optional[ExifReadFromXMP]: + def _xmp_with_reason(self, reason: str) -> ExifReadFromXMP | None: if not self._xml_extracted: LOG.debug('Extracting XMP for "%s"', reason) self._cached_xml = self._extract_xmp() @@ -877,7 +877,7 @@ def _xmp_with_reason(self, reason: str) -> T.Optional[ExifReadFromXMP]: return self._cached_xml - def _extract_xmp(self) -> T.Optional[ExifReadFromXMP]: + def _extract_xmp(self) -> ExifReadFromXMP | None: xml_str = self.extract_application_notes() if xml_str is None: if isinstance(self._path_or_stream, Path): @@ -898,7 +898,7 @@ def _extract_xmp(self) -> T.Optional[ExifReadFromXMP]: return ExifReadFromXMP(et.ElementTree(e)) - def extract_altitude(self) -> T.Optional[float]: + def extract_altitude(self) -> float | None: val = super().extract_altitude() if val is not None: return val @@ -910,7 +910,7 @@ def extract_altitude(self) -> T.Optional[float]: return val return None - def extract_capture_time(self) -> T.Optional[datetime.datetime]: + def extract_capture_time(self) -> datetime.datetime | None: val = super().extract_capture_time() if val is not None: return val @@ -922,7 +922,7 @@ def extract_capture_time(self) -> T.Optional[datetime.datetime]: return val return None - def extract_lon_lat(self) -> T.Optional[T.Tuple[float, float]]: + def extract_lon_lat(self) -> tuple[float, float] | None: val = super().extract_lon_lat() if val is not None: return val @@ -934,7 +934,7 @@ def extract_lon_lat(self) -> T.Optional[T.Tuple[float, float]]: return val return None - def extract_make(self) -> T.Optional[str]: + def extract_make(self) -> str | None: val = super().extract_make() if val is not None: return val @@ -946,7 +946,7 @@ def extract_make(self) -> T.Optional[str]: return val return None - def extract_model(self) -> T.Optional[str]: + def extract_model(self) -> str | None: val = super().extract_model() if val is not None: return val @@ -958,7 +958,7 @@ def extract_model(self) -> T.Optional[str]: return val return None - def extract_width(self) -> T.Optional[int]: + def extract_width(self) -> int | None: val = super().extract_width() if val is not None: return val @@ -970,7 +970,7 @@ def extract_width(self) -> T.Optional[int]: return val return None - def extract_height(self) -> T.Optional[int]: + def extract_height(self) -> int | None: val = super().extract_height() if val is not None: return val diff --git a/mapillary_tools/exif_write.py b/mapillary_tools/exif_write.py index c1ad70399..17016de15 100644 --- a/mapillary_tools/exif_write.py +++ b/mapillary_tools/exif_write.py @@ -1,4 +1,5 @@ # pyre-ignore-all-errors[5, 21, 24] +from __future__ import annotations import datetime import io @@ -15,9 +16,9 @@ class ExifEdit: - _filename_or_bytes: T.Union[str, bytes] + _filename_or_bytes: str | bytes - def __init__(self, filename_or_bytes: T.Union[Path, bytes]) -> None: + def __init__(self, filename_or_bytes: Path | bytes) -> None: """Initialize the object""" if isinstance(filename_or_bytes, Path): # make sure filename is resolved to avoid to be interpretted as bytes in piexif @@ -25,12 +26,12 @@ def __init__(self, filename_or_bytes: T.Union[Path, bytes]) -> None: self._filename_or_bytes = str(filename_or_bytes.resolve()) else: self._filename_or_bytes = filename_or_bytes - self._ef: T.Dict = piexif.load(self._filename_or_bytes) + self._ef: dict = piexif.load(self._filename_or_bytes) @staticmethod def decimal_to_dms( value: float, precision: int - ) -> T.Tuple[T.Tuple[float, int], T.Tuple[float, int], T.Tuple[float, int]]: + ) -> tuple[tuple[float, int], tuple[float, int], tuple[float, int]]: """ Convert decimal position to degrees, minutes, seconds in a fromat supported by EXIF """ @@ -40,7 +41,7 @@ def decimal_to_dms( return (deg, 1), (min, 1), (sec, precision) - def add_image_description(self, data: T.Dict) -> None: + def add_image_description(self, data: dict) -> None: """Add a dict to image description.""" self._ef["0th"][piexif.ImageIFD.ImageDescription] = json.dumps(data) @@ -201,7 +202,7 @@ def dump_image_bytes(self) -> bytes: piexif.insert(exif_bytes, self._filename_or_bytes, output) return output.read() - def write(self, filename: T.Optional[Path] = None) -> None: + def write(self, filename: Path | None = None) -> None: """Save exif data to file.""" if filename is None: if not isinstance(self._filename_or_bytes, str): diff --git a/mapillary_tools/exiftool_read.py b/mapillary_tools/exiftool_read.py index 20a51d1b6..3929a0fbb 100644 --- a/mapillary_tools/exiftool_read.py +++ b/mapillary_tools/exiftool_read.py @@ -9,7 +9,7 @@ from . import exif_read, utils -EXIFTOOL_NAMESPACES: T.Dict[str, str] = { +EXIFTOOL_NAMESPACES: dict[str, str] = { "Adobe": "http://ns.exiftool.org/APP14/Adobe/1.0/", "Apple": "http://ns.exiftool.org/MakerNotes/Apple/1.0/", "Composite": "http://ns.exiftool.org/Composite/1.0/", @@ -57,7 +57,7 @@ _DESCRIPTION_TAG = "rdf:Description" -def expand_tag(ns_tag: str, namespaces: T.Dict[str, str]) -> str: +def expand_tag(ns_tag: str, namespaces: dict[str, str]) -> str: try: ns, tag = ns_tag.split(":", maxsplit=2) except ValueError: @@ -72,7 +72,7 @@ def canonical_path(path: Path) -> str: return str(path.resolve().as_posix()) -def find_rdf_description_path(element: ET.Element) -> T.Optional[Path]: +def find_rdf_description_path(element: ET.Element) -> Path | None: about = element.get(_EXPANDED_ABOUT_TAG) if about is None: return None @@ -81,8 +81,8 @@ def find_rdf_description_path(element: ET.Element) -> T.Optional[Path]: def index_rdf_description_by_path( xml_paths: T.Sequence[Path], -) -> T.Dict[str, ET.Element]: - rdf_description_by_path: T.Dict[str, ET.Element] = {} +) -> dict[str, ET.Element]: + rdf_description_by_path: dict[str, ET.Element] = {} for xml_path in utils.find_xml_files(xml_paths): try: @@ -127,7 +127,7 @@ def __init__( ) -> None: self.etree = etree - def extract_altitude(self) -> T.Optional[float]: + def extract_altitude(self) -> float | None: """ Extract altitude """ @@ -143,7 +143,7 @@ def extract_altitude(self) -> T.Optional[float]: def _extract_gps_datetime( self, date_tags: T.Sequence[str], time_tags: T.Sequence[str] - ) -> T.Optional[datetime.datetime]: + ) -> datetime.datetime | None: """ Extract timestamp from GPS field. """ @@ -157,13 +157,13 @@ def _extract_gps_datetime( return exif_read.parse_gps_datetime_separately(gpsdate, gpstimestamp) - def extract_gps_datetime(self) -> T.Optional[datetime.datetime]: + def extract_gps_datetime(self) -> datetime.datetime | None: """ Extract timestamp from GPS field. """ return self._extract_gps_datetime(["GPS:GPSDateStamp"], ["GPS:GPSTimeStamp"]) - def extract_gps_datetime_from_xmp(self) -> T.Optional[datetime.datetime]: + def extract_gps_datetime_from_xmp(self) -> datetime.datetime | None: """ Extract timestamp from XMP GPS field. """ @@ -180,7 +180,7 @@ def _extract_exif_datetime( dt_tags: T.Sequence[str], subsec_tags: T.Sequence[str], offset_tags: T.Sequence[str], - ) -> T.Optional[datetime.datetime]: + ) -> datetime.datetime | None: dtstr = self._extract_alternative_fields(dt_tags, str) if dtstr is None: return None @@ -195,7 +195,7 @@ def _extract_exif_datetime( return None return dt - def extract_exif_datetime_from_xmp(self) -> T.Optional[datetime.datetime]: + def extract_exif_datetime_from_xmp(self) -> datetime.datetime | None: # EXIF DateTimeOriginal: 0x9003 (date/time when original image was taken) # EXIF SubSecTimeOriginal: 0x9291 (fractional seconds for DateTimeOriginal) # EXIF OffsetTimeOriginal: 0x9011 (time zone for DateTimeOriginal) @@ -234,7 +234,7 @@ def extract_exif_datetime_from_xmp(self) -> T.Optional[datetime.datetime]: return None - def extract_exif_datetime(self) -> T.Optional[datetime.datetime]: + def extract_exif_datetime(self) -> datetime.datetime | None: # EXIF DateTimeOriginal: 0x9003 (date/time when original image was taken) # EXIF SubSecTimeOriginal: 0x9291 (fractional seconds for DateTimeOriginal) # EXIF OffsetTimeOriginal: 0x9011 (time zone for DateTimeOriginal) @@ -270,7 +270,7 @@ def extract_exif_datetime(self) -> T.Optional[datetime.datetime]: return None - def extract_capture_time(self) -> T.Optional[datetime.datetime]: + def extract_capture_time(self) -> datetime.datetime | None: """ Extract capture time from EXIF DateTime tags """ @@ -300,7 +300,7 @@ def extract_capture_time(self) -> T.Optional[datetime.datetime]: return None - def extract_direction(self) -> T.Optional[float]: + def extract_direction(self) -> float | None: """ Extract image direction (i.e. compass, heading, bearing) """ @@ -313,7 +313,7 @@ def extract_direction(self) -> T.Optional[float]: float, ) - def extract_lon_lat(self) -> T.Optional[T.Tuple[float, float]]: + def extract_lon_lat(self) -> tuple[float, float] | None: lon_lat = self._extract_lon_lat("GPS:GPSLongitude", "GPS:GPSLatitude") if lon_lat is not None: return lon_lat @@ -332,7 +332,7 @@ def extract_lon_lat(self) -> T.Optional[T.Tuple[float, float]]: def _extract_lon_lat( self, lon_tag: str, lat_tag: str - ) -> T.Optional[T.Tuple[float, float]]: + ) -> tuple[float, float] | None: lon = self._extract_alternative_fields( [lon_tag], float, @@ -355,7 +355,7 @@ def _extract_lon_lat( return lon, lat - def extract_make(self) -> T.Optional[str]: + def extract_make(self) -> str | None: """ Extract camera make """ @@ -374,7 +374,7 @@ def extract_make(self) -> T.Optional[str]: return None return make.strip() - def extract_model(self) -> T.Optional[str]: + def extract_model(self) -> str | None: """ Extract camera model """ @@ -394,7 +394,7 @@ def extract_model(self) -> T.Optional[str]: return None return model.strip() - def extract_width(self) -> T.Optional[int]: + def extract_width(self) -> int | None: """ Extract image width in pixels """ @@ -409,7 +409,7 @@ def extract_width(self) -> T.Optional[int]: int, ) - def extract_height(self) -> T.Optional[int]: + def extract_height(self) -> int | None: """ Extract image height in pixels """ @@ -447,8 +447,8 @@ def extract_orientation(self) -> int: def _extract_alternative_fields( self, fields: T.Sequence[str], - field_type: T.Type[_FIELD_TYPE], - ) -> T.Optional[_FIELD_TYPE]: + field_type: type[_FIELD_TYPE], + ) -> _FIELD_TYPE | None: for field in fields: value = self.etree.findtext(field, namespaces=EXIFTOOL_NAMESPACES) if value is None: diff --git a/mapillary_tools/exiftool_read_video.py b/mapillary_tools/exiftool_read_video.py index 25be9e5b4..7030cfe1e 100644 --- a/mapillary_tools/exiftool_read_video.py +++ b/mapillary_tools/exiftool_read_video.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import dataclasses import functools import logging @@ -9,7 +11,7 @@ MAX_TRACK_ID = 10 -EXIFTOOL_NAMESPACES: T.Dict[str, str] = { +EXIFTOOL_NAMESPACES: dict[str, str] = { "Keys": "http://ns.exiftool.org/QuickTime/Keys/1.0/", "IFD0": "http://ns.exiftool.org/EXIF/IFD0/1.0/", "QuickTime": "http://ns.exiftool.org/QuickTime/QuickTime/1.0/", @@ -28,7 +30,7 @@ expand_tag = functools.partial(exiftool_read.expand_tag, namespaces=EXIFTOOL_NAMESPACES) -def _maybe_float(text: T.Optional[str]) -> T.Optional[float]: +def _maybe_float(text: str | None) -> float | None: if text is None: return None try: @@ -37,8 +39,8 @@ def _maybe_float(text: T.Optional[str]) -> T.Optional[float]: return None -def _index_text_by_tag(elements: T.Iterable[ET.Element]) -> T.Dict[str, T.List[str]]: - texts_by_tag: T.Dict[str, T.List[str]] = {} +def _index_text_by_tag(elements: T.Iterable[ET.Element]) -> dict[str, list[str]]: + texts_by_tag: dict[str, list[str]] = {} for element in elements: tag = element.tag if element.text is not None: @@ -47,10 +49,10 @@ def _index_text_by_tag(elements: T.Iterable[ET.Element]) -> T.Dict[str, T.List[s def _extract_alternative_fields( - texts_by_tag: T.Dict[str, T.List[str]], + texts_by_tag: dict[str, list[str]], fields: T.Sequence[str], field_type: T.Type[_FIELD_TYPE], -) -> T.Optional[_FIELD_TYPE]: +) -> _FIELD_TYPE | None: for field in fields: values = texts_by_tag.get(expand_tag(field)) if values is None: @@ -81,15 +83,15 @@ def _extract_alternative_fields( def _aggregate_gps_track( - texts_by_tag: T.Dict[str, T.List[str]], - time_tag: T.Optional[str], + texts_by_tag: dict[str, list[str]], + time_tag: str | None, lon_tag: str, lat_tag: str, - alt_tag: T.Optional[str] = None, - gps_time_tag: T.Optional[str] = None, - direction_tag: T.Optional[str] = None, - ground_speed_tag: T.Optional[str] = None, -) -> T.List[GPSPoint]: + alt_tag: str | None = None, + gps_time_tag: str | None = None, + direction_tag: str | None = None, + ground_speed_tag: str | None = None, +) -> list[GPSPoint]: """ Aggregate all GPS data by the tags. It requires lat, lon to be present, and their lengths must match. @@ -140,8 +142,8 @@ def _aggregate_gps_track( assert len(timestamps) == expected_length def _aggregate_float_values_same_length( - tag: T.Optional[str], - ) -> T.List[T.Optional[float]]: + tag: str | None, + ) -> list[float | None]: if tag is not None: vals = [ _maybe_float(val) @@ -212,11 +214,11 @@ def _aggregate_samples( elements: T.Iterable[ET.Element], sample_time_tag: str, sample_duration_tag: str, -) -> T.Generator[T.Tuple[float, float, T.List[ET.Element]], None, None]: +) -> T.Generator[tuple[float, float, list[ET.Element]], None, None]: expanded_sample_time_tag = expand_tag(sample_time_tag) expanded_sample_duration_tag = expand_tag(sample_duration_tag) - accumulated_elements: T.List[ET.Element] = [] + accumulated_elements: list[ET.Element] = [] sample_time = None sample_duration = None for element in elements: @@ -234,17 +236,17 @@ def _aggregate_samples( def _aggregate_gps_track_by_sample_time( - sample_iterator: T.Iterable[T.Tuple[float, float, T.List[ET.Element]]], + sample_iterator: T.Iterable[tuple[float, float, list[ET.Element]]], lon_tag: str, lat_tag: str, - alt_tag: T.Optional[str] = None, - gps_time_tag: T.Optional[str] = None, - direction_tag: T.Optional[str] = None, - ground_speed_tag: T.Optional[str] = None, - gps_fix_tag: T.Optional[str] = None, - gps_precision_tag: T.Optional[str] = None, -) -> T.List[GPSPoint]: - track: T.List[GPSPoint] = [] + alt_tag: str | None = None, + gps_time_tag: str | None = None, + direction_tag: str | None = None, + ground_speed_tag: str | None = None, + gps_fix_tag: str | None = None, + gps_precision_tag: str | None = None, +) -> list[GPSPoint]: + track: list[GPSPoint] = [] expanded_gps_fix_tag = None if gps_fix_tag is not None: @@ -311,25 +313,25 @@ def __init__( self._texts_by_tag = _index_text_by_tag(self.etree.getroot()) self._all_tags = set(self._texts_by_tag.keys()) - def extract_gps_track(self) -> T.List[geo.Point]: + def extract_gps_track(self) -> list[geo.Point]: # blackvue and many other cameras track_with_fix = self._extract_gps_track_from_quicktime() if track_with_fix: - return T.cast(T.List[geo.Point], track_with_fix) + return T.cast(list[geo.Point], track_with_fix) # insta360 has its own tag track_with_fix = self._extract_gps_track_from_quicktime(namespace="Insta360") if track_with_fix: - return T.cast(T.List[geo.Point], track_with_fix) + return T.cast(list[geo.Point], track_with_fix) # mostly for gopro track_with_fix = self._extract_gps_track_from_track() if track_with_fix: - return T.cast(T.List[geo.Point], track_with_fix) + return T.cast(list[geo.Point], track_with_fix) return [] - def _extract_make_and_model(self) -> T.Tuple[T.Optional[str], T.Optional[str]]: + def _extract_make_and_model(self) -> tuple[str | None, str | None]: make = self._extract_alternative_fields(["GoPro:Make"], str) model = self._extract_alternative_fields(["GoPro:Model"], str) if model is not None: @@ -360,15 +362,15 @@ def _extract_make_and_model(self) -> T.Tuple[T.Optional[str], T.Optional[str]]: model = model.strip() return make, model - def extract_make(self) -> T.Optional[str]: + def extract_make(self) -> str | None: make, _ = self._extract_make_and_model() return make - def extract_model(self) -> T.Optional[str]: + def extract_model(self) -> str | None: _, model = self._extract_make_and_model() return model - def _extract_gps_track_from_track(self) -> T.List[GPSPoint]: + def _extract_gps_track_from_track(self) -> list[GPSPoint]: for track_id in range(1, MAX_TRACK_ID + 1): track_ns = f"Track{track_id}" if self._all_tags_exists( @@ -402,15 +404,15 @@ def _extract_alternative_fields( self, fields: T.Sequence[str], field_type: T.Type[_FIELD_TYPE], - ) -> T.Optional[_FIELD_TYPE]: + ) -> _FIELD_TYPE | None: return _extract_alternative_fields(self._texts_by_tag, fields, field_type) - def _all_tags_exists(self, tags: T.Set[str]) -> bool: + def _all_tags_exists(self, tags: set[str]) -> bool: return self._all_tags.issuperset(tags) def _extract_gps_track_from_quicktime( self, namespace: str = "QuickTime" - ) -> T.List[GPSPoint]: + ) -> list[GPSPoint]: if not self._all_tags_exists( { expand_tag(f"{namespace}:GPSDateTime"), diff --git a/mapillary_tools/ffmpeg.py b/mapillary_tools/ffmpeg.py index b7215642c..0125150ae 100644 --- a/mapillary_tools/ffmpeg.py +++ b/mapillary_tools/ffmpeg.py @@ -1,4 +1,5 @@ # pyre-ignore-all-errors[5, 24] +from __future__ import annotations import datetime import json @@ -33,7 +34,7 @@ class Stream(T.TypedDict): class ProbeOutput(T.TypedDict): - streams: T.List[Stream] + streams: list[Stream] class FFmpegNotFoundError(Exception): @@ -77,7 +78,7 @@ def __init__( self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe", - stderr: T.Optional[int] = None, + stderr: int | None = None, ) -> None: """ ffmpeg_path: path to ffmpeg binary @@ -88,8 +89,8 @@ def __init__( self.ffprobe_path = ffprobe_path self.stderr = stderr - def _run_ffprobe_json(self, cmd: T.List[str]) -> T.Dict: - full_cmd: T.List[str] = [self.ffprobe_path, "-print_format", "json", *cmd] + def _run_ffprobe_json(self, cmd: list[str]) -> dict: + full_cmd: list[str] = [self.ffprobe_path, "-print_format", "json", *cmd] LOG.info(f"Extracting video information: {' '.join(full_cmd)}") try: completed = subprocess.run( @@ -132,8 +133,8 @@ def _run_ffprobe_json(self, cmd: T.List[str]) -> T.Dict: return output - def _run_ffmpeg(self, cmd: T.List[str]) -> None: - full_cmd: T.List[str] = [self.ffmpeg_path, *cmd] + def _run_ffmpeg(self, cmd: list[str]) -> None: + full_cmd: list[str] = [self.ffmpeg_path, *cmd] LOG.info(f"Extracting frames: {' '.join(full_cmd)}") try: subprocess.run(full_cmd, check=True, stderr=self.stderr) @@ -145,7 +146,7 @@ def _run_ffmpeg(self, cmd: T.List[str]) -> None: raise FFmpegCalledProcessError(ex) from ex def probe_format_and_streams(self, video_path: Path) -> ProbeOutput: - cmd: T.List[str] = [ + cmd: list[str] = [ "-hide_banner", "-show_format", "-show_streams", @@ -158,7 +159,7 @@ def extract_frames( video_path: Path, sample_dir: Path, sample_interval: float, - stream_idx: T.Optional[int] = None, + stream_idx: int | None = None, ) -> None: """ Extract frames by the sample interval from the specified video stream. @@ -175,7 +176,7 @@ def extract_frames( ouput_template = f"{sample_prefix}_{NA_STREAM_IDX}_%06d{FRAME_EXT}" stream_specifier = "v" - cmd: T.List[str] = [ + cmd: list[str] = [ # global options should be specified first *["-hide_banner", "-nostdin"], # input 0 @@ -195,7 +196,7 @@ def extract_frames( self._run_ffmpeg(cmd) - def generate_binary_search(self, sorted_frame_indices: T.Sequence[int]) -> str: + def generate_binary_search(self, sorted_frame_indices: list[int]) -> str: length = len(sorted_frame_indices) if length == 0: @@ -211,8 +212,8 @@ def extract_specified_frames( self, video_path: Path, sample_dir: Path, - frame_indices: T.Set[int], - stream_idx: T.Optional[int] = None, + frame_indices: set[int], + stream_idx: int | None = None, ) -> None: """ Extract specified frames from the specified video stream. @@ -253,7 +254,7 @@ def extract_specified_frames( # If not close, error "The process cannot access the file because it is being used by another process" if not delete: select_file.close() - cmd: T.List[str] = [ + cmd: list[str] = [ # global options should be specified first *["-hide_banner", "-nostdin"], # input 0 @@ -300,7 +301,7 @@ class Probe: def __init__(self, probe: ProbeOutput) -> None: self.probe = probe - def probe_video_start_time(self) -> T.Optional[datetime.datetime]: + def probe_video_start_time(self) -> datetime.datetime | None: """ Find video start time of the given video. It searches video creation time and duration in video streams first and then the other streams. @@ -327,11 +328,11 @@ def probe_video_start_time(self) -> T.Optional[datetime.datetime]: return None - def probe_video_streams(self) -> T.List[Stream]: + def probe_video_streams(self) -> list[Stream]: streams = self.probe.get("streams", []) return [stream for stream in streams if stream.get("codec_type") == "video"] - def probe_video_with_max_resolution(self) -> T.Optional[Stream]: + def probe_video_with_max_resolution(self) -> Stream | None: video_streams = self.probe_video_streams() video_streams.sort( key=lambda s: s.get("width", 0) * s.get("height", 0), reverse=True @@ -341,7 +342,7 @@ def probe_video_with_max_resolution(self) -> T.Optional[Stream]: return video_streams[0] -def extract_stream_start_time(stream: Stream) -> T.Optional[datetime.datetime]: +def extract_stream_start_time(stream: Stream) -> datetime.datetime | None: """ Find the start time of the given stream. Start time is the creation time of the stream minus the duration of the stream. @@ -368,7 +369,7 @@ def extract_stream_start_time(stream: Stream) -> T.Optional[datetime.datetime]: def _extract_stream_frame_idx( sample_basename: str, sample_basename_pattern: T.Pattern[str], -) -> T.Optional[T.Tuple[T.Optional[int], int]]: +) -> tuple[int | None, int] | None: """ extract stream id and frame index from sample basename e.g. basename GX010001_NA_000000.jpg will extract (None, 0) @@ -408,7 +409,7 @@ def _extract_stream_frame_idx( def iterate_samples( sample_dir: Path, video_path: Path -) -> T.Generator[T.Tuple[T.Optional[int], int, Path], None, None]: +) -> T.Generator[tuple[int | None, int, Path], None, None]: """ Search all samples in the sample_dir, and return a generator of the tuple: (stream ID, frame index, sample path). @@ -428,17 +429,17 @@ def iterate_samples( def sort_selected_samples( - sample_dir: Path, video_path: Path, selected_stream_indices: T.List[T.Optional[int]] -) -> T.List[T.Tuple[int, T.List[T.Optional[Path]]]]: + sample_dir: Path, video_path: Path, selected_stream_indices: list[int | None] +) -> list[tuple[int, list[Path | None]]]: """ Group frames by frame index, so that the Nth group contains all the frames from the selected streams at frame index N. """ - stream_samples: T.Dict[int, T.List[T.Tuple[T.Optional[int], Path]]] = {} + stream_samples: dict[int, list[tuple[int | None, Path]]] = {} for stream_idx, frame_idx, sample_path in iterate_samples(sample_dir, video_path): stream_samples.setdefault(frame_idx, []).append((stream_idx, sample_path)) - selected: T.List[T.Tuple[int, T.List[T.Optional[Path]]]] = [] + selected: list[tuple[int, list[Path | None]]] = [] for frame_idx in sorted(stream_samples.keys()): indexed = { stream_idx: sample_path diff --git a/mapillary_tools/geo.py b/mapillary_tools/geo.py index 3c4110414..5a6bb8d12 100644 --- a/mapillary_tools/geo.py +++ b/mapillary_tools/geo.py @@ -244,14 +244,14 @@ def interpolate(self, t: float) -> Point: return interpolated -_PointAbstract = T.TypeVar("_PointAbstract") +_T = T.TypeVar("_T") def sample_points_by_distance( - samples: T.Iterable[_PointAbstract], + samples: T.Iterable[_T], min_distance: float, - point_func: T.Callable[[_PointAbstract], Point], -) -> T.Generator[_PointAbstract, None, None]: + point_func: T.Callable[[_T], Point], +) -> T.Generator[_T, None, None]: prevp: Point | None = None for sample in samples: if prevp is None: @@ -281,23 +281,6 @@ def interpolate_directions_if_none(sequence: T.Sequence[PointLike]) -> None: sequence[-1].angle = prev_angle -def extend_deduplicate_points( - sequence: T.Iterable[PointLike], - to_extend: list[PointLike] | None = None, -) -> list[PointLike]: - if to_extend is None: - to_extend = [] - for point in sequence: - if to_extend: - prev = to_extend[-1].lon, to_extend[-1].lat - cur = (point.lon, point.lat) - if cur != prev: - to_extend.append(point) - else: - to_extend.append(point) - return to_extend - - def _ecef_from_lla2(lat: float, lon: float) -> tuple[float, float, float]: """ Compute ECEF XYZ from latitude and longitude. diff --git a/mapillary_tools/gpmf/gpmf_parser.py b/mapillary_tools/gpmf/gpmf_parser.py index f3ff4ed4d..497a80f57 100644 --- a/mapillary_tools/gpmf/gpmf_parser.py +++ b/mapillary_tools/gpmf/gpmf_parser.py @@ -39,7 +39,7 @@ class KLVDict(T.TypedDict): type: bytes structure_size: int repeat: int - data: T.List[T.Any] + data: list[T.Any] GPMFSampleData: C.GreedyRange @@ -143,7 +143,7 @@ class GoProInfo: def extract_gopro_info( fp: T.BinaryIO, telemetry_only: bool = False -) -> T.Optional[GoProInfo]: +) -> GoProInfo | None: """ Return the GoProInfo object if found. None indicates it's not a valid GoPro video. """ @@ -276,7 +276,7 @@ def _gps5_timestamp_to_epoch_time(dtstr: str): def _gps5_from_stream( stream: T.Sequence[KLVDict], ) -> T.Generator[telemetry.GPSPoint, None, None]: - indexed: T.Dict[bytes, T.List[T.List[T.Any]]] = { + indexed: dict[bytes, list[list[T.Any]]] = { klv["key"]: klv["data"] for klv in stream } @@ -362,7 +362,7 @@ def _gps9_from_stream( ) -> T.Generator[telemetry.GPSPoint, None, None]: NUM_VALUES = 9 - indexed: T.Dict[bytes, T.List[T.List[T.Any]]] = { + indexed: dict[bytes, list[list[T.Any]]] = { klv["key"]: klv["data"] for klv in stream } @@ -444,8 +444,8 @@ def _find_first_device_id(stream: T.Sequence[KLVDict]) -> int: return device_id -def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[telemetry.GPSPoint]: - sample_points: T.List[telemetry.GPSPoint] = [] +def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> list[telemetry.GPSPoint]: + sample_points: list[telemetry.GPSPoint] = [] for klv in stream: if klv["key"] == b"STRM": @@ -469,7 +469,7 @@ def _is_matrix_calibration(matrix: T.Sequence[float]) -> bool: def _build_matrix( - orin: T.Union[bytes, T.Sequence[int]], orio: T.Union[bytes, T.Sequence[int]] + orin: bytes | T.Sequence[int], orio: bytes | T.Sequence[int] ) -> T.Sequence[float]: matrix = [] @@ -503,14 +503,14 @@ def _apply_matrix( yield sum(matrix[row_start + x] * values[x] for x in range(size)) -def _flatten(nested: T.Sequence[T.Sequence[float]]) -> T.List[float]: - output: T.List[float] = [] +def _flatten(nested: T.Sequence[T.Sequence[float]]) -> list[float]: + output: list[float] = [] for row in nested: output.extend(row) return output -def _get_matrix(klv: T.Dict[bytes, KLVDict]) -> T.Optional[T.Sequence[float]]: +def _get_matrix(klv: dict[bytes, KLVDict]) -> T.Sequence[float] | None: mtrx = klv.get(b"MTRX") if mtrx is not None: matrix: T.Sequence[float] = _flatten(mtrx["data"]) @@ -530,7 +530,7 @@ def _get_matrix(klv: T.Dict[bytes, KLVDict]) -> T.Optional[T.Sequence[float]]: def _scale_and_calibrate( stream: T.Sequence[KLVDict], key: bytes ) -> T.Generator[T.Sequence[float], None, None]: - indexed: T.Dict[bytes, KLVDict] = {klv["key"]: klv for klv in stream} + indexed: dict[bytes, KLVDict] = {klv["key"]: klv for klv in stream} klv = indexed.get(key) if klv is None: @@ -561,7 +561,7 @@ def _scale_and_calibrate( def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes): - values: T.List[T.Sequence[float]] = [] + values: list[T.Sequence[float]] = [] for klv in stream: if klv["key"] == b"STRM": @@ -684,7 +684,7 @@ def _load_telemetry_from_samples( return device_found -def _is_gpmd_description(description: T.Dict) -> bool: +def _is_gpmd_description(description: dict) -> bool: return description["format"] == b"gpmd" @@ -699,11 +699,11 @@ def _filter_gpmd_samples(track: TrackBoxParser) -> T.Generator[Sample, None, Non yield sample -def _extract_camera_model_from_devices(device_names: T.Dict[int, bytes]) -> str: +def _extract_camera_model_from_devices(device_names: dict[int, bytes]) -> str: if not device_names: return "" - unicode_names: T.List[str] = [] + unicode_names: list[str] = [] for name in device_names.values(): try: unicode_names.append(name.decode("utf-8")) @@ -730,7 +730,7 @@ def _extract_camera_model_from_devices(device_names: T.Dict[int, bytes]) -> str: def _iterate_read_sample_data( fp: T.BinaryIO, samples: T.Iterable[Sample] -) -> T.Generator[T.Tuple[Sample, bytes], None, None]: +) -> T.Generator[tuple[Sample, bytes], None, None]: for sample in samples: fp.seek(sample.raw_sample.offset, io.SEEK_SET) yield (sample, fp.read(sample.raw_sample.size)) diff --git a/mapillary_tools/gpmf/gps_filter.py b/mapillary_tools/gpmf/gps_filter.py index 3e3213488..12d9abdff 100644 --- a/mapillary_tools/gpmf/gps_filter.py +++ b/mapillary_tools/gpmf/gps_filter.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import statistics import typing as T @@ -96,7 +98,7 @@ def _f(p1, p2): def dbscan( sequences: T.Sequence[PointSequence], merge_or_not: Decider, -) -> T.Dict[int, PointSequence]: +) -> dict[int, PointSequence]: """ One-dimension DBSCAN clustering: https://en.wikipedia.org/wiki/DBSCAN The input is a list of sequences, and it is guaranteed that all sequences are sorted by time. @@ -107,7 +109,7 @@ def dbscan( """ # find which sequences (keys) should be merged to which sequences (values) - mergeto: T.Dict[int, int] = {} + mergeto: dict[int, int] = {} for left in range(len(sequences)): mergeto.setdefault(left, left) # find the first sequence to merge with @@ -119,7 +121,7 @@ def dbscan( break # merge - merged: T.Dict[int, PointSequence] = {} + merged: dict[int, PointSequence] = {} for idx, s in enumerate(sequences): merged.setdefault(mergeto[idx], []).extend(s) diff --git a/mapillary_tools/history.py b/mapillary_tools/history.py index 49b2ab715..90d51b277 100644 --- a/mapillary_tools/history.py +++ b/mapillary_tools/history.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import json import logging import string @@ -43,14 +45,14 @@ def write_history( md5sum: str, params: JSONDict, summary: JSONDict, - metadatas: T.Optional[T.Sequence[types.Metadata]] = None, + metadatas: T.Sequence[types.Metadata] | None = None, ) -> None: if not constants.MAPILLARY_UPLOAD_HISTORY_PATH: return path = history_desc_path(md5sum) LOG.debug("Writing upload history: %s", path) path.resolve().parent.mkdir(parents=True, exist_ok=True) - history: T.Dict[str, T.Any] = { + history: dict[str, T.Any] = { "params": params, "summary": summary, } diff --git a/mapillary_tools/mp4/construct_mp4_parser.py b/mapillary_tools/mp4/construct_mp4_parser.py index 6391e6846..0d7b8d84a 100644 --- a/mapillary_tools/mp4/construct_mp4_parser.py +++ b/mapillary_tools/mp4/construct_mp4_parser.py @@ -1,4 +1,5 @@ # pyre-ignore-all-errors[5, 16, 21, 58] +from __future__ import annotations import typing as T @@ -42,7 +43,7 @@ class BoxDict(T.TypedDict, total=True): type: BoxType - data: T.Union[T.Sequence["BoxDict"], T.Dict[str, T.Any], bytes] + data: T.Sequence["BoxDict"] | dict[str, T.Any] | bytes _UNITY_MATRIX = [0x10000, 0, 0, 0, 0x10000, 0, 0, 0, 0x40000000] @@ -376,7 +377,7 @@ class Box64ConstructBuilder: NOTE: Do not build data with this struct. For building, use Box32StructBuilder instead. """ - _box: T.Optional[C.Construct] + _box: C.Construct | None def __init__( self, @@ -438,8 +439,8 @@ def BoxList(self) -> C.Construct: def parse_box(self, data: bytes) -> BoxDict: return T.cast(BoxDict, self.Box.parse(data)) - def parse_boxlist(self, data: bytes) -> T.List[BoxDict]: - return T.cast(T.List[BoxDict], self.BoxList.parse(data)) + def parse_boxlist(self, data: bytes) -> list[BoxDict]: + return T.cast(list[BoxDict], self.BoxList.parse(data)) class Box32ConstructBuilder(Box64ConstructBuilder): @@ -464,7 +465,7 @@ def Box(self) -> C.Construct: def parse_box(self, data: bytes) -> BoxDict: raise NotImplementedError("Box32ConstructBuilder does not support parsing") - def parse_boxlist(self, data: bytes) -> T.List[BoxDict]: + def parse_boxlist(self, data: bytes) -> list[BoxDict]: raise NotImplementedError("Box32ConstructBuilder does not support parsing") def build_box(self, box: BoxDict) -> bytes: @@ -584,7 +585,7 @@ def _new_cmap_without_boxes( def find_box_at_pathx( - box: T.Union[T.Sequence[BoxDict], BoxDict], path: T.Sequence[bytes] + box: T.Sequence[BoxDict] | BoxDict, path: T.Sequence[bytes] ) -> BoxDict: found = find_box_at_path(box, path) if found is None: @@ -593,8 +594,8 @@ def find_box_at_pathx( def find_box_at_path( - box: T.Union[T.Sequence[BoxDict], BoxDict], path: T.Sequence[bytes] -) -> T.Optional[BoxDict]: + box: T.Sequence[BoxDict] | BoxDict, path: T.Sequence[bytes] +) -> BoxDict | None: if not path: return None @@ -608,7 +609,7 @@ def find_box_at_path( if box["type"] == path[0]: if len(path) == 1: return box - box_data = T.cast(T.Sequence[BoxDict], box["data"]) + box_data = T.cast(list[BoxDict], box["data"]) # ListContainer from construct is not sequence assert isinstance(box_data, T.Sequence), ( f"expect a list of boxes but got {type(box_data)} at path {path}" diff --git a/mapillary_tools/mp4/mp4_sample_parser.py b/mapillary_tools/mp4/mp4_sample_parser.py index 285825aa4..121d46c71 100644 --- a/mapillary_tools/mp4/mp4_sample_parser.py +++ b/mapillary_tools/mp4/mp4_sample_parser.py @@ -44,16 +44,16 @@ class Sample(T.NamedTuple): exact_timedelta: float # reference to the sample description - description: T.Dict + description: dict def _extract_raw_samples( sizes: T.Sequence[int], - chunk_entries: T.Sequence[T.Dict], + chunk_entries: T.Sequence[dict], chunk_offsets: T.Sequence[int], timedeltas: T.Sequence[int], - composition_offsets: T.Optional[T.Sequence[int]], - syncs: T.Optional[T.Set[int]], + composition_offsets: list[int] | None, + syncs: set[int] | None, ) -> T.Generator[RawSample, None, None]: if not sizes: return @@ -130,7 +130,7 @@ def _extract_raw_samples( def _extract_samples( raw_samples: T.Iterator[RawSample], - descriptions: T.List, + descriptions: list, timescale: int, ) -> T.Generator[Sample, None, None]: acc_delta = 0 @@ -154,21 +154,21 @@ def _extract_samples( def extract_raw_samples_from_stbl_data( stbl: bytes, -) -> T.Tuple[T.List[T.Dict], T.Generator[RawSample, None, None]]: - descriptions = [] - sizes = [] - chunk_offsets = [] - chunk_entries = [] - timedeltas: T.List[int] = [] - composition_offsets: T.Optional[T.List[int]] = None - syncs: T.Optional[T.Set[int]] = None +) -> tuple[list[dict], T.Generator[RawSample, None, None]]: + descriptions: list[dict] = [] + sizes: list[int] = [] + chunk_offsets: list[int] = [] + chunk_entries: list[dict] = [] + timedeltas: list[int] = [] + composition_offsets: list[int] | None = None + syncs: set[int] | None = None stbl_children = T.cast( T.Sequence[cparser.BoxDict], STBLBoxlistConstruct.parse(stbl) ) for box in stbl_children: - data: T.Dict = T.cast(T.Dict, box["data"]) + data: dict = T.cast(dict, box["data"]) if box["type"] == b"stsd": descriptions = list(data["entries"]) @@ -227,32 +227,32 @@ def __init__(self, trak_children: T.Sequence[cparser.BoxDict]): ) self.stbl_data = T.cast(bytes, stbl["data"]) - def extract_tkhd_boxdata(self) -> T.Dict: + def extract_tkhd_boxdata(self) -> dict: return T.cast( - T.Dict, cparser.find_box_at_pathx(self.trak_children, [b"tkhd"])["data"] + dict, cparser.find_box_at_pathx(self.trak_children, [b"tkhd"])["data"] ) def is_video_track(self) -> bool: hdlr = cparser.find_box_at_pathx(self.trak_children, [b"mdia", b"hdlr"]) - return T.cast(T.Dict[str, T.Any], hdlr["data"])["handler_type"] == b"vide" + return T.cast(dict[str, T.Any], hdlr["data"])["handler_type"] == b"vide" - def extract_sample_descriptions(self) -> T.List[T.Dict]: + def extract_sample_descriptions(self) -> list[dict]: # TODO: return [] if parsing fail boxes = _STSDBoxListConstruct.parse(self.stbl_data) stsd = cparser.find_box_at_pathx( T.cast(T.Sequence[cparser.BoxDict], boxes), [b"stsd"] ) - return T.cast(T.List[T.Dict], T.cast(T.Dict, stsd["data"])["entries"]) + return T.cast(list[dict], T.cast(dict, stsd["data"])["entries"]) - def extract_elst_boxdata(self) -> T.Optional[T.Dict]: + def extract_elst_boxdata(self) -> dict | None: box = cparser.find_box_at_path(self.trak_children, [b"edts", b"elst"]) if box is None: return None - return T.cast(T.Dict, box["data"]) + return T.cast(dict, box["data"]) - def extract_mdhd_boxdata(self) -> T.Dict: + def extract_mdhd_boxdata(self) -> dict: box = cparser.find_box_at_pathx(self.trak_children, [b"mdia", b"mdhd"]) - return T.cast(T.Dict, box["data"]) + return T.cast(dict, box["data"]) def extract_raw_samples(self) -> T.Generator[RawSample, None, None]: _, raw_samples = extract_raw_samples_from_stbl_data(self.stbl_data) @@ -261,7 +261,7 @@ def extract_raw_samples(self) -> T.Generator[RawSample, None, None]: def extract_samples(self) -> T.Generator[Sample, None, None]: descriptions, raw_samples = extract_raw_samples_from_stbl_data(self.stbl_data) mdhd = T.cast( - T.Dict, + dict, cparser.find_box_at_pathx(self.trak_children, [b"mdia", b"mdhd"])["data"], ) yield from _extract_samples(raw_samples, descriptions, mdhd["timescale"]) @@ -287,15 +287,15 @@ def parse_stream(cls, stream: T.BinaryIO) -> "MovieBoxParser": moov = sparser.parse_box_data_firstx(stream, [b"moov"]) return cls(moov) - def extract_mvhd_boxdata(self) -> T.Dict: + def extract_mvhd_boxdata(self) -> dict: mvhd = cparser.find_box_at_pathx(self.moov_children, [b"mvhd"]) - return T.cast(T.Dict, mvhd["data"]) + return T.cast(dict, mvhd["data"]) - def extract_udta_boxdata(self) -> T.Dict | None: + def extract_udta_boxdata(self) -> dict | None: box = cparser.find_box_at_path(self.moov_children, [b"udta"]) if box is None: return None - return T.cast(T.Dict, box["data"]) + return T.cast(dict, box["data"]) def extract_tracks(self) -> T.Generator[TrackBoxParser, None, None]: for box in self.moov_children: diff --git a/mapillary_tools/mp4/simple_mp4_builder.py b/mapillary_tools/mp4/simple_mp4_builder.py index b93a3ef29..69e07cfbf 100644 --- a/mapillary_tools/mp4/simple_mp4_builder.py +++ b/mapillary_tools/mp4/simple_mp4_builder.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import dataclasses import io import typing as T @@ -64,8 +66,8 @@ class _SampleChunk: offset: int -def _build_chunks(raw_samples: T.Iterable[RawSample]) -> T.List[_SampleChunk]: - chunks: T.List[_SampleChunk] = [] +def _build_chunks(raw_samples: T.Iterable[RawSample]) -> list[_SampleChunk]: + chunks: list[_SampleChunk] = [] prev_raw_sample = None for raw_sample in raw_samples: @@ -120,7 +122,7 @@ class _CompressedSampleDelta: def _build_stts(sample_deltas: T.Iterable[int]) -> BoxDict: # compress deltas - compressed: T.List[_CompressedSampleDelta] = [] + compressed: list[_CompressedSampleDelta] = [] for delta in sample_deltas: if compressed and delta == compressed[-1].sample_delta: compressed[-1].sample_count += 1 @@ -146,7 +148,7 @@ class _CompressedSampleCompositionOffset: def _build_ctts(sample_composition_offsets: T.Iterable[int]) -> BoxDict: # compress offsets - compressed: T.List[_CompressedSampleCompositionOffset] = [] + compressed: list[_CompressedSampleCompositionOffset] = [] for offset in sample_composition_offsets: if compressed and offset == compressed[-1].sample_offset: compressed[-1].sample_count += 1 @@ -182,7 +184,7 @@ def _build_stss(is_syncs: T.Iterable[bool]) -> BoxDict: def build_stbl_from_raw_samples( descriptions: T.Sequence[T.Any], raw_samples: T.Iterable[RawSample] -) -> T.List[BoxDict]: +) -> list[BoxDict]: # raw_samples could be iterator so convert to list raw_samples = list(raw_samples) # It is recommended that the boxes within the Sample Table Box be in the following order: @@ -329,9 +331,8 @@ def _build_moov_typed_data(moov_children: T.Sequence[BoxDict]) -> bytes: def transform_mp4( src_fp: T.BinaryIO, - sample_generator: T.Optional[ - T.Callable[[T.BinaryIO, T.List[BoxDict]], T.Iterator[io.IOBase]] - ] = None, + sample_generator: T.Callable[[T.BinaryIO, list[BoxDict]], T.Iterator[io.IOBase]] + | None = None, ) -> io_utils.ChainedIO: # extract ftyp src_fp.seek(0) @@ -347,7 +348,7 @@ def transform_mp4( # extract video samples source_samples = list(iterate_samples(moov_children)) - sample_readers: T.List[io.IOBase] = [ + sample_readers: list[io.IOBase] = [ io_utils.SlicedIO(src_fp, sample.offset, sample.size) for sample in source_samples ] diff --git a/mapillary_tools/mp4/simple_mp4_parser.py b/mapillary_tools/mp4/simple_mp4_parser.py index 8f1083397..b35358392 100644 --- a/mapillary_tools/mp4/simple_mp4_parser.py +++ b/mapillary_tools/mp4/simple_mp4_parser.py @@ -1,4 +1,5 @@ # pyre-ignore-all-errors[5, 16, 21, 24, 58] +from __future__ import annotations import io import typing as T @@ -130,8 +131,8 @@ def parse_boxes_recursive( stream: T.BinaryIO, maxsize: int = -1, depth: int = 0, - box_list_types: T.Optional[T.Set[bytes]] = None, -) -> T.Generator[T.Tuple[Header, int, T.BinaryIO], None, None]: + box_list_types: set[bytes] | None = None, +) -> T.Generator[tuple[Header, int, T.BinaryIO], None, None]: assert maxsize == -1 or 0 <= maxsize if box_list_types is None: @@ -152,10 +153,10 @@ def parse_boxes_recursive( def parse_path( stream: T.BinaryIO, - path: T.Sequence[T.Union[bytes, T.Sequence[bytes]]], + path: T.Sequence[bytes | T.Sequence[bytes]], maxsize: int = -1, depth: int = 0, -) -> T.Generator[T.Tuple[Header, T.BinaryIO], None, None]: +) -> T.Generator[tuple[Header, T.BinaryIO], None, None]: if not path: return @@ -172,8 +173,8 @@ def parse_path( def _parse_path_first( - stream: T.BinaryIO, path: T.List[bytes], maxsize: int = -1, depth: int = 0 -) -> T.Optional[T.Tuple[Header, T.BinaryIO]]: + stream: T.BinaryIO, path: list[bytes], maxsize: int = -1, depth: int = 0 +) -> tuple[Header, T.BinaryIO] | None: if not path: return None for h, s in parse_boxes(stream, maxsize=maxsize, extend_eof=depth == 0): @@ -188,8 +189,8 @@ def _parse_path_first( def parse_mp4_data_first( - stream: T.BinaryIO, path: T.List[bytes], maxsize: int = -1 -) -> T.Optional[bytes]: + stream: T.BinaryIO, path: list[bytes], maxsize: int = -1 +) -> bytes | None: # depth=0 will enable EoF extension parsed = _parse_path_first(stream, path, maxsize=maxsize, depth=0) if parsed is None: @@ -199,7 +200,7 @@ def parse_mp4_data_first( def parse_mp4_data_firstx( - stream: T.BinaryIO, path: T.List[bytes], maxsize: int = -1 + stream: T.BinaryIO, path: list[bytes], maxsize: int = -1 ) -> bytes: data = parse_mp4_data_first(stream, path, maxsize=maxsize) if data is None: @@ -208,8 +209,8 @@ def parse_mp4_data_firstx( def parse_box_data_first( - stream: T.BinaryIO, path: T.List[bytes], maxsize: int = -1 -) -> T.Optional[bytes]: + stream: T.BinaryIO, path: list[bytes], maxsize: int = -1 +) -> bytes | None: # depth=1 will disable EoF extension parsed = _parse_path_first(stream, path, maxsize=maxsize, depth=1) if parsed is None: @@ -219,7 +220,7 @@ def parse_box_data_first( def parse_box_data_firstx( - stream: T.BinaryIO, path: T.List[bytes], maxsize: int = -1 + stream: T.BinaryIO, path: list[bytes], maxsize: int = -1 ) -> bytes: data = parse_box_data_first(stream, path, maxsize=maxsize) if data is None: diff --git a/mapillary_tools/process_geotag_properties.py b/mapillary_tools/process_geotag_properties.py index 9bf6adc93..673831dcf 100644 --- a/mapillary_tools/process_geotag_properties.py +++ b/mapillary_tools/process_geotag_properties.py @@ -25,9 +25,7 @@ ] -def _normalize_import_paths( - import_path: T.Union[Path, T.Sequence[Path]], -) -> T.Sequence[Path]: +def _normalize_import_paths(import_path: Path | T.Sequence[Path]) -> T.Sequence[Path]: import_paths: T.Sequence[Path] if isinstance(import_path, Path): import_paths = [import_path] @@ -219,7 +217,7 @@ def _is_error_skipped(error_type: str, skipped_process_errors: set[T.Type[Except def _show_stats( metadatas: T.Sequence[types.MetadataOrError], - skipped_process_errors: T.Set[T.Type[Exception]], + skipped_process_errors: set[T.Type[Exception]], ) -> None: metadatas_by_filetype: dict[types.FileType, list[types.MetadataOrError]] = {} for metadata in metadatas: @@ -249,7 +247,7 @@ def _show_stats( def _show_stats_per_filetype( metadatas: T.Collection[types.MetadataOrError], filetype: types.FileType, - skipped_process_errors: T.Set[T.Type[Exception]], + skipped_process_errors: set[T.Type[Exception]], ): good_metadatas: list[types.Metadata] good_metadatas, error_metadatas = types.separate_errors(metadatas) @@ -314,7 +312,7 @@ def _validate_metadatas( def process_finalize( - import_path: T.Union[T.Sequence[Path], Path], + import_path: T.Sequence[Path] | Path, metadatas: list[types.MetadataOrError], skip_process_errors: bool = False, device_make: str | None = None, @@ -406,7 +404,7 @@ def process_finalize( _write_metadatas(metadatas, desc_path) # Show stats - skipped_process_errors: T.Set[T.Type[Exception]] + skipped_process_errors: set[T.Type[Exception]] if skip_process_errors: # Skip all exceptions skipped_process_errors = {Exception} diff --git a/mapillary_tools/process_sequence_properties.py b/mapillary_tools/process_sequence_properties.py index 9c4e5ed8d..e20513527 100644 --- a/mapillary_tools/process_sequence_properties.py +++ b/mapillary_tools/process_sequence_properties.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import itertools import logging import math @@ -14,13 +16,13 @@ def split_sequence_by( - sequence: T.List[SeqItem], + sequence: T.Sequence[SeqItem], should_split: T.Callable[[SeqItem, SeqItem], bool], -) -> T.List[T.List[SeqItem]]: +) -> list[list[SeqItem]]: """ Split a sequence into multiple sequences by should_split(prev, cur) => True """ - output_sequences: T.List[T.List[SeqItem]] = [] + output_sequences: list[list[SeqItem]] = [] seq = iter(sequence) @@ -45,14 +47,14 @@ def split_sequence_by( def split_sequence_by_agg( - sequence: T.List[SeqItem], - should_split_with_sequence_state: T.Callable[[SeqItem, T.Dict], bool], -) -> T.List[T.List[SeqItem]]: + sequence: T.Sequence[SeqItem], + should_split_with_sequence_state: T.Callable[[SeqItem, dict], bool], +) -> list[list[SeqItem]]: """ Split a sequence by should_split_with_sequence_state(cur, sequence_state) => True """ - output_sequences: T.List[T.List[SeqItem]] = [] - sequence_state: T.Dict = {} + output_sequences: list[list[SeqItem]] = [] + sequence_state: dict = {} for cur in sequence: start_new_sequence = should_split_with_sequence_state(cur, sequence_state) @@ -77,9 +79,9 @@ def duplication_check( sequence: PointSequence, max_duplicate_distance: float, max_duplicate_angle: float, -) -> T.Tuple[PointSequence, T.List[types.ErrorMetadata]]: +) -> tuple[PointSequence, list[types.ErrorMetadata]]: dedups: PointSequence = [] - dups: T.List[types.ErrorMetadata] = [] + dups: list[types.ErrorMetadata] = [] it = iter(sequence) prev = next(it) @@ -125,10 +127,10 @@ def duplication_check( def _group_by( - image_metadatas: T.List[types.ImageMetadata], + image_metadatas: T.Iterable[types.ImageMetadata], group_key_func=T.Callable[[types.ImageMetadata], T.Hashable], -) -> T.Dict[T.Hashable, T.List[types.ImageMetadata]]: - grouped: T.Dict[T.Hashable, T.List[types.ImageMetadata]] = {} +) -> dict[T.Hashable, list[types.ImageMetadata]]: + grouped: dict[T.Hashable, list[types.ImageMetadata]] = {} for metadata in image_metadatas: grouped.setdefault(group_key_func(metadata), []).append(metadata) return grouped @@ -245,13 +247,13 @@ def _is_video_stationary( def _check_video_limits( - video_metadatas: T.Sequence[types.VideoMetadata], + video_metadatas: T.Iterable[types.VideoMetadata], max_sequence_filesize_in_bytes: int, max_avg_speed: float, max_radius_for_stationary_check: float, -) -> T.Tuple[T.List[types.VideoMetadata], T.List[types.ErrorMetadata]]: - output_video_metadatas: T.List[types.VideoMetadata] = [] - error_metadatas: T.List[types.ErrorMetadata] = [] +) -> tuple[list[types.VideoMetadata], list[types.ErrorMetadata]]: + output_video_metadatas: list[types.VideoMetadata] = [] + error_metadatas: list[types.ErrorMetadata] = [] for video_metadata in video_metadatas: try: @@ -312,9 +314,9 @@ def _check_sequences_by_limits( input_sequences: T.Sequence[PointSequence], max_sequence_filesize_in_bytes: int, max_avg_speed: float, -) -> T.Tuple[T.List[PointSequence], T.List[types.ErrorMetadata]]: - output_sequences: T.List[PointSequence] = [] - output_errors: T.List[types.ErrorMetadata] = [] +) -> tuple[list[PointSequence], list[types.ErrorMetadata]]: + output_sequences: list[PointSequence] = [] + output_errors: list[types.ErrorMetadata] = [] for sequence in input_sequences: sequence_filesize = sum( @@ -370,8 +372,8 @@ def _check_sequences_by_limits( def _group_by_folder_and_camera( - image_metadatas: T.List[types.ImageMetadata], -) -> T.List[T.List[types.ImageMetadata]]: + image_metadatas: list[types.ImageMetadata], +) -> list[list[types.ImageMetadata]]: grouped = _group_by( image_metadatas, lambda metadata: ( @@ -395,8 +397,8 @@ def _group_by_folder_and_camera( def _split_sequences_by_cutoff_time( - input_sequences: T.List[PointSequence], cutoff_time: float -) -> T.List[PointSequence]: + input_sequences: T.Sequence[PointSequence], cutoff_time: float +) -> list[PointSequence]: def _should_split_by_cutoff_time( prev: types.ImageMetadata, cur: types.ImageMetadata ) -> bool: @@ -432,8 +434,8 @@ def _should_split_by_cutoff_time( def _split_sequences_by_cutoff_distance( - input_sequences: T.List[PointSequence], cutoff_distance: float -) -> T.List[PointSequence]: + input_sequences: T.Sequence[PointSequence], cutoff_distance: float +) -> list[PointSequence]: def _should_split_by_cutoff_distance( prev: types.ImageMetadata, cur: types.ImageMetadata ) -> bool: @@ -471,12 +473,12 @@ def _should_split_by_cutoff_distance( def _check_sequences_duplication( - input_sequences: T.List[PointSequence], + input_sequences: T.Sequence[PointSequence], duplicate_distance: float, duplicate_angle: float, -) -> T.Tuple[T.List[PointSequence], T.List[types.ErrorMetadata]]: - output_sequences: T.List[PointSequence] = [] - output_errors: T.List[types.ErrorMetadata] = [] +) -> tuple[list[PointSequence], list[types.ErrorMetadata]]: + output_sequences: list[PointSequence] = [] + output_errors: list[types.ErrorMetadata] = [] for sequence in input_sequences: output_sequence, errors = duplication_check( @@ -502,14 +504,14 @@ def _check_sequences_duplication( def _split_sequences_by_limits( - input_sequences: T.List[PointSequence], + input_sequences: T.Sequence[PointSequence], max_sequence_filesize_in_bytes: float, max_sequence_pixels: float, -) -> T.List[PointSequence]: +) -> list[PointSequence]: max_sequence_images = constants.MAX_SEQUENCE_LENGTH max_sequence_filesize = max_sequence_filesize_in_bytes - def _should_split(image: types.ImageMetadata, sequence_state: T.Dict) -> bool: + def _should_split(image: types.ImageMetadata, sequence_state: dict) -> bool: last_sequence_images = sequence_state.get("last_sequence_images", 0) last_sequence_file_size = sequence_state.get("last_sequence_file_size", 0) last_sequence_pixels = sequence_state.get("last_sequence_pixels", 0) @@ -586,15 +588,15 @@ def process_sequence_properties( duplicate_distance: float = constants.DUPLICATE_DISTANCE, duplicate_angle: float = constants.DUPLICATE_ANGLE, max_avg_speed: float = constants.MAX_AVG_SPEED, -) -> T.List[types.MetadataOrError]: +) -> list[types.MetadataOrError]: max_sequence_filesize_in_bytes = _parse_filesize_in_bytes( constants.MAX_SEQUENCE_FILESIZE ) max_sequence_pixels = _parse_pixels(constants.MAX_SEQUENCE_PIXELS) - error_metadatas: T.List[types.ErrorMetadata] = [] - image_metadatas: T.List[types.ImageMetadata] = [] - video_metadatas: T.List[types.VideoMetadata] = [] + error_metadatas: list[types.ErrorMetadata] = [] + image_metadatas: list[types.ImageMetadata] = [] + video_metadatas: list[types.VideoMetadata] = [] for metadata in metadatas: if isinstance(metadata, types.ErrorMetadata): @@ -617,7 +619,7 @@ def process_sequence_properties( error_metadatas.extend(video_error_metadatas) if image_metadatas: - sequences: T.List[PointSequence] + sequences: list[PointSequence] # Group by folder and camera sequences = _group_by_folder_and_camera(image_metadatas) diff --git a/mapillary_tools/sample_video.py b/mapillary_tools/sample_video.py index 1c13a48ce..7a0cd5fc2 100644 --- a/mapillary_tools/sample_video.py +++ b/mapillary_tools/sample_video.py @@ -19,7 +19,7 @@ def _normalize_path( video_import_path: Path, skip_subfolders: bool -) -> T.Tuple[Path, T.List[Path]]: +) -> tuple[Path, list[Path]]: if video_import_path.is_dir(): video_list = utils.find_videos( [video_import_path], skip_subfolders=skip_subfolders @@ -51,7 +51,7 @@ def sample_video( video_sample_distance=constants.VIDEO_SAMPLE_DISTANCE, video_sample_interval=constants.VIDEO_SAMPLE_INTERVAL, video_duration_ratio=constants.VIDEO_DURATION_RATIO, - video_start_time: T.Optional[str] = None, + video_start_time: str | None = None, skip_sample_errors: bool = False, rerun: bool = False, ) -> None: @@ -62,7 +62,7 @@ def sample_video( f"Expect either non-negative video_sample_distance or positive video_sample_interval but got {video_sample_distance} and {video_sample_interval} respectively" ) - video_start_time_dt: T.Optional[datetime.datetime] = None + video_start_time_dt: datetime.datetime | None = None if video_start_time is not None: try: video_start_time_dt = types.map_capture_time_to_datetime(video_start_time) @@ -179,7 +179,7 @@ def _sample_single_video_by_interval( sample_dir: Path, sample_interval: float, duration_ratio: float, - start_time: T.Optional[datetime.datetime] = None, + start_time: datetime.datetime | None = None, ) -> None: ffmpeg = ffmpeglib.FFMPEG(constants.FFMPEG_PATH, constants.FFPROBE_PATH) @@ -219,7 +219,7 @@ def _sample_video_stream_by_distance( points: T.Sequence[geo.Point], video_track_parser: mp4_sample_parser.TrackBoxParser, sample_distance: float, -) -> T.Dict[int, T.Tuple[mp4_sample_parser.Sample, geo.Point]]: +) -> dict[int, tuple[mp4_sample_parser.Sample, geo.Point]]: """ Locate video frames along the track (points), then resample them by the minimal sample_distance, and return the sparse frames. """ @@ -275,7 +275,7 @@ def _sample_single_video_by_distance( video_path: Path, sample_dir: Path, sample_distance: float, - start_time: T.Optional[datetime.datetime] = None, + start_time: datetime.datetime | None = None, ) -> None: ffmpeg = ffmpeglib.FFMPEG(constants.FFMPEG_PATH, constants.FFPROBE_PATH) diff --git a/mapillary_tools/telemetry.py b/mapillary_tools/telemetry.py index fb4caa5b0..3dd73a4e9 100644 --- a/mapillary_tools/telemetry.py +++ b/mapillary_tools/telemetry.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import dataclasses import typing as T from enum import Enum, unique @@ -26,10 +28,10 @@ class TimestampedMeasurement: @dataclasses.dataclass class GPSPoint(TimestampedMeasurement, Point): - epoch_time: T.Optional[float] - fix: T.Optional[GPSFix] - precision: T.Optional[float] - ground_speed: T.Optional[float] + epoch_time: float | None + fix: GPSFix | None + precision: float | None + ground_speed: float | None @dataclasses.dataclass diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index cb0ff817d..747b421b7 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -53,23 +53,20 @@ class FileType(enum.Enum): @dataclasses.dataclass class ImageMetadata(geo.Point): filename: Path - # if None or absent, it will be calculated - md5sum: T.Optional[str] = None - # filetype: is always FileType.IMAGE - width: T.Optional[int] = None - height: T.Optional[int] = None - MAPSequenceUUID: T.Optional[str] = None - MAPDeviceMake: T.Optional[str] = None - MAPDeviceModel: T.Optional[str] = None - MAPGPSAccuracyMeters: T.Optional[float] = None - MAPCameraUUID: T.Optional[str] = None - MAPOrientation: T.Optional[int] = None - # deprecated since v0.10.0; keep here for compatibility - MAPMetaTags: T.Optional[T.Dict] = None - MAPFilename: T.Optional[str] = None - filesize: T.Optional[int] = None - - def update_md5sum(self, image_data: T.Optional[T.BinaryIO] = None) -> None: + md5sum: str | None = None + width: int | None = None + height: int | None = None + MAPSequenceUUID: str | None = None + MAPDeviceMake: str | None = None + MAPDeviceModel: str | None = None + MAPGPSAccuracyMeters: float | None = None + MAPCameraUUID: str | None = None + MAPOrientation: int | None = None + MAPMetaTags: dict | None = None + MAPFilename: str | None = None + filesize: int | None = None + + def update_md5sum(self, image_data: T.BinaryIO | None = None) -> None: if self.md5sum is None: if image_data is None: with self.filename.open("rb") as fp: @@ -87,13 +84,12 @@ def sort_key(self): @dataclasses.dataclass class VideoMetadata: filename: Path - # if None or absent, it will be calculated filetype: FileType points: T.Sequence[geo.Point] - md5sum: T.Optional[str] = None - make: T.Optional[str] = None - model: T.Optional[str] = None - filesize: T.Optional[int] = None + md5sum: str | None = None + make: str | None = None + model: str | None = None + filesize: int | None = None def update_md5sum(self) -> None: if self.md5sum is None: @@ -144,7 +140,7 @@ def combine_filetype_filters( class UserItem(TypedDict, total=False): - MAPOrganizationKey: T.Union[int, str] + MAPOrganizationKey: int | str # Not in use. Keep here for back-compatibility MAPSettingsUsername: str MAPSettingsUserKey: str @@ -183,23 +179,22 @@ class ImageDescription(_SequenceOnly, _Image, MetaProperties, total=True): # filename is required filename: str # if None or absent, it will be calculated - md5sum: T.Optional[str] + md5sum: str | None filetype: Literal["image"] - filesize: T.Optional[int] + filesize: int | None class _VideoDescriptionRequired(TypedDict, total=True): filename: str - # if None or absent, it will be calculated - md5sum: T.Optional[str] + md5sum: str | None filetype: str - MAPGPSTrack: T.List[T.Sequence[T.Union[float, int, None]]] + MAPGPSTrack: list[T.Sequence[float | int | None]] class VideoDescription(_VideoDescriptionRequired, total=False): MAPDeviceMake: str MAPDeviceModel: str - filesize: T.Optional[int] + filesize: int | None class _ErrorDescription(TypedDict, total=False): @@ -207,7 +202,7 @@ class _ErrorDescription(TypedDict, total=False): type: str message: str # vars is optional - vars: T.Dict + vars: dict class _ImageDescriptionErrorRequired(TypedDict, total=True): @@ -238,7 +233,7 @@ def separate_errors( def _describe_error_desc( - exc: Exception, filename: Path, filetype: T.Optional[FileType] + exc: Exception, filename: Path, filetype: FileType | None ) -> ImageDescriptionError: err: _ErrorDescription = { "type": exc.__class__.__name__, @@ -397,7 +392,7 @@ def describe_error_metadata( } -def merge_schema(*schemas: T.Dict) -> T.Dict: +def merge_schema(*schemas: dict) -> dict: for s in schemas: assert s.get("type") == "object", "must be all object schemas" properties = {} @@ -507,7 +502,7 @@ def validate_video_desc(desc: T.Any) -> None: raise exceptions.MapillaryMetadataValidationError(ex.message) from ex -def datetime_to_map_capture_time(time: T.Union[datetime.datetime, int, float]) -> str: +def datetime_to_map_capture_time(time: datetime.datetime | int | float) -> str: if isinstance(time, (float, int)): dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc) # otherwise it will be assumed to be in local time @@ -608,7 +603,7 @@ def from_desc(desc): def _from_image_desc(desc) -> ImageMetadata: - kwargs: T.Dict = {} + kwargs: dict = {} for k, v in desc.items(): if k not in [ "filename", @@ -638,7 +633,7 @@ def _from_image_desc(desc) -> ImageMetadata: ) -def _encode_point(p: geo.Point) -> T.Sequence[T.Union[float, int, None]]: +def _encode_point(p: geo.Point) -> T.Sequence[float | int | None]: entry = [ int(p.time * 1000), round(p.lon, _COORDINATES_PRECISION), @@ -743,10 +738,10 @@ def desc_file_to_exif( def group_and_sort_images( - metadatas: T.Sequence[ImageMetadata], -) -> T.Dict[str, T.List[ImageMetadata]]: + metadatas: T.Iterable[ImageMetadata], +) -> dict[str, list[ImageMetadata]]: # group metadatas by uuid - sequences_by_uuid: T.Dict[str, T.List[ImageMetadata]] = {} + sequences_by_uuid: dict[str, list[ImageMetadata]] = {} missing_sequence_uuid = str(uuid.uuid4()) for metadata in metadatas: if metadata.MAPSequenceUUID is None: diff --git a/mapillary_tools/utils.py b/mapillary_tools/utils.py index 551f096cd..662e9ff1c 100644 --- a/mapillary_tools/utils.py +++ b/mapillary_tools/utils.py @@ -9,9 +9,7 @@ # Use "hashlib._Hash" instead of hashlib._Hash because: # AttributeError: module 'hashlib' has no attribute '_Hash' -def md5sum_fp( - fp: T.IO[bytes], md5: T.Optional["hashlib._Hash"] = None -) -> "hashlib._Hash": +def md5sum_fp(fp: T.IO[bytes], md5: "hashlib._Hash | None" = None) -> "hashlib._Hash": if md5 is None: md5 = hashlib.md5() while True: @@ -93,12 +91,12 @@ def filter_video_samples( def find_all_image_samples( - image_paths: T.Sequence[Path], video_paths: T.Sequence[Path] -) -> T.Dict[Path, T.List[Path]]: + image_paths: T.Iterable[Path], video_paths: T.Iterable[Path] +) -> dict[Path, list[Path]]: # TODO: not work with the same filenames, e.g. foo/hello.mp4 and bar/hello.mp4 video_basenames = {path.name: path for path in video_paths} - image_samples_by_video_path: T.Dict[Path, T.List[Path]] = {} + image_samples_by_video_path: dict[Path, list[Path]] = {} for image_path in image_paths: # If you want to walk an arbitrary filesystem path upwards, # it is recommended to first call Path.resolve() so as to resolve symlinks and eliminate “..” components. @@ -115,7 +113,7 @@ def find_all_image_samples( def deduplicate_paths(paths: T.Iterable[Path]) -> T.Generator[Path, None, None]: - resolved_paths: T.Set[Path] = set() + resolved_paths: set[Path] = set() for p in paths: resolved = p.resolve() if resolved not in resolved_paths: @@ -124,10 +122,10 @@ def deduplicate_paths(paths: T.Iterable[Path]) -> T.Generator[Path, None, None]: def find_images( - import_paths: T.Sequence[Path], + import_paths: T.Iterable[Path], skip_subfolders: bool = False, -) -> T.List[Path]: - image_paths: T.List[Path] = [] +) -> list[Path]: + image_paths: list[Path] = [] for path in import_paths: if path.is_dir(): image_paths.extend( @@ -142,10 +140,10 @@ def find_images( def find_videos( - import_paths: T.Sequence[Path], + import_paths: T.Iterable[Path], skip_subfolders: bool = False, -) -> T.List[Path]: - video_paths: T.List[Path] = [] +) -> list[Path]: + video_paths: list[Path] = [] for path in import_paths: if path.is_dir(): video_paths.extend( @@ -160,10 +158,10 @@ def find_videos( def find_zipfiles( - import_paths: T.Sequence[Path], + import_paths: T.Iterable[Path], skip_subfolders: bool = False, -) -> T.List[Path]: - zip_paths: T.List[Path] = [] +) -> list[Path]: + zip_paths: list[Path] = [] for path in import_paths: if path.is_dir(): zip_paths.extend( @@ -177,8 +175,8 @@ def find_zipfiles( return list(deduplicate_paths(zip_paths)) -def find_xml_files(import_paths: T.Sequence[Path]) -> T.List[Path]: - xml_paths: T.List[Path] = [] +def find_xml_files(import_paths: T.Iterable[Path]) -> list[Path]: + xml_paths: list[Path] = [] for path in import_paths: if path.is_dir(): # XML could be hidden in hidden folders From d513fb7ec616d00a26c8487fec8d23b498667862 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Fri, 4 Apr 2025 11:12:21 -0700 Subject: [PATCH 3/5] fix ruff check --- mapillary_tools/commands/__main__.py | 1 - mapillary_tools/exif_write.py | 1 - mapillary_tools/telemetry.py | 1 - 3 files changed, 3 deletions(-) diff --git a/mapillary_tools/commands/__main__.py b/mapillary_tools/commands/__main__.py index f45ecb2d9..e50ac2fc7 100644 --- a/mapillary_tools/commands/__main__.py +++ b/mapillary_tools/commands/__main__.py @@ -2,7 +2,6 @@ import enum import logging import sys -import typing as T from pathlib import Path import requests diff --git a/mapillary_tools/exif_write.py b/mapillary_tools/exif_write.py index 17016de15..c3737d8b9 100644 --- a/mapillary_tools/exif_write.py +++ b/mapillary_tools/exif_write.py @@ -6,7 +6,6 @@ import json import logging import math -import typing as T from pathlib import Path import piexif diff --git a/mapillary_tools/telemetry.py b/mapillary_tools/telemetry.py index 3dd73a4e9..26c41dc2a 100644 --- a/mapillary_tools/telemetry.py +++ b/mapillary_tools/telemetry.py @@ -1,7 +1,6 @@ from __future__ import annotations import dataclasses -import typing as T from enum import Enum, unique from .geo import Point From b0745d66efd69f844c790b3a20c634386e89d67c Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Fri, 4 Apr 2025 11:19:29 -0700 Subject: [PATCH 4/5] fix tests for 3.8 --- mapillary_tools/mp4/construct_mp4_parser.py | 4 ++-- mapillary_tools/mp4/mp4_sample_parser.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mapillary_tools/mp4/construct_mp4_parser.py b/mapillary_tools/mp4/construct_mp4_parser.py index 0d7b8d84a..6cd0f3d46 100644 --- a/mapillary_tools/mp4/construct_mp4_parser.py +++ b/mapillary_tools/mp4/construct_mp4_parser.py @@ -440,7 +440,7 @@ def parse_box(self, data: bytes) -> BoxDict: return T.cast(BoxDict, self.Box.parse(data)) def parse_boxlist(self, data: bytes) -> list[BoxDict]: - return T.cast(list[BoxDict], self.BoxList.parse(data)) + return T.cast(T.List[BoxDict], self.BoxList.parse(data)) class Box32ConstructBuilder(Box64ConstructBuilder): @@ -609,7 +609,7 @@ def find_box_at_path( if box["type"] == path[0]: if len(path) == 1: return box - box_data = T.cast(list[BoxDict], box["data"]) + box_data = T.cast(T.List[BoxDict], box["data"]) # ListContainer from construct is not sequence assert isinstance(box_data, T.Sequence), ( f"expect a list of boxes but got {type(box_data)} at path {path}" diff --git a/mapillary_tools/mp4/mp4_sample_parser.py b/mapillary_tools/mp4/mp4_sample_parser.py index 121d46c71..1161edfe9 100644 --- a/mapillary_tools/mp4/mp4_sample_parser.py +++ b/mapillary_tools/mp4/mp4_sample_parser.py @@ -234,7 +234,7 @@ def extract_tkhd_boxdata(self) -> dict: def is_video_track(self) -> bool: hdlr = cparser.find_box_at_pathx(self.trak_children, [b"mdia", b"hdlr"]) - return T.cast(dict[str, T.Any], hdlr["data"])["handler_type"] == b"vide" + return T.cast(T.Dict[str, T.Any], hdlr["data"])["handler_type"] == b"vide" def extract_sample_descriptions(self) -> list[dict]: # TODO: return [] if parsing fail @@ -242,7 +242,7 @@ def extract_sample_descriptions(self) -> list[dict]: stsd = cparser.find_box_at_pathx( T.cast(T.Sequence[cparser.BoxDict], boxes), [b"stsd"] ) - return T.cast(list[dict], T.cast(dict, stsd["data"])["entries"]) + return T.cast(T.List[dict], T.cast(dict, stsd["data"])["entries"]) def extract_elst_boxdata(self) -> dict | None: box = cparser.find_box_at_path(self.trak_children, [b"edts", b"elst"]) From 630bf65803bf301d5ceab775dd5100789ca61985 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Fri, 4 Apr 2025 11:24:35 -0700 Subject: [PATCH 5/5] integration tests for python3.8 --- mapillary_tools/exiftool_read_video.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mapillary_tools/exiftool_read_video.py b/mapillary_tools/exiftool_read_video.py index 7030cfe1e..6b2d1261f 100644 --- a/mapillary_tools/exiftool_read_video.py +++ b/mapillary_tools/exiftool_read_video.py @@ -317,17 +317,17 @@ def extract_gps_track(self) -> list[geo.Point]: # blackvue and many other cameras track_with_fix = self._extract_gps_track_from_quicktime() if track_with_fix: - return T.cast(list[geo.Point], track_with_fix) + return T.cast(T.List[geo.Point], track_with_fix) # insta360 has its own tag track_with_fix = self._extract_gps_track_from_quicktime(namespace="Insta360") if track_with_fix: - return T.cast(list[geo.Point], track_with_fix) + return T.cast(T.List[geo.Point], track_with_fix) # mostly for gopro track_with_fix = self._extract_gps_track_from_track() if track_with_fix: - return T.cast(list[geo.Point], track_with_fix) + return T.cast(T.List[geo.Point], track_with_fix) return []