diff --git a/src/async_geotiff/__init__.py b/src/async_geotiff/__init__.py index b2ad35e..2c8e2cb 100644 --- a/src/async_geotiff/__init__.py +++ b/src/async_geotiff/__init__.py @@ -1,4 +1,5 @@ from ._geotiff import GeoTIFF +from ._overview import Overview from ._version import __version__ -__all__ = ["GeoTIFF", "__version__"] +__all__ = ["GeoTIFF", "Overview", "__version__"] diff --git a/src/async_geotiff/_geotiff.py b/src/async_geotiff/_geotiff.py index ec3fbe2..341a595 100644 --- a/src/async_geotiff/_geotiff.py +++ b/src/async_geotiff/_geotiff.py @@ -1,11 +1,13 @@ from __future__ import annotations +from dataclasses import dataclass, field from functools import cached_property from typing import TYPE_CHECKING, Literal, Self from affine import Affine from async_tiff import TIFF +from async_geotiff._overview import Overview from async_geotiff.enums import Compression, Interleaving, PhotometricInterp if TYPE_CHECKING: @@ -14,23 +16,28 @@ from async_tiff.store import ObjectStore +@dataclass(frozen=True, init=False, kw_only=True, repr=False) class GeoTIFF: - """A class representing a GeoTIFF dataset.""" + """A class representing a GeoTIFF image.""" _tiff: TIFF """The underlying async-tiff TIFF instance that we wrap. """ - _primary_ifd: ImageFileDirectory + _primary_ifd: ImageFileDirectory = field(init=False) """The primary (first) IFD of the GeoTIFF. Some tags, like most geo tags, only exist on the primary IFD. """ - _gkd: GeoKeyDirectory + _gkd: GeoKeyDirectory = field(init=False) """The GeoKeyDirectory of the primary IFD. """ + _overviews: list[Overview] = field(init=False) + """A list of overviews for the GeoTIFF. + """ + def __init__(self, tiff: TIFF) -> None: """Create a GeoTIFF from an existing TIFF instance.""" @@ -44,9 +51,44 @@ def __init__(self, tiff: TIFF) -> None: if len(tiff.ifds) == 0: raise ValueError("TIFF does not contain any IFDs") - self._tiff = tiff - self._primary_ifd = first_ifd - self._gkd = gkd + # We use object.__setattr__ because the dataclass is frozen + object.__setattr__(self, "_tiff", tiff) + object.__setattr__(self, "_primary_ifd", first_ifd) + object.__setattr__(self, "_gkd", gkd) + + # Skip the first IFD, since it's the primary image + ifd_idx = 1 + overviews: list[Overview] = [] + while True: + try: + data_ifd = (ifd_idx, tiff.ifds[ifd_idx]) + except IndexError: + # No more IFDs + break + + ifd_idx += 1 + + mask_ifd = None + next_ifd = None + try: + next_ifd = tiff.ifds[ifd_idx] + except IndexError: + # No more IFDs + pass + finally: + if next_ifd is not None and is_mask_ifd(next_ifd): + mask_ifd = (ifd_idx, next_ifd) + ifd_idx += 1 + + ovr = Overview._create( + geotiff=self, + gkd=gkd, + ifd=data_ifd, + mask_ifd=mask_ifd, + ) + overviews.append(ovr) + + object.__setattr__(self, "_overviews", overviews) @classmethod async def open( @@ -214,6 +256,11 @@ def nodata(self) -> float | None: return float(nodata) + @property + def overviews(self) -> list[Overview]: + """A list of overview levels for the dataset.""" + return self._overviews + @property def photometric(self) -> PhotometricInterp | None: """The photometric interpretation of the dataset.""" @@ -330,3 +377,15 @@ def has_geokeys(ifd: ImageFileDirectory) -> bool: """ return ifd.geo_key_directory is not None + + +def is_mask_ifd(ifd: ImageFileDirectory) -> bool: + """Check if an IFD is a mask IFD.""" + if ( + ifd.compression == Compression.deflate + and ifd.new_subfile_type + and ifd.photometric_interpretation == 4 + ): + return True + + return False diff --git a/src/async_geotiff/_overview.py b/src/async_geotiff/_overview.py new file mode 100644 index 0000000..f96dade --- /dev/null +++ b/src/async_geotiff/_overview.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from dataclasses import dataclass +from functools import cached_property +from typing import TYPE_CHECKING + +from affine import Affine + +if TYPE_CHECKING: + from async_tiff import GeoKeyDirectory, ImageFileDirectory + + from async_geotiff import GeoTIFF + + +@dataclass(init=False, frozen=True, kw_only=True, eq=False, repr=False) +class Overview: + """An overview level of a Cloud-Optimized GeoTIFF image.""" + + _geotiff: GeoTIFF + """A reference to the parent GeoTIFF object. + """ + + _gkd: GeoKeyDirectory + """The GeoKeyDirectory of the primary IFD. + """ + + _ifd: tuple[int, ImageFileDirectory] + """The IFD for this overview level. + + (positional index of the IFD in the TIFF file, IFD object) + """ + + _mask_ifd: tuple[int, ImageFileDirectory] | None + """The IFD for the mask associated with this overview level, if any. + + (positional index of the IFD in the TIFF file, IFD object) + """ + + @classmethod + def _create( + cls, + *, + geotiff: GeoTIFF, + gkd: GeoKeyDirectory, + ifd: tuple[int, ImageFileDirectory], + mask_ifd: tuple[int, ImageFileDirectory] | None, + ) -> Overview: + instance = cls.__new__(cls) + + # We use object.__setattr__ because the dataclass is frozen + object.__setattr__(instance, "_geotiff", geotiff) + object.__setattr__(instance, "_gkd", gkd) + object.__setattr__(instance, "_ifd", ifd) + object.__setattr__(instance, "_mask_ifd", mask_ifd) + + return instance + + @cached_property + def transform(self) -> Affine: + """The affine transform mapping pixel coordinates to geographic coordinates. + + Returns: + Affine: The affine transform. + """ + full_transform = self._geotiff.transform + + overview_width = self._ifd[1].image_width + full_width = self._geotiff.width + overview_height = self._ifd[1].image_height + full_height = self._geotiff.height + + scale_x = full_width / overview_width + scale_y = full_height / overview_height + + return full_transform * Affine.scale(scale_x, scale_y) diff --git a/tests/test_overview.py b/tests/test_overview.py new file mode 100644 index 0000000..2a1eb46 --- /dev/null +++ b/tests/test_overview.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Awaitable, Callable + +import pytest +from affine import Affine + +from async_geotiff import GeoTIFF + +if TYPE_CHECKING: + from rasterio.io import DatasetReader + + LoadGeoTIFF = Callable[[str], Awaitable[GeoTIFF]] + LoadRasterio = Callable[[str], DatasetReader] + + +@pytest.mark.asyncio +async def test_overview_transform( + load_geotiff: LoadGeoTIFF, load_rasterio: LoadRasterio +) -> None: + name = "uint8_rgb_deflate_block64_cog" + + geotiff = await load_geotiff(name) + ovr = geotiff.overviews[0] + + with load_rasterio(name) as rasterio_ds: + assert len(geotiff.overviews) == len(rasterio_ds.overviews(1)) + + overviews = rasterio_ds.overviews(1) + overview_level = overviews[0] + decimated_transform = rasterio_ds.transform * Affine.scale(overview_level) + + assert ovr.transform == decimated_transform