Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 152 additions & 11 deletions mapillary_tools/geotag/geotag_from_generic.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,163 @@
from __future__ import annotations

import abc
import logging
import typing as T
from pathlib import Path

from tqdm import tqdm

from .. import exceptions, types, utils


LOG = logging.getLogger(__name__)


class GenericImageExtractor(abc.ABC):
"""
Extracts metadata from an image file.
"""

def __init__(self, image_path: Path):
self.image_path = image_path

def extract(self) -> types.ImageMetadataOrError:
raise NotImplementedError


TImageExtractor = T.TypeVar("TImageExtractor", bound=GenericImageExtractor)


class GeotagImagesFromGeneric(abc.ABC, T.Generic[TImageExtractor]):
"""
Extracts metadata from a list of image files with multiprocessing.
"""

def __init__(
self, image_paths: T.Sequence[Path], num_processes: int | None
) -> None:
self.image_paths = image_paths
self.num_processes = num_processes

def to_description(self) -> list[types.ImageMetadataOrError]:
extractor_or_errors = self._generate_image_extractors()

assert len(extractor_or_errors) == len(self.image_paths)

extractors, error_metadatas = types.separate_errors(extractor_or_errors)

from .. import types
map_results = utils.mp_map_maybe(
self.run_extraction,
extractors,
num_processes=self.num_processes,
)

results = list(
tqdm(
map_results,
desc="Extracting images",
unit="images",
disable=LOG.getEffectiveLevel() <= logging.DEBUG,
total=len(extractors),
)
)

class GeotagImagesFromGeneric(abc.ABC):
def __init__(self) -> None:
pass
return results + error_metadatas

@abc.abstractmethod
def to_description(self) -> T.List[types.ImageMetadataOrError]:
def _generate_image_extractors(
self,
) -> T.Sequence[TImageExtractor | types.ErrorMetadata]:
raise NotImplementedError

# This method is passed to multiprocessing
# so it has to be classmethod or staticmethod to avoid pickling the instance
@classmethod
def run_extraction(cls, extractor: TImageExtractor) -> types.ImageMetadataOrError:
image_path = extractor.image_path

class GeotagVideosFromGeneric(abc.ABC):
def __init__(self) -> None:
pass
try:
return extractor.extract()
except exceptions.MapillaryDescriptionError as ex:
return types.describe_error_metadata(
ex, image_path, filetype=types.FileType.IMAGE
)
except Exception as ex:
LOG.exception("Unexpected error extracting metadata from %s", image_path)
return types.describe_error_metadata(
ex, image_path, filetype=types.FileType.IMAGE
)

@abc.abstractmethod
def to_description(self) -> T.List[types.VideoMetadataOrError]:

class GenericVideoExtractor(abc.ABC):
"""
Extracts metadata from a video file.
"""

def __init__(self, video_path: Path):
self.video_path = video_path

def extract(self) -> types.VideoMetadataOrError:
raise NotImplementedError


TVideoExtractor = T.TypeVar("TVideoExtractor", bound=GenericVideoExtractor)


class GeotagVideosFromGeneric(abc.ABC, T.Generic[TVideoExtractor]):
"""
Extracts metadata from a list of video files with multiprocessing.
"""

def __init__(
self, video_paths: T.Sequence[Path], num_processes: int | None
) -> None:
self.video_paths = video_paths
self.num_processes = num_processes

def to_description(self) -> list[types.VideoMetadataOrError]:
extractor_or_errors = self._generate_video_extractors()

assert len(extractor_or_errors) == len(self.video_paths)

extractors, error_metadatas = types.separate_errors(extractor_or_errors)

map_results = utils.mp_map_maybe(
self.run_extraction,
extractors,
num_processes=self.num_processes,
)

results = list(
tqdm(
map_results,
desc="Extracting videos",
unit="videos",
disable=LOG.getEffectiveLevel() <= logging.DEBUG,
total=len(extractors),
)
)

return results + error_metadatas

def _generate_video_extractors(
self,
) -> T.Sequence[TVideoExtractor | types.ErrorMetadata]:
raise NotImplementedError

# This method is passed to multiprocessing
# so it has to be classmethod or staticmethod to avoid pickling the instance
@classmethod
def run_extraction(cls, extractor: TVideoExtractor) -> types.VideoMetadataOrError:
video_path = extractor.video_path

try:
return extractor.extract()
except exceptions.MapillaryDescriptionError as ex:
return types.describe_error_metadata(
ex, video_path, filetype=types.FileType.VIDEO
)
except Exception as ex:
LOG.exception("Unexpected error extracting metadata from %s", video_path)
return types.describe_error_metadata(
ex, video_path, filetype=types.FileType.VIDEO
)
128 changes: 42 additions & 86 deletions mapillary_tools/geotag/geotag_images_from_exif.py
Original file line number Diff line number Diff line change
@@ -1,104 +1,60 @@
import contextlib
import logging
import typing as T
from multiprocessing import Pool
from pathlib import Path

