Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mapillary_tools/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(
self.angle_diff = angle_diff


class MapillaryEXIFNotFoundError(MapillaryDescriptionError):
class MapillaryExifToolXMLNotFoundError(MapillaryDescriptionError):
pass


Expand Down
29 changes: 3 additions & 26 deletions mapillary_tools/exiftool_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import xml.etree.ElementTree as ET
from pathlib import Path

from . import exif_read, utils
from . import exif_read


EXIFTOOL_NAMESPACES: dict[str, str] = {
Expand Down Expand Up @@ -53,8 +53,8 @@


LOG = logging.getLogger(__name__)
DESCRIPTION_TAG = "rdf:Description"
_FIELD_TYPE = T.TypeVar("_FIELD_TYPE", int, float, str)
_DESCRIPTION_TAG = "rdf:Description"


def expand_tag(ns_tag: str, namespaces: dict[str, str]) -> str:
Expand All @@ -79,35 +79,12 @@ def find_rdf_description_path(element: ET.Element) -> Path | None:
return Path(about)


def index_rdf_description_by_path(
xml_paths: T.Sequence[Path],
) -> dict[str, ET.Element]:
rdf_description_by_path: dict[str, ET.Element] = {}

for xml_path in utils.find_xml_files(xml_paths):
try:
etree = ET.parse(xml_path)
except ET.ParseError as ex:
verbose = LOG.getEffectiveLevel() <= logging.DEBUG
if verbose:
LOG.warning(f"Failed to parse {xml_path}", exc_info=verbose)
else:
LOG.warning(f"Failed to parse {xml_path}: {ex}", exc_info=verbose)
continue

rdf_description_by_path.update(
index_rdf_description_by_path_from_xml_element(etree.getroot())
)

return rdf_description_by_path


def index_rdf_description_by_path_from_xml_element(
element: ET.Element,
) -> dict[str, ET.Element]:
rdf_description_by_path: dict[str, ET.Element] = {}

elements = element.iterfind(_DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES)
elements = element.iterfind(DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES)
for element in elements:
path = find_rdf_description_path(element)
if path is not None:
Expand Down
1 change: 0 additions & 1 deletion mapillary_tools/geotag/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,30 @@
from tqdm import tqdm

from .. import exceptions, types, utils
from .image_extractors.base import BaseImageExtractor
from .video_extractors.base import BaseVideoExtractor


LOG = logging.getLogger(__name__)


class GenericImageExtractor(abc.ABC):
"""
Extracts metadata from an image file.
"""

def __init__(self, image_path: Path):
self.image_path = image_path

def extract(self) -> types.ImageMetadataOrError:
raise NotImplementedError


TImageExtractor = T.TypeVar("TImageExtractor", bound=GenericImageExtractor)
TImageExtractor = T.TypeVar("TImageExtractor", bound=BaseImageExtractor)


class GeotagImagesFromGeneric(abc.ABC, T.Generic[TImageExtractor]):
"""
Extracts metadata from a list of image files with multiprocessing.
"""

def __init__(
self, image_paths: T.Sequence[Path], num_processes: int | None = None
) -> None:
self.image_paths = image_paths
def __init__(self, num_processes: int | None = None) -> None:
self.num_processes = num_processes

def to_description(self) -> list[types.ImageMetadataOrError]:
extractor_or_errors = self._generate_image_extractors()
def to_description(
self, image_paths: T.Sequence[Path]
) -> list[types.ImageMetadataOrError]:
extractor_or_errors = self._generate_image_extractors(image_paths)

assert len(extractor_or_errors) == len(self.image_paths)
assert len(extractor_or_errors) == len(image_paths)

extractors, error_metadatas = types.separate_errors(extractor_or_errors)

Expand All @@ -64,11 +53,6 @@ def to_description(self) -> list[types.ImageMetadataOrError]:

return results + error_metadatas

def _generate_image_extractors(
self,
) -> T.Sequence[TImageExtractor | types.ErrorMetadata]:
raise NotImplementedError

# This method is passed to multiprocessing
# so it has to be classmethod or staticmethod to avoid pickling the instance
@classmethod
Expand All @@ -81,43 +65,39 @@ def run_extraction(cls, extractor: TImageExtractor) -> types.ImageMetadataOrErro
return types.describe_error_metadata(
ex, image_path, filetype=types.FileType.IMAGE
)
except exceptions.MapillaryUserError as ex:
# Considered as fatal error if not MapillaryDescriptionError
raise ex
except Exception as ex:
# TODO: hide details if not verbose mode
LOG.exception("Unexpected error extracting metadata from %s", image_path)
return types.describe_error_metadata(
ex, image_path, filetype=types.FileType.IMAGE
)


class GenericVideoExtractor(abc.ABC):
"""
Extracts metadata from a video file.
"""

def __init__(self, video_path: Path):
self.video_path = video_path

def extract(self) -> types.VideoMetadataOrError:
def _generate_image_extractors(
self, image_paths: T.Sequence[Path]
) -> T.Sequence[TImageExtractor | types.ErrorMetadata]:
raise NotImplementedError


TVideoExtractor = T.TypeVar("TVideoExtractor", bound=GenericVideoExtractor)
TVideoExtractor = T.TypeVar("TVideoExtractor", bound=BaseVideoExtractor)


class GeotagVideosFromGeneric(abc.ABC, T.Generic[TVideoExtractor]):
"""
Extracts metadata from a list of video files with multiprocessing.
"""

def __init__(
self, video_paths: T.Sequence[Path], num_processes: int | None = None
) -> None:
self.video_paths = video_paths
def __init__(self, num_processes: int | None = None) -> None:
self.num_processes = num_processes

def to_description(self) -> list[types.VideoMetadataOrError]:
extractor_or_errors = self._generate_video_extractors()
def to_description(
self, video_paths: T.Sequence[Path]
) -> list[types.VideoMetadataOrError]:
extractor_or_errors = self._generate_video_extractors(video_paths)

assert len(extractor_or_errors) == len(self.video_paths)
assert len(extractor_or_errors) == len(video_paths)

extractors, error_metadatas = types.separate_errors(extractor_or_errors)

Expand All @@ -139,11 +119,6 @@ def to_description(self) -> list[types.VideoMetadataOrError]:

return results + error_metadatas

def _generate_video_extractors(
self,
) -> T.Sequence[TVideoExtractor | types.ErrorMetadata]:
raise NotImplementedError

# This method is passed to multiprocessing
# so it has to be classmethod or staticmethod to avoid pickling the instance
@classmethod
Expand All @@ -156,8 +131,17 @@ def run_extraction(cls, extractor: TVideoExtractor) -> types.VideoMetadataOrErro
return types.describe_error_metadata(
ex, video_path, filetype=types.FileType.VIDEO
)
except exceptions.MapillaryUserError as ex:
# Considered as fatal error if not MapillaryDescriptionError
raise ex
except Exception as ex:
# TODO: hide details if not verbose mode
LOG.exception("Unexpected error extracting metadata from %s", video_path)
return types.describe_error_metadata(
ex, video_path, filetype=types.FileType.VIDEO
)

def _generate_video_extractors(
self, video_paths: T.Sequence[Path]
) -> T.Sequence[TVideoExtractor | types.ErrorMetadata]:
raise NotImplementedError
61 changes: 27 additions & 34 deletions mapillary_tools/geotag/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
from .. import exceptions, types, utils
from ..types import FileType
from . import (
geotag_from_generic,
base,
geotag_images_from_exif,
geotag_images_from_exiftool,
geotag_images_from_exiftool_both_image_and_video,
geotag_images_from_gpx_file,
geotag_images_from_nmea_file,
geotag_images_from_video,
geotag_videos_from_exiftool_video,
geotag_videos_from_exiftool,
geotag_videos_from_gpx,
geotag_videos_from_video,
)
Expand Down Expand Up @@ -106,7 +105,7 @@ def _is_reprocessable(metadata: types.MetadataOrError) -> bool:


def _filter_images_and_videos(
file_paths: T.Iterable[Path],
paths: T.Iterable[Path],
filetypes: set[types.FileType] | None = None,
) -> tuple[list[Path], list[Path]]:
image_paths = []
Expand All @@ -121,7 +120,7 @@ def _filter_images_and_videos(
include_images = types.FileType.IMAGE in filetypes
include_videos = bool(filetypes & ALL_VIDEO_TYPES)

for path in file_paths:
for path in paths:
if utils.is_image_file(path):
if include_images:
image_paths.append(path)
Expand Down Expand Up @@ -154,60 +153,57 @@ def _geotag_images(
else:
interpolation = option.interpolation

geotag: geotag_from_generic.GeotagImagesFromGeneric
geotag: base.GeotagImagesFromGeneric

if option.source is SourceType.NATIVE:
geotag = geotag_images_from_exif.GeotagImagesFromEXIF(
image_paths, num_processes=option.num_processes
num_processes=option.num_processes
)
return geotag.to_description()
return geotag.to_description(image_paths)

if option.source is SourceType.EXIFTOOL_RUNTIME:
geotag = geotag_images_from_exiftool.GeotagImagesFromExifToolRunner(
image_paths, num_processes=option.num_processes
num_processes=option.num_processes
)
try:
return geotag.to_description()
return geotag.to_description(image_paths)
except exceptions.MapillaryExiftoolNotFoundError as ex:
LOG.warning('Skip "%s" because: %s', option.source.value, ex)
return []

elif option.source is SourceType.EXIFTOOL_XML:
# This is to ensure 'video_process --geotag={"source": "exiftool_xml", "source_path": "/tmp/xml_path"}'
# to work
geotag = geotag_images_from_exiftool_both_image_and_video.GeotagImagesFromExifToolBothImageAndVideo(
image_paths,
geotag = geotag_images_from_exiftool.GeotagImagesFromExifToolWithSamples(
xml_path=_ensure_source_path(option),
num_processes=option.num_processes,
)
return geotag.to_description()
return geotag.to_description(image_paths)

elif option.source is SourceType.GPX:
geotag = geotag_images_from_gpx_file.GeotagImagesFromGPXFile(
image_paths,
source_path=_ensure_source_path(option),
use_gpx_start_time=interpolation.use_gpx_start_time,
offset_time=interpolation.offset_time,
num_processes=option.num_processes,
)
return geotag.to_description()
return geotag.to_description(image_paths)

elif option.source is SourceType.NMEA:
geotag = geotag_images_from_nmea_file.GeotagImagesFromNMEAFile(
image_paths,
source_path=_ensure_source_path(option),
use_gpx_start_time=interpolation.use_gpx_start_time,
offset_time=interpolation.offset_time,
num_processes=option.num_processes,
)

return geotag.to_description()
return geotag.to_description(image_paths)

elif option.source is SourceType.EXIF:
geotag = geotag_images_from_exif.GeotagImagesFromEXIF(
image_paths, num_processes=option.num_processes
num_processes=option.num_processes
)
return geotag.to_description()
return geotag.to_description(image_paths)

elif option.source in [
SourceType.GOPRO,
Expand All @@ -225,17 +221,15 @@ def _geotag_images(
)
video_paths_with_image_samples = list(image_samples_by_video_path.keys())
video_metadatas = geotag_videos_from_video.GeotagVideosFromVideo(
video_paths_with_image_samples,
filetypes={map_geotag_source_to_filetype[option.source]},
num_processes=option.num_processes,
).to_description()
).to_description(video_paths_with_image_samples)
geotag = geotag_images_from_video.GeotagImagesFromVideo(
image_paths,
video_metadatas,
offset_time=interpolation.offset_time,
num_processes=option.num_processes,
)
return geotag.to_description()
return geotag.to_description(image_paths)

else:
raise ValueError(f"Invalid geotag source {option.source}")
Expand All @@ -249,34 +243,33 @@ def _geotag_videos(
if not video_paths:
return []

geotag: geotag_from_generic.GeotagVideosFromGeneric
geotag: base.GeotagVideosFromGeneric

if option.source is SourceType.NATIVE:
geotag = geotag_videos_from_video.GeotagVideosFromVideo(
video_paths, num_processes=option.num_processes, filetypes=option.filetypes
num_processes=option.num_processes, filetypes=option.filetypes
)
return geotag.to_description()
return geotag.to_description(video_paths)

if option.source is SourceType.EXIFTOOL_RUNTIME:
geotag = geotag_videos_from_exiftool_video.GeotagVideosFromExifToolRunner(
video_paths, num_processes=option.num_processes
geotag = geotag_videos_from_exiftool.GeotagVideosFromExifToolRunner(
num_processes=option.num_processes
)
try:
return geotag.to_description()
return geotag.to_description(video_paths)
except exceptions.MapillaryExiftoolNotFoundError as ex:
LOG.warning('Skip "%s" because: %s', option.source.value, ex)
return []

elif option.source is SourceType.EXIFTOOL_XML:
geotag = geotag_videos_from_exiftool_video.GeotagVideosFromExifToolVideo(
video_paths,
geotag = geotag_videos_from_exiftool.GeotagVideosFromExifToolXML(
xml_path=_ensure_source_path(option),
)
return geotag.to_description()
return geotag.to_description(video_paths)

elif option.source is SourceType.GPX:
geotag = geotag_videos_from_gpx.GeotagVideosFromGPX(video_paths)
return geotag.to_description()
geotag = geotag_videos_from_gpx.GeotagVideosFromGPX()
return geotag.to_description(video_paths)

elif option.source is SourceType.NMEA:
# TODO: geotag videos from NMEA
Expand Down
Loading
Loading