diff --git a/mapillary_tools/geotag/geotag_from_generic.py b/mapillary_tools/geotag/geotag_from_generic.py index 0b90aa64f..90930a2c2 100644 --- a/mapillary_tools/geotag/geotag_from_generic.py +++ b/mapillary_tools/geotag/geotag_from_generic.py @@ -1,22 +1,163 @@ +from __future__ import annotations + import abc +import logging import typing as T +from pathlib import Path + +from tqdm import tqdm + +from .. import exceptions, types, utils + + +LOG = logging.getLogger(__name__) + + +class GenericImageExtractor(abc.ABC): + """ + Extracts metadata from an image file. + """ + + def __init__(self, image_path: Path): + self.image_path = image_path + + def extract(self) -> types.ImageMetadataOrError: + raise NotImplementedError + + +TImageExtractor = T.TypeVar("TImageExtractor", bound=GenericImageExtractor) + + +class GeotagImagesFromGeneric(abc.ABC, T.Generic[TImageExtractor]): + """ + Extracts metadata from a list of image files with multiprocessing. + """ + + def __init__( + self, image_paths: T.Sequence[Path], num_processes: int | None + ) -> None: + self.image_paths = image_paths + self.num_processes = num_processes + + def to_description(self) -> list[types.ImageMetadataOrError]: + extractor_or_errors = self._generate_image_extractors() + + assert len(extractor_or_errors) == len(self.image_paths) + + extractors, error_metadatas = types.separate_errors(extractor_or_errors) -from .. import types + map_results = utils.mp_map_maybe( + self.run_extraction, + extractors, + num_processes=self.num_processes, + ) + results = list( + tqdm( + map_results, + desc="Extracting images", + unit="images", + disable=LOG.getEffectiveLevel() <= logging.DEBUG, + total=len(extractors), + ) + ) -class GeotagImagesFromGeneric(abc.ABC): - def __init__(self) -> None: - pass + return results + error_metadatas - @abc.abstractmethod - def to_description(self) -> T.List[types.ImageMetadataOrError]: + def _generate_image_extractors( + self, + ) -> T.Sequence[TImageExtractor | types.ErrorMetadata]: raise NotImplementedError + # This method is passed to multiprocessing + # so it has to be classmethod or staticmethod to avoid pickling the instance + @classmethod + def run_extraction(cls, extractor: TImageExtractor) -> types.ImageMetadataOrError: + image_path = extractor.image_path -class GeotagVideosFromGeneric(abc.ABC): - def __init__(self) -> None: - pass + try: + return extractor.extract() + except exceptions.MapillaryDescriptionError as ex: + return types.describe_error_metadata( + ex, image_path, filetype=types.FileType.IMAGE + ) + except Exception as ex: + LOG.exception("Unexpected error extracting metadata from %s", image_path) + return types.describe_error_metadata( + ex, image_path, filetype=types.FileType.IMAGE + ) - @abc.abstractmethod - def to_description(self) -> T.List[types.VideoMetadataOrError]: + +class GenericVideoExtractor(abc.ABC): + """ + Extracts metadata from a video file. + """ + + def __init__(self, video_path: Path): + self.video_path = video_path + + def extract(self) -> types.VideoMetadataOrError: + raise NotImplementedError + + +TVideoExtractor = T.TypeVar("TVideoExtractor", bound=GenericVideoExtractor) + + +class GeotagVideosFromGeneric(abc.ABC, T.Generic[TVideoExtractor]): + """ + Extracts metadata from a list of video files with multiprocessing. + """ + + def __init__( + self, video_paths: T.Sequence[Path], num_processes: int | None + ) -> None: + self.video_paths = video_paths + self.num_processes = num_processes + + def to_description(self) -> list[types.VideoMetadataOrError]: + extractor_or_errors = self._generate_video_extractors() + + assert len(extractor_or_errors) == len(self.video_paths) + + extractors, error_metadatas = types.separate_errors(extractor_or_errors) + + map_results = utils.mp_map_maybe( + self.run_extraction, + extractors, + num_processes=self.num_processes, + ) + + results = list( + tqdm( + map_results, + desc="Extracting videos", + unit="videos", + disable=LOG.getEffectiveLevel() <= logging.DEBUG, + total=len(extractors), + ) + ) + + return results + error_metadatas + + def _generate_video_extractors( + self, + ) -> T.Sequence[TVideoExtractor | types.ErrorMetadata]: raise NotImplementedError + + # This method is passed to multiprocessing + # so it has to be classmethod or staticmethod to avoid pickling the instance + @classmethod + def run_extraction(cls, extractor: TVideoExtractor) -> types.VideoMetadataOrError: + video_path = extractor.video_path + + try: + return extractor.extract() + except exceptions.MapillaryDescriptionError as ex: + return types.describe_error_metadata( + ex, video_path, filetype=types.FileType.VIDEO + ) + except Exception as ex: + LOG.exception("Unexpected error extracting metadata from %s", video_path) + return types.describe_error_metadata( + ex, video_path, filetype=types.FileType.VIDEO + ) diff --git a/mapillary_tools/geotag/geotag_images_from_exif.py b/mapillary_tools/geotag/geotag_images_from_exif.py index 52f51cf71..7fd0c7c93 100644 --- a/mapillary_tools/geotag/geotag_images_from_exif.py +++ b/mapillary_tools/geotag/geotag_images_from_exif.py @@ -1,104 +1,60 @@ +import contextlib import logging import typing as T -from multiprocessing import Pool from pathlib import Path -from tqdm import tqdm - from .. import exceptions, geo, types, utils from ..exif_read import ExifRead, ExifReadABC -from .geotag_from_generic import GeotagImagesFromGeneric +from .geotag_from_generic import GenericImageExtractor, GeotagImagesFromGeneric LOG = logging.getLogger(__name__) -class GeotagImagesFromEXIF(GeotagImagesFromGeneric): - def __init__( - self, image_paths: T.Sequence[Path], num_processes: T.Optional[int] = None - ): - self.image_paths = image_paths - self.num_processes = num_processes - super().__init__() - - @staticmethod - def build_image_metadata( - image_path: Path, exif: ExifReadABC, skip_lonlat_error: bool = False - ) -> types.ImageMetadata: - lonlat = exif.extract_lon_lat() - if lonlat is None: - if not skip_lonlat_error: +class ImageEXIFExtractor(GenericImageExtractor): + def __init__(self, image_path: Path, skip_lonlat_error: bool = False): + super().__init__(image_path) + self.skip_lonlat_error = skip_lonlat_error + + @contextlib.contextmanager + def _exif_context(self) -> T.Generator[ExifReadABC, None, None]: + with self.image_path.open("rb") as fp: + yield ExifRead(fp) + + def extract(self) -> types.ImageMetadata: + with self._exif_context() as exif: + lonlat = exif.extract_lon_lat() + if lonlat is None: + if not self.skip_lonlat_error: + raise exceptions.MapillaryGeoTaggingError( + "Unable to extract GPS Longitude or GPS Latitude from the image" + ) + lonlat = (0.0, 0.0) + lon, lat = lonlat + + capture_time = exif.extract_capture_time() + if capture_time is None: raise exceptions.MapillaryGeoTaggingError( - "Unable to extract GPS Longitude or GPS Latitude from the image" + "Unable to extract timestamp from the image" ) - lonlat = (0.0, 0.0) - lon, lat = lonlat - - capture_time = exif.extract_capture_time() - if capture_time is None: - raise exceptions.MapillaryGeoTaggingError( - "Unable to extract timestamp from the image" - ) - - image_metadata = types.ImageMetadata( - filename=image_path, - filesize=utils.get_file_size(image_path), - time=geo.as_unix_time(capture_time), - lat=lat, - lon=lon, - alt=exif.extract_altitude(), - angle=exif.extract_direction(), - width=exif.extract_width(), - height=exif.extract_height(), - MAPOrientation=exif.extract_orientation(), - MAPDeviceMake=exif.extract_make(), - MAPDeviceModel=exif.extract_model(), - ) - - return image_metadata - @staticmethod - def geotag_image( - image_path: Path, skip_lonlat_error: bool = False - ) -> types.ImageMetadataOrError: - try: - with image_path.open("rb") as fp: - exif = ExifRead(fp) - image_metadata = GeotagImagesFromEXIF.build_image_metadata( - image_path, exif, skip_lonlat_error=skip_lonlat_error - ) - except Exception as ex: - return types.describe_error_metadata( - ex, image_path, filetype=types.FileType.IMAGE + image_metadata = types.ImageMetadata( + filename=self.image_path, + filesize=utils.get_file_size(self.image_path), + time=geo.as_unix_time(capture_time), + lat=lat, + lon=lon, + alt=exif.extract_altitude(), + angle=exif.extract_direction(), + width=exif.extract_width(), + height=exif.extract_height(), + MAPOrientation=exif.extract_orientation(), + MAPDeviceMake=exif.extract_make(), + MAPDeviceModel=exif.extract_model(), ) return image_metadata - def to_description(self) -> T.List[types.ImageMetadataOrError]: - if self.num_processes is None: - num_processes = self.num_processes - disable_multiprocessing = False - else: - num_processes = max(self.num_processes, 1) - disable_multiprocessing = self.num_processes <= 0 - with Pool(processes=num_processes) as pool: - image_metadatas_iter: T.Iterator[types.ImageMetadataOrError] - if disable_multiprocessing: - image_metadatas_iter = map( - GeotagImagesFromEXIF.geotag_image, - self.image_paths, - ) - else: - image_metadatas_iter = pool.imap( - GeotagImagesFromEXIF.geotag_image, - self.image_paths, - ) - return list( - tqdm( - image_metadatas_iter, - desc="Extracting geotags from images", - unit="images", - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - total=len(self.image_paths), - ) - ) +class GeotagImagesFromEXIF(GeotagImagesFromGeneric): + def _generate_image_extractors(self) -> T.Sequence[ImageEXIFExtractor]: + return [ImageEXIFExtractor(path) for path in self.image_paths] diff --git a/mapillary_tools/geotag/geotag_images_from_exiftool.py b/mapillary_tools/geotag/geotag_images_from_exiftool.py index 95061e17c..a9bbb9915 100644 --- a/mapillary_tools/geotag/geotag_images_from_exiftool.py +++ b/mapillary_tools/geotag/geotag_images_from_exiftool.py @@ -1,54 +1,47 @@ +from __future__ import annotations + +import contextlib import logging import typing as T import xml.etree.ElementTree as ET -from multiprocessing import Pool from pathlib import Path -from tqdm import tqdm - from .. import exceptions, exiftool_read, types from .geotag_from_generic import GeotagImagesFromGeneric -from .geotag_images_from_exif import GeotagImagesFromEXIF +from .geotag_images_from_exif import ImageEXIFExtractor LOG = logging.getLogger(__name__) +class ImageExifToolExtractor(ImageEXIFExtractor): + def __init__(self, image_path: Path, element: ET.Element): + super().__init__(image_path) + self.element = element + + @contextlib.contextmanager + def _exif_context(self): + yield exiftool_read.ExifToolRead(ET.ElementTree(self.element)) + + class GeotagImagesFromExifTool(GeotagImagesFromGeneric): def __init__( self, image_paths: T.Sequence[Path], xml_path: Path, - num_processes: T.Optional[int] = None, + num_processes: int | None = None, ): - self.image_paths = image_paths self.xml_path = xml_path - self.num_processes = num_processes - super().__init__() - - @staticmethod - def geotag_image(element: ET.Element) -> types.ImageMetadataOrError: - image_path = exiftool_read.find_rdf_description_path(element) - assert image_path is not None, "must find the path from the element" - - try: - exif = exiftool_read.ExifToolRead(ET.ElementTree(element)) - image_metadata = GeotagImagesFromEXIF.build_image_metadata( - image_path, exif, skip_lonlat_error=False - ) - except Exception as ex: - return types.describe_error_metadata( - ex, image_path, filetype=types.FileType.IMAGE - ) - - return image_metadata + super().__init__(image_paths=image_paths, num_processes=num_processes) - def to_description(self) -> T.List[types.ImageMetadataOrError]: + def _generate_image_extractors( + self, + ) -> T.Sequence[ImageExifToolExtractor | types.ErrorMetadata]: rdf_description_by_path = exiftool_read.index_rdf_description_by_path( [self.xml_path] ) - error_metadatas: T.List[types.ErrorMetadata] = [] - rdf_descriptions: T.List[ET.Element] = [] + results: T.List[ImageExifToolExtractor | types.ErrorMetadata] = [] + for path in self.image_paths: rdf_description = rdf_description_by_path.get( exiftool_read.canonical_path(path) @@ -57,41 +50,12 @@ def to_description(self) -> T.List[types.ImageMetadataOrError]: exc = exceptions.MapillaryEXIFNotFoundError( f"The {exiftool_read._DESCRIPTION_TAG} XML element for the image not found" ) - error_metadatas.append( + results.append( types.describe_error_metadata( exc, path, filetype=types.FileType.IMAGE ) ) else: - rdf_descriptions.append(rdf_description) - - if self.num_processes is None: - num_processes = self.num_processes - disable_multiprocessing = False - else: - num_processes = max(self.num_processes, 1) - disable_multiprocessing = self.num_processes <= 0 - - with Pool(processes=num_processes) as pool: - image_metadatas_iter: T.Iterator[types.ImageMetadataOrError] - if disable_multiprocessing: - image_metadatas_iter = map( - GeotagImagesFromExifTool.geotag_image, - rdf_descriptions, - ) - else: - image_metadatas_iter = pool.imap( - GeotagImagesFromExifTool.geotag_image, - rdf_descriptions, - ) - image_metadata_or_errors = list( - tqdm( - image_metadatas_iter, - desc="Extracting geotags from ExifTool XML", - unit="images", - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - total=len(self.image_paths), - ) - ) + results.append(ImageExifToolExtractor(path, rdf_description)) - return error_metadatas + image_metadata_or_errors + return results diff --git a/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py b/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py index c0837ade0..6c901d253 100644 --- a/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py +++ b/mapillary_tools/geotag/geotag_images_from_exiftool_both_image_and_video.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import typing as T from pathlib import Path @@ -22,26 +24,12 @@ def __init__( offset_time: float = 0.0, num_processes: T.Optional[int] = None, ): - self.image_paths = image_paths + super().__init__(image_paths, num_processes=num_processes) self.xml_path = xml_path self.offset_time = offset_time - self.num_processes = num_processes - super().__init__() - - def to_description(self) -> T.List[types.ImageMetadataOrError]: - # will return this list - final_image_metadatas: T.List[types.ImageMetadataOrError] = [] - # find the images that can be geotagged from EXIF - image_metadatas_from_exiftool = ( - geotag_images_from_exiftool.GeotagImagesFromExifTool( - self.image_paths, - self.xml_path, - num_processes=self.num_processes, - ).to_description() - ) - - # find all video paths in self.xml_path + def geotag_samples(self) -> list[types.ImageMetadataOrError]: + # Find all video paths in self.xml_path rdf_description_by_path = exiftool_read.index_rdf_description_by_path( [self.xml_path] ) @@ -49,45 +37,41 @@ def to_description(self) -> T.List[types.ImageMetadataOrError]: [Path(pathstr) for pathstr in rdf_description_by_path.keys()], skip_subfolders=True, ) + # Find all video paths that have sample images + samples_by_video = utils.find_all_image_samples(self.image_paths, video_paths) - # will try to geotag these error metadatas from video later - error_metadata_by_image_path = {} - for image_metadata in image_metadatas_from_exiftool: - if isinstance(image_metadata, types.ErrorMetadata): - error_metadata_by_image_path[image_metadata.filename] = image_metadata - else: - final_image_metadatas.append(image_metadata) - - maybe_image_samples = list(error_metadata_by_image_path.keys()) - - # find all video paths that have sample images - video_paths_with_image_samples = list( - utils.find_all_image_samples(maybe_image_samples, video_paths).keys() - ) - - video_metadatas = ( + video_metadata_or_errors = ( geotag_videos_from_exiftool_video.GeotagVideosFromExifToolVideo( - video_paths_with_image_samples, + list(samples_by_video.keys()), self.xml_path, num_processes=self.num_processes, ).to_description() ) - - image_metadatas_from_video = geotag_images_from_video.GeotagImagesFromVideo( - maybe_image_samples, - video_metadatas, + sample_paths = sum(samples_by_video.values(), []) + sample_metadata_or_errors = geotag_images_from_video.GeotagImagesFromVideo( + sample_paths, + video_metadata_or_errors, offset_time=self.offset_time, num_processes=self.num_processes, ).to_description() - final_image_metadatas.extend(image_metadatas_from_video) - # add back error metadatas that can not be geotagged at all - actual_image_sample_paths = set( - image_metadata.filename for image_metadata in image_metadatas_from_video + return sample_metadata_or_errors + + def to_description(self) -> list[types.ImageMetadataOrError]: + sample_metadata_or_errors = self.geotag_samples() + + sample_paths = set(metadata.filename for metadata in sample_metadata_or_errors) + + non_sample_paths = [ + path for path in self.image_paths if path not in sample_paths + ] + + non_sample_metadata_or_errors = ( + geotag_images_from_exiftool.GeotagImagesFromExifTool( + non_sample_paths, + self.xml_path, + num_processes=self.num_processes, + ).to_description() ) - for path, error_metadata in error_metadata_by_image_path.items(): - if path not in actual_image_sample_paths: - final_image_metadatas.append(error_metadata) - assert len(final_image_metadatas) <= len(self.image_paths) - return final_image_metadatas + return sample_metadata_or_errors + non_sample_metadata_or_errors diff --git a/mapillary_tools/geotag/geotag_images_from_gpx.py b/mapillary_tools/geotag/geotag_images_from_gpx.py index 594a8bc1e..ac06d90b3 100644 --- a/mapillary_tools/geotag/geotag_images_from_gpx.py +++ b/mapillary_tools/geotag/geotag_images_from_gpx.py @@ -1,12 +1,13 @@ +from __future__ import annotations + import dataclasses import logging import typing as T -from multiprocessing import Pool from pathlib import Path from .. import exceptions, geo, types from .geotag_from_generic import GeotagImagesFromGeneric -from .geotag_images_from_exif import GeotagImagesFromEXIF +from .geotag_images_from_exif import ImageEXIFExtractor LOG = logging.getLogger(__name__) @@ -20,41 +21,19 @@ def __init__( use_gpx_start_time: bool = False, use_image_start_time: bool = False, offset_time: float = 0.0, - num_processes: T.Optional[int] = None, + num_processes: int | None = None, ): - super().__init__() - self.image_paths = image_paths + super().__init__(image_paths, num_processes=num_processes) self.points = points self.use_gpx_start_time = use_gpx_start_time self.use_image_start_time = use_image_start_time self.offset_time = offset_time - self.num_processes = num_processes - - @staticmethod - def geotag_image(image_path: Path) -> types.ImageMetadataOrError: - return GeotagImagesFromEXIF.geotag_image(image_path, skip_lonlat_error=True) - - def geotag_multiple_images( - self, image_paths: T.Sequence[Path] - ) -> T.List[types.ImageMetadataOrError]: - if self.num_processes is None: - num_processes = self.num_processes - disable_multiprocessing = False - else: - num_processes = max(self.num_processes, 1) - disable_multiprocessing = self.num_processes <= 0 - - if disable_multiprocessing: - return list(map(GeotagImagesFromGPX.geotag_image, image_paths)) - else: - with Pool(processes=num_processes) as pool: - return pool.map(GeotagImagesFromGPX.geotag_image, image_paths) def _interpolate_image_metadata_along( self, image_metadata: types.ImageMetadata, sorted_points: T.Sequence[geo.Point], - ) -> types.ImageMetadataOrError: + ) -> types.ImageMetadata: assert sorted_points, "must have at least one point" if image_metadata.time < sorted_points[0].time: @@ -63,15 +42,12 @@ def _interpolate_image_metadata_along( gpx_end_time = types.datetime_to_map_capture_time(sorted_points[-1].time) # with the tolerance of 1ms if 0.001 < delta: - exc = exceptions.MapillaryOutsideGPXTrackError( + raise exceptions.MapillaryOutsideGPXTrackError( f"The image date time is {round(delta, 3)} seconds behind the GPX start point", image_time=types.datetime_to_map_capture_time(image_metadata.time), gpx_start_time=gpx_start_time, gpx_end_time=gpx_end_time, ) - return types.describe_error_metadata( - exc, image_metadata.filename, filetype=types.FileType.IMAGE - ) if sorted_points[-1].time < image_metadata.time: delta = image_metadata.time - sorted_points[-1].time @@ -79,15 +55,12 @@ def _interpolate_image_metadata_along( gpx_end_time = types.datetime_to_map_capture_time(sorted_points[-1].time) # with the tolerance of 1ms if 0.001 < delta: - exc = exceptions.MapillaryOutsideGPXTrackError( + raise exceptions.MapillaryOutsideGPXTrackError( f"The image time is {round(delta, 3)} seconds beyond the GPX end point", image_time=types.datetime_to_map_capture_time(image_metadata.time), gpx_start_time=gpx_start_time, gpx_end_time=gpx_end_time, ) - return types.describe_error_metadata( - exc, image_metadata.filename, filetype=types.FileType.IMAGE - ) interpolated = geo.interpolate(sorted_points, image_metadata.time) @@ -100,34 +73,25 @@ def _interpolate_image_metadata_along( time=interpolated.time, ) - def to_description(self) -> T.List[types.ImageMetadataOrError]: - metadatas: T.List[types.ImageMetadataOrError] = [] - - if not self.points: - exc = exceptions.MapillaryGPXEmptyError( - "Empty GPS extracted from the geotag source" - ) - for image_path in self.image_paths: - metadatas.append( - types.describe_error_metadata( - exc, image_path, filetype=types.FileType.IMAGE - ), - ) - assert len(self.image_paths) == len(metadatas) - return metadatas + def _generate_image_extractors(self) -> T.Sequence[ImageEXIFExtractor]: + return [ + ImageEXIFExtractor(path, skip_lonlat_error=True) + for path in self.image_paths + ] + + def to_description(self) -> list[types.ImageMetadataOrError]: + final_metadatas: list[types.ImageMetadataOrError] = [] - image_metadata_or_errors = self.geotag_multiple_images(self.image_paths) + image_metadata_or_errors = super().to_description() - image_metadatas = [] - for metadata_or_error in image_metadata_or_errors: - if isinstance(metadata_or_error, types.ErrorMetadata): - metadatas.append(metadata_or_error) - else: - image_metadatas.append(metadata_or_error) + image_metadatas, error_metadatas = types.separate_errors( + image_metadata_or_errors + ) + final_metadatas.extend(error_metadatas) if not image_metadatas: - assert len(self.image_paths) == len(metadatas) - return metadatas + assert len(self.image_paths) == len(final_metadatas) + return final_metadatas # Do not use point itself for comparison because point.angle or point.alt could be None # when you compare nonnull value with None, it will throw @@ -162,64 +126,25 @@ def to_description(self) -> T.List[types.ImageMetadataOrError]: LOG.debug("GPX start time delta: %s", time_delta) image_time_offset += time_delta - LOG.debug("Final time offset for interpolation: %s", image_time_offset) + if image_time_offset: + LOG.debug("Final time offset for interpolation: %s", image_time_offset) + for image_metadata in sorted_image_metadatas: + # TODO: this time modification seems to affect final capture times + image_metadata.time += image_time_offset for image_metadata in sorted_image_metadatas: - image_metadata.time += image_time_offset - metadatas.append( - self._interpolate_image_metadata_along(image_metadata, sorted_points) - ) - - assert len(self.image_paths) == len(metadatas) - return metadatas - - -class GeotagImagesFromGPXWithProgress(GeotagImagesFromGPX): - def __init__( - self, - image_paths: T.Sequence[Path], - points: T.Sequence[geo.Point], - use_gpx_start_time: bool = False, - use_image_start_time: bool = False, - offset_time: float = 0.0, - num_processes: T.Optional[int] = None, - progress_bar=None, - ) -> None: - super().__init__( - image_paths, - points, - use_gpx_start_time=use_gpx_start_time, - use_image_start_time=use_image_start_time, - offset_time=offset_time, - num_processes=num_processes, - ) - self._progress_bar = progress_bar - - def geotag_multiple_images( - self, image_paths: T.Sequence[Path] - ) -> T.List[types.ImageMetadataOrError]: - if self._progress_bar is None: - return super().geotag_multiple_images(image_paths) - - if self.num_processes is None: - num_processes = self.num_processes - disable_multiprocessing = False - else: - num_processes = max(self.num_processes, 1) - disable_multiprocessing = self.num_processes <= 0 - - output = [] - with Pool(processes=num_processes) as pool: - image_metadatas_iter: T.Iterator[types.ImageMetadataOrError] - if disable_multiprocessing: - image_metadatas_iter = map( - GeotagImagesFromGPX.geotag_image, image_paths + try: + final_metadatas.append( + self._interpolate_image_metadata_along( + image_metadata, sorted_points + ) ) - else: - image_metadatas_iter = pool.imap( - GeotagImagesFromGPX.geotag_image, image_paths + except exceptions.MapillaryOutsideGPXTrackError as ex: + error_metadata = types.describe_error_metadata( + ex, image_metadata.filename, filetype=types.FileType.IMAGE ) - for image_metadata_or_error in image_metadatas_iter: - self._progress_bar.update(1) - output.append(image_metadata_or_error) - return output + final_metadatas.append(error_metadata) + + assert len(self.image_paths) == len(final_metadatas) + + return final_metadatas diff --git a/mapillary_tools/geotag/geotag_images_from_gpx_file.py b/mapillary_tools/geotag/geotag_images_from_gpx_file.py index f563a6d3e..286f45de0 100644 --- a/mapillary_tools/geotag/geotag_images_from_gpx_file.py +++ b/mapillary_tools/geotag/geotag_images_from_gpx_file.py @@ -1,21 +1,17 @@ -import dataclasses import logging import typing as T -from multiprocessing import Pool from pathlib import Path import gpxpy -from tqdm import tqdm -from .. import exif_read, geo, types -from .geotag_from_generic import GeotagImagesFromGeneric -from .geotag_images_from_gpx import GeotagImagesFromGPXWithProgress +from .. import geo +from .geotag_images_from_gpx import GeotagImagesFromGPX LOG = logging.getLogger(__name__) -class GeotagImagesFromGPXFile(GeotagImagesFromGeneric): +class GeotagImagesFromGPXFile(GeotagImagesFromGPX): def __init__( self, image_paths: T.Sequence[Path], @@ -24,7 +20,6 @@ def __init__( offset_time: float = 0.0, num_processes: T.Optional[int] = None, ): - super().__init__() try: tracks = parse_gpx(source_path) except Exception as ex: @@ -38,91 +33,13 @@ def __init__( len(tracks), source_path, ) - self.points: T.List[geo.Point] = sum(tracks, []) - self.image_paths = image_paths - self.source_path = source_path - self.use_gpx_start_time = use_gpx_start_time - self.offset_time = offset_time - self.num_processes = num_processes - - @staticmethod - def _extract_image_metadata( - image_metadata: types.ImageMetadata, - ) -> types.ImageMetadataOrError: - try: - exif = exif_read.ExifRead(image_metadata.filename) - orientation = exif.extract_orientation() - make = exif.extract_make() - model = exif.extract_model() - except Exception as ex: - return types.describe_error_metadata( - ex, image_metadata.filename, filetype=types.FileType.IMAGE - ) - - return dataclasses.replace( - image_metadata, - MAPOrientation=orientation, - MAPDeviceMake=make, - MAPDeviceModel=model, - ) - - def to_description(self) -> T.List[types.ImageMetadataOrError]: - with tqdm( - total=len(self.image_paths), - desc="Interpolating", - unit="images", - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - ) as pbar: - geotag = GeotagImagesFromGPXWithProgress( - self.image_paths, - self.points, - use_gpx_start_time=self.use_gpx_start_time, - offset_time=self.offset_time, - progress_bar=pbar, - ) - image_metadata_or_errors = geotag.to_description() - - image_metadatas: T.List[types.ImageMetadata] = [] - error_metadatas: T.List[types.ErrorMetadata] = [] - for metadata in image_metadata_or_errors: - if isinstance(metadata, types.ErrorMetadata): - error_metadatas.append(metadata) - else: - image_metadatas.append(metadata) - - if self.num_processes is None: - num_processes = self.num_processes - disable_multiprocessing = False - else: - num_processes = max(self.num_processes, 1) - disable_multiprocessing = self.num_processes <= 0 - - with Pool(processes=num_processes) as pool: - image_metadatas_iter: T.Iterator[types.ImageMetadataOrError] - if disable_multiprocessing: - image_metadatas_iter = map( - GeotagImagesFromGPXFile._extract_image_metadata, image_metadatas - ) - else: - # Do not pass error metadatas where the error object can not be pickled for multiprocessing to work - # Otherwise we get: - # TypeError: __init__() missing 3 required positional arguments: 'image_time', 'gpx_start_time', and 'gpx_end_time' - # See https://stackoverflow.com/a/61432070 - image_metadatas_iter = pool.imap( - GeotagImagesFromGPXFile._extract_image_metadata, image_metadatas - ) - image_metadata_or_errors = list( - tqdm( - image_metadatas_iter, - desc="Processing", - unit="images", - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - ) - ) - - return ( - T.cast(T.List[types.ImageMetadataOrError], error_metadatas) - + image_metadata_or_errors + points = sum(tracks, []) + super().__init__( + image_paths, + points, + use_gpx_start_time=use_gpx_start_time, + offset_time=offset_time, + num_processes=num_processes, ) diff --git a/mapillary_tools/geotag/geotag_images_from_video.py b/mapillary_tools/geotag/geotag_images_from_video.py index aa2efc65c..6332b3eee 100644 --- a/mapillary_tools/geotag/geotag_images_from_video.py +++ b/mapillary_tools/geotag/geotag_images_from_video.py @@ -1,13 +1,13 @@ +from __future__ import annotations + import logging import typing as T from pathlib import Path -from tqdm import tqdm - from .. import types, utils from .geotag_from_generic import GeotagImagesFromGeneric -from .geotag_images_from_gpx import GeotagImagesFromGPXWithProgress +from .geotag_images_from_gpx import GeotagImagesFromGPX LOG = logging.getLogger(__name__) @@ -19,69 +19,69 @@ def __init__( image_paths: T.Sequence[Path], video_metadatas: T.Sequence[types.VideoMetadataOrError], offset_time: float = 0.0, - num_processes: T.Optional[int] = None, + num_processes: int | None = None, ): - self.image_paths = image_paths + super().__init__(image_paths, num_processes=num_processes) self.video_metadatas = video_metadatas self.offset_time = offset_time - self.num_processes = num_processes - super().__init__() - def to_description(self) -> T.List[types.ImageMetadataOrError]: - # will return this list - final_image_metadatas: T.List[types.ImageMetadataOrError] = [] + def to_description(self) -> list[types.ImageMetadataOrError]: + # Will return this list + final_image_metadatas: list[types.ImageMetadataOrError] = [] - for video_metadata in self.video_metadatas: + video_metadatas, video_error_metadatas = types.separate_errors( + self.video_metadatas + ) + + for video_error_metadata in video_error_metadatas: + video_path = video_error_metadata.filename + sample_paths = list( + utils.filter_video_samples(self.image_paths, video_path) + ) + LOG.debug( + "Found %d sample images from video %s with error: %s", + len(sample_paths), + video_path, + video_error_metadata.error, + ) + for sample_path in sample_paths: + image_error_metadata = types.describe_error_metadata( + video_error_metadata.error, + sample_path, + filetype=types.FileType.IMAGE, + ) + final_image_metadatas.append(image_error_metadata) + + for video_metadata in video_metadatas: video_path = video_metadata.filename - LOG.debug("Processing video: %s", video_path) - sample_image_paths = list( + sample_paths = list( utils.filter_video_samples(self.image_paths, video_path) ) LOG.debug( "Found %d sample images from video %s", - len(sample_image_paths), + len(sample_paths), video_path, ) - if isinstance(video_metadata, types.ErrorMetadata): - for sample_image_path in sample_image_paths: - error_metadata = types.describe_error_metadata( - video_metadata.error, - sample_image_path, - filetype=types.FileType.IMAGE, - ) - final_image_metadatas.append(error_metadata) - continue - - with tqdm( - total=len(sample_image_paths), - desc=f"Interpolating {video_path.name}", - unit="images", - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - ) as pbar: - image_metadatas = GeotagImagesFromGPXWithProgress( - sample_image_paths, - video_metadata.points, - use_gpx_start_time=False, - use_image_start_time=True, - offset_time=self.offset_time, - num_processes=self.num_processes, - progress_bar=pbar, - ).to_description() - final_image_metadatas.extend(image_metadatas) - - # update make and model - LOG.debug( - 'Found camera make "%s" and model "%s"', - video_metadata.make, - video_metadata.model, + geotag = GeotagImagesFromGPX( + sample_paths, + video_metadata.points, + use_gpx_start_time=False, + use_image_start_time=True, + offset_time=self.offset_time, + num_processes=self.num_processes, ) + + image_metadatas = geotag.to_description() + for metadata in image_metadatas: if isinstance(metadata, types.ImageMetadata): metadata.MAPDeviceMake = video_metadata.make metadata.MAPDeviceModel = video_metadata.model + final_image_metadatas.extend(image_metadatas) + # NOTE: this method only geotags images that have a corresponding video, # so the number of image metadata objects returned might be less than # the number of the input image_paths diff --git a/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py b/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py index 99a405808..bb26cb567 100644 --- a/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_exiftool_video.py @@ -1,52 +1,45 @@ +from __future__ import annotations + import logging import typing as T import xml.etree.ElementTree as ET -from multiprocessing import Pool from pathlib import Path -from tqdm import tqdm - from .. import exceptions, exiftool_read, geo, types, utils from ..exiftool_read_video import ExifToolReadVideo from ..gpmf import gpmf_gps_filter from ..telemetry import GPSPoint -from .geotag_from_generic import GeotagVideosFromGeneric +from .geotag_from_generic import GenericVideoExtractor, GeotagVideosFromGeneric LOG = logging.getLogger(__name__) -_DESCRIPTION_TAG = "rdf:Description" -class GeotagVideosFromExifToolVideo(GeotagVideosFromGeneric): - def __init__( - self, - video_paths: T.Sequence[Path], - xml_path: Path, - num_processes: T.Optional[int] = None, - ): - self.video_paths = video_paths - self.xml_path = xml_path - self.num_processes = num_processes - super().__init__() +class VideoExifToolExtractor(GenericVideoExtractor): + def __init__(self, video_path: Path, element: ET.Element): + super().__init__(video_path) + self.element = element + + def extract(self) -> types.VideoMetadataOrError: + exif = ExifToolReadVideo(ET.ElementTree(self.element)) - @staticmethod - def geotag_video(element: ET.Element) -> types.VideoMetadataOrError: - video_path = exiftool_read.find_rdf_description_path(element) - assert video_path is not None, "must find the path from the element" + make = exif.extract_make() + model = exif.extract_model() - try: - exif = ExifToolReadVideo(ET.ElementTree(element)) + is_gopro = make is not None and make.upper() in ["GOPRO"] - points = exif.extract_gps_track() + points = exif.extract_gps_track() + # ExifTool has no idea if GPS is not found or found but empty + if is_gopro: if not points: - raise exceptions.MapillaryVideoGPSNotFoundError( - "No GPS data found from the video" - ) + raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") - points = geo.extend_deduplicate_points(points) - assert points, "must have at least one point" + # ExifTool (since 13.04) converts GPSSpeed for GoPro to km/h, so here we convert it back to m/s + for p in points: + if isinstance(p, GPSPoint) and p.ground_speed is not None: + p.ground_speed = p.ground_speed / 3.6 - if all(isinstance(p, GPSPoint) for p in points): + if isinstance(points[0], GPSPoint): points = T.cast( T.List[geo.Point], gpmf_gps_filter.remove_noisy_points( @@ -56,78 +49,56 @@ def geotag_video(element: ET.Element) -> types.VideoMetadataOrError: if not points: raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") - video_metadata = types.VideoMetadata( - video_path, - filesize=utils.get_file_size(video_path), - filetype=types.FileType.VIDEO, - points=points, - make=exif.extract_make(), - model=exif.extract_model(), + if not points: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" ) - except Exception as ex: - if not isinstance(ex, exceptions.MapillaryDescriptionError): - LOG.warning( - "Failed to geotag video %s: %s", - video_path, - str(ex), - exc_info=LOG.getEffectiveLevel() <= logging.DEBUG, - ) - return types.describe_error_metadata( - ex, video_path, filetype=types.FileType.VIDEO - ) + video_metadata = types.VideoMetadata( + self.video_path, + filesize=utils.get_file_size(self.video_path), + filetype=types.FileType.VIDEO, + points=points, + make=make, + model=model, + ) return video_metadata - def to_description(self) -> T.List[types.VideoMetadataOrError]: + +class GeotagVideosFromExifToolVideo(GeotagVideosFromGeneric): + def __init__( + self, + video_paths: T.Sequence[Path], + xml_path: Path, + num_processes: int | None = None, + ): + super().__init__(video_paths, num_processes=num_processes) + self.xml_path = xml_path + + def _generate_video_extractors( + self, + ) -> T.Sequence[GenericVideoExtractor | types.ErrorMetadata]: rdf_description_by_path = exiftool_read.index_rdf_description_by_path( [self.xml_path] ) - error_metadatas: T.List[types.ErrorMetadata] = [] - rdf_descriptions: T.List[ET.Element] = [] + results: list[VideoExifToolExtractor | types.ErrorMetadata] = [] + for path in self.video_paths: rdf_description = rdf_description_by_path.get( exiftool_read.canonical_path(path) ) if rdf_description is None: exc = exceptions.MapillaryEXIFNotFoundError( - f"The {_DESCRIPTION_TAG} XML element for the video not found" + f"The {exiftool_read._DESCRIPTION_TAG} XML element for the image not found" ) - error_metadatas.append( + results.append( types.describe_error_metadata( - exc, path, filetype=types.FileType.VIDEO + exc, path, filetype=types.FileType.IMAGE ) ) else: - rdf_descriptions.append(rdf_description) - - if self.num_processes is None: - num_processes = self.num_processes - disable_multiprocessing = False - else: - num_processes = max(self.num_processes, 1) - disable_multiprocessing = self.num_processes <= 0 - - with Pool(processes=num_processes) as pool: - video_metadatas_iter: T.Iterator[types.VideoMetadataOrError] - if disable_multiprocessing: - video_metadatas_iter = map( - GeotagVideosFromExifToolVideo.geotag_video, rdf_descriptions - ) - else: - video_metadatas_iter = pool.imap( - GeotagVideosFromExifToolVideo.geotag_video, - rdf_descriptions, - ) - video_metadata_or_errors = list( - tqdm( - video_metadatas_iter, - desc="Extracting GPS tracks from ExifTool XML", - unit="videos", - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - total=len(self.video_paths), - ) - ) + results.append(VideoExifToolExtractor(path, rdf_description)) - return error_metadatas + video_metadata_or_errors + return results diff --git a/mapillary_tools/geotag/geotag_videos_from_video.py b/mapillary_tools/geotag/geotag_videos_from_video.py index 4f7092055..ac05ce979 100644 --- a/mapillary_tools/geotag/geotag_videos_from_video.py +++ b/mapillary_tools/geotag/geotag_videos_from_video.py @@ -1,182 +1,151 @@ +from __future__ import annotations + import io -import logging import typing as T -from multiprocessing import Pool from pathlib import Path -from tqdm import tqdm - -from .. import exceptions, geo, types, utils +from .. import exceptions, geo, telemetry, types, utils from ..camm import camm_parser from ..gpmf import gpmf_gps_filter, gpmf_parser -from ..mp4 import simple_mp4_parser as sparser -from ..telemetry import GPSPoint +from ..types import FileType from . import blackvue_parser -from .geotag_from_generic import GeotagVideosFromGeneric +from .geotag_from_generic import GenericVideoExtractor, GeotagVideosFromGeneric -LOG = logging.getLogger(__name__) +class GoProVideoExtractor(GenericVideoExtractor): + def extract(self) -> types.VideoMetadataOrError: + with self.video_path.open("rb") as fp: + gopro_info = gpmf_parser.extract_gopro_info(fp) -class GeotagVideosFromVideo(GeotagVideosFromGeneric): - def __init__( - self, - video_paths: T.Sequence[Path], - filetypes: T.Optional[T.Set[types.FileType]] = None, - num_processes: T.Optional[int] = None, - ): - self.video_paths = video_paths - self.filetypes = filetypes - self.num_processes = num_processes - - def to_description(self) -> T.List[types.VideoMetadataOrError]: - if self.num_processes is None: - num_processes = self.num_processes - disable_multiprocessing = False - else: - num_processes = max(self.num_processes, 1) - disable_multiprocessing = self.num_processes <= 0 - - with Pool(processes=num_processes) as pool: - video_metadatas_iter: T.Iterator[types.VideoMetadataOrError] - if disable_multiprocessing: - video_metadatas_iter = map(self._geotag_video, self.video_paths) - else: - video_metadatas_iter = pool.imap( - self._geotag_video, - self.video_paths, - ) - return list( - tqdm( - video_metadatas_iter, - desc="Extracting GPS tracks from videos", - unit="videos", - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - total=len(self.video_paths), - ) + if gopro_info is None: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" ) - def _geotag_video( - self, - video_path: Path, - ) -> types.VideoMetadataOrError: - return GeotagVideosFromVideo.geotag_video(video_path, self.filetypes) - - @staticmethod - def _extract_video_metadata( - video_path: Path, - filetypes: T.Optional[T.Set[types.FileType]] = None, - ) -> T.Optional[types.VideoMetadata]: - if ( - filetypes is None - or types.FileType.VIDEO in filetypes - or types.FileType.GOPRO in filetypes - ): - with video_path.open("rb") as fp: - try: - gopro_info = gpmf_parser.extract_gopro_info(fp) - except sparser.ParsingError: - gopro_info = None - - if gopro_info is not None: - return types.VideoMetadata( - filename=video_path, - filesize=utils.get_file_size(video_path), - filetype=types.FileType.GOPRO, - points=T.cast(T.List[geo.Point], gopro_info.gps), - make=gopro_info.make, - model=gopro_info.model, - ) + gps_points = gopro_info.gps + assert gps_points is not None, "must have GPS data extracted" + if not gps_points: + raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") + + gps_points = T.cast( + T.List[telemetry.GPSPoint], gpmf_gps_filter.remove_noisy_points(gps_points) + ) + if not gps_points: + raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") + + video_metadata = types.VideoMetadata( + filename=self.video_path, + filesize=utils.get_file_size(self.video_path), + filetype=FileType.GOPRO, + points=T.cast(T.List[geo.Point], gps_points), + make=gopro_info.make, + model=gopro_info.model, + ) - if ( - filetypes is None - or types.FileType.VIDEO in filetypes - or types.FileType.CAMM in filetypes - ): - with video_path.open("rb") as fp: - try: - points = camm_parser.extract_points(fp) - except sparser.ParsingError: - points = None - - if points is not None: - fp.seek(0, io.SEEK_SET) - make, model = camm_parser.extract_camera_make_and_model(fp) - return types.VideoMetadata( - filename=video_path, - filesize=utils.get_file_size(video_path), - filetype=types.FileType.CAMM, - points=points, - make=make, - model=model, - ) - - if ( - filetypes is None - or types.FileType.VIDEO in filetypes - or types.FileType.BLACKVUE in filetypes - ): - with video_path.open("rb") as fp: - try: - points = blackvue_parser.extract_points(fp) - except sparser.ParsingError: - points = None - - if points is not None: - fp.seek(0, io.SEEK_SET) - make, model = "BlackVue", blackvue_parser.extract_camera_model(fp) - return types.VideoMetadata( - filename=video_path, - filesize=utils.get_file_size(video_path), - filetype=types.FileType.BLACKVUE, - points=points, - make=make, - model=model, - ) - - return None - - @staticmethod - def geotag_video( - video_path: Path, - filetypes: T.Optional[T.Set[types.FileType]] = None, - ) -> types.VideoMetadataOrError: - video_metadata = None - try: - video_metadata = GeotagVideosFromVideo._extract_video_metadata( - video_path, filetypes - ) + return video_metadata - if video_metadata is None: - raise exceptions.MapillaryVideoError("No GPS data found from the video") - if not video_metadata.points: +class CAMMVideoExtractor(GenericVideoExtractor): + def extract(self) -> types.VideoMetadataOrError: + with self.video_path.open("rb") as fp: + points = camm_parser.extract_points(fp) + + if points is None: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" + ) + + if not points: raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") - video_metadata.points = geo.extend_deduplicate_points(video_metadata.points) - assert video_metadata.points, "must have at least one point" + fp.seek(0, io.SEEK_SET) + make, model = camm_parser.extract_camera_make_and_model(fp) - if all(isinstance(p, GPSPoint) for p in video_metadata.points): - video_metadata.points = T.cast( - T.List[geo.Point], - gpmf_gps_filter.remove_noisy_points( - T.cast(T.List[GPSPoint], video_metadata.points) - ), - ) - if not video_metadata.points: - raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") - except Exception as ex: - if not isinstance(ex, exceptions.MapillaryDescriptionError): - LOG.warning( - "Failed to geotag video %s: %s", - video_path, - str(ex), - exc_info=LOG.getEffectiveLevel() <= logging.DEBUG, + return types.VideoMetadata( + filename=self.video_path, + filesize=utils.get_file_size(self.video_path), + filetype=FileType.CAMM, + points=points, + make=make, + model=model, + ) + + +class BlackVueVideoExtractor(GenericVideoExtractor): + def extract(self) -> types.VideoMetadataOrError: + with self.video_path.open("rb") as fp: + points = blackvue_parser.extract_points(fp) + + if points is None: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" ) - filetype = None if video_metadata is None else video_metadata.filetype - return types.describe_error_metadata( - ex, - video_path, - filetype=filetype, - ) + + if not points: + raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") + + fp.seek(0, io.SEEK_SET) + make, model = "BlackVue", blackvue_parser.extract_camera_model(fp) + + video_metadata = types.VideoMetadata( + filename=self.video_path, + filesize=utils.get_file_size(self.video_path), + filetype=FileType.BLACKVUE, + points=points, + make=make, + model=model, + ) return video_metadata + + +class NativeVideoExtractor(GenericVideoExtractor): + def __init__(self, video_path: Path, filetypes: set[FileType] | None = None): + super().__init__(video_path) + self.filetypes = filetypes + + def extract(self) -> types.VideoMetadataOrError: + ft = self.filetypes + extractor: GenericVideoExtractor + + if ft is None or FileType.VIDEO in ft or FileType.GOPRO in ft: + extractor = GoProVideoExtractor(self.video_path) + try: + return extractor.extract() + except exceptions.MapillaryVideoGPSNotFoundError: + pass + + if ft is None or FileType.VIDEO in ft or FileType.CAMM in ft: + extractor = CAMMVideoExtractor(self.video_path) + try: + return extractor.extract() + except exceptions.MapillaryVideoGPSNotFoundError: + pass + + if ft is None or FileType.VIDEO in ft or FileType.BLACKVUE in ft: + extractor = BlackVueVideoExtractor(self.video_path) + try: + return extractor.extract() + except exceptions.MapillaryVideoGPSNotFoundError: + pass + + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" + ) + + +class GeotagVideosFromVideo(GeotagVideosFromGeneric): + def __init__( + self, + video_paths: T.Sequence[Path], + filetypes: set[FileType] | None = None, + num_processes: int | None = None, + ): + super().__init__(video_paths, num_processes=num_processes) + self.filetypes = filetypes + + def _generate_video_extractors(self) -> T.Sequence[GenericVideoExtractor]: + return [ + NativeVideoExtractor(path, filetypes=self.filetypes) + for path in self.video_paths + ] diff --git a/mapillary_tools/process_geotag_properties.py b/mapillary_tools/process_geotag_properties.py index 4029eadaf..58436d1ca 100644 --- a/mapillary_tools/process_geotag_properties.py +++ b/mapillary_tools/process_geotag_properties.py @@ -1,10 +1,10 @@ +from __future__ import annotations + import collections import datetime -import itertools import json import logging import typing as T -from multiprocessing import Pool from pathlib import Path from tqdm import tqdm @@ -446,55 +446,33 @@ def _show_stats_per_filetype( ) -_IT = T.TypeVar("_IT") - - -def split_if( - it: T.Iterable[_IT], sep: T.Callable[[_IT], bool] -) -> T.Tuple[T.List[_IT], T.List[_IT]]: - yes, no = [], [] - for e in it: - if sep(e): - yes.append(e) - else: - no.append(e) - return yes, no - - def _validate_metadatas( - metadatas: T.Sequence[types.MetadataOrError], num_processes: T.Optional[int] + metadatas: T.Sequence[types.MetadataOrError], num_processes: int | None ) -> T.List[types.MetadataOrError]: # validating metadatas is slow, hence multiprocessing - if num_processes is None: - pool_num_processes = None - disable_multiprocessing = False - else: - pool_num_processes = max(num_processes, 1) - disable_multiprocessing = num_processes <= 0 - with Pool(processes=pool_num_processes) as pool: - validated_metadatas_iter: T.Iterator[types.MetadataOrError] - if disable_multiprocessing: - validated_metadatas_iter = map(types.validate_and_fail_metadata, metadatas) - else: - # Do not pass error metadatas where the error object can not be pickled for multiprocessing to work - # Otherwise we get: - # TypeError: __init__() missing 3 required positional arguments: 'image_time', 'gpx_start_time', and 'gpx_end_time' - # See https://stackoverflow.com/a/61432070 - yes, no = split_if(metadatas, lambda m: isinstance(m, types.ErrorMetadata)) - no_iter = pool.imap( - types.validate_and_fail_metadata, - no, - ) - validated_metadatas_iter = itertools.chain(yes, no_iter) - return list( - tqdm( - validated_metadatas_iter, - desc="Validating metadatas", - unit="metadata", - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - total=len(metadatas), - ) + + # Do not pass error metadatas where the error object can not be pickled for multiprocessing to work + # Otherwise we get: + # TypeError: __init__() missing 3 required positional arguments: 'image_time', 'gpx_start_time', and 'gpx_end_time' + # See https://stackoverflow.com/a/61432070 + good_metadatas, error_metadatas = types.separate_errors(metadatas) + map_results = utils.mp_map_maybe( + types.validate_and_fail_metadata, + T.cast(T.Iterable[types.Metadata], good_metadatas), + num_processes=num_processes, + ) + + validated_metadatas = list( + tqdm( + map_results, + desc="Validating metadatas", + unit="metadata", + disable=LOG.getEffectiveLevel() <= logging.DEBUG, + total=len(good_metadatas), ) + ) + + return validated_metadatas + error_metadatas def process_finalize( diff --git a/mapillary_tools/sample_video.py b/mapillary_tools/sample_video.py index 89b9de9ba..3efb94744 100644 --- a/mapillary_tools/sample_video.py +++ b/mapillary_tools/sample_video.py @@ -299,9 +299,12 @@ def _sample_single_video_by_distance( ) LOG.info("Extracting video metdata") - video_metadata = geotag_videos_from_video.GeotagVideosFromVideo.geotag_video( - video_path - ) + + video_metadatas = geotag_videos_from_video.GeotagVideosFromVideo( + [video_path] + ).to_description() + assert video_metadatas, "expect non-empty video metadatas" + video_metadata = video_metadatas[0] if isinstance(video_metadata, types.ErrorMetadata): LOG.warning(str(video_metadata.error)) return diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index 17182a243..b886bf3ec 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import dataclasses import datetime import enum @@ -180,6 +182,24 @@ class ImageDescriptionError(_ImageDescriptionErrorRequired, total=False): filetype: str +M = T.TypeVar("M") + + +def separate_errors( + metadatas: T.Iterable[M | ErrorMetadata], +) -> tuple[list[M], list[ErrorMetadata]]: + good: list[M] = [] + bad: list[ErrorMetadata] = [] + + for metadata in metadatas: + if isinstance(metadata, ErrorMetadata): + bad.append(metadata) + else: + good.append(metadata) + + return good, bad + + def _describe_error_desc( exc: Exception, filename: Path, filetype: T.Optional[FileType] ) -> ImageDescriptionError: diff --git a/mapillary_tools/utils.py b/mapillary_tools/utils.py index 5d166563c..551f096cd 100644 --- a/mapillary_tools/utils.py +++ b/mapillary_tools/utils.py @@ -1,6 +1,9 @@ +from __future__ import annotations + import hashlib import os import typing as T +from multiprocessing import Pool from pathlib import Path @@ -194,3 +197,26 @@ def find_xml_files(import_paths: T.Sequence[Path]) -> T.List[Path]: def get_file_size(path: Path) -> int: return os.path.getsize(path) + + +TMapIn = T.TypeVar("TMapIn") +TMapOut = T.TypeVar("TMapOut") + + +def mp_map_maybe( + func: T.Callable[[TMapIn], TMapOut], + iterable: T.Iterable[TMapIn], + num_processes: int | None = None, +) -> T.Generator[TMapOut, None, None]: + if num_processes is None: + pool_num_processes = None + disable_multiprocessing = False + else: + pool_num_processes = max(num_processes, 1) + disable_multiprocessing = num_processes <= 0 + + if disable_multiprocessing: + yield from map(func, iterable) + else: + with Pool(processes=pool_num_processes) as pool: + yield from pool.imap(func, iterable) diff --git a/mapillary_tools/video_data_extraction/extract_video_data.py b/mapillary_tools/video_data_extraction/extract_video_data.py index b0b334e78..6234096e5 100644 --- a/mapillary_tools/video_data_extraction/extract_video_data.py +++ b/mapillary_tools/video_data_extraction/extract_video_data.py @@ -1,6 +1,7 @@ +from __future__ import annotations + import logging import typing as T -from multiprocessing import Pool from pathlib import Path import tqdm @@ -8,13 +9,7 @@ from .. import exceptions, geo, utils from ..gpmf import gpmf_gps_filter from ..telemetry import GPSPoint -from ..types import ( - ErrorMetadata, - FileType, - MetadataOrError, - VideoMetadata, - VideoMetadataOrError, -) +from ..types import ErrorMetadata, FileType, VideoMetadata, VideoMetadataOrError from . import video_data_parser_factory from .cli_options import CliOptions from .extractors.base_parser import BaseParser @@ -29,30 +24,25 @@ class VideoDataExtractor: def __init__(self, options: CliOptions) -> None: self.options = options - def process(self) -> T.List[MetadataOrError]: + def process(self) -> T.List[VideoMetadataOrError]: paths = self.options["paths"] self._check_paths(paths) video_files = utils.find_videos(paths) self._check_sources_cardinality(video_files) - num_processes = self.options["num_processes"] or None - with Pool(processes=num_processes) as pool: - if num_processes == 1: - iter: T.Iterator[VideoMetadataOrError] = map( - self.process_file, video_files - ) - else: - iter = pool.imap(self.process_file, video_files) - - video_metadata_or_errors = list( - tqdm.tqdm( - iter, - desc="Extracting GPS tracks", - unit="videos", - disable=LOG.getEffectiveLevel() <= logging.DEBUG, - total=len(video_files), - ) + map_results = utils.mp_map_maybe( + self.process_file, video_files, num_processes=self.options["num_processes"] + ) + + video_metadata_or_errors: list[VideoMetadataOrError] = list( + tqdm.tqdm( + map_results, + desc="Extracting GPS tracks", + unit="videos", + disable=LOG.getEffectiveLevel() <= logging.DEBUG, + total=len(video_files), ) + ) return video_metadata_or_errors diff --git a/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py b/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py index 4cf91ea1b..dcfadf5bb 100644 --- a/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py +++ b/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py @@ -4,9 +4,8 @@ from pathlib import Path from ... import geo -from ...exiftool_read import EXIFTOOL_NAMESPACES +from ...exiftool_read import _DESCRIPTION_TAG, EXIFTOOL_NAMESPACES from ...exiftool_read_video import ExifToolReadVideo -from ...geotag.geotag_videos_from_exiftool_video import _DESCRIPTION_TAG from ..cli_options import CliOptions, CliParserOptions from .base_parser import BaseParser