diff --git a/mapillary_tools/commands/process.py b/mapillary_tools/commands/process.py index 0d7c52fa3..2513e7b1b 100644 --- a/mapillary_tools/commands/process.py +++ b/mapillary_tools/commands/process.py @@ -1,14 +1,15 @@ +from __future__ import annotations + import argparse import inspect -import typing as T from pathlib import Path -from .. import constants +from .. import constants, types from ..process_geotag_properties import ( - FileType, - GeotagSource, + DEFAULT_GEOTAG_SOURCE_OPTIONS, process_finalize, process_geotag_properties, + SourceType, ) from ..process_sequence_properties import process_sequence_properties @@ -24,24 +25,13 @@ class Command: help = "process images and videos" def add_basic_arguments(self, parser: argparse.ArgumentParser): - geotag_sources: T.List[GeotagSource] = [ - "blackvue_videos", - "camm", - "exif", - "exiftool", - "gopro_videos", - "gpx", - "nmea", - ] - geotag_gpx_based_sources: T.List[GeotagSource] = [ - "gpx", - "gopro_videos", - "nmea", - "blackvue_videos", - "camm", + geotag_gpx_based_sources: list[str] = [ + SourceType.GPX.value, + SourceType.NMEA.value, + SourceType.GOPRO.value, + SourceType.BLACKVUE.value, + SourceType.CAMM.value, ] - for source in geotag_gpx_based_sources: - assert source in geotag_sources parser.add_argument( "--skip_process_errors", @@ -53,9 +43,9 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser): parser.add_argument( "--filetypes", "--file_types", - help=f"Process files of the specified types only. Supported file types: {','.join(sorted(t.value for t in FileType))} [default: %(default)s]", - type=lambda option: set(FileType(t) for t in option.split(",")), - default=",".join(sorted(t.value for t in FileType)), + help=f"Process files of the specified types only. Supported file types: {','.join(sorted(t.value for t in types.FileType))} [default: %(default)s]", + type=lambda option: set(types.FileType(t) for t in option.split(",")), + default=None, required=False, ) group = parser.add_argument_group(bold_text("PROCESS EXIF OPTIONS")) @@ -122,10 +112,9 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser): ) group_geotagging.add_argument( "--geotag_source", - help="Provide the source of date/time and GPS information needed for geotagging. [default: %(default)s]", - action="store", - choices=geotag_sources, - default="exif", + help=f"Provide the source of date/time and GPS information needed for geotagging. Supported source types: {', '.join(g.value for g in SourceType)} [default: {','.join(DEFAULT_GEOTAG_SOURCE_OPTIONS)}]", + action="append", + default=[], required=False, ) group_geotagging.add_argument( @@ -216,24 +205,7 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser): ) def run(self, vars_args: dict): - if ( - "geotag_source" in vars_args - and vars_args["geotag_source"] == "blackvue_videos" - and ( - "device_make" not in vars_args - or ("device_make" in vars_args and not vars_args["device_make"]) - ) - ): - vars_args["device_make"] = "Blackvue" - if ( - "device_make" in vars_args - and vars_args["device_make"] - and vars_args["device_make"].lower() == "blackvue" - ): - vars_args["duplicate_angle"] = 360 - metadatas = process_geotag_properties( - vars_args=vars_args, **( { k: v diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index aa63a9cdb..4c0497cf0 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import os import typing as T @@ -30,7 +32,8 @@ def _yes_or_no(val: str) -> bool: VIDEO_DURATION_RATIO = float(os.getenv(_ENV_PREFIX + "VIDEO_DURATION_RATIO", 1)) FFPROBE_PATH: str = os.getenv(_ENV_PREFIX + "FFPROBE_PATH", "ffprobe") FFMPEG_PATH: str = os.getenv(_ENV_PREFIX + "FFMPEG_PATH", "ffmpeg") -EXIFTOOL_PATH: str = os.getenv(_ENV_PREFIX + "EXIFTOOL_PATH", "exiftool") +# When not set, MT will try to check both "exiftool" and "exiftool.exe" from $PATH +EXIFTOOL_PATH: str | None = os.getenv(_ENV_PREFIX + "EXIFTOOL_PATH") IMAGE_DESCRIPTION_FILENAME = os.getenv( _ENV_PREFIX + "IMAGE_DESCRIPTION_FILENAME", "mapillary_image_description.json" ) diff --git a/mapillary_tools/exiftool_read.py b/mapillary_tools/exiftool_read.py index 90c03870b..20a51d1b6 100644 --- a/mapillary_tools/exiftool_read.py +++ b/mapillary_tools/exiftool_read.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import logging import typing as T @@ -93,11 +95,23 @@ def index_rdf_description_by_path( LOG.warning(f"Failed to parse {xml_path}: {ex}", exc_info=verbose) continue - elements = etree.iterfind(_DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES) - for element in elements: - path = find_rdf_description_path(element) - if path is not None: - rdf_description_by_path[canonical_path(path)] = element + rdf_description_by_path.update( + index_rdf_description_by_path_from_xml_element(etree.getroot()) + ) + + return rdf_description_by_path + + +def index_rdf_description_by_path_from_xml_element( + element: ET.Element, +) -> dict[str, ET.Element]: + rdf_description_by_path: dict[str, ET.Element] = {} + + elements = element.iterfind(_DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES) + for element in elements: + path = find_rdf_description_path(element) + if path is not None: + rdf_description_by_path[canonical_path(path)] = element return rdf_description_by_path diff --git a/mapillary_tools/exiftool_runner.py b/mapillary_tools/exiftool_runner.py new file mode 100644 index 000000000..ae124828d --- /dev/null +++ b/mapillary_tools/exiftool_runner.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import platform +import shutil +import subprocess +import typing as T +from pathlib import Path + + +class ExiftoolRunner: + """ + Wrapper around ExifTool to run it in a subprocess + """ + + def __init__(self, exiftool_path: str | None = None, recursive: bool = False): + if exiftool_path is None: + exiftool_path = self._search_preferred_exiftool_path() + self.exiftool_path = exiftool_path + self.recursive = recursive + + def _search_preferred_exiftool_path(self) -> str: + system = platform.system() + + if system and system.lower() == "windows": + exiftool_paths = ["exiftool.exe", "exiftool"] + else: + exiftool_paths = ["exiftool", "exiftool.exe"] + + for path in exiftool_paths: + full_path = shutil.which(path) + if full_path: + return path + + # Always return the prefered one, even if it is not found, + # and let the subprocess.run figure out the error later + return exiftool_paths[0] + + def _build_args_read_stdin(self) -> list[str]: + args: list[str] = [ + self.exiftool_path, + "-q", + "-n", # Disable print conversion + "-X", # XML output + "-ee", + *["-api", "LargeFileSupport=1"], + *["-charset", "filename=utf8"], + *["-@", "-"], + ] + + if self.recursive: + args.append("-r") + + return args + + def extract_xml(self, paths: T.Sequence[Path]) -> str: + if not paths: + # ExifTool will show its full manual if no files are provided + raise ValueError("No files provided to exiftool") + + # To handle non-latin1 filenames under Windows, we pass the path + # via stdin. See https://exiftool.org/faq.html#Q18 + stdin = "\n".join([str(p.resolve()) for p in paths]) + + args = self._build_args_read_stdin() + + # Raise FileNotFoundError here if self.exiftool_path not found + process = subprocess.run( + args, + capture_output=True, + text=True, + input=stdin, + encoding="utf-8", + # Do not check exit status to allow some files not found + # check=True, + ) + + return process.stdout diff --git a/mapillary_tools/geotag/factory.py b/mapillary_tools/geotag/factory.py new file mode 100644 index 000000000..9a3b09b6e --- /dev/null +++ b/mapillary_tools/geotag/factory.py @@ -0,0 +1,298 @@ +from __future__ import annotations + +import json +import logging +import typing as T +from pathlib import Path + +from .. import exceptions, types, utils +from ..types import FileType +from . import ( + geotag_from_generic, + geotag_images_from_exif, + geotag_images_from_exiftool, + geotag_images_from_exiftool_both_image_and_video, + geotag_images_from_gpx_file, + geotag_images_from_nmea_file, + geotag_images_from_video, + geotag_videos_from_exiftool_video, + geotag_videos_from_gpx, + geotag_videos_from_video, +) +from .options import InterpolationOption, SOURCE_TYPE_ALIAS, SourceOption, SourceType + + +LOG = logging.getLogger(__name__) + + +def parse_source_option(source: str) -> list[SourceOption]: + """ + Given a source string, parse it into a list of GeotagOptions objects. + + Examples: + "native" -> [SourceOption(SourceType.NATIVE)] + "gpx,exif" -> [SourceOption(SourceType.GPX), SourceOption(SourceType.EXIF)] + "exif,gpx" -> [SourceOption(SourceType.EXIF), SourceOption(SourceType.GPX)] + '{"source": "gpx"}' -> [SourceOption(SourceType.GPX)] + """ + + try: + source_type = SourceType(SOURCE_TYPE_ALIAS.get(source, source)) + except ValueError: + pass + else: + return [SourceOption(source_type)] + + try: + payload = json.loads(source) + except json.JSONDecodeError: + pass + else: + return [SourceOption.from_dict(payload)] + + sources = source.split(",") + + return [SourceOption(SourceType(SOURCE_TYPE_ALIAS.get(s, s))) for s in sources] + + +def process( + # Collection: ABC for sized iterable container classes + paths: T.Iterable[Path], + options: T.Collection[SourceOption], +) -> list[types.MetadataOrError]: + if not options: + raise ValueError("No geotag options provided") + + final_metadatas: list[types.MetadataOrError] = [] + + # Paths (image path or video path) that will be sent to the next geotag process + reprocessable_paths = set(paths) + + for idx, option in enumerate(options): + LOG.debug("Processing %d files with %s", len(reprocessable_paths), option) + + image_metadata_or_errors = _geotag_images(reprocessable_paths, option) + video_metadata_or_errors = _geotag_videos(reprocessable_paths, option) + + more_option = idx < len(options) - 1 + + for metadata in image_metadata_or_errors + video_metadata_or_errors: + if more_option and _is_reprocessable(metadata): + # Leave what it is for the next geotag process + pass + else: + final_metadatas.append(metadata) + reprocessable_paths.remove(metadata.filename) + + # Quit if no more paths to process + if not reprocessable_paths: + break + + return final_metadatas + + +def _is_reprocessable(metadata: types.MetadataOrError) -> bool: + if isinstance(metadata, types.ErrorMetadata): + if isinstance( + metadata.error, + ( + exceptions.MapillaryGeoTaggingError, + exceptions.MapillaryVideoGPSNotFoundError, + ), + ): + return True + + return False + + +def _filter_images_and_videos( + file_paths: T.Iterable[Path], + filetypes: set[types.FileType] | None = None, +) -> tuple[list[Path], list[Path]]: + image_paths = [] + video_paths = [] + + ALL_VIDEO_TYPES = {types.FileType.VIDEO, *types.NATIVE_VIDEO_FILETYPES} + + if filetypes is None: + include_images = True + include_videos = True + else: + include_images = types.FileType.IMAGE in filetypes + include_videos = bool(filetypes & ALL_VIDEO_TYPES) + + for path in file_paths: + if utils.is_image_file(path): + if include_images: + image_paths.append(path) + + elif utils.is_video_file(path): + if include_videos: + video_paths.append(path) + + return image_paths, video_paths + + +def _ensure_source_path(option: SourceOption) -> Path: + if option.source_path is None or option.source_path.source_path is None: + raise exceptions.MapillaryBadParameterError( + f"source_path must be provided for {option.source}" + ) + return option.source_path.source_path + + +def _geotag_images( + paths: T.Iterable[Path], option: SourceOption +) -> list[types.ImageMetadataOrError]: + image_paths, _ = _filter_images_and_videos(paths, option.filetypes) + + if not image_paths: + return [] + + if option.interpolation is None: + interpolation = InterpolationOption() + else: + interpolation = option.interpolation + + geotag: geotag_from_generic.GeotagImagesFromGeneric + + if option.source is SourceType.NATIVE: + geotag = geotag_images_from_exif.GeotagImagesFromEXIF( + image_paths, num_processes=option.num_processes + ) + return geotag.to_description() + + if option.source is SourceType.EXIFTOOL_RUNTIME: + geotag = geotag_images_from_exiftool.GeotagImagesFromExifToolRunner( + image_paths, num_processes=option.num_processes + ) + try: + return geotag.to_description() + except exceptions.MapillaryExiftoolNotFoundError as ex: + LOG.warning('Skip "%s" because: %s', option.source.value, ex) + return [] + + elif option.source is SourceType.EXIFTOOL_XML: + # This is to ensure 'video_process --geotag={"source": "exiftool_xml", "source_path": "/tmp/xml_path"}' + # to work + geotag = geotag_images_from_exiftool_both_image_and_video.GeotagImagesFromExifToolBothImageAndVideo( + image_paths, + xml_path=_ensure_source_path(option), + num_processes=option.num_processes, + ) + return geotag.to_description() + + elif option.source is SourceType.GPX: + geotag = geotag_images_from_gpx_file.GeotagImagesFromGPXFile( + image_paths, + source_path=_ensure_source_path(option), + use_gpx_start_time=interpolation.use_gpx_start_time, + offset_time=interpolation.offset_time, + num_processes=option.num_processes, + ) + return geotag.to_description() + + elif option.source is SourceType.NMEA: + geotag = geotag_images_from_nmea_file.GeotagImagesFromNMEAFile( + image_paths, + source_path=_ensure_source_path(option), + use_gpx_start_time=interpolation.use_gpx_start_time, + offset_time=interpolation.offset_time, + num_processes=option.num_processes, + ) + + return geotag.to_description() + + elif option.source is SourceType.EXIF: + geotag = geotag_images_from_exif.GeotagImagesFromEXIF( + image_paths, num_processes=option.num_processes + ) + return geotag.to_description() + + elif option.source in [ + SourceType.GOPRO, + SourceType.BLACKVUE, + SourceType.CAMM, + ]: + map_geotag_source_to_filetype: dict[SourceType, FileType] = { + SourceType.GOPRO: FileType.GOPRO, + SourceType.BLACKVUE: FileType.BLACKVUE, + SourceType.CAMM: FileType.CAMM, + } + video_paths = utils.find_videos([_ensure_source_path(option)]) + image_samples_by_video_path = utils.find_all_image_samples( + image_paths, video_paths + ) + video_paths_with_image_samples = list(image_samples_by_video_path.keys()) + video_metadatas = geotag_videos_from_video.GeotagVideosFromVideo( + video_paths_with_image_samples, + filetypes={map_geotag_source_to_filetype[option.source]}, + num_processes=option.num_processes, + ).to_description() + geotag = geotag_images_from_video.GeotagImagesFromVideo( + image_paths, + video_metadatas, + offset_time=interpolation.offset_time, + num_processes=option.num_processes, + ) + return geotag.to_description() + + else: + raise ValueError(f"Invalid geotag source {option.source}") + + +def _geotag_videos( + paths: T.Iterable[Path], option: SourceOption +) -> list[types.VideoMetadataOrError]: + _, video_paths = _filter_images_and_videos(paths, option.filetypes) + + if not video_paths: + return [] + + geotag: geotag_from_generic.GeotagVideosFromGeneric + + if option.source is SourceType.NATIVE: + geotag = geotag_videos_from_video.GeotagVideosFromVideo( + video_paths, num_processes=option.num_processes, filetypes=option.filetypes + ) + return geotag.to_description() + + if option.source is SourceType.EXIFTOOL_RUNTIME: + geotag = geotag_videos_from_exiftool_video.GeotagVideosFromExifToolRunner( + video_paths, num_processes=option.num_processes + ) + try: + return geotag.to_description() + except exceptions.MapillaryExiftoolNotFoundError as ex: + LOG.warning('Skip "%s" because: %s', option.source.value, ex) + return [] + + elif option.source is SourceType.EXIFTOOL_XML: + geotag = geotag_videos_from_exiftool_video.GeotagVideosFromExifToolVideo( + video_paths, + xml_path=_ensure_source_path(option), + ) + return geotag.to_description() + + elif option.source is SourceType.GPX: + geotag = geotag_videos_from_gpx.GeotagVideosFromGPX(video_paths) + return geotag.to_description() + + elif option.source is SourceType.NMEA: + # TODO: geotag videos from NMEA + return [] + + elif option.source is SourceType.EXIF: + # Legacy image-specific geotag types + return [] + + elif option.source in [ + SourceType.GOPRO, + SourceType.BLACKVUE, + SourceType.CAMM, + ]: + # Legacy image-specific geotag types + return [] + + else: + raise ValueError(f"Invalid geotag source {option.source}") diff --git a/mapillary_tools/geotag/geotag_images_from_exiftool.py b/mapillary_tools/geotag/geotag_images_from_exiftool.py index a9bbb9915..b187a2a41 100644 --- a/mapillary_tools/geotag/geotag_images_from_exiftool.py +++ b/mapillary_tools/geotag/geotag_images_from_exiftool.py @@ -6,7 +6,8 @@ import xml.etree.ElementTree as ET from pathlib import Path -from .. import exceptions, exiftool_read, types +from .. import constants, exceptions, exiftool_read, types +from ..exiftool_runner import ExiftoolRunner from .geotag_from_generic import GeotagImagesFromGeneric from .geotag_images_from_exif import ImageEXIFExtractor @@ -40,7 +41,50 @@ def _generate_image_extractors( [self.xml_path] ) - results: T.List[ImageExifToolExtractor | types.ErrorMetadata] = [] + results: list[ImageExifToolExtractor | types.ErrorMetadata] = [] + + for path in self.image_paths: + rdf_description = rdf_description_by_path.get( + exiftool_read.canonical_path(path) + ) + if rdf_description is None: + exc = exceptions.MapillaryEXIFNotFoundError( + f"The {exiftool_read._DESCRIPTION_TAG} XML element for the image not found" + ) + results.append( + types.describe_error_metadata( + exc, path, filetype=types.FileType.IMAGE + ) + ) + else: + results.append(ImageExifToolExtractor(path, rdf_description)) + + return results + + +class GeotagImagesFromExifToolRunner(GeotagImagesFromGeneric): + def _generate_image_extractors( + self, + ) -> T.Sequence[ImageExifToolExtractor | types.ErrorMetadata]: + runner = ExiftoolRunner(constants.EXIFTOOL_PATH) + + LOG.debug( + "Extracting XML from %d images with exiftool command: %s", + len(self.image_paths), + " ".join(runner._build_args_read_stdin()), + ) + try: + xml = runner.extract_xml(self.image_paths) + except FileNotFoundError as ex: + raise exceptions.MapillaryExiftoolNotFoundError(ex) from ex + + rdf_description_by_path = ( + exiftool_read.index_rdf_description_by_path_from_xml_element( + ET.fromstring(xml) + ) + ) + + results: list[ImageExifToolExtractor | types.ErrorMetadata] = [] for path in self.image_paths: rdf_description = rdf_description_by_path.get( diff --git a/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py b/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py index 6c901d253..4b9e19d49 100644 --- a/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py +++ b/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py @@ -22,7 +22,7 @@ def __init__( image_paths: T.Sequence[Path], xml_path: Path, offset_time: float = 0.0, - num_processes: T.Optional[int] = None, + num_processes: int | None = None, ): super().__init__(image_paths, num_processes=num_processes) self.xml_path = xml_path diff --git a/mapillary_tools/geotag/geotag_images_from_gpx_file.py b/mapillary_tools/geotag/geotag_images_from_gpx_file.py index 286f45de0..347ea5294 100644 --- a/mapillary_tools/geotag/geotag_images_from_gpx_file.py +++ b/mapillary_tools/geotag/geotag_images_from_gpx_file.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import typing as T from pathlib import Path @@ -18,7 +20,7 @@ def __init__( source_path: Path, use_gpx_start_time: bool = False, offset_time: float = 0.0, - num_processes: T.Optional[int] = None, + num_processes: int | None = None, ): try: tracks = parse_gpx(source_path) @@ -46,11 +48,11 @@ def __init__( Track = T.List[geo.Point] -def parse_gpx(gpx_file: Path) -> T.List[Track]: +def parse_gpx(gpx_file: Path) -> list[Track]: with gpx_file.open("r") as f: gpx = gpxpy.parse(f) - tracks: T.List[Track] = [] + tracks: list[Track] = [] for track in gpx.tracks: for segment in track.segments: diff --git a/mapillary_tools/geotag/geotag_images_from_nmea_file.py b/mapillary_tools/geotag/geotag_images_from_nmea_file.py index 7be409d11..0a90a8487 100644 --- a/mapillary_tools/geotag/geotag_images_from_nmea_file.py +++ b/mapillary_tools/geotag/geotag_images_from_nmea_file.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import typing as T from pathlib import Path @@ -15,7 +17,7 @@ def __init__( source_path: Path, use_gpx_start_time: bool = False, offset_time: float = 0.0, - num_processes: T.Optional[int] = None, + num_processes: int | None = None, ): points = get_lat_lon_time_from_nmea(source_path) super().__init__( @@ -27,7 +29,7 @@ def __init__( ) -def get_lat_lon_time_from_nmea(nmea_file: Path) -> T.List[geo.Point]: +def get_lat_lon_time_from_nmea(nmea_file: Path) -> list[geo.Point]: with nmea_file.open("r") as f: lines = f.readlines() lines = [line.rstrip("\n\r") for line in lines] diff --git a/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py b/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py index bb26cb567..2411f8a03 100644 --- a/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py @@ -5,8 +5,9 @@ import xml.etree.ElementTree as ET from pathlib import Path -from .. import exceptions, exiftool_read, geo, types, utils +from .. import constants, exceptions, exiftool_read, geo, types, utils from ..exiftool_read_video import ExifToolReadVideo +from ..exiftool_runner import ExiftoolRunner from ..gpmf import gpmf_gps_filter from ..telemetry import GPSPoint from .geotag_from_generic import GenericVideoExtractor, GeotagVideosFromGeneric @@ -102,3 +103,47 @@ def _generate_video_extractors( results.append(VideoExifToolExtractor(path, rdf_description)) return results + + +class GeotagVideosFromExifToolRunner(GeotagVideosFromGeneric): + def _generate_video_extractors( + self, + ) -> T.Sequence[GenericVideoExtractor | types.ErrorMetadata]: + runner = ExiftoolRunner(constants.EXIFTOOL_PATH) + + LOG.debug( + "Extracting XML from %d videos with exiftool command: %s", + len(self.video_paths), + " ".join(runner._build_args_read_stdin()), + ) + + try: + xml = runner.extract_xml(self.video_paths) + except FileNotFoundError as ex: + raise exceptions.MapillaryExiftoolNotFoundError(ex) from ex + + rdf_description_by_path = ( + exiftool_read.index_rdf_description_by_path_from_xml_element( + ET.fromstring(xml) + ) + ) + + results: list[VideoExifToolExtractor | types.ErrorMetadata] = [] + + for path in self.video_paths: + rdf_description = rdf_description_by_path.get( + exiftool_read.canonical_path(path) + ) + if rdf_description is None: + exc = exceptions.MapillaryEXIFNotFoundError( + f"The {exiftool_read._DESCRIPTION_TAG} XML element for the video not found" + ) + results.append( + types.describe_error_metadata( + exc, path, filetype=types.FileType.IMAGE + ) + ) + else: + results.append(VideoExifToolExtractor(path, rdf_description)) + + return results diff --git a/mapillary_tools/geotag/geotag_videos_from_gpx.py b/mapillary_tools/geotag/geotag_videos_from_gpx.py new file mode 100644 index 000000000..809a25c7d --- /dev/null +++ b/mapillary_tools/geotag/geotag_videos_from_gpx.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +import dataclasses +import datetime +import logging + +import typing as T +from pathlib import Path + +from .. import geo, telemetry, types +from . import options +from .geotag_from_generic import GenericVideoExtractor, GeotagVideosFromGeneric +from .geotag_images_from_gpx_file import parse_gpx +from .geotag_videos_from_video import NativeVideoExtractor + + +LOG = logging.getLogger(__name__) + + +class GPXVideoExtractor(GenericVideoExtractor): + def __init__(self, video_path: Path, gpx_path: Path): + self.video_path = video_path + self.gpx_path = gpx_path + + def extract(self) -> types.VideoMetadataOrError: + try: + gpx_tracks = parse_gpx(self.gpx_path) + except Exception as ex: + raise RuntimeError( + f"Error parsing GPX {self.gpx_path}: {ex.__class__.__name__}: {ex}" + ) + + if 1 < len(gpx_tracks): + LOG.warning( + "Found %s tracks in the GPX file %s. Will merge points in all the tracks as a single track for interpolation", + len(gpx_tracks), + self.gpx_path, + ) + + gpx_points: T.Sequence[geo.Point] = sum(gpx_tracks, []) + + native_extractor = NativeVideoExtractor(self.video_path) + + video_metadata_or_error = native_extractor.extract() + + if isinstance(video_metadata_or_error, types.ErrorMetadata): + self._rebase_times(gpx_points) + return types.VideoMetadata( + filename=video_metadata_or_error.filename, + filetype=video_metadata_or_error.filetype or types.FileType.VIDEO, + points=gpx_points, + ) + + video_metadata = video_metadata_or_error + + offset = self._synx_gpx_by_first_gps_timestamp( + gpx_points, video_metadata.points + ) + + self._rebase_times(gpx_points, offset=offset) + + return dataclasses.replace(video_metadata_or_error, points=gpx_points) + + @staticmethod + def _rebase_times(points: T.Sequence[geo.Point], offset: float = 0.0): + """ + Make point times start from 0 + """ + if points: + first_timestamp = points[0].time + for p in points: + p.time = (p.time - first_timestamp) + offset + return points + + def _synx_gpx_by_first_gps_timestamp( + self, gpx_points: T.Sequence[geo.Point], video_gps_points: T.Sequence[geo.Point] + ) -> float: + offset: float = 0.0 + + if not gpx_points: + return offset + + first_gpx_dt = datetime.datetime.fromtimestamp( + gpx_points[0].time, tz=datetime.timezone.utc + ) + LOG.info("First GPX timestamp: %s", first_gpx_dt) + + if not video_gps_points: + LOG.warning( + "Skip GPX synchronization because no GPS found in video %s", + self.video_path, + ) + return offset + + first_gps_point = video_gps_points[0] + if isinstance(first_gps_point, telemetry.GPSPoint): + if first_gps_point.epoch_time is not None: + first_gps_dt = datetime.datetime.fromtimestamp( + first_gps_point.epoch_time, tz=datetime.timezone.utc + ) + LOG.info("First GPS timestamp: %s", first_gps_dt) + offset = gpx_points[0].time - first_gps_point.epoch_time + if offset: + LOG.warning( + "Found offset between GPX %s and video GPS timestamps %s: %s seconds", + first_gpx_dt, + first_gps_dt, + offset, + ) + else: + LOG.info( + "GPX and GPS are perfectly synchronized (all starts from %s)", + first_gpx_dt, + ) + else: + LOG.warning( + "Skip GPX synchronization because no GPS epoch time found in video %s", + self.video_path, + ) + + return offset + + +class GeotagVideosFromGPX(GeotagVideosFromGeneric): + def __init__( + self, + video_paths: T.Sequence[Path], + option: options.SourcePathOption | None = None, + num_processes: int | None = None, + ): + super().__init__(video_paths, num_processes=num_processes) + if option is None: + option = options.SourcePathOption(pattern="%f.gpx") + self.option = option + + def _generate_image_extractors(self) -> T.Sequence[GPXVideoExtractor]: + return [ + GPXVideoExtractor(video_path, self.option.resolve(video_path)) + for video_path in self.video_paths + ] diff --git a/mapillary_tools/geotag/options.py b/mapillary_tools/geotag/options.py new file mode 100644 index 000000000..44b2cd652 --- /dev/null +++ b/mapillary_tools/geotag/options.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +import dataclasses +import enum +import json +import typing as T +from pathlib import Path + +import jsonschema + +from .. import types + + +class SourceType(enum.Enum): + NATIVE = "native" + GPX = "gpx" + NMEA = "nmea" + EXIFTOOL_XML = "exiftool_xml" + EXIFTOOL_RUNTIME = "exiftool_runtime" + + # Legacy source types for images + GOPRO = "gopro" + BLACKVUE = "blackvue" + CAMM = "camm" + EXIF = "exif" + + +SOURCE_TYPE_ALIAS: dict[str, SourceType] = { + "blackvue_videos": SourceType.BLACKVUE, + "gopro_videos": SourceType.GOPRO, +} + + +@dataclasses.dataclass +class SourceOption: + # Type of the source + source: SourceType + + # Filter by these filetypes + filetypes: set[types.FileType] | None = None + + num_processes: int | None = None + + source_path: SourcePathOption | None = None + + interpolation: InterpolationOption | None = None + + @classmethod + def from_dict(cls, data: dict[str, T.Any]) -> SourceOption: + validate_option(data) + + kwargs: dict[str, T.Any] = {} + for k, v in data.items(): + # None values are considered as absent and should be ignored + if v is None: + continue + if k == "source": + kwargs[k] = SourceType(SOURCE_TYPE_ALIAS.get(v, v)) + elif k == "filetypes": + kwargs[k] = {types.FileType(t) for t in v} + elif k == "source_path": + kwargs.setdefault("source_path", SourcePathOption()).source_path = v + elif k == "pattern": + kwargs.setdefault("source_path", SourcePathOption()).pattern = v + elif k == "interpolation_offset_time": + kwargs.setdefault( + "interpolation", InterpolationOption() + ).offset_time = v + elif k == "interpolation_use_gpx_start_time": + kwargs.setdefault( + "interpolation", InterpolationOption() + ).use_gpx_start_time = v + + return cls(**kwargs) + + +@dataclasses.dataclass +class SourcePathOption: + pattern: str | None = None + source_path: Path | None = None + + def __post_init__(self): + if self.source_path is None and self.pattern is None: + raise ValueError("Either pattern or source_path must be provided") + + def resolve(self, path: Path) -> Path: + if self.source_path is not None: + return self.source_path + + assert self.pattern is not None, ( + "either pattern or source_path must be provided" + ) + + # %f: the full video filename (foo.mp4) + # %g: the video filename without extension (foo) + # %e: the video filename extension (.mp4) + replaced = Path( + self.pattern.replace("%f", path.name) + .replace("%g", path.stem) + .replace("%e", path.suffix) + ) + + abs_path = ( + replaced + if replaced.is_absolute() + else Path.joinpath(path.parent.resolve(), replaced) + ).resolve() + + return abs_path + + +@dataclasses.dataclass +class InterpolationOption: + offset_time: float = 0.0 + use_gpx_start_time: bool = False + + +SourceOptionSchema = { + "type": "object", + "properties": { + "source": { + "type": "string", + "enum": [s.value for s in SourceType] + list(SOURCE_TYPE_ALIAS.keys()), + }, + "filetypes": { + "type": "array", + "items": { + "type": "string", + "enum": [t.value for t in types.FileType], + }, + }, + "source_path": { + "type": "string", + }, + "pattern": { + "type": "string", + }, + "num_processes": { + "type": "integer", + }, + "interpolation_offset_time": { + "type": "float", + }, + "interpolation_use_gpx_start_time": { + "type": "boolean", + }, + }, + "required": ["source"], + "additionalProperties": False, +} + + +def validate_option(instance): + jsonschema.validate(instance=instance, schema=SourceOptionSchema) + + +if __name__ == "__main__": + # python -m mapillary_tools.geotag.options > schema/geotag_source_option.json + print(json.dumps(SourceOptionSchema, indent=4)) diff --git a/mapillary_tools/process_geotag_properties.py b/mapillary_tools/process_geotag_properties.py index a89f8097a..c5ffd06e2 100644 --- a/mapillary_tools/process_geotag_properties.py +++ b/mapillary_tools/process_geotag_properties.py @@ -9,192 +9,110 @@ from tqdm import tqdm -from . import constants, exceptions, exif_write, types, utils -from .geotag import ( - geotag_from_generic, - geotag_images_from_exif, - geotag_images_from_exiftool_both_image_and_video, - geotag_images_from_gpx_file, - geotag_images_from_nmea_file, - geotag_images_from_video, - geotag_videos_from_exiftool_video, - geotag_videos_from_video, -) -from .types import FileType, VideoMetadataOrError - -from .video_data_extraction.cli_options import CliOptions, CliParserOptions -from .video_data_extraction.extract_video_data import VideoDataExtractor +from mapillary_tools.geotag.options import InterpolationOption, SourcePathOption +from . import constants, exceptions, exif_write, types, utils +from .geotag.factory import parse_source_option, process, SourceOption, SourceType LOG = logging.getLogger(__name__) - - -GeotagSource = T.Literal[ - "gopro_videos", "blackvue_videos", "camm", "exif", "gpx", "nmea", "exiftool" +DEFAULT_GEOTAG_SOURCE_OPTIONS = [ + SourceType.NATIVE.value, + SourceType.EXIFTOOL_RUNTIME.value, ] -VideoGeotagSource = T.Literal[ - "video", - "camm", - "gopro", - "blackvue", - "gpx", - "nmea", - "exiftool_xml", - "exiftool_runtime", -] +def _normalize_import_paths( + import_path: T.Union[Path, T.Sequence[Path]], +) -> T.Sequence[Path]: + import_paths: T.Sequence[Path] + if isinstance(import_path, Path): + import_paths = [import_path] + else: + import_paths = import_path + import_paths = list(utils.deduplicate_paths(import_paths)) + return import_paths -def _process_images( - image_paths: T.Sequence[Path], - geotag_source: GeotagSource, - geotag_source_path: T.Optional[Path] = None, - video_import_path: T.Optional[Path] = None, - interpolation_use_gpx_start_time: bool = False, - interpolation_offset_time: float = 0.0, - num_processes: T.Optional[int] = None, - skip_subfolders=False, -) -> T.Sequence[types.ImageMetadataOrError]: - geotag: geotag_from_generic.GeotagImagesFromGeneric - - if video_import_path is not None: - # commands that trigger this branch: - # video_process video_import_path image_paths --geotag_source gpx --geotag_source_path --skip_subfolders - image_paths = list( - utils.filter_video_samples( - image_paths, video_import_path, skip_subfolders=skip_subfolders - ) - ) - if geotag_source == "exif": - geotag = geotag_images_from_exif.GeotagImagesFromEXIF( - image_paths, num_processes=num_processes - ) +def _parse_source_options( + geotag_source: list[str], + video_geotag_source: list[str], + geotag_source_path: Path | None, +) -> list[SourceOption]: + parsed_options: list[SourceOption] = [] - else: - if geotag_source_path is None: - geotag_source_path = video_import_path - if geotag_source_path is None: - raise exceptions.MapillaryFileNotFoundError( - "Geotag source path (--geotag_source_path) is required" + for s in geotag_source: + parsed_options.extend(parse_source_option(s)) + + for s in video_geotag_source: + for video_option in parse_source_option(s): + video_option.filetypes = _combine_filetypes( + video_option.filetypes, {types.FileType.VIDEO} ) - if geotag_source == "exiftool": - if not geotag_source_path.exists(): - raise exceptions.MapillaryFileNotFoundError( - f"Geotag source file not found: {geotag_source_path}" - ) - else: - if not geotag_source_path.is_file(): - raise exceptions.MapillaryFileNotFoundError( - f"Geotag source file not found: {geotag_source_path}" + parsed_options.append(video_option) + + if geotag_source_path is not None: + for parsed_option in parsed_options: + if parsed_option.source_path is None: + parsed_option.source_path = SourcePathOption( + source_path=Path(geotag_source_path) ) + else: + source_path_option = parsed_option.source_path + if source_path_option.source_path is None: + source_path_option.source_path = Path(geotag_source_path) + else: + LOG.warning( + "The option --geotag_source_path is ignored for source %s", + parsed_option, + ) - if geotag_source == "gpx": - geotag = geotag_images_from_gpx_file.GeotagImagesFromGPXFile( - image_paths, - geotag_source_path, - use_gpx_start_time=interpolation_use_gpx_start_time, - offset_time=interpolation_offset_time, - num_processes=num_processes, - ) - elif geotag_source == "nmea": - geotag = geotag_images_from_nmea_file.GeotagImagesFromNMEAFile( - image_paths, - geotag_source_path, - use_gpx_start_time=interpolation_use_gpx_start_time, - offset_time=interpolation_offset_time, - num_processes=num_processes, - ) - elif geotag_source in ["gopro_videos", "blackvue_videos", "camm"]: - map_geotag_source_to_filetype: T.Dict[GeotagSource, FileType] = { - "gopro_videos": FileType.GOPRO, - "blackvue_videos": FileType.BLACKVUE, - "camm": FileType.CAMM, - } - video_paths = utils.find_videos([geotag_source_path]) - image_samples_by_video_path = utils.find_all_image_samples( - image_paths, video_paths - ) - video_paths_with_image_samples = list(image_samples_by_video_path.keys()) - video_metadatas = geotag_videos_from_video.GeotagVideosFromVideo( - video_paths_with_image_samples, - filetypes={map_geotag_source_to_filetype[geotag_source]}, - num_processes=num_processes, - ).to_description() - geotag = geotag_images_from_video.GeotagImagesFromVideo( - image_paths, - video_metadatas, - offset_time=interpolation_offset_time, - num_processes=num_processes, - ) - elif geotag_source == "exiftool": - geotag = geotag_images_from_exiftool_both_image_and_video.GeotagImagesFromExifToolBothImageAndVideo( - image_paths, - geotag_source_path, - ) - else: - raise RuntimeError(f"Invalid geotag source {geotag_source}") + return parsed_options - return geotag.to_description() +# Assume {GOPRO, VIDEO} are the NATIVE_VIDEO_FILETYPES: +# a | b = result +# {CAMM} | {GOPRO} = {} +# {CAMM} | {GOPRO, VIDEO} = {CAMM} +# {GOPRO} | {GOPRO, VIDEO} = {GOPRO} +# {GOPRO} | {VIDEO} = {GOPRO} +# {CAMM, GOPRO} | {VIDEO} = {CAMM, GOPRO} +# {VIDEO} | {VIDEO} = {CAMM, GOPRO, VIDEO} +def _combine_filetypes( + a: set[types.FileType] | None, b: set[types.FileType] | None +) -> set[types.FileType] | None: + if a is None: + return b -def _process_videos( - geotag_source: str, - geotag_source_path: T.Optional[Path], - video_paths: T.Sequence[Path], - num_processes: T.Optional[int], - filetypes: T.Optional[T.Set[FileType]], -) -> T.Sequence[VideoMetadataOrError]: - geotag: geotag_from_generic.GeotagVideosFromGeneric - if geotag_source == "exiftool": - if geotag_source_path is None: - raise exceptions.MapillaryFileNotFoundError( - "Geotag source path (--geotag_source_path) is required" - ) - if not geotag_source_path.exists(): - raise exceptions.MapillaryFileNotFoundError( - f"Geotag source file not found: {geotag_source_path}" - ) - geotag = geotag_videos_from_exiftool_video.GeotagVideosFromExifToolVideo( - video_paths, - geotag_source_path, - num_processes=num_processes, - ) - else: - geotag = geotag_videos_from_video.GeotagVideosFromVideo( - video_paths, - filetypes=filetypes, - num_processes=num_processes, - ) - return geotag.to_description() + if b is None: + return a + # VIDEO is a superset of NATIVE_VIDEO_FILETYPES, + # so we add NATIVE_VIDEO_FILETYPES to each set for intersection later -def _normalize_import_paths( - import_path: T.Union[Path, T.Sequence[Path]], -) -> T.Sequence[Path]: - import_paths: T.Sequence[Path] - if isinstance(import_path, Path): - import_paths = [import_path] - else: - import_paths = import_path - import_paths = list(utils.deduplicate_paths(import_paths)) - return import_paths + if types.FileType.VIDEO in a: + a = a | types.NATIVE_VIDEO_FILETYPES + + if types.FileType.VIDEO in b: + b = b | types.NATIVE_VIDEO_FILETYPES + + return a.intersection(b) def process_geotag_properties( - vars_args: T.Dict, # Hello, I'm a hack - import_path: T.Union[Path, T.Sequence[Path]], - filetypes: T.Set[FileType], - geotag_source: GeotagSource, - geotag_source_path: T.Optional[Path] = None, + import_path: Path | T.Sequence[Path], + filetypes: set[types.FileType] | None, + # Geotag options + geotag_source: list[str], + geotag_source_path: Path | None, + video_geotag_source: list[str], + # Global options # video_import_path comes from the command video_process - video_import_path: T.Optional[Path] = None, + video_import_path: Path | None = None, interpolation_use_gpx_start_time: bool = False, interpolation_offset_time: float = 0.0, + num_processes: int | None = None, skip_subfolders=False, - num_processes: T.Optional[int] = None, -) -> T.List[types.MetadataOrError]: - filetypes = set(FileType(f) for f in filetypes) +) -> list[types.MetadataOrError]: import_paths = _normalize_import_paths(import_path) # Check and fail early @@ -204,84 +122,34 @@ def process_geotag_properties( f"Import file or directory not found: {path}" ) - metadatas: T.List[types.MetadataOrError] = [] - - if FileType.IMAGE in filetypes: - image_paths = utils.find_images(import_paths, skip_subfolders=skip_subfolders) - if image_paths: - image_metadatas = _process_images( - image_paths, - geotag_source=geotag_source, - geotag_source_path=geotag_source_path, - video_import_path=video_import_path, - interpolation_use_gpx_start_time=interpolation_use_gpx_start_time, - interpolation_offset_time=interpolation_offset_time, - num_processes=num_processes, - skip_subfolders=skip_subfolders, - ) - metadatas.extend(image_metadatas) + if geotag_source_path is None: + geotag_source_path = video_import_path - # --video_geotag_source is still experimental, for videos execute it XOR the legacy code - if vars_args["video_geotag_source"]: - metadatas.extend(_process_videos_beta(vars_args)) - else: - if ( - FileType.CAMM in filetypes - or FileType.GOPRO in filetypes - or FileType.BLACKVUE in filetypes - or FileType.VIDEO in filetypes - ): - video_paths = utils.find_videos( - import_paths, skip_subfolders=skip_subfolders - ) - if video_paths: - video_metadata = _process_videos( - geotag_source, - geotag_source_path, - video_paths, - num_processes, - filetypes, - ) - metadatas.extend(video_metadata) + if not geotag_source and not video_geotag_source: + geotag_source = [*DEFAULT_GEOTAG_SOURCE_OPTIONS] - # filenames should be deduplicated in utils.find_images/utils.find_videos - assert len(metadatas) == len(set(metadata.filename for metadata in metadatas)), ( - "duplicate filenames found" + options = _parse_source_options( + geotag_source=geotag_source or [], + video_geotag_source=video_geotag_source or [], + geotag_source_path=geotag_source_path, ) - return metadatas - - -def _process_videos_beta(vars_args: T.Dict): - geotag_sources = vars_args["video_geotag_source"] - geotag_sources_opts: T.List[CliParserOptions] = [] - for source in geotag_sources: - parsed_opts: CliParserOptions = {} - try: - parsed_opts = json.loads(source) - except ValueError: - if source not in T.get_args(VideoGeotagSource): - raise exceptions.MapillaryBadParameterError( - "Unknown beta source %s or invalid JSON", source - ) - parsed_opts = {"source": source} + for option in options: + option.filetypes = _combine_filetypes(option.filetypes, filetypes) + option.num_processes = num_processes + if option.interpolation is None: + option.interpolation = InterpolationOption( + offset_time=interpolation_offset_time, + use_gpx_start_time=interpolation_use_gpx_start_time, + ) - if "source" not in parsed_opts: - raise exceptions.MapillaryBadParameterError("Missing beta source name") + # TODO: can find both in one pass + image_paths = utils.find_images(import_paths, skip_subfolders=skip_subfolders) + video_paths = utils.find_videos(import_paths, skip_subfolders=skip_subfolders) - geotag_sources_opts.append(parsed_opts) + metadata_or_errors = process(image_paths + video_paths, options) - options: CliOptions = { - "paths": vars_args["import_path"], - "recursive": vars_args["skip_subfolders"] is False, - "geotag_sources_options": geotag_sources_opts, - "geotag_source_path": vars_args["geotag_source_path"], - "num_processes": vars_args["num_processes"], - "device_make": vars_args["device_make"], - "device_model": vars_args["device_model"], - } - extractor = VideoDataExtractor(options) - return extractor.process() + return metadata_or_errors def _apply_offsets( @@ -380,15 +248,17 @@ def _show_stats( metadatas: T.Sequence[types.MetadataOrError], skipped_process_errors: T.Set[T.Type[Exception]], ) -> None: - metadatas_by_filetype: T.Dict[FileType, T.List[types.MetadataOrError]] = {} + metadatas_by_filetype: T.Dict[types.FileType, list[types.MetadataOrError]] = {} for metadata in metadatas: - filetype: T.Optional[FileType] + filetype: types.FileType | None if isinstance(metadata, types.ImageMetadata): - filetype = FileType.IMAGE + filetype = types.FileType.IMAGE else: filetype = metadata.filetype if filetype: - metadatas_by_filetype.setdefault(FileType(filetype), []).append(metadata) + metadatas_by_filetype.setdefault(types.FileType(filetype), []).append( + metadata + ) for filetype, group in metadatas_by_filetype.items(): _show_stats_per_filetype(group, filetype, skipped_process_errors) @@ -409,12 +279,12 @@ def _show_stats( def _show_stats_per_filetype( metadatas: T.Sequence[types.MetadataOrError], - filetype: FileType, + filetype: types.FileType, skipped_process_errors: T.Set[T.Type[Exception]], ): - good_metadatas: T.List[T.Union[types.VideoMetadata, types.ImageMetadata]] = [] + good_metadatas: list[T.Union[types.VideoMetadata, types.ImageMetadata]] = [] filesize_to_upload = 0 - error_metadatas: T.List[types.ErrorMetadata] = [] + error_metadatas: list[types.ErrorMetadata] = [] for metadata in metadatas: if isinstance(metadata, types.ErrorMetadata): error_metadatas.append(metadata) @@ -448,7 +318,7 @@ def _show_stats_per_filetype( def _validate_metadatas( metadatas: T.Sequence[types.MetadataOrError], num_processes: int | None -) -> T.List[types.MetadataOrError]: +) -> list[types.MetadataOrError]: # validating metadatas is slow, hence multiprocessing # Do not pass error metadatas where the error object can not be pickled for multiprocessing to work @@ -477,10 +347,10 @@ def _validate_metadatas( def process_finalize( import_path: T.Union[T.Sequence[Path], Path], - metadatas: T.List[types.MetadataOrError], + metadatas: list[types.MetadataOrError], skip_process_errors: bool = False, - device_make: T.Optional[str] = None, - device_model: T.Optional[str] = None, + device_make: str | None = None, + device_model: str | None = None, overwrite_all_EXIF_tags: bool = False, overwrite_EXIF_time_tag: bool = False, overwrite_EXIF_gps_tag: bool = False, @@ -488,9 +358,9 @@ def process_finalize( overwrite_EXIF_orientation_tag: bool = False, offset_time: float = 0.0, offset_angle: float = 0.0, - desc_path: T.Optional[str] = None, - num_processes: T.Optional[int] = None, -) -> T.List[types.MetadataOrError]: + desc_path: str | None = None, + num_processes: int | None = None, +) -> list[types.MetadataOrError]: for metadata in metadatas: if isinstance(metadata, types.VideoMetadata): if device_make is not None: diff --git a/mapillary_tools/sample_video.py b/mapillary_tools/sample_video.py index 3efb94744..1c13a48ce 100644 --- a/mapillary_tools/sample_video.py +++ b/mapillary_tools/sample_video.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import logging import os @@ -11,7 +13,6 @@ from .exif_write import ExifEdit from .geotag import geotag_videos_from_video from .mp4 import mp4_sample_parser -from .process_geotag_properties import GeotagSource LOG = logging.getLogger(__name__) @@ -46,7 +47,6 @@ def sample_video( video_import_path: Path, import_path: Path, # None if called from the sample_video command - geotag_source: T.Optional[GeotagSource] = None, skip_subfolders=False, video_sample_distance=constants.VIDEO_SAMPLE_DISTANCE, video_sample_interval=constants.VIDEO_SAMPLE_INTERVAL, @@ -86,16 +86,6 @@ def sample_video( elif sample_dir.is_file(): os.remove(sample_dir) - if geotag_source is None: - geotag_source = "exif" - - # If it is not exif, then we use the legacy interval-based sample and geotag them in "process" for backward compatibility - if geotag_source not in ["exif"]: - if 0 <= video_sample_distance: - raise exceptions.MapillaryBadParameterError( - f'Geotagging from "{geotag_source}" works with the legacy interval-based sampling only. To switch back, rerun the command with "--video_sample_distance -1 --video_sample_interval 2"' - ) - for video_path in video_list: # need to resolve video_path because video_dir might be absolute sample_dir = Path(import_path).joinpath( @@ -303,7 +293,7 @@ def _sample_single_video_by_distance( video_metadatas = geotag_videos_from_video.GeotagVideosFromVideo( [video_path] ).to_description() - assert video_metadatas, "expect non-empty video metadatas" + assert len(video_metadatas) == 1, "expect 1 video metadata" video_metadata = video_metadatas[0] if isinstance(video_metadata, types.ErrorMetadata): LOG.warning(str(video_metadata.error)) diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index dc19f50a2..a3c96717c 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -33,12 +33,21 @@ class FileType(enum.Enum): + IMAGE = "image" + ZIP = "zip" + # VIDEO is a superset of all NATIVE_VIDEO_FILETYPES below. + # It also contains the videos that external geotag source (e.g. exiftool) supports + VIDEO = "video" BLACKVUE = "blackvue" CAMM = "camm" GOPRO = "gopro" - IMAGE = "image" - VIDEO = "video" - ZIP = "zip" + + +NATIVE_VIDEO_FILETYPES = { + FileType.BLACKVUE, + FileType.CAMM, + FileType.GOPRO, +} @dataclasses.dataclass diff --git a/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py b/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py index 68c014518..cbb3ad50f 100644 --- a/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py @@ -24,7 +24,10 @@ def __init__( self, video_path: Path, options: CliOptions, parser_options: CliParserOptions ): super().__init__(video_path, options, parser_options) - exiftool_path = shutil.which(constants.EXIFTOOL_PATH) + if constants.EXIFTOOL_PATH is None: + exiftool_path = shutil.which("exiftool") + else: + exiftool_path = shutil.which(constants.EXIFTOOL_PATH) if not exiftool_path: raise exceptions.MapillaryExiftoolNotFoundError( diff --git a/tests/cli/exiftool_runner.py b/tests/cli/exiftool_runner.py new file mode 100644 index 000000000..8e0a45830 --- /dev/null +++ b/tests/cli/exiftool_runner.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import argparse +import logging +import sys +from pathlib import Path + +from mapillary_tools import exiftool_runner, utils + + +LOG = logging.getLogger("mapillary_tools") + + +def configure_logger(logger: logging.Logger, stream=None) -> None: + formatter = logging.Formatter("%(asctime)s - %(levelname)-7s - %(message)s") + handler = logging.StreamHandler(stream) + handler.setFormatter(formatter) + logger.addHandler(handler) + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("path", nargs="+", help="Paths to files or directories") + return parser.parse_args() + + +def main(): + configure_logger(LOG, sys.stdout) + LOG.setLevel(logging.INFO) + + parsed = parse_args() + + video_paths = utils.find_videos([Path(p) for p in parsed.path]) + image_paths = utils.find_images([Path(p) for p in parsed.path]) + + LOG.info( + "Found %d video files and %d image files", len(video_paths), len(image_paths) + ) + + runner = exiftool_runner.ExiftoolRunner("exiftool") + xml = runner.extract_xml(image_paths + video_paths) + + print(xml) + + +if __name__ == "__main__": + main() diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 02335309d..918e97fa3 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -152,7 +152,7 @@ def run_exiftool_and_generate_geotag_args( pytest.skip("exiftool not installed") exiftool_outuput_dir = run_exiftool(test_data_dir) exiftool_params = ( - f"--geotag_source exiftool --geotag_source_path {exiftool_outuput_dir}" + f"--geotag_source exiftool_xml --geotag_source_path {exiftool_outuput_dir}" ) return f"{run_args} {exiftool_params}" diff --git a/tests/integration/test_gopro.py b/tests/integration/test_gopro.py index 5ebe90698..b02587dd6 100644 --- a/tests/integration/test_gopro.py +++ b/tests/integration/test_gopro.py @@ -125,9 +125,11 @@ def test_process_gopro_hero8( if not IS_FFMPEG_INSTALLED: pytest.skip("skip because ffmpeg not installed") video_path = setup_data.join("hero8.mp4") - args = f"{EXECUTABLE} video_process --video_sample_interval=2 --video_sample_distance=-1 --geotag_source=gopro_videos {str(video_path)}" if use_exiftool: + args = f"{EXECUTABLE} --verbose video_process --video_sample_interval=2 --video_sample_distance=-1 {str(video_path)}" args = run_exiftool_and_generate_geotag_args(setup_data, args) + else: + args = f"{EXECUTABLE} --verbose video_process --video_sample_interval=2 --video_sample_distance=-1 --geotag_source=gopro_videos {str(video_path)}" env = os.environ.copy() env.update(TEST_ENVS) x = subprocess.run(args, shell=True, env=env) diff --git a/tests/integration/test_process.py b/tests/integration/test_process.py index d829f1de7..4fceeed9e 100644 --- a/tests/integration/test_process.py +++ b/tests/integration/test_process.py @@ -420,7 +420,7 @@ def test_geotagging_images_from_gpx_with_offset(setup_data: py.path.local): fp.write(GPX_CONTENT) x = subprocess.run( - f"{EXECUTABLE} process --file_types=image {PROCESS_FLAGS} {setup_data} --geotag_source gpx --geotag_source_path {gpx_file} --interpolation_offset_time=-20 --skip_process_errors", + f"{EXECUTABLE} --verbose process --file_types=image {PROCESS_FLAGS} {setup_data} --geotag_source gpx --geotag_source_path {gpx_file} --interpolation_offset_time=-20 --skip_process_errors", shell=True, ) assert x.returncode == 0, x.stderr @@ -556,7 +556,7 @@ def test_process_unsupported_filetypes(setup_data: py.path.local): video_dir = setup_data.join("gopro_data") for filetypes in ["blackvue"]: x = subprocess.run( - f"{EXECUTABLE} --verbose process --filetypes={filetypes} {PROCESS_FLAGS} --skip_process_errors {video_dir}", + f"{EXECUTABLE} --verbose process --filetypes={filetypes} --geotag_source=native {PROCESS_FLAGS} --skip_process_errors {video_dir}", shell=True, ) assert x.returncode == 0, x.stderr @@ -568,7 +568,7 @@ def test_process_unsupported_filetypes(setup_data: py.path.local): for filetypes in ["image"]: x = subprocess.run( - f"{EXECUTABLE} --verbose process --filetypes={filetypes} {PROCESS_FLAGS} --skip_process_errors {video_dir}", + f"{EXECUTABLE} --verbose process --filetypes={filetypes} --geotag_source=native {PROCESS_FLAGS} --skip_process_errors {video_dir}", shell=True, ) assert x.returncode == 0, x.stderr