Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 17 additions & 45 deletions mapillary_tools/commands/process.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from __future__ import annotations

import argparse
import inspect
import typing as T
from pathlib import Path

from .. import constants
from .. import constants, types
from ..process_geotag_properties import (
FileType,
GeotagSource,
DEFAULT_GEOTAG_SOURCE_OPTIONS,
process_finalize,
process_geotag_properties,
SourceType,
)
from ..process_sequence_properties import process_sequence_properties

Expand All @@ -24,24 +25,13 @@ class Command:
help = "process images and videos"

def add_basic_arguments(self, parser: argparse.ArgumentParser):
geotag_sources: T.List[GeotagSource] = [
"blackvue_videos",
"camm",
"exif",
"exiftool",
"gopro_videos",
"gpx",
"nmea",
]
geotag_gpx_based_sources: T.List[GeotagSource] = [
"gpx",
"gopro_videos",
"nmea",
"blackvue_videos",
"camm",
geotag_gpx_based_sources: list[str] = [
SourceType.GPX.value,
SourceType.NMEA.value,
SourceType.GOPRO.value,
SourceType.BLACKVUE.value,
SourceType.CAMM.value,
]
for source in geotag_gpx_based_sources:
assert source in geotag_sources

parser.add_argument(
"--skip_process_errors",
Expand All @@ -53,9 +43,9 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser):
parser.add_argument(
"--filetypes",
"--file_types",
help=f"Process files of the specified types only. Supported file types: {','.join(sorted(t.value for t in FileType))} [default: %(default)s]",
type=lambda option: set(FileType(t) for t in option.split(",")),
default=",".join(sorted(t.value for t in FileType)),
help=f"Process files of the specified types only. Supported file types: {','.join(sorted(t.value for t in types.FileType))} [default: %(default)s]",
type=lambda option: set(types.FileType(t) for t in option.split(",")),
default=None,
required=False,
)
group = parser.add_argument_group(bold_text("PROCESS EXIF OPTIONS"))
Expand Down Expand Up @@ -122,10 +112,9 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser):
)
group_geotagging.add_argument(
"--geotag_source",
help="Provide the source of date/time and GPS information needed for geotagging. [default: %(default)s]",
action="store",
choices=geotag_sources,
default="exif",
help=f"Provide the source of date/time and GPS information needed for geotagging. Supported source types: {', '.join(g.value for g in SourceType)} [default: {','.join(DEFAULT_GEOTAG_SOURCE_OPTIONS)}]",
action="append",
default=[],
required=False,
)
group_geotagging.add_argument(
Expand Down Expand Up @@ -216,24 +205,7 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser):
)

def run(self, vars_args: dict):
if (
"geotag_source" in vars_args
and vars_args["geotag_source"] == "blackvue_videos"
and (
"device_make" not in vars_args
or ("device_make" in vars_args and not vars_args["device_make"])
)
):
vars_args["device_make"] = "Blackvue"
if (
"device_make" in vars_args
and vars_args["device_make"]
and vars_args["device_make"].lower() == "blackvue"
):
vars_args["duplicate_angle"] = 360

metadatas = process_geotag_properties(
vars_args=vars_args,
**(
{
k: v
Expand Down
5 changes: 4 additions & 1 deletion mapillary_tools/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import os
import typing as T

Expand Down Expand Up @@ -30,7 +32,8 @@ def _yes_or_no(val: str) -> bool:
VIDEO_DURATION_RATIO = float(os.getenv(_ENV_PREFIX + "VIDEO_DURATION_RATIO", 1))
FFPROBE_PATH: str = os.getenv(_ENV_PREFIX + "FFPROBE_PATH", "ffprobe")
FFMPEG_PATH: str = os.getenv(_ENV_PREFIX + "FFMPEG_PATH", "ffmpeg")
EXIFTOOL_PATH: str = os.getenv(_ENV_PREFIX + "EXIFTOOL_PATH", "exiftool")
# When not set, MT will try to check both "exiftool" and "exiftool.exe" from $PATH
EXIFTOOL_PATH: str | None = os.getenv(_ENV_PREFIX + "EXIFTOOL_PATH")
IMAGE_DESCRIPTION_FILENAME = os.getenv(
_ENV_PREFIX + "IMAGE_DESCRIPTION_FILENAME", "mapillary_image_description.json"
)
Expand Down
24 changes: 19 additions & 5 deletions mapillary_tools/exiftool_read.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import datetime
import logging
import typing as T
Expand Down Expand Up @@ -93,11 +95,23 @@ def index_rdf_description_by_path(
LOG.warning(f"Failed to parse {xml_path}: {ex}", exc_info=verbose)
continue

elements = etree.iterfind(_DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES)
for element in elements:
path = find_rdf_description_path(element)
if path is not None:
rdf_description_by_path[canonical_path(path)] = element
rdf_description_by_path.update(
index_rdf_description_by_path_from_xml_element(etree.getroot())
)

return rdf_description_by_path


def index_rdf_description_by_path_from_xml_element(
element: ET.Element,
) -> dict[str, ET.Element]:
rdf_description_by_path: dict[str, ET.Element] = {}

elements = element.iterfind(_DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES)
for element in elements:
path = find_rdf_description_path(element)
if path is not None:
rdf_description_by_path[canonical_path(path)] = element

return rdf_description_by_path

Expand Down
77 changes: 77 additions & 0 deletions mapillary_tools/exiftool_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

import platform
import shutil
import subprocess
import typing as T
from pathlib import Path


class ExiftoolRunner:
"""
Wrapper around ExifTool to run it in a subprocess
"""

def __init__(self, exiftool_path: str | None = None, recursive: bool = False):
if exiftool_path is None:
exiftool_path = self._search_preferred_exiftool_path()
self.exiftool_path = exiftool_path
self.recursive = recursive

def _search_preferred_exiftool_path(self) -> str:
system = platform.system()

if system and system.lower() == "windows":
exiftool_paths = ["exiftool.exe", "exiftool"]
else:
exiftool_paths = ["exiftool", "exiftool.exe"]

for path in exiftool_paths:
full_path = shutil.which(path)
if full_path:
return path

# Always return the prefered one, even if it is not found,
# and let the subprocess.run figure out the error later
return exiftool_paths[0]

def _build_args_read_stdin(self) -> list[str]:
args: list[str] = [
self.exiftool_path,
"-q",
"-n", # Disable print conversion
"-X", # XML output
"-ee",
*["-api", "LargeFileSupport=1"],
*["-charset", "filename=utf8"],
*["-@", "-"],
]

if self.recursive:
args.append("-r")

return args

def extract_xml(self, paths: T.Sequence[Path]) -> str:
if not paths:
# ExifTool will show its full manual if no files are provided
raise ValueError("No files provided to exiftool")

# To handle non-latin1 filenames under Windows, we pass the path
# via stdin. See https://exiftool.org/faq.html#Q18
stdin = "\n".join([str(p.resolve()) for p in paths])

args = self._build_args_read_stdin()

# Raise FileNotFoundError here if self.exiftool_path not found
process = subprocess.run(
args,
capture_output=True,
text=True,
input=stdin,
encoding="utf-8",
# Do not check exit status to allow some files not found
# check=True,
)

return process.stdout
Loading
Loading