Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 144 additions & 35 deletions mapillary_tools/exif_read.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import abc
import datetime
import io
import logging
import re
import struct
import typing as T
import xml.etree.ElementTree as et
from fractions import Fraction
Expand All @@ -11,6 +13,7 @@
from exifread.utils import Ratio


LOG = logging.getLogger(__name__)
XMP_NAMESPACES = {
"exif": "http://ns.adobe.com/exif/1.0/",
"tiff": "http://ns.adobe.com/tiff/1.0/",
Expand Down Expand Up @@ -508,6 +511,86 @@ def _extract_alternative_fields(
return None


def extract_xmp_efficiently(fp) -> T.Optional[str]:
"""
Extract XMP metadata from a JPEG file efficiently by reading only necessary chunks.

Args:
image_path (str): Path to the JPEG image file

Returns:
str: Formatted XML string containing XMP metadata, or None if no XMP data found
"""
# JPEG markers
SOI_MARKER = b"\xff\xd8" # Start of Image
APP1_MARKER = b"\xff\xe1" # Application Segment 1 (where XMP usually lives)
XMP_IDENTIFIER = b"http://ns.adobe.com/xap/1.0/\x00"
XMP_META_TAG_BEGIN = b"<x:xmpmeta"
XMP_META_TAG_END = b"</x:xmpmeta>"

# Check for JPEG signature (SOI marker)
if fp.read(2) != SOI_MARKER:
return None

while True:
# Read marker
marker_bytes = fp.read(2)
if len(marker_bytes) < 2:
# End of file
break

# If not APP1, skip this segment
if marker_bytes != APP1_MARKER:
# Read length field (includes the length bytes themselves)
length_bytes = fp.read(2)
if len(length_bytes) < 2:
break

length = struct.unpack(">H", length_bytes)[0]
# Skip the rest of this segment (-2 because length includes length bytes)
fp.seek(length - 2, io.SEEK_CUR)
continue

# It's an APP1 segment - read length
length_bytes = fp.read(2)
if len(length_bytes) < 2:
break

length = struct.unpack(">H", length_bytes)[0]
segment_data_length = length - 2 # Subtract length field size

# Read enough bytes to check for XMP identifier
identifier_check = fp.read(len(XMP_IDENTIFIER))
if len(identifier_check) < len(XMP_IDENTIFIER):
break

# Check if this APP1 contains XMP data
if identifier_check == XMP_IDENTIFIER:
# We found XMP data - read the rest of the segment
remaining_length = segment_data_length - len(XMP_IDENTIFIER)
if remaining_length > 128 * 1024 * 1024:
raise ValueError("XMP data too large")
xmp_data = fp.read(remaining_length)

# Process the XMP data
begin_idx = xmp_data.find(XMP_META_TAG_BEGIN)
if begin_idx >= 0:
end_idx = xmp_data.rfind(XMP_META_TAG_END, begin_idx)
if end_idx >= 0:
xmp_data = xmp_data[begin_idx : end_idx + len(XMP_META_TAG_END)]
else:
xmp_data = xmp_data[begin_idx:]

return xmp_data.decode("utf-8")
else:
# Not XMP data - skip the rest of this APP1 segment
# We already read the identifier_check bytes, so subtract that
fp.seek(segment_data_length - len(identifier_check), io.SEEK_CUR)

# If we reach here, no XMP data was found
return None


class ExifReadFromEXIF(ExifReadABC):
"""
EXIF class for reading exif from an image
Expand All @@ -520,16 +603,20 @@ def __init__(self, path_or_stream: T.Union[Path, T.BinaryIO]) -> None:
if isinstance(path_or_stream, Path):
with path_or_stream.open("rb") as fp:
try:
self.tags = exifread.process_file(fp, details=True, debug=True)
except Exception:
# Turn off details and debug for performance reasons
self.tags = exifread.process_file(fp, details=False, debug=False)
except Exception as ex:
LOG.warning("Error reading EXIF from %s: %s", path_or_stream, ex)
self.tags = {}

else:
try:
# Turn off details and debug for performance reasons
self.tags = exifread.process_file(
path_or_stream, details=True, debug=True
path_or_stream, details=False, debug=False
)
except Exception:
except Exception as ex:
LOG.warning("Error reading EXIF: %s", ex)
self.tags = {}

def extract_altitude(self) -> T.Optional[float]:
Expand Down Expand Up @@ -775,27 +862,54 @@ def extract_application_notes(self) -> T.Optional[str]:


class ExifRead(ExifReadFromEXIF):
"""
Extract properties from EXIF first and then XMP
NOTE: For performance reasons, XMP is only extracted if EXIF does not contain the required fields
"""

def __init__(self, path_or_stream: T.Union[Path, T.BinaryIO]) -> None:
super().__init__(path_or_stream)
self._xmp = self._extract_xmp()
self._path_or_stream = path_or_stream
self._xml_extracted: bool = False
self._cached_xml: T.Optional[ExifReadFromXMP] = None

def _xmp_with_reason(self, reason: str) -> T.Optional[ExifReadFromXMP]:
if not self._xml_extracted:
LOG.debug('Extracting XMP for "%s"', reason)
self._cached_xml = self._extract_xmp()
self._xml_extracted = True

return self._cached_xml

def _extract_xmp(self) -> T.Optional[ExifReadFromXMP]:
application_notes = self.extract_application_notes()
if application_notes is None:
return None
xml_str = self.extract_application_notes()
if xml_str is None:
if isinstance(self._path_or_stream, Path):
with self._path_or_stream.open("rb") as fp:
xml_str = extract_xmp_efficiently(fp)
else:
self._path_or_stream.seek(0, io.SEEK_SET)
xml_str = extract_xmp_efficiently(self._path_or_stream)

if xml_str is None:
return None

try:
e = et.fromstring(application_notes)
except et.ParseError:
e = et.fromstring(xml_str)
except et.ParseError as ex:
LOG.warning("Error parsing XMP XML: %s: %s", ex, xml_str)
return None

return ExifReadFromXMP(et.ElementTree(e))

def extract_altitude(self) -> T.Optional[float]:
val = super().extract_altitude()
if val is not None:
return val
if self._xmp is None:
xmp = self._xmp_with_reason("altitude")
if xmp is None:
return None
val = self._xmp.extract_altitude()
val = xmp.extract_altitude()
if val is not None:
return val
return None
Expand All @@ -804,20 +918,10 @@ def extract_capture_time(self) -> T.Optional[datetime.datetime]:
val = super().extract_capture_time()
if val is not None:
return val
if self._xmp is None:
return None
val = self._xmp.extract_capture_time()
if val is not None:
return val
return None

def extract_direction(self) -> T.Optional[float]:
val = super().extract_direction()
if val is not None:
return val
if self._xmp is None:
xmp = self._xmp_with_reason("capture_time")
if xmp is None:
return None
val = self._xmp.extract_direction()
val = xmp.extract_capture_time()
if val is not None:
return val
return None
Expand All @@ -826,9 +930,10 @@ def extract_lon_lat(self) -> T.Optional[T.Tuple[float, float]]:
val = super().extract_lon_lat()
if val is not None:
return val
if self._xmp is None:
xmp = self._xmp_with_reason("lon_lat")
if xmp is None:
return None
val = self._xmp.extract_lon_lat()
val = xmp.extract_lon_lat()
if val is not None:
return val
return None
Expand All @@ -837,9 +942,10 @@ def extract_make(self) -> T.Optional[str]:
val = super().extract_make()
if val is not None:
return val
if self._xmp is None:
xmp = self._xmp_with_reason("make")
if xmp is None:
return None
val = self._xmp.extract_make()
val = xmp.extract_make()
if val is not None:
return val
return None
Expand All @@ -848,9 +954,10 @@ def extract_model(self) -> T.Optional[str]:
val = super().extract_model()
if val is not None:
return val
if self._xmp is None:
xmp = self._xmp_with_reason("model")
if xmp is None:
return None
val = self._xmp.extract_model()
val = xmp.extract_model()
if val is not None:
return val
return None
Expand All @@ -859,9 +966,10 @@ def extract_width(self) -> T.Optional[int]:
val = super().extract_width()
if val is not None:
return val
if self._xmp is None:
xmp = self._xmp_with_reason("width")
if xmp is None:
return None
val = self._xmp.extract_width()
val = xmp.extract_width()
if val is not None:
return val
return None
Expand All @@ -870,9 +978,10 @@ def extract_height(self) -> T.Optional[int]:
val = super().extract_height()
if val is not None:
return val
if self._xmp is None:
xmp = self._xmp_with_reason("width")
if xmp is None:
return None
val = self._xmp.extract_height()
val = xmp.extract_height()
if val is not None:
return val
return None
46 changes: 5 additions & 41 deletions mapillary_tools/geotag/geotag_images_from_exif.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,17 @@
import io
import logging
import typing as T
from multiprocessing import Pool
from pathlib import Path

from tqdm import tqdm

from .. import exceptions, exif_write, geo, types, utils
from .. import exceptions, geo, types, utils
from ..exif_read import ExifRead, ExifReadABC
from .geotag_from_generic import GeotagImagesFromGeneric

LOG = logging.getLogger(__name__)


def verify_image_exif_write(
metadata: types.ImageMetadata,
image_bytes: T.Optional[bytes] = None,
) -> None:
if image_bytes is None:
edit = exif_write.ExifEdit(metadata.filename)
else:
edit = exif_write.ExifEdit(image_bytes)

# The cast is to fix the type error in Python3.6:
# Argument 1 to "add_image_description" of "ExifEdit" has incompatible type "ImageDescription"; expected "Dict[str, Any]"
edit.add_image_description(
T.cast(T.Dict, types.desc_file_to_exif(types.as_desc(metadata)))
)

# Possible errors thrown here:
# - struct.error: 'H' format requires 0 <= number <= 65535
# - piexif.InvalidImageDataError
edit.dump_image_bytes()


class GeotagImagesFromEXIF(GeotagImagesFromGeneric):
def __init__(
self, image_paths: T.Sequence[Path], num_processes: T.Optional[int] = None
Expand Down Expand Up @@ -84,30 +62,16 @@ def geotag_image(
image_path: Path, skip_lonlat_error: bool = False
) -> types.ImageMetadataOrError:
try:
# load the image bytes into memory to avoid reading it multiple times
with image_path.open("rb") as fp:
image_bytesio = io.BytesIO(fp.read())

image_bytesio.seek(0, io.SEEK_SET)
exif = ExifRead(image_bytesio)

image_metadata = GeotagImagesFromEXIF.build_image_metadata(
image_path, exif, skip_lonlat_error=skip_lonlat_error
)

image_bytesio.seek(0, io.SEEK_SET)
verify_image_exif_write(
image_metadata,
image_bytes=image_bytesio.read(),
)
exif = ExifRead(fp)
image_metadata = GeotagImagesFromEXIF.build_image_metadata(
image_path, exif, skip_lonlat_error=skip_lonlat_error
)
except Exception as ex:
return types.describe_error_metadata(
ex, image_path, filetype=types.FileType.IMAGE
)

image_bytesio.seek(0, io.SEEK_SET)
image_metadata.update_md5sum(image_bytesio)

return image_metadata

def to_description(self) -> T.List[types.ImageMetadataOrError]:
Expand Down
6 changes: 1 addition & 5 deletions mapillary_tools/geotag/geotag_images_from_exiftool.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .. import exceptions, exiftool_read, types
from .geotag_from_generic import GeotagImagesFromGeneric
from .geotag_images_from_exif import GeotagImagesFromEXIF, verify_image_exif_write
from .geotag_images_from_exif import GeotagImagesFromEXIF

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,10 +40,6 @@ def geotag_image(element: ET.Element) -> types.ImageMetadataOrError:
with image_path.open("rb") as fp:
image_bytesio = io.BytesIO(fp.read())
image_bytesio.seek(0, io.SEEK_SET)
verify_image_exif_write(
image_metadata,
image_bytes=image_bytesio.read(),
)
except Exception as ex:
return types.describe_error_metadata(
ex, image_path, filetype=types.FileType.IMAGE
Expand Down
Loading
Loading