diff --git a/mapillary_tools/exceptions.py b/mapillary_tools/exceptions.py index 10c8b7b48..cd730d9fc 100644 --- a/mapillary_tools/exceptions.py +++ b/mapillary_tools/exceptions.py @@ -87,7 +87,7 @@ def __init__( self.angle_diff = angle_diff -class MapillaryEXIFNotFoundError(MapillaryDescriptionError): +class MapillaryExifToolXMLNotFoundError(MapillaryDescriptionError): pass diff --git a/mapillary_tools/exiftool_read.py b/mapillary_tools/exiftool_read.py index 3929a0fbb..e10da07cb 100644 --- a/mapillary_tools/exiftool_read.py +++ b/mapillary_tools/exiftool_read.py @@ -6,7 +6,7 @@ import xml.etree.ElementTree as ET from pathlib import Path -from . import exif_read, utils +from . import exif_read EXIFTOOL_NAMESPACES: dict[str, str] = { @@ -53,8 +53,8 @@ LOG = logging.getLogger(__name__) +DESCRIPTION_TAG = "rdf:Description" _FIELD_TYPE = T.TypeVar("_FIELD_TYPE", int, float, str) -_DESCRIPTION_TAG = "rdf:Description" def expand_tag(ns_tag: str, namespaces: dict[str, str]) -> str: @@ -79,35 +79,12 @@ def find_rdf_description_path(element: ET.Element) -> Path | None: return Path(about) -def index_rdf_description_by_path( - xml_paths: T.Sequence[Path], -) -> dict[str, ET.Element]: - rdf_description_by_path: dict[str, ET.Element] = {} - - for xml_path in utils.find_xml_files(xml_paths): - try: - etree = ET.parse(xml_path) - except ET.ParseError as ex: - verbose = LOG.getEffectiveLevel() <= logging.DEBUG - if verbose: - LOG.warning(f"Failed to parse {xml_path}", exc_info=verbose) - else: - LOG.warning(f"Failed to parse {xml_path}: {ex}", exc_info=verbose) - continue - - rdf_description_by_path.update( - index_rdf_description_by_path_from_xml_element(etree.getroot()) - ) - - return rdf_description_by_path - - def index_rdf_description_by_path_from_xml_element( element: ET.Element, ) -> dict[str, ET.Element]: rdf_description_by_path: dict[str, ET.Element] = {} - elements = element.iterfind(_DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES) + elements = element.iterfind(DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES) for element in elements: path = find_rdf_description_path(element) if path is not None: diff --git a/mapillary_tools/geotag/__init__.py b/mapillary_tools/geotag/__init__.py deleted file mode 100644 index 75e64f68f..000000000 --- a/mapillary_tools/geotag/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .. import geo # noqa: F401 diff --git a/mapillary_tools/geotag/geotag_from_generic.py b/mapillary_tools/geotag/base.py similarity index 71% rename from mapillary_tools/geotag/geotag_from_generic.py rename to mapillary_tools/geotag/base.py index 6d9f7e89b..b0ed1beb9 100644 --- a/mapillary_tools/geotag/geotag_from_generic.py +++ b/mapillary_tools/geotag/base.py @@ -8,24 +8,14 @@ from tqdm import tqdm from .. import exceptions, types, utils +from .image_extractors.base import BaseImageExtractor +from .video_extractors.base import BaseVideoExtractor LOG = logging.getLogger(__name__) -class GenericImageExtractor(abc.ABC): - """ - Extracts metadata from an image file. - """ - - def __init__(self, image_path: Path): - self.image_path = image_path - - def extract(self) -> types.ImageMetadataOrError: - raise NotImplementedError - - -TImageExtractor = T.TypeVar("TImageExtractor", bound=GenericImageExtractor) +TImageExtractor = T.TypeVar("TImageExtractor", bound=BaseImageExtractor) class GeotagImagesFromGeneric(abc.ABC, T.Generic[TImageExtractor]): @@ -33,16 +23,15 @@ class GeotagImagesFromGeneric(abc.ABC, T.Generic[TImageExtractor]): Extracts metadata from a list of image files with multiprocessing. """ - def __init__( - self, image_paths: T.Sequence[Path], num_processes: int | None = None - ) -> None: - self.image_paths = image_paths + def __init__(self, num_processes: int | None = None) -> None: self.num_processes = num_processes - def to_description(self) -> list[types.ImageMetadataOrError]: - extractor_or_errors = self._generate_image_extractors() + def to_description( + self, image_paths: T.Sequence[Path] + ) -> list[types.ImageMetadataOrError]: + extractor_or_errors = self._generate_image_extractors(image_paths) - assert len(extractor_or_errors) == len(self.image_paths) + assert len(extractor_or_errors) == len(image_paths) extractors, error_metadatas = types.separate_errors(extractor_or_errors) @@ -64,11 +53,6 @@ def to_description(self) -> list[types.ImageMetadataOrError]: return results + error_metadatas - def _generate_image_extractors( - self, - ) -> T.Sequence[TImageExtractor | types.ErrorMetadata]: - raise NotImplementedError - # This method is passed to multiprocessing # so it has to be classmethod or staticmethod to avoid pickling the instance @classmethod @@ -81,26 +65,23 @@ def run_extraction(cls, extractor: TImageExtractor) -> types.ImageMetadataOrErro return types.describe_error_metadata( ex, image_path, filetype=types.FileType.IMAGE ) + except exceptions.MapillaryUserError as ex: + # Considered as fatal error if not MapillaryDescriptionError + raise ex except Exception as ex: + # TODO: hide details if not verbose mode LOG.exception("Unexpected error extracting metadata from %s", image_path) return types.describe_error_metadata( ex, image_path, filetype=types.FileType.IMAGE ) - -class GenericVideoExtractor(abc.ABC): - """ - Extracts metadata from a video file. - """ - - def __init__(self, video_path: Path): - self.video_path = video_path - - def extract(self) -> types.VideoMetadataOrError: + def _generate_image_extractors( + self, image_paths: T.Sequence[Path] + ) -> T.Sequence[TImageExtractor | types.ErrorMetadata]: raise NotImplementedError -TVideoExtractor = T.TypeVar("TVideoExtractor", bound=GenericVideoExtractor) +TVideoExtractor = T.TypeVar("TVideoExtractor", bound=BaseVideoExtractor) class GeotagVideosFromGeneric(abc.ABC, T.Generic[TVideoExtractor]): @@ -108,16 +89,15 @@ class GeotagVideosFromGeneric(abc.ABC, T.Generic[TVideoExtractor]): Extracts metadata from a list of video files with multiprocessing. """ - def __init__( - self, video_paths: T.Sequence[Path], num_processes: int | None = None - ) -> None: - self.video_paths = video_paths + def __init__(self, num_processes: int | None = None) -> None: self.num_processes = num_processes - def to_description(self) -> list[types.VideoMetadataOrError]: - extractor_or_errors = self._generate_video_extractors() + def to_description( + self, video_paths: T.Sequence[Path] + ) -> list[types.VideoMetadataOrError]: + extractor_or_errors = self._generate_video_extractors(video_paths) - assert len(extractor_or_errors) == len(self.video_paths) + assert len(extractor_or_errors) == len(video_paths) extractors, error_metadatas = types.separate_errors(extractor_or_errors) @@ -139,11 +119,6 @@ def to_description(self) -> list[types.VideoMetadataOrError]: return results + error_metadatas - def _generate_video_extractors( - self, - ) -> T.Sequence[TVideoExtractor | types.ErrorMetadata]: - raise NotImplementedError - # This method is passed to multiprocessing # so it has to be classmethod or staticmethod to avoid pickling the instance @classmethod @@ -156,8 +131,17 @@ def run_extraction(cls, extractor: TVideoExtractor) -> types.VideoMetadataOrErro return types.describe_error_metadata( ex, video_path, filetype=types.FileType.VIDEO ) + except exceptions.MapillaryUserError as ex: + # Considered as fatal error if not MapillaryDescriptionError + raise ex except Exception as ex: + # TODO: hide details if not verbose mode LOG.exception("Unexpected error extracting metadata from %s", video_path) return types.describe_error_metadata( ex, video_path, filetype=types.FileType.VIDEO ) + + def _generate_video_extractors( + self, video_paths: T.Sequence[Path] + ) -> T.Sequence[TVideoExtractor | types.ErrorMetadata]: + raise NotImplementedError diff --git a/mapillary_tools/geotag/factory.py b/mapillary_tools/geotag/factory.py index 9a3b09b6e..def91c7a7 100644 --- a/mapillary_tools/geotag/factory.py +++ b/mapillary_tools/geotag/factory.py @@ -8,14 +8,13 @@ from .. import exceptions, types, utils from ..types import FileType from . import ( - geotag_from_generic, + base, geotag_images_from_exif, geotag_images_from_exiftool, - geotag_images_from_exiftool_both_image_and_video, geotag_images_from_gpx_file, geotag_images_from_nmea_file, geotag_images_from_video, - geotag_videos_from_exiftool_video, + geotag_videos_from_exiftool, geotag_videos_from_gpx, geotag_videos_from_video, ) @@ -106,7 +105,7 @@ def _is_reprocessable(metadata: types.MetadataOrError) -> bool: def _filter_images_and_videos( - file_paths: T.Iterable[Path], + paths: T.Iterable[Path], filetypes: set[types.FileType] | None = None, ) -> tuple[list[Path], list[Path]]: image_paths = [] @@ -121,7 +120,7 @@ def _filter_images_and_videos( include_images = types.FileType.IMAGE in filetypes include_videos = bool(filetypes & ALL_VIDEO_TYPES) - for path in file_paths: + for path in paths: if utils.is_image_file(path): if include_images: image_paths.append(path) @@ -154,20 +153,20 @@ def _geotag_images( else: interpolation = option.interpolation - geotag: geotag_from_generic.GeotagImagesFromGeneric + geotag: base.GeotagImagesFromGeneric if option.source is SourceType.NATIVE: geotag = geotag_images_from_exif.GeotagImagesFromEXIF( - image_paths, num_processes=option.num_processes + num_processes=option.num_processes ) - return geotag.to_description() + return geotag.to_description(image_paths) if option.source is SourceType.EXIFTOOL_RUNTIME: geotag = geotag_images_from_exiftool.GeotagImagesFromExifToolRunner( - image_paths, num_processes=option.num_processes + num_processes=option.num_processes ) try: - return geotag.to_description() + return geotag.to_description(image_paths) except exceptions.MapillaryExiftoolNotFoundError as ex: LOG.warning('Skip "%s" because: %s', option.source.value, ex) return [] @@ -175,39 +174,36 @@ def _geotag_images( elif option.source is SourceType.EXIFTOOL_XML: # This is to ensure 'video_process --geotag={"source": "exiftool_xml", "source_path": "/tmp/xml_path"}' # to work - geotag = geotag_images_from_exiftool_both_image_and_video.GeotagImagesFromExifToolBothImageAndVideo( - image_paths, + geotag = geotag_images_from_exiftool.GeotagImagesFromExifToolWithSamples( xml_path=_ensure_source_path(option), num_processes=option.num_processes, ) - return geotag.to_description() + return geotag.to_description(image_paths) elif option.source is SourceType.GPX: geotag = geotag_images_from_gpx_file.GeotagImagesFromGPXFile( - image_paths, source_path=_ensure_source_path(option), use_gpx_start_time=interpolation.use_gpx_start_time, offset_time=interpolation.offset_time, num_processes=option.num_processes, ) - return geotag.to_description() + return geotag.to_description(image_paths) elif option.source is SourceType.NMEA: geotag = geotag_images_from_nmea_file.GeotagImagesFromNMEAFile( - image_paths, source_path=_ensure_source_path(option), use_gpx_start_time=interpolation.use_gpx_start_time, offset_time=interpolation.offset_time, num_processes=option.num_processes, ) - return geotag.to_description() + return geotag.to_description(image_paths) elif option.source is SourceType.EXIF: geotag = geotag_images_from_exif.GeotagImagesFromEXIF( - image_paths, num_processes=option.num_processes + num_processes=option.num_processes ) - return geotag.to_description() + return geotag.to_description(image_paths) elif option.source in [ SourceType.GOPRO, @@ -225,17 +221,15 @@ def _geotag_images( ) video_paths_with_image_samples = list(image_samples_by_video_path.keys()) video_metadatas = geotag_videos_from_video.GeotagVideosFromVideo( - video_paths_with_image_samples, filetypes={map_geotag_source_to_filetype[option.source]}, num_processes=option.num_processes, - ).to_description() + ).to_description(video_paths_with_image_samples) geotag = geotag_images_from_video.GeotagImagesFromVideo( - image_paths, video_metadatas, offset_time=interpolation.offset_time, num_processes=option.num_processes, ) - return geotag.to_description() + return geotag.to_description(image_paths) else: raise ValueError(f"Invalid geotag source {option.source}") @@ -249,34 +243,33 @@ def _geotag_videos( if not video_paths: return [] - geotag: geotag_from_generic.GeotagVideosFromGeneric + geotag: base.GeotagVideosFromGeneric if option.source is SourceType.NATIVE: geotag = geotag_videos_from_video.GeotagVideosFromVideo( - video_paths, num_processes=option.num_processes, filetypes=option.filetypes + num_processes=option.num_processes, filetypes=option.filetypes ) - return geotag.to_description() + return geotag.to_description(video_paths) if option.source is SourceType.EXIFTOOL_RUNTIME: - geotag = geotag_videos_from_exiftool_video.GeotagVideosFromExifToolRunner( - video_paths, num_processes=option.num_processes + geotag = geotag_videos_from_exiftool.GeotagVideosFromExifToolRunner( + num_processes=option.num_processes ) try: - return geotag.to_description() + return geotag.to_description(video_paths) except exceptions.MapillaryExiftoolNotFoundError as ex: LOG.warning('Skip "%s" because: %s', option.source.value, ex) return [] elif option.source is SourceType.EXIFTOOL_XML: - geotag = geotag_videos_from_exiftool_video.GeotagVideosFromExifToolVideo( - video_paths, + geotag = geotag_videos_from_exiftool.GeotagVideosFromExifToolXML( xml_path=_ensure_source_path(option), ) - return geotag.to_description() + return geotag.to_description(video_paths) elif option.source is SourceType.GPX: - geotag = geotag_videos_from_gpx.GeotagVideosFromGPX(video_paths) - return geotag.to_description() + geotag = geotag_videos_from_gpx.GeotagVideosFromGPX() + return geotag.to_description(video_paths) elif option.source is SourceType.NMEA: # TODO: geotag videos from NMEA diff --git a/mapillary_tools/geotag/geotag_images_from_exif.py b/mapillary_tools/geotag/geotag_images_from_exif.py index 7fd0c7c93..7a5ec1698 100644 --- a/mapillary_tools/geotag/geotag_images_from_exif.py +++ b/mapillary_tools/geotag/geotag_images_from_exif.py @@ -1,60 +1,24 @@ -import contextlib +from __future__ import annotations + import logging +import sys import typing as T from pathlib import Path -from .. import exceptions, geo, types, utils -from ..exif_read import ExifRead, ExifReadABC -from .geotag_from_generic import GenericImageExtractor, GeotagImagesFromGeneric - -LOG = logging.getLogger(__name__) - - -class ImageEXIFExtractor(GenericImageExtractor): - def __init__(self, image_path: Path, skip_lonlat_error: bool = False): - super().__init__(image_path) - self.skip_lonlat_error = skip_lonlat_error +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override - @contextlib.contextmanager - def _exif_context(self) -> T.Generator[ExifReadABC, None, None]: - with self.image_path.open("rb") as fp: - yield ExifRead(fp) +from .base import GeotagImagesFromGeneric +from .image_extractors.exif import ImageEXIFExtractor - def extract(self) -> types.ImageMetadata: - with self._exif_context() as exif: - lonlat = exif.extract_lon_lat() - if lonlat is None: - if not self.skip_lonlat_error: - raise exceptions.MapillaryGeoTaggingError( - "Unable to extract GPS Longitude or GPS Latitude from the image" - ) - lonlat = (0.0, 0.0) - lon, lat = lonlat - - capture_time = exif.extract_capture_time() - if capture_time is None: - raise exceptions.MapillaryGeoTaggingError( - "Unable to extract timestamp from the image" - ) - - image_metadata = types.ImageMetadata( - filename=self.image_path, - filesize=utils.get_file_size(self.image_path), - time=geo.as_unix_time(capture_time), - lat=lat, - lon=lon, - alt=exif.extract_altitude(), - angle=exif.extract_direction(), - width=exif.extract_width(), - height=exif.extract_height(), - MAPOrientation=exif.extract_orientation(), - MAPDeviceMake=exif.extract_make(), - MAPDeviceModel=exif.extract_model(), - ) - - return image_metadata +LOG = logging.getLogger(__name__) class GeotagImagesFromEXIF(GeotagImagesFromGeneric): - def _generate_image_extractors(self) -> T.Sequence[ImageEXIFExtractor]: - return [ImageEXIFExtractor(path) for path in self.image_paths] + @override + def _generate_image_extractors( + self, image_paths: T.Sequence[Path] + ) -> T.Sequence[ImageEXIFExtractor]: + return [ImageEXIFExtractor(path) for path in image_paths] diff --git a/mapillary_tools/geotag/geotag_images_from_exiftool.py b/mapillary_tools/geotag/geotag_images_from_exiftool.py index b187a2a41..e5c42ac76 100644 --- a/mapillary_tools/geotag/geotag_images_from_exiftool.py +++ b/mapillary_tools/geotag/geotag_images_from_exiftool.py @@ -1,105 +1,153 @@ from __future__ import annotations -import contextlib import logging +import sys import typing as T import xml.etree.ElementTree as ET from pathlib import Path -from .. import constants, exceptions, exiftool_read, types +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + +from .. import constants, exceptions, exiftool_read, types, utils from ..exiftool_runner import ExiftoolRunner -from .geotag_from_generic import GeotagImagesFromGeneric -from .geotag_images_from_exif import ImageEXIFExtractor +from .base import GeotagImagesFromGeneric +from .geotag_images_from_video import GeotagImagesFromVideo +from .geotag_videos_from_exiftool import GeotagVideosFromExifToolXML +from .image_extractors.exiftool import ImageExifToolExtractor +from .utils import index_rdf_description_by_path LOG = logging.getLogger(__name__) -class ImageExifToolExtractor(ImageEXIFExtractor): - def __init__(self, image_path: Path, element: ET.Element): - super().__init__(image_path) - self.element = element - - @contextlib.contextmanager - def _exif_context(self): - yield exiftool_read.ExifToolRead(ET.ElementTree(self.element)) - - -class GeotagImagesFromExifTool(GeotagImagesFromGeneric): +class GeotagImagesFromExifToolXML(GeotagImagesFromGeneric): def __init__( self, - image_paths: T.Sequence[Path], xml_path: Path, num_processes: int | None = None, ): self.xml_path = xml_path - super().__init__(image_paths=image_paths, num_processes=num_processes) - - def _generate_image_extractors( - self, - ) -> T.Sequence[ImageExifToolExtractor | types.ErrorMetadata]: - rdf_description_by_path = exiftool_read.index_rdf_description_by_path( - [self.xml_path] - ) - + super().__init__(num_processes=num_processes) + + @classmethod + def build_image_extractors( + cls, + rdf_by_path: dict[str, ET.Element], + image_paths: T.Iterable[Path], + ) -> list[ImageExifToolExtractor | types.ErrorMetadata]: results: list[ImageExifToolExtractor | types.ErrorMetadata] = [] - for path in self.image_paths: - rdf_description = rdf_description_by_path.get( - exiftool_read.canonical_path(path) - ) - if rdf_description is None: - exc = exceptions.MapillaryEXIFNotFoundError( - f"The {exiftool_read._DESCRIPTION_TAG} XML element for the image not found" + for path in image_paths: + rdf = rdf_by_path.get(exiftool_read.canonical_path(path)) + if rdf is None: + ex = exceptions.MapillaryExifToolXMLNotFoundError( + "Cannot find the image in the ExifTool XML" ) results.append( types.describe_error_metadata( - exc, path, filetype=types.FileType.IMAGE + ex, path, filetype=types.FileType.IMAGE ) ) else: - results.append(ImageExifToolExtractor(path, rdf_description)) + results.append(ImageExifToolExtractor(path, rdf)) return results + @override + def _generate_image_extractors( + self, image_paths: T.Sequence[Path] + ) -> T.Sequence[ImageExifToolExtractor | types.ErrorMetadata]: + rdf_by_path = index_rdf_description_by_path([self.xml_path]) + return self.build_image_extractors(rdf_by_path, image_paths) + class GeotagImagesFromExifToolRunner(GeotagImagesFromGeneric): + @override def _generate_image_extractors( - self, + self, image_paths: T.Sequence[Path] ) -> T.Sequence[ImageExifToolExtractor | types.ErrorMetadata]: runner = ExiftoolRunner(constants.EXIFTOOL_PATH) LOG.debug( - "Extracting XML from %d images with exiftool command: %s", - len(self.image_paths), + "Extracting XML from %d images with ExifTool command: %s", + len(image_paths), " ".join(runner._build_args_read_stdin()), ) try: - xml = runner.extract_xml(self.image_paths) + xml = runner.extract_xml(image_paths) except FileNotFoundError as ex: raise exceptions.MapillaryExiftoolNotFoundError(ex) from ex - rdf_description_by_path = ( - exiftool_read.index_rdf_description_by_path_from_xml_element( - ET.fromstring(xml) + try: + xml_element = ET.fromstring(xml) + except ET.ParseError as ex: + LOG.warning( + "Failed to parse ExifTool XML: %s", + str(ex), + exc_info=LOG.getEffectiveLevel() <= logging.DEBUG, + ) + rdf_by_path = {} + else: + rdf_by_path = exiftool_read.index_rdf_description_by_path_from_xml_element( + xml_element ) - ) - results: list[ImageExifToolExtractor | types.ErrorMetadata] = [] + return GeotagImagesFromExifToolXML.build_image_extractors( + rdf_by_path, image_paths + ) - for path in self.image_paths: - rdf_description = rdf_description_by_path.get( - exiftool_read.canonical_path(path) - ) - if rdf_description is None: - exc = exceptions.MapillaryEXIFNotFoundError( - f"The {exiftool_read._DESCRIPTION_TAG} XML element for the image not found" - ) - results.append( - types.describe_error_metadata( - exc, path, filetype=types.FileType.IMAGE - ) - ) - else: - results.append(ImageExifToolExtractor(path, rdf_description)) - return results +class GeotagImagesFromExifToolWithSamples(GeotagImagesFromGeneric): + def __init__( + self, + xml_path: Path, + offset_time: float = 0.0, + num_processes: int | None = None, + ): + super().__init__(num_processes=num_processes) + self.xml_path = xml_path + self.offset_time = offset_time + + def geotag_samples( + self, image_paths: T.Sequence[Path] + ) -> list[types.ImageMetadataOrError]: + # Find all video paths in self.xml_path + rdf_by_path = index_rdf_description_by_path([self.xml_path]) + video_paths = utils.find_videos( + [Path(pathstr) for pathstr in rdf_by_path.keys()], + skip_subfolders=True, + ) + # Find all video paths that have sample images + samples_by_video = utils.find_all_image_samples(image_paths, video_paths) + + video_metadata_or_errors = GeotagVideosFromExifToolXML( + self.xml_path, + num_processes=self.num_processes, + ).to_description(list(samples_by_video.keys())) + sample_paths = sum(samples_by_video.values(), []) + sample_metadata_or_errors = GeotagImagesFromVideo( + video_metadata_or_errors, + offset_time=self.offset_time, + num_processes=self.num_processes, + ).to_description(sample_paths) + + return sample_metadata_or_errors + + @override + def to_description( + self, image_paths: T.Sequence[Path] + ) -> list[types.ImageMetadataOrError]: + sample_metadata_or_errors = self.geotag_samples(image_paths) + + sample_paths = set(metadata.filename for metadata in sample_metadata_or_errors) + + non_sample_paths = [path for path in image_paths if path not in sample_paths] + + non_sample_metadata_or_errors = GeotagImagesFromExifToolXML( + self.xml_path, + num_processes=self.num_processes, + ).to_description(non_sample_paths) + + return sample_metadata_or_errors + non_sample_metadata_or_errors diff --git a/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py b/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py deleted file mode 100644 index 4b9e19d49..000000000 --- a/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py +++ /dev/null @@ -1,77 +0,0 @@ -from __future__ import annotations - -import logging -import typing as T -from pathlib import Path - -from .. import exiftool_read, types, utils -from . import ( - geotag_images_from_exiftool, - geotag_images_from_video, - geotag_videos_from_exiftool_video, -) -from .geotag_from_generic import GeotagImagesFromGeneric - - -LOG = logging.getLogger(__name__) - - -class GeotagImagesFromExifToolBothImageAndVideo(GeotagImagesFromGeneric): - def __init__( - self, - image_paths: T.Sequence[Path], - xml_path: Path, - offset_time: float = 0.0, - num_processes: int | None = None, - ): - super().__init__(image_paths, num_processes=num_processes) - self.xml_path = xml_path - self.offset_time = offset_time - - def geotag_samples(self) -> list[types.ImageMetadataOrError]: - # Find all video paths in self.xml_path - rdf_description_by_path = exiftool_read.index_rdf_description_by_path( - [self.xml_path] - ) - video_paths = utils.find_videos( - [Path(pathstr) for pathstr in rdf_description_by_path.keys()], - skip_subfolders=True, - ) - # Find all video paths that have sample images - samples_by_video = utils.find_all_image_samples(self.image_paths, video_paths) - - video_metadata_or_errors = ( - geotag_videos_from_exiftool_video.GeotagVideosFromExifToolVideo( - list(samples_by_video.keys()), - self.xml_path, - num_processes=self.num_processes, - ).to_description() - ) - sample_paths = sum(samples_by_video.values(), []) - sample_metadata_or_errors = geotag_images_from_video.GeotagImagesFromVideo( - sample_paths, - video_metadata_or_errors, - offset_time=self.offset_time, - num_processes=self.num_processes, - ).to_description() - - return sample_metadata_or_errors - - def to_description(self) -> list[types.ImageMetadataOrError]: - sample_metadata_or_errors = self.geotag_samples() - - sample_paths = set(metadata.filename for metadata in sample_metadata_or_errors) - - non_sample_paths = [ - path for path in self.image_paths if path not in sample_paths - ] - - non_sample_metadata_or_errors = ( - geotag_images_from_exiftool.GeotagImagesFromExifTool( - non_sample_paths, - self.xml_path, - num_processes=self.num_processes, - ).to_description() - ) - - return sample_metadata_or_errors + non_sample_metadata_or_errors diff --git a/mapillary_tools/geotag/geotag_images_from_gpx.py b/mapillary_tools/geotag/geotag_images_from_gpx.py index ac06d90b3..56f4779ec 100644 --- a/mapillary_tools/geotag/geotag_images_from_gpx.py +++ b/mapillary_tools/geotag/geotag_images_from_gpx.py @@ -2,11 +2,17 @@ import dataclasses import logging +import sys import typing as T from pathlib import Path +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + from .. import exceptions, geo, types -from .geotag_from_generic import GeotagImagesFromGeneric +from .base import GeotagImagesFromGeneric from .geotag_images_from_exif import ImageEXIFExtractor @@ -16,14 +22,13 @@ class GeotagImagesFromGPX(GeotagImagesFromGeneric): def __init__( self, - image_paths: T.Sequence[Path], points: T.Sequence[geo.Point], use_gpx_start_time: bool = False, use_image_start_time: bool = False, offset_time: float = 0.0, num_processes: int | None = None, ): - super().__init__(image_paths, num_processes=num_processes) + super().__init__(num_processes=num_processes) self.points = points self.use_gpx_start_time = use_gpx_start_time self.use_image_start_time = use_image_start_time @@ -73,16 +78,21 @@ def _interpolate_image_metadata_along( time=interpolated.time, ) - def _generate_image_extractors(self) -> T.Sequence[ImageEXIFExtractor]: + @override + def _generate_image_extractors( + self, image_paths: T.Sequence[Path] + ) -> T.Sequence[ImageEXIFExtractor]: return [ - ImageEXIFExtractor(path, skip_lonlat_error=True) - for path in self.image_paths + ImageEXIFExtractor(path, skip_lonlat_error=True) for path in image_paths ] - def to_description(self) -> list[types.ImageMetadataOrError]: + @override + def to_description( + self, image_paths: T.Sequence[Path] + ) -> list[types.ImageMetadataOrError]: final_metadatas: list[types.ImageMetadataOrError] = [] - image_metadata_or_errors = super().to_description() + image_metadata_or_errors = super().to_description(image_paths) image_metadatas, error_metadatas = types.separate_errors( image_metadata_or_errors @@ -90,7 +100,7 @@ def to_description(self) -> list[types.ImageMetadataOrError]: final_metadatas.extend(error_metadatas) if not image_metadatas: - assert len(self.image_paths) == len(final_metadatas) + assert len(image_paths) == len(final_metadatas) return final_metadatas # Do not use point itself for comparison because point.angle or point.alt could be None @@ -145,6 +155,6 @@ def to_description(self) -> list[types.ImageMetadataOrError]: ) final_metadatas.append(error_metadata) - assert len(self.image_paths) == len(final_metadatas) + assert len(image_paths) == len(final_metadatas) return final_metadatas diff --git a/mapillary_tools/geotag/geotag_images_from_gpx_file.py b/mapillary_tools/geotag/geotag_images_from_gpx_file.py index 347ea5294..9c0532711 100644 --- a/mapillary_tools/geotag/geotag_images_from_gpx_file.py +++ b/mapillary_tools/geotag/geotag_images_from_gpx_file.py @@ -1,12 +1,9 @@ from __future__ import annotations import logging -import typing as T from pathlib import Path -import gpxpy - -from .. import geo +from . import utils from .geotag_images_from_gpx import GeotagImagesFromGPX @@ -16,14 +13,13 @@ class GeotagImagesFromGPXFile(GeotagImagesFromGPX): def __init__( self, - image_paths: T.Sequence[Path], source_path: Path, use_gpx_start_time: bool = False, offset_time: float = 0.0, num_processes: int | None = None, ): try: - tracks = parse_gpx(source_path) + tracks = utils.parse_gpx(source_path) except Exception as ex: raise RuntimeError( f"Error parsing GPX {source_path}: {ex.__class__.__name__}: {ex}" @@ -37,36 +33,8 @@ def __init__( ) points = sum(tracks, []) super().__init__( - image_paths, points, use_gpx_start_time=use_gpx_start_time, offset_time=offset_time, num_processes=num_processes, ) - - -Track = T.List[geo.Point] - - -def parse_gpx(gpx_file: Path) -> list[Track]: - with gpx_file.open("r") as f: - gpx = gpxpy.parse(f) - - tracks: list[Track] = [] - - for track in gpx.tracks: - for segment in track.segments: - tracks.append([]) - for point in segment.points: - if point.time is not None: - tracks[-1].append( - geo.Point( - time=geo.as_unix_time(point.time), - lat=point.latitude, - lon=point.longitude, - alt=point.elevation, - angle=None, - ) - ) - - return tracks diff --git a/mapillary_tools/geotag/geotag_images_from_nmea_file.py b/mapillary_tools/geotag/geotag_images_from_nmea_file.py index 0a90a8487..ffb67e5f4 100644 --- a/mapillary_tools/geotag/geotag_images_from_nmea_file.py +++ b/mapillary_tools/geotag/geotag_images_from_nmea_file.py @@ -1,7 +1,6 @@ from __future__ import annotations import datetime -import typing as T from pathlib import Path import pynmea2 @@ -13,7 +12,6 @@ class GeotagImagesFromNMEAFile(GeotagImagesFromGPX): def __init__( self, - image_paths: T.Sequence[Path], source_path: Path, use_gpx_start_time: bool = False, offset_time: float = 0.0, @@ -21,7 +19,6 @@ def __init__( ): points = get_lat_lon_time_from_nmea(source_path) super().__init__( - image_paths, points, use_gpx_start_time=use_gpx_start_time, offset_time=offset_time, diff --git a/mapillary_tools/geotag/geotag_images_from_video.py b/mapillary_tools/geotag/geotag_images_from_video.py index 6332b3eee..6d032b21b 100644 --- a/mapillary_tools/geotag/geotag_images_from_video.py +++ b/mapillary_tools/geotag/geotag_images_from_video.py @@ -1,12 +1,17 @@ from __future__ import annotations import logging +import sys import typing as T from pathlib import Path -from .. import types, utils +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override -from .geotag_from_generic import GeotagImagesFromGeneric +from .. import types, utils +from .base import GeotagImagesFromGeneric from .geotag_images_from_gpx import GeotagImagesFromGPX @@ -16,16 +21,18 @@ class GeotagImagesFromVideo(GeotagImagesFromGeneric): def __init__( self, - image_paths: T.Sequence[Path], video_metadatas: T.Sequence[types.VideoMetadataOrError], offset_time: float = 0.0, num_processes: int | None = None, ): - super().__init__(image_paths, num_processes=num_processes) + super().__init__(num_processes=num_processes) self.video_metadatas = video_metadatas self.offset_time = offset_time - def to_description(self) -> list[types.ImageMetadataOrError]: + @override + def to_description( + self, image_paths: T.Sequence[Path] + ) -> list[types.ImageMetadataOrError]: # Will return this list final_image_metadatas: list[types.ImageMetadataOrError] = [] @@ -35,9 +42,7 @@ def to_description(self) -> list[types.ImageMetadataOrError]: for video_error_metadata in video_error_metadatas: video_path = video_error_metadata.filename - sample_paths = list( - utils.filter_video_samples(self.image_paths, video_path) - ) + sample_paths = list(utils.filter_video_samples(image_paths, video_path)) LOG.debug( "Found %d sample images from video %s with error: %s", len(sample_paths), @@ -55,9 +60,7 @@ def to_description(self) -> list[types.ImageMetadataOrError]: for video_metadata in video_metadatas: video_path = video_metadata.filename - sample_paths = list( - utils.filter_video_samples(self.image_paths, video_path) - ) + sample_paths = list(utils.filter_video_samples(image_paths, video_path)) LOG.debug( "Found %d sample images from video %s", len(sample_paths), @@ -65,7 +68,6 @@ def to_description(self) -> list[types.ImageMetadataOrError]: ) geotag = GeotagImagesFromGPX( - sample_paths, video_metadata.points, use_gpx_start_time=False, use_image_start_time=True, @@ -73,7 +75,7 @@ def to_description(self) -> list[types.ImageMetadataOrError]: num_processes=self.num_processes, ) - image_metadatas = geotag.to_description() + image_metadatas = geotag.to_description(image_paths) for metadata in image_metadatas: if isinstance(metadata, types.ImageMetadata): @@ -85,6 +87,6 @@ def to_description(self) -> list[types.ImageMetadataOrError]: # NOTE: this method only geotags images that have a corresponding video, # so the number of image metadata objects returned might be less than # the number of the input image_paths - assert len(final_image_metadatas) <= len(self.image_paths) + assert len(final_image_metadatas) <= len(image_paths) return final_image_metadatas diff --git a/mapillary_tools/geotag/geotag_videos_from_exiftool.py b/mapillary_tools/geotag/geotag_videos_from_exiftool.py new file mode 100644 index 000000000..5b7de6839 --- /dev/null +++ b/mapillary_tools/geotag/geotag_videos_from_exiftool.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import logging +import sys +import typing as T +import xml.etree.ElementTree as ET +from pathlib import Path + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + +from .. import constants, exceptions, exiftool_read, types +from ..exiftool_runner import ExiftoolRunner +from .base import GeotagVideosFromGeneric +from .utils import index_rdf_description_by_path +from .video_extractors.exiftool import VideoExifToolExtractor + +LOG = logging.getLogger(__name__) + + +class GeotagVideosFromExifToolXML(GeotagVideosFromGeneric): + def __init__( + self, + xml_path: Path, + num_processes: int | None = None, + ): + super().__init__(num_processes=num_processes) + self.xml_path = xml_path + + @classmethod + def build_image_extractors( + cls, + rdf_by_path: dict[str, ET.Element], + video_paths: T.Iterable[Path], + ) -> list[VideoExifToolExtractor | types.ErrorMetadata]: + results: list[VideoExifToolExtractor | types.ErrorMetadata] = [] + + for path in video_paths: + rdf = rdf_by_path.get(exiftool_read.canonical_path(path)) + if rdf is None: + ex = exceptions.MapillaryExifToolXMLNotFoundError( + "Cannot find the video in the ExifTool XML" + ) + results.append( + types.describe_error_metadata( + ex, path, filetype=types.FileType.VIDEO + ) + ) + else: + results.append(VideoExifToolExtractor(path, rdf)) + + return results + + @override + def _generate_video_extractors( + self, video_paths: T.Sequence[Path] + ) -> T.Sequence[VideoExifToolExtractor | types.ErrorMetadata]: + rdf_by_path = index_rdf_description_by_path([self.xml_path]) + return self.build_image_extractors(rdf_by_path, video_paths) + + +class GeotagVideosFromExifToolRunner(GeotagVideosFromGeneric): + @override + def _generate_video_extractors( + self, video_paths: T.Sequence[Path] + ) -> T.Sequence[VideoExifToolExtractor | types.ErrorMetadata]: + runner = ExiftoolRunner(constants.EXIFTOOL_PATH) + + LOG.debug( + "Extracting XML from %d videos with ExifTool command: %s", + len(video_paths), + " ".join(runner._build_args_read_stdin()), + ) + try: + xml = runner.extract_xml(video_paths) + except FileNotFoundError as ex: + raise exceptions.MapillaryExiftoolNotFoundError(ex) from ex + + try: + xml_element = ET.fromstring(xml) + except ET.ParseError as ex: + LOG.warning( + "Failed to parse ExifTool XML: %s", + str(ex), + exc_info=LOG.getEffectiveLevel() <= logging.DEBUG, + ) + rdf_by_path = {} + else: + rdf_by_path = exiftool_read.index_rdf_description_by_path_from_xml_element( + xml_element + ) + + return GeotagVideosFromExifToolXML.build_image_extractors( + rdf_by_path, video_paths + ) diff --git a/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py b/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py deleted file mode 100644 index 3f95fc5ed..000000000 --- a/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py +++ /dev/null @@ -1,151 +0,0 @@ -from __future__ import annotations - -import logging -import typing as T -import xml.etree.ElementTree as ET -from pathlib import Path - -from .. import constants, exceptions, exiftool_read, geo, types, utils -from ..exiftool_read_video import ExifToolReadVideo -from ..exiftool_runner import ExiftoolRunner -from ..gpmf import gpmf_gps_filter -from ..telemetry import GPSPoint -from .geotag_from_generic import GenericVideoExtractor, GeotagVideosFromGeneric - -LOG = logging.getLogger(__name__) - - -class VideoExifToolExtractor(GenericVideoExtractor): - def __init__(self, video_path: Path, element: ET.Element): - super().__init__(video_path) - self.element = element - - def extract(self) -> types.VideoMetadataOrError: - exif = ExifToolReadVideo(ET.ElementTree(self.element)) - - make = exif.extract_make() - model = exif.extract_model() - - is_gopro = make is not None and make.upper() in ["GOPRO"] - - points = exif.extract_gps_track() - - # ExifTool has no idea if GPS is not found or found but empty - if is_gopro: - if not points: - raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") - - # ExifTool (since 13.04) converts GPSSpeed for GoPro to km/h, so here we convert it back to m/s - for p in points: - if isinstance(p, GPSPoint) and p.ground_speed is not None: - p.ground_speed = p.ground_speed / 3.6 - - if isinstance(points[0], GPSPoint): - points = T.cast( - T.List[geo.Point], - gpmf_gps_filter.remove_noisy_points( - T.cast(T.List[GPSPoint], points) - ), - ) - if not points: - raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") - - if not points: - raise exceptions.MapillaryVideoGPSNotFoundError( - "No GPS data found from the video" - ) - - filetype = types.FileType.GOPRO if is_gopro else types.FileType.VIDEO - - video_metadata = types.VideoMetadata( - self.video_path, - filesize=utils.get_file_size(self.video_path), - filetype=filetype, - points=points, - make=make, - model=model, - ) - - return video_metadata - - -class GeotagVideosFromExifToolVideo(GeotagVideosFromGeneric): - def __init__( - self, - video_paths: T.Sequence[Path], - xml_path: Path, - num_processes: int | None = None, - ): - super().__init__(video_paths, num_processes=num_processes) - self.xml_path = xml_path - - def _generate_video_extractors( - self, - ) -> T.Sequence[GenericVideoExtractor | types.ErrorMetadata]: - rdf_description_by_path = exiftool_read.index_rdf_description_by_path( - [self.xml_path] - ) - - results: list[VideoExifToolExtractor | types.ErrorMetadata] = [] - - for path in self.video_paths: - rdf_description = rdf_description_by_path.get( - exiftool_read.canonical_path(path) - ) - if rdf_description is None: - exc = exceptions.MapillaryEXIFNotFoundError( - f"The {exiftool_read._DESCRIPTION_TAG} XML element for the video not found" - ) - results.append( - types.describe_error_metadata( - exc, path, filetype=types.FileType.VIDEO - ) - ) - else: - results.append(VideoExifToolExtractor(path, rdf_description)) - - return results - - -class GeotagVideosFromExifToolRunner(GeotagVideosFromGeneric): - def _generate_video_extractors( - self, - ) -> T.Sequence[GenericVideoExtractor | types.ErrorMetadata]: - runner = ExiftoolRunner(constants.EXIFTOOL_PATH) - - LOG.debug( - "Extracting XML from %d videos with exiftool command: %s", - len(self.video_paths), - " ".join(runner._build_args_read_stdin()), - ) - - try: - xml = runner.extract_xml(self.video_paths) - except FileNotFoundError as ex: - raise exceptions.MapillaryExiftoolNotFoundError(ex) from ex - - rdf_description_by_path = ( - exiftool_read.index_rdf_description_by_path_from_xml_element( - ET.fromstring(xml) - ) - ) - - results: list[VideoExifToolExtractor | types.ErrorMetadata] = [] - - for path in self.video_paths: - rdf_description = rdf_description_by_path.get( - exiftool_read.canonical_path(path) - ) - if rdf_description is None: - exc = exceptions.MapillaryEXIFNotFoundError( - f"The {exiftool_read._DESCRIPTION_TAG} XML element for the video not found" - ) - results.append( - types.describe_error_metadata( - exc, path, filetype=types.FileType.VIDEO - ) - ) - else: - results.append(VideoExifToolExtractor(path, rdf_description)) - - return results diff --git a/mapillary_tools/geotag/geotag_videos_from_gpx.py b/mapillary_tools/geotag/geotag_videos_from_gpx.py index 809a25c7d..a5e8afd85 100644 --- a/mapillary_tools/geotag/geotag_videos_from_gpx.py +++ b/mapillary_tools/geotag/geotag_videos_from_gpx.py @@ -1,140 +1,39 @@ from __future__ import annotations -import dataclasses -import datetime import logging - +import sys import typing as T from pathlib import Path -from .. import geo, telemetry, types +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + from . import options -from .geotag_from_generic import GenericVideoExtractor, GeotagVideosFromGeneric -from .geotag_images_from_gpx_file import parse_gpx -from .geotag_videos_from_video import NativeVideoExtractor +from .base import GeotagVideosFromGeneric +from .video_extractors.gpx import GPXVideoExtractor LOG = logging.getLogger(__name__) -class GPXVideoExtractor(GenericVideoExtractor): - def __init__(self, video_path: Path, gpx_path: Path): - self.video_path = video_path - self.gpx_path = gpx_path - - def extract(self) -> types.VideoMetadataOrError: - try: - gpx_tracks = parse_gpx(self.gpx_path) - except Exception as ex: - raise RuntimeError( - f"Error parsing GPX {self.gpx_path}: {ex.__class__.__name__}: {ex}" - ) - - if 1 < len(gpx_tracks): - LOG.warning( - "Found %s tracks in the GPX file %s. Will merge points in all the tracks as a single track for interpolation", - len(gpx_tracks), - self.gpx_path, - ) - - gpx_points: T.Sequence[geo.Point] = sum(gpx_tracks, []) - - native_extractor = NativeVideoExtractor(self.video_path) - - video_metadata_or_error = native_extractor.extract() - - if isinstance(video_metadata_or_error, types.ErrorMetadata): - self._rebase_times(gpx_points) - return types.VideoMetadata( - filename=video_metadata_or_error.filename, - filetype=video_metadata_or_error.filetype or types.FileType.VIDEO, - points=gpx_points, - ) - - video_metadata = video_metadata_or_error - - offset = self._synx_gpx_by_first_gps_timestamp( - gpx_points, video_metadata.points - ) - - self._rebase_times(gpx_points, offset=offset) - - return dataclasses.replace(video_metadata_or_error, points=gpx_points) - - @staticmethod - def _rebase_times(points: T.Sequence[geo.Point], offset: float = 0.0): - """ - Make point times start from 0 - """ - if points: - first_timestamp = points[0].time - for p in points: - p.time = (p.time - first_timestamp) + offset - return points - - def _synx_gpx_by_first_gps_timestamp( - self, gpx_points: T.Sequence[geo.Point], video_gps_points: T.Sequence[geo.Point] - ) -> float: - offset: float = 0.0 - - if not gpx_points: - return offset - - first_gpx_dt = datetime.datetime.fromtimestamp( - gpx_points[0].time, tz=datetime.timezone.utc - ) - LOG.info("First GPX timestamp: %s", first_gpx_dt) - - if not video_gps_points: - LOG.warning( - "Skip GPX synchronization because no GPS found in video %s", - self.video_path, - ) - return offset - - first_gps_point = video_gps_points[0] - if isinstance(first_gps_point, telemetry.GPSPoint): - if first_gps_point.epoch_time is not None: - first_gps_dt = datetime.datetime.fromtimestamp( - first_gps_point.epoch_time, tz=datetime.timezone.utc - ) - LOG.info("First GPS timestamp: %s", first_gps_dt) - offset = gpx_points[0].time - first_gps_point.epoch_time - if offset: - LOG.warning( - "Found offset between GPX %s and video GPS timestamps %s: %s seconds", - first_gpx_dt, - first_gps_dt, - offset, - ) - else: - LOG.info( - "GPX and GPS are perfectly synchronized (all starts from %s)", - first_gpx_dt, - ) - else: - LOG.warning( - "Skip GPX synchronization because no GPS epoch time found in video %s", - self.video_path, - ) - - return offset - - class GeotagVideosFromGPX(GeotagVideosFromGeneric): def __init__( self, - video_paths: T.Sequence[Path], option: options.SourcePathOption | None = None, num_processes: int | None = None, ): - super().__init__(video_paths, num_processes=num_processes) + super().__init__(num_processes=num_processes) if option is None: option = options.SourcePathOption(pattern="%f.gpx") self.option = option - def _generate_image_extractors(self) -> T.Sequence[GPXVideoExtractor]: + @override + def _generate_video_extractors( + self, video_paths: T.Sequence[Path] + ) -> T.Sequence[GPXVideoExtractor]: return [ GPXVideoExtractor(video_path, self.option.resolve(video_path)) - for video_path in self.video_paths + for video_path in video_paths ] diff --git a/mapillary_tools/geotag/geotag_videos_from_video.py b/mapillary_tools/geotag/geotag_videos_from_video.py index ec0e6d7e2..f718656eb 100644 --- a/mapillary_tools/geotag/geotag_videos_from_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_video.py @@ -1,165 +1,32 @@ from __future__ import annotations +import sys import typing as T from pathlib import Path -from .. import blackvue_parser, exceptions, geo, telemetry, types, utils -from ..camm import camm_parser -from ..gpmf import gpmf_gps_filter, gpmf_parser -from ..types import FileType -from .geotag_from_generic import GenericVideoExtractor, GeotagVideosFromGeneric - - -class GoProVideoExtractor(GenericVideoExtractor): - def extract(self) -> types.VideoMetadataOrError: - with self.video_path.open("rb") as fp: - gopro_info = gpmf_parser.extract_gopro_info(fp) - - if gopro_info is None: - raise exceptions.MapillaryVideoGPSNotFoundError( - "No GPS data found from the video" - ) - - gps_points = gopro_info.gps - assert gps_points is not None, "must have GPS data extracted" - if not gps_points: - # Instead of raising an exception, return error metadata to tell the file type - ex: exceptions.MapillaryDescriptionError = ( - exceptions.MapillaryGPXEmptyError("Empty GPS data found") - ) - return types.describe_error_metadata( - ex, self.video_path, filetype=FileType.GOPRO - ) - - gps_points = T.cast( - T.List[telemetry.GPSPoint], gpmf_gps_filter.remove_noisy_points(gps_points) - ) - if not gps_points: - # Instead of raising an exception, return error metadata to tell the file type - ex = exceptions.MapillaryGPSNoiseError("GPS is too noisy") - return types.describe_error_metadata( - ex, self.video_path, filetype=FileType.GOPRO - ) - - video_metadata = types.VideoMetadata( - filename=self.video_path, - filesize=utils.get_file_size(self.video_path), - filetype=FileType.GOPRO, - points=T.cast(T.List[geo.Point], gps_points), - make=gopro_info.make, - model=gopro_info.model, - ) - - return video_metadata - - -class CAMMVideoExtractor(GenericVideoExtractor): - def extract(self) -> types.VideoMetadataOrError: - with self.video_path.open("rb") as fp: - camm_info = camm_parser.extract_camm_info(fp) - - if camm_info is None: - raise exceptions.MapillaryVideoGPSNotFoundError( - "No GPS data found from the video" - ) - - if not camm_info.gps and not camm_info.mini_gps: - # Instead of raising an exception, return error metadata to tell the file type - ex: exceptions.MapillaryDescriptionError = ( - exceptions.MapillaryGPXEmptyError("Empty GPS data found") - ) - return types.describe_error_metadata( - ex, self.video_path, filetype=FileType.CAMM - ) - - return types.VideoMetadata( - filename=self.video_path, - filesize=utils.get_file_size(self.video_path), - filetype=FileType.CAMM, - points=T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps), - make=camm_info.make, - model=camm_info.model, - ) - +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override -class BlackVueVideoExtractor(GenericVideoExtractor): - def extract(self) -> types.VideoMetadataOrError: - with self.video_path.open("rb") as fp: - blackvue_info = blackvue_parser.extract_blackvue_info(fp) - - if blackvue_info is None: - raise exceptions.MapillaryVideoGPSNotFoundError( - "No GPS data found from the video" - ) - - if not blackvue_info.gps: - # Instead of raising an exception, return error metadata to tell the file type - ex: exceptions.MapillaryDescriptionError = ( - exceptions.MapillaryGPXEmptyError("Empty GPS data found") - ) - return types.describe_error_metadata( - ex, self.video_path, filetype=FileType.BLACKVUE - ) - - video_metadata = types.VideoMetadata( - filename=self.video_path, - filesize=utils.get_file_size(self.video_path), - filetype=FileType.BLACKVUE, - points=blackvue_info.gps or [], - make=blackvue_info.make, - model=blackvue_info.model, - ) - - return video_metadata - - -class NativeVideoExtractor(GenericVideoExtractor): - def __init__(self, video_path: Path, filetypes: set[FileType] | None = None): - super().__init__(video_path) - self.filetypes = filetypes - - def extract(self) -> types.VideoMetadataOrError: - ft = self.filetypes - extractor: GenericVideoExtractor - - if ft is None or FileType.VIDEO in ft or FileType.GOPRO in ft: - extractor = GoProVideoExtractor(self.video_path) - try: - return extractor.extract() - except exceptions.MapillaryVideoGPSNotFoundError: - pass - - if ft is None or FileType.VIDEO in ft or FileType.CAMM in ft: - extractor = CAMMVideoExtractor(self.video_path) - try: - return extractor.extract() - except exceptions.MapillaryVideoGPSNotFoundError: - pass - - if ft is None or FileType.VIDEO in ft or FileType.BLACKVUE in ft: - extractor = BlackVueVideoExtractor(self.video_path) - try: - return extractor.extract() - except exceptions.MapillaryVideoGPSNotFoundError: - pass - - raise exceptions.MapillaryVideoGPSNotFoundError( - "No GPS data found from the video" - ) +from ..types import FileType +from .base import GeotagVideosFromGeneric +from .video_extractors.native import NativeVideoExtractor class GeotagVideosFromVideo(GeotagVideosFromGeneric): def __init__( self, - video_paths: T.Sequence[Path], filetypes: set[FileType] | None = None, num_processes: int | None = None, ): - super().__init__(video_paths, num_processes=num_processes) + super().__init__(num_processes=num_processes) self.filetypes = filetypes - def _generate_video_extractors(self) -> T.Sequence[GenericVideoExtractor]: + @override + def _generate_video_extractors( + self, video_paths: T.Sequence[Path] + ) -> T.Sequence[NativeVideoExtractor]: return [ - NativeVideoExtractor(path, filetypes=self.filetypes) - for path in self.video_paths + NativeVideoExtractor(path, filetypes=self.filetypes) for path in video_paths ] diff --git a/mapillary_tools/geotag/image_extractors/base.py b/mapillary_tools/geotag/image_extractors/base.py new file mode 100644 index 000000000..5d65d6cd2 --- /dev/null +++ b/mapillary_tools/geotag/image_extractors/base.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +import abc +from pathlib import Path + +from ... import types + + +class BaseImageExtractor(abc.ABC): + """ + Extracts metadata from an image file. + """ + + def __init__(self, image_path: Path): + self.image_path = image_path + + def extract(self) -> types.ImageMetadataOrError: + raise NotImplementedError diff --git a/mapillary_tools/geotag/image_extractors/exif.py b/mapillary_tools/geotag/image_extractors/exif.py new file mode 100644 index 000000000..f78b0bdaf --- /dev/null +++ b/mapillary_tools/geotag/image_extractors/exif.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import contextlib +import sys +import typing as T +from pathlib import Path + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + +from ... import exceptions, exif_read, geo, types, utils +from .base import BaseImageExtractor + + +class ImageEXIFExtractor(BaseImageExtractor): + def __init__(self, image_path: Path, skip_lonlat_error: bool = False): + super().__init__(image_path) + self.skip_lonlat_error = skip_lonlat_error + + @contextlib.contextmanager + def _exif_context(self) -> T.Generator[exif_read.ExifReadABC, None, None]: + with self.image_path.open("rb") as fp: + yield exif_read.ExifRead(fp) + + @override + def extract(self) -> types.ImageMetadata: + with self._exif_context() as exif: + lonlat = exif.extract_lon_lat() + if lonlat is None: + if not self.skip_lonlat_error: + raise exceptions.MapillaryGeoTaggingError( + "Unable to extract GPS Longitude or GPS Latitude from the image" + ) + lonlat = (0.0, 0.0) + lon, lat = lonlat + + capture_time = exif.extract_capture_time() + if capture_time is None: + raise exceptions.MapillaryGeoTaggingError( + "Unable to extract timestamp from the image" + ) + + image_metadata = types.ImageMetadata( + filename=self.image_path, + filesize=utils.get_file_size(self.image_path), + time=geo.as_unix_time(capture_time), + lat=lat, + lon=lon, + alt=exif.extract_altitude(), + angle=exif.extract_direction(), + width=exif.extract_width(), + height=exif.extract_height(), + MAPOrientation=exif.extract_orientation(), + MAPDeviceMake=exif.extract_make(), + MAPDeviceModel=exif.extract_model(), + ) + + return image_metadata diff --git a/mapillary_tools/geotag/image_extractors/exiftool.py b/mapillary_tools/geotag/image_extractors/exiftool.py new file mode 100644 index 000000000..a0fbb5c82 --- /dev/null +++ b/mapillary_tools/geotag/image_extractors/exiftool.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +import contextlib +import xml.etree.ElementTree as ET +from pathlib import Path + +from ... import exiftool_read +from .exif import ImageEXIFExtractor + + +class ImageExifToolExtractor(ImageEXIFExtractor): + def __init__(self, image_path: Path, element: ET.Element): + super().__init__(image_path) + self.element = element + + @contextlib.contextmanager + def _exif_context(self): + yield exiftool_read.ExifToolRead(ET.ElementTree(self.element)) diff --git a/mapillary_tools/geotag/options.py b/mapillary_tools/geotag/options.py index 44b2cd652..c3c243e61 100644 --- a/mapillary_tools/geotag/options.py +++ b/mapillary_tools/geotag/options.py @@ -28,6 +28,7 @@ class SourceType(enum.Enum): SOURCE_TYPE_ALIAS: dict[str, SourceType] = { "blackvue_videos": SourceType.BLACKVUE, "gopro_videos": SourceType.GOPRO, + "exiftool": SourceType.EXIFTOOL_RUNTIME, } diff --git a/mapillary_tools/geotag/utils.py b/mapillary_tools/geotag/utils.py new file mode 100644 index 000000000..ef311e437 --- /dev/null +++ b/mapillary_tools/geotag/utils.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import logging +import typing as T +import xml.etree.ElementTree as ET +from pathlib import Path + +import gpxpy + +from .. import exiftool_read, geo, utils + +Track = T.List[geo.Point] +LOG = logging.getLogger(__name__) + + +def parse_gpx(gpx_file: Path) -> list[Track]: + with gpx_file.open("r") as f: + gpx = gpxpy.parse(f) + + tracks: list[Track] = [] + + for track in gpx.tracks: + for segment in track.segments: + tracks.append([]) + for point in segment.points: + if point.time is not None: + tracks[-1].append( + geo.Point( + time=geo.as_unix_time(point.time), + lat=point.latitude, + lon=point.longitude, + alt=point.elevation, + angle=None, + ) + ) + + return tracks + + +def index_rdf_description_by_path( + xml_paths: T.Sequence[Path], +) -> dict[str, ET.Element]: + rdf_description_by_path: dict[str, ET.Element] = {} + + for xml_path in utils.find_xml_files(xml_paths): + try: + etree = ET.parse(xml_path) + except ET.ParseError as ex: + verbose = LOG.getEffectiveLevel() <= logging.DEBUG + if verbose: + LOG.warning("Failed to parse %s", xml_path, exc_info=True) + else: + LOG.warning("Failed to parse %s: %s", xml_path, ex) + continue + + rdf_description_by_path.update( + exiftool_read.index_rdf_description_by_path_from_xml_element( + etree.getroot() + ) + ) + + return rdf_description_by_path diff --git a/mapillary_tools/geotag/video_extractors/base.py b/mapillary_tools/geotag/video_extractors/base.py new file mode 100644 index 000000000..1a2e03b3c --- /dev/null +++ b/mapillary_tools/geotag/video_extractors/base.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +import abc +from pathlib import Path + +from ... import types + + +class BaseVideoExtractor(abc.ABC): + """ + Extracts metadata from a video file. + """ + + def __init__(self, video_path: Path): + self.video_path = video_path + + def extract(self) -> types.VideoMetadataOrError: + raise NotImplementedError diff --git a/mapillary_tools/geotag/video_extractors/exiftool.py b/mapillary_tools/geotag/video_extractors/exiftool.py new file mode 100644 index 000000000..bb51863a5 --- /dev/null +++ b/mapillary_tools/geotag/video_extractors/exiftool.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import sys +import typing as T +from pathlib import Path +from xml.etree import ElementTree as ET + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + +from ... import exceptions, exiftool_read_video, geo, telemetry, types, utils +from ...gpmf import gpmf_gps_filter +from .base import BaseVideoExtractor + + +class VideoExifToolExtractor(BaseVideoExtractor): + def __init__(self, video_path: Path, element: ET.Element): + super().__init__(video_path) + self.element = element + + @override + def extract(self) -> types.VideoMetadataOrError: + exif = exiftool_read_video.ExifToolReadVideo(ET.ElementTree(self.element)) + + make = exif.extract_make() + model = exif.extract_model() + + is_gopro = make is not None and make.upper() in ["GOPRO"] + + points = exif.extract_gps_track() + + # ExifTool has no idea if GPS is not found or found but empty + if is_gopro: + if not points: + raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") + + # ExifTool (since 13.04) converts GPSSpeed for GoPro to km/h, so here we convert it back to m/s + for p in points: + if isinstance(p, telemetry.GPSPoint) and p.ground_speed is not None: + p.ground_speed = p.ground_speed / 3.6 + + if isinstance(points[0], telemetry.GPSPoint): + points = T.cast( + T.List[geo.Point], + gpmf_gps_filter.remove_noisy_points( + T.cast(T.List[telemetry.GPSPoint], points) + ), + ) + if not points: + raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") + + if not points: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" + ) + + filetype = types.FileType.GOPRO if is_gopro else types.FileType.VIDEO + + video_metadata = types.VideoMetadata( + self.video_path, + filesize=utils.get_file_size(self.video_path), + filetype=filetype, + points=points, + make=make, + model=model, + ) + + return video_metadata diff --git a/mapillary_tools/geotag/video_extractors/gpx.py b/mapillary_tools/geotag/video_extractors/gpx.py new file mode 100644 index 000000000..560fa4294 --- /dev/null +++ b/mapillary_tools/geotag/video_extractors/gpx.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import dataclasses +import datetime +import logging +import sys +import typing as T +from pathlib import Path + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + +from ... import geo, telemetry, types +from ..utils import parse_gpx +from .base import BaseVideoExtractor +from .native import NativeVideoExtractor + + +LOG = logging.getLogger(__name__) + + +class GPXVideoExtractor(BaseVideoExtractor): + def __init__(self, video_path: Path, gpx_path: Path): + self.video_path = video_path + self.gpx_path = gpx_path + + @override + def extract(self) -> types.VideoMetadataOrError: + try: + gpx_tracks = parse_gpx(self.gpx_path) + except Exception as ex: + raise RuntimeError( + f"Error parsing GPX {self.gpx_path}: {ex.__class__.__name__}: {ex}" + ) + + if 1 < len(gpx_tracks): + LOG.warning( + "Found %s tracks in the GPX file %s. Will merge points in all the tracks as a single track for interpolation", + len(gpx_tracks), + self.gpx_path, + ) + + gpx_points: T.Sequence[geo.Point] = sum(gpx_tracks, []) + + native_extractor = NativeVideoExtractor(self.video_path) + + video_metadata_or_error = native_extractor.extract() + + if isinstance(video_metadata_or_error, types.ErrorMetadata): + self._rebase_times(gpx_points) + return types.VideoMetadata( + filename=video_metadata_or_error.filename, + filetype=video_metadata_or_error.filetype or types.FileType.VIDEO, + points=gpx_points, + ) + + video_metadata = video_metadata_or_error + + offset = self._synx_gpx_by_first_gps_timestamp( + gpx_points, video_metadata.points + ) + + self._rebase_times(gpx_points, offset=offset) + + return dataclasses.replace(video_metadata_or_error, points=gpx_points) + + @staticmethod + def _rebase_times(points: T.Sequence[geo.Point], offset: float = 0.0): + """ + Make point times start from 0 + """ + if points: + first_timestamp = points[0].time + for p in points: + p.time = (p.time - first_timestamp) + offset + return points + + def _synx_gpx_by_first_gps_timestamp( + self, gpx_points: T.Sequence[geo.Point], video_gps_points: T.Sequence[geo.Point] + ) -> float: + offset: float = 0.0 + + if not gpx_points: + return offset + + first_gpx_dt = datetime.datetime.fromtimestamp( + gpx_points[0].time, tz=datetime.timezone.utc + ) + LOG.info("First GPX timestamp: %s", first_gpx_dt) + + if not video_gps_points: + LOG.warning( + "Skip GPX synchronization because no GPS found in video %s", + self.video_path, + ) + return offset + + first_gps_point = video_gps_points[0] + if isinstance(first_gps_point, telemetry.GPSPoint): + if first_gps_point.epoch_time is not None: + first_gps_dt = datetime.datetime.fromtimestamp( + first_gps_point.epoch_time, tz=datetime.timezone.utc + ) + LOG.info("First GPS timestamp: %s", first_gps_dt) + offset = gpx_points[0].time - first_gps_point.epoch_time + if offset: + LOG.warning( + "Found offset between GPX %s and video GPS timestamps %s: %s seconds", + first_gpx_dt, + first_gps_dt, + offset, + ) + else: + LOG.info( + "GPX and GPS are perfectly synchronized (all starts from %s)", + first_gpx_dt, + ) + else: + LOG.warning( + "Skip GPX synchronization because no GPS epoch time found in video %s", + self.video_path, + ) + + return offset diff --git a/mapillary_tools/geotag/video_extractors/native.py b/mapillary_tools/geotag/video_extractors/native.py new file mode 100644 index 000000000..b30d3160e --- /dev/null +++ b/mapillary_tools/geotag/video_extractors/native.py @@ -0,0 +1,157 @@ +from __future__ import annotations + +import sys +import typing as T +from pathlib import Path + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + +from ... import blackvue_parser, exceptions, geo, telemetry, types, utils +from ...camm import camm_parser +from ...gpmf import gpmf_gps_filter, gpmf_parser +from .base import BaseVideoExtractor + + +class GoProVideoExtractor(BaseVideoExtractor): + @override + def extract(self) -> types.VideoMetadataOrError: + with self.video_path.open("rb") as fp: + gopro_info = gpmf_parser.extract_gopro_info(fp) + + if gopro_info is None: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" + ) + + gps_points = gopro_info.gps + assert gps_points is not None, "must have GPS data extracted" + if not gps_points: + # Instead of raising an exception, return error metadata to tell the file type + ex: exceptions.MapillaryDescriptionError = ( + exceptions.MapillaryGPXEmptyError("Empty GPS data found") + ) + return types.describe_error_metadata( + ex, self.video_path, filetype=types.FileType.GOPRO + ) + + gps_points = T.cast( + T.List[telemetry.GPSPoint], gpmf_gps_filter.remove_noisy_points(gps_points) + ) + if not gps_points: + # Instead of raising an exception, return error metadata to tell the file type + ex = exceptions.MapillaryGPSNoiseError("GPS is too noisy") + return types.describe_error_metadata( + ex, self.video_path, filetype=types.FileType.GOPRO + ) + + video_metadata = types.VideoMetadata( + filename=self.video_path, + filesize=utils.get_file_size(self.video_path), + filetype=types.FileType.GOPRO, + points=T.cast(T.List[geo.Point], gps_points), + make=gopro_info.make, + model=gopro_info.model, + ) + + return video_metadata + + +class CAMMVideoExtractor(BaseVideoExtractor): + @override + def extract(self) -> types.VideoMetadataOrError: + with self.video_path.open("rb") as fp: + camm_info = camm_parser.extract_camm_info(fp) + + if camm_info is None: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" + ) + + if not camm_info.gps and not camm_info.mini_gps: + # Instead of raising an exception, return error metadata to tell the file type + ex: exceptions.MapillaryDescriptionError = ( + exceptions.MapillaryGPXEmptyError("Empty GPS data found") + ) + return types.describe_error_metadata( + ex, self.video_path, filetype=types.FileType.CAMM + ) + + return types.VideoMetadata( + filename=self.video_path, + filesize=utils.get_file_size(self.video_path), + filetype=types.FileType.CAMM, + points=T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps), + make=camm_info.make, + model=camm_info.model, + ) + + +class BlackVueVideoExtractor(BaseVideoExtractor): + @override + def extract(self) -> types.VideoMetadataOrError: + with self.video_path.open("rb") as fp: + blackvue_info = blackvue_parser.extract_blackvue_info(fp) + + if blackvue_info is None: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" + ) + + if not blackvue_info.gps: + # Instead of raising an exception, return error metadata to tell the file type + ex: exceptions.MapillaryDescriptionError = ( + exceptions.MapillaryGPXEmptyError("Empty GPS data found") + ) + return types.describe_error_metadata( + ex, self.video_path, filetype=types.FileType.BLACKVUE + ) + + video_metadata = types.VideoMetadata( + filename=self.video_path, + filesize=utils.get_file_size(self.video_path), + filetype=types.FileType.BLACKVUE, + points=blackvue_info.gps or [], + make=blackvue_info.make, + model=blackvue_info.model, + ) + + return video_metadata + + +class NativeVideoExtractor(BaseVideoExtractor): + def __init__(self, video_path: Path, filetypes: set[types.FileType] | None = None): + super().__init__(video_path) + self.filetypes = filetypes + + @override + def extract(self) -> types.VideoMetadataOrError: + ft = self.filetypes + extractor: BaseVideoExtractor + + if ft is None or types.FileType.VIDEO in ft or types.FileType.GOPRO in ft: + extractor = GoProVideoExtractor(self.video_path) + try: + return extractor.extract() + except exceptions.MapillaryVideoGPSNotFoundError: + pass + + if ft is None or types.FileType.VIDEO in ft or types.FileType.CAMM in ft: + extractor = CAMMVideoExtractor(self.video_path) + try: + return extractor.extract() + except exceptions.MapillaryVideoGPSNotFoundError: + pass + + if ft is None or types.FileType.VIDEO in ft or types.FileType.BLACKVUE in ft: + extractor = BlackVueVideoExtractor(self.video_path) + try: + return extractor.extract() + except exceptions.MapillaryVideoGPSNotFoundError: + pass + + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" + ) diff --git a/mapillary_tools/sample_video.py b/mapillary_tools/sample_video.py index 7a0cd5fc2..527d1f581 100644 --- a/mapillary_tools/sample_video.py +++ b/mapillary_tools/sample_video.py @@ -290,9 +290,9 @@ def _sample_single_video_by_distance( LOG.info("Extracting video metdata") - video_metadatas = geotag_videos_from_video.GeotagVideosFromVideo( + video_metadatas = geotag_videos_from_video.GeotagVideosFromVideo().to_description( [video_path] - ).to_description() + ) assert len(video_metadatas) == 1, "expect 1 video metadata" video_metadata = video_metadatas[0] if isinstance(video_metadata, types.ErrorMetadata): diff --git a/setup.py b/setup.py index df3aa5a59..c67fd8830 100644 --- a/setup.py +++ b/setup.py @@ -47,6 +47,8 @@ def readme(): "mapillary_tools.camm", "mapillary_tools.commands", "mapillary_tools.geotag", + "mapillary_tools.geotag.image_extractors", + "mapillary_tools.geotag.video_extractors", "mapillary_tools.gpmf", "mapillary_tools.mp4", ],