from tqdm import tqdm

from .. import exceptions, geo, types, utils
from ..exif_read import ExifRead, ExifReadABC
from .geotag_from_generic import GeotagImagesFromGeneric
from .geotag_from_generic import GenericImageExtractor, GeotagImagesFromGeneric

LOG = logging.getLogger(__name__)


class GeotagImagesFromEXIF(GeotagImagesFromGeneric):
def __init__(
self, image_paths: T.Sequence[Path], num_processes: T.Optional[int] = None
):
self.image_paths = image_paths
self.num_processes = num_processes
super().__init__()

@staticmethod
def build_image_metadata(
image_path: Path, exif: ExifReadABC, skip_lonlat_error: bool = False
) -> types.ImageMetadata:
lonlat = exif.extract_lon_lat()
if lonlat is None:
if not skip_lonlat_error:
class ImageEXIFExtractor(GenericImageExtractor):
def __init__(self, image_path: Path, skip_lonlat_error: bool = False):
super().__init__(image_path)
self.skip_lonlat_error = skip_lonlat_error

@contextlib.contextmanager
def _exif_context(self) -> T.Generator[ExifReadABC, None, None]:
with self.image_path.open("rb") as fp:
yield ExifRead(fp)

def extract(self) -> types.ImageMetadata:
with self._exif_context() as exif:
lonlat = exif.extract_lon_lat()
if lonlat is None:
if not self.skip_lonlat_error:
raise exceptions.MapillaryGeoTaggingError(
"Unable to extract GPS Longitude or GPS Latitude from the image"
)
lonlat = (0.0, 0.0)
lon, lat = lonlat

capture_time = exif.extract_capture_time()
if capture_time is None:
raise exceptions.MapillaryGeoTaggingError(
"Unable to extract GPS Longitude or GPS Latitude from the image"
"Unable to extract timestamp from the image"
)
lonlat = (0.0, 0.0)
lon, lat = lonlat

capture_time = exif.extract_capture_time()
if capture_time is None:
raise exceptions.MapillaryGeoTaggingError(
"Unable to extract timestamp from the image"
)

image_metadata = types.ImageMetadata(
filename=image_path,
filesize=utils.get_file_size(image_path),
time=geo.as_unix_time(capture_time),
lat=lat,
lon=lon,
alt=exif.extract_altitude(),
angle=exif.extract_direction(),
width=exif.extract_width(),
height=exif.extract_height(),
MAPOrientation=exif.extract_orientation(),
MAPDeviceMake=exif.extract_make(),
MAPDeviceModel=exif.extract_model(),
)

return image_metadata

@staticmethod
def geotag_image(
image_path: Path, skip_lonlat_error: bool = False
) -> types.ImageMetadataOrError:
try:
with image_path.open("rb") as fp:
exif = ExifRead(fp)
image_metadata = GeotagImagesFromEXIF.build_image_metadata(
image_path, exif, skip_lonlat_error=skip_lonlat_error
)
except Exception as ex:
return types.describe_error_metadata(
ex, image_path, filetype=types.FileType.IMAGE
image_metadata = types.ImageMetadata(
filename=self.image_path,
filesize=utils.get_file_size(self.image_path),
time=geo.as_unix_time(capture_time),
lat=lat,
lon=lon,
alt=exif.extract_altitude(),
angle=exif.extract_direction(),
width=exif.extract_width(),
height=exif.extract_height(),
MAPOrientation=exif.extract_orientation(),
MAPDeviceMake=exif.extract_make(),
MAPDeviceModel=exif.extract_model(),
)

return image_metadata

def to_description(self) -> T.List[types.ImageMetadataOrError]:
if self.num_processes is None:
num_processes = self.num_processes
disable_multiprocessing = False
else:
num_processes = max(self.num_processes, 1)
disable_multiprocessing = self.num_processes <= 0

with Pool(processes=num_processes) as pool:
image_metadatas_iter: T.Iterator[types.ImageMetadataOrError]
if disable_multiprocessing:
image_metadatas_iter = map(
GeotagImagesFromEXIF.geotag_image,
self.image_paths,
)
else:
image_metadatas_iter = pool.imap(
GeotagImagesFromEXIF.geotag_image,
self.image_paths,
)
return list(
tqdm(
image_metadatas_iter,
desc="Extracting geotags from images",
unit="images",
disable=LOG.getEffectiveLevel() <= logging.DEBUG,
total=len(self.image_paths),
)
)
class GeotagImagesFromEXIF(GeotagImagesFromGeneric):
def _generate_image_extractors(self) -> T.Sequence[ImageEXIFExtractor]:
return [ImageEXIFExtractor(path) for path in self.image_paths]
Loading
Loading