diff --git a/python/Cargo.toml b/python/Cargo.toml index cad90e2..71d204a 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -25,6 +25,8 @@ pyo3 = { version = "0.23.0", features = ["macros"] } pyo3-async-runtimes = "0.23" pyo3-bytes = "0.1.2" pyo3-object_store = { git = "https://github.com/developmentseed/obstore", rev = "28ba07a621c1c104f084fb47ae7f8d08b1eae3ea" } +rayon = "1.10.0" +tokio-rayon = "2.1.0" thiserror = "1" # We opt-in to using rustls as the TLS provider for reqwest, which is the HTTP diff --git a/python/python/async_tiff/_async_tiff.pyi b/python/python/async_tiff/_async_tiff.pyi index b20bd8f..f5babf4 100644 --- a/python/python/async_tiff/_async_tiff.pyi +++ b/python/python/async_tiff/_async_tiff.pyi @@ -1,3 +1,7 @@ +from ._decoder import Decoder as Decoder +from ._decoder import DecoderRegistry as DecoderRegistry from ._geo import GeoKeyDirectory as GeoKeyDirectory from ._ifd import ImageFileDirectory as ImageFileDirectory +from ._thread_pool import ThreadPool as ThreadPool from ._tiff import TIFF as TIFF +from ._tile import Tile as Tile diff --git a/python/python/async_tiff/_decoder.pyi b/python/python/async_tiff/_decoder.pyi index 070eddc..f46d0e8 100644 --- a/python/python/async_tiff/_decoder.pyi +++ b/python/python/async_tiff/_decoder.pyi @@ -10,5 +10,6 @@ class Decoder(Protocol): def __call__(buffer: Buffer) -> Buffer: ... class DecoderRegistry: - def __init__(self) -> None: ... - def add(self, compression: CompressionMethod | int, decoder: Decoder) -> None: ... + def __init__( + self, decoders: dict[CompressionMethod | int, Decoder] | None = None + ) -> None: ... diff --git a/python/python/async_tiff/_thread_pool.pyi b/python/python/async_tiff/_thread_pool.pyi new file mode 100644 index 0000000..0363c7d --- /dev/null +++ b/python/python/async_tiff/_thread_pool.pyi @@ -0,0 +1,2 @@ +class ThreadPool: + def __init__(self, num_threads: int) -> None: ... diff --git a/python/python/async_tiff/_tile.pyi b/python/python/async_tiff/_tile.pyi new file mode 100644 index 0000000..884f30d --- /dev/null +++ b/python/python/async_tiff/_tile.pyi @@ -0,0 +1,12 @@ +from collections.abc import Buffer + +from ._decoder import DecoderRegistry +from ._thread_pool import ThreadPool + +class Tile: + async def decode( + self, + *, + decoder_registry: DecoderRegistry | None = None, + pool: ThreadPool | None = None, + ) -> Buffer: ... diff --git a/python/src/decoder.rs b/python/src/decoder.rs index 56bc8ed..06471a1 100644 --- a/python/src/decoder.rs +++ b/python/src/decoder.rs @@ -1,3 +1,6 @@ +use std::collections::HashMap; +use std::sync::Arc; + use async_tiff::decoder::{Decoder, DecoderRegistry}; use async_tiff::error::AiocogeoError; use async_tiff::tiff::tags::PhotometricInterpretation; @@ -5,30 +8,48 @@ use bytes::Bytes; use pyo3::exceptions::PyTypeError; use pyo3::intern; use pyo3::prelude::*; +use pyo3::sync::GILOnceCell; use pyo3::types::{PyDict, PyTuple}; use pyo3_bytes::PyBytes; use crate::enums::PyCompressionMethod; -#[pyclass(name = "DecoderRegistry")] -pub(crate) struct PyDecoderRegistry(DecoderRegistry); +static DEFAULT_DECODER_REGISTRY: GILOnceCell> = GILOnceCell::new(); + +pub fn get_default_decoder_registry(py: Python<'_>) -> Arc { + let registry = + DEFAULT_DECODER_REGISTRY.get_or_init(py, || Arc::new(DecoderRegistry::default())); + registry.clone() +} + +#[pyclass(name = "DecoderRegistry", frozen)] +#[derive(Debug, Default)] +pub(crate) struct PyDecoderRegistry(Arc); #[pymethods] impl PyDecoderRegistry { #[new] - fn new() -> Self { - Self(DecoderRegistry::default()) + #[pyo3(signature = (decoders = None))] + pub(crate) fn new(decoders: Option>) -> Self { + let mut decoder_registry = DecoderRegistry::default(); + if let Some(decoders) = decoders { + for (compression, decoder) in decoders.into_iter() { + decoder_registry + .as_mut() + .insert(compression.into(), Box::new(decoder)); + } + } + Self(Arc::new(decoder_registry)) } - - fn add(&mut self, compression: PyCompressionMethod, decoder: PyDecoder) { - self.0 - .as_mut() - .insert(compression.into(), Box::new(decoder)); +} +impl PyDecoderRegistry { + pub(crate) fn inner(&self) -> &Arc { + &self.0 } } #[derive(Debug)] -struct PyDecoder(PyObject); +pub(crate) struct PyDecoder(PyObject); impl PyDecoder { fn call(&self, py: Python, buffer: Bytes) -> PyResult { diff --git a/python/src/enums.rs b/python/src/enums.rs index 91baa90..c80fbb4 100644 --- a/python/src/enums.rs +++ b/python/src/enums.rs @@ -6,6 +6,7 @@ use pyo3::intern; use pyo3::prelude::*; use pyo3::types::{PyString, PyTuple}; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub(crate) struct PyCompressionMethod(CompressionMethod); impl From for PyCompressionMethod { diff --git a/python/src/lib.rs b/python/src/lib.rs index e35cec0..acfa7a0 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -4,13 +4,16 @@ mod decoder; mod enums; mod geo; mod ifd; +mod thread_pool; mod tiff; +mod tile; use pyo3::prelude::*; use crate::decoder::PyDecoderRegistry; use crate::geo::PyGeoKeyDirectory; use crate::ifd::PyImageFileDirectory; +use crate::thread_pool::PyThreadPool; use crate::tiff::PyTIFF; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -48,6 +51,7 @@ fn _async_tiff(py: Python, m: &Bound) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; pyo3_object_store::register_store_module(py, m, "async_tiff")?; diff --git a/python/src/thread_pool.rs b/python/src/thread_pool.rs new file mode 100644 index 0000000..0f86f01 --- /dev/null +++ b/python/src/thread_pool.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; + +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; + +use pyo3::sync::GILOnceCell; +use rayon::{ThreadPool, ThreadPoolBuilder}; + +static DEFAULT_POOL: GILOnceCell> = GILOnceCell::new(); + +pub fn get_default_pool(py: Python<'_>) -> PyResult> { + let runtime = DEFAULT_POOL.get_or_try_init(py, || { + let pool = ThreadPoolBuilder::new().build().map_err(|err| { + PyValueError::new_err(format!("Could not create rayon threadpool. {}", err)) + })?; + Ok::<_, PyErr>(Arc::new(pool)) + })?; + Ok(runtime.clone()) +} + +#[pyclass(name = "ThreadPool", frozen, module = "async_tiff")] +pub(crate) struct PyThreadPool(Arc); + +#[pymethods] +impl PyThreadPool { + #[new] + fn new(num_threads: usize) -> PyResult { + let pool = ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .map_err(|err| { + PyValueError::new_err(format!("Could not create rayon threadpool. {}", err)) + })?; + Ok(Self(Arc::new(pool))) + } +} + +impl PyThreadPool { + pub(crate) fn inner(&self) -> &Arc { + &self.0 + } +} + +impl AsRef for PyThreadPool { + fn as_ref(&self) -> &ThreadPool { + &self.0 + } +} diff --git a/python/src/tile.rs b/python/src/tile.rs new file mode 100644 index 0000000..5b34296 --- /dev/null +++ b/python/src/tile.rs @@ -0,0 +1,40 @@ +use async_tiff::Tile; +use pyo3::prelude::*; +use pyo3_async_runtimes::tokio::future_into_py; +use pyo3_bytes::PyBytes; +use tokio_rayon::AsyncThreadPool; + +use crate::decoder::get_default_decoder_registry; +use crate::thread_pool::{get_default_pool, PyThreadPool}; +use crate::PyDecoderRegistry; + +#[pyclass(name = "Tile")] +pub(crate) struct PyTile(Option); + +#[pymethods] +impl PyTile { + #[pyo3(signature = (*, decoder_registry=None, pool=None))] + fn decode_async( + &mut self, + py: Python, + decoder_registry: Option<&PyDecoderRegistry>, + pool: Option<&PyThreadPool>, + ) -> PyResult { + let decoder_registry = decoder_registry + .map(|r| r.inner().clone()) + .unwrap_or_else(|| get_default_decoder_registry(py)); + let pool = pool + .map(|p| Ok(p.inner().clone())) + .unwrap_or_else(|| get_default_pool(py))?; + let tile = self.0.take().unwrap(); + + let result = future_into_py(py, async move { + let decoded_bytes = pool + .spawn_async(move || tile.decode(&decoder_registry)) + .await + .unwrap(); + Ok(PyBytes::new(decoded_bytes)) + })?; + Ok(result.unbind()) + } +} diff --git a/src/cog.rs b/src/cog.rs index 8d17ff1..01e498f 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -9,8 +9,6 @@ pub struct COGReader { #[allow(dead_code)] cursor: AsyncCursor, ifds: ImageFileDirectories, - #[allow(dead_code)] - bigtiff: bool, } impl COGReader { @@ -46,30 +44,12 @@ impl COGReader { let ifds = ImageFileDirectories::open(&mut cursor, first_ifd_location, bigtiff).await?; - Ok(Self { - cursor, - ifds, - bigtiff, - }) + Ok(Self { cursor, ifds }) } pub fn ifds(&self) -> &ImageFileDirectories { &self.ifds } - - /// Return the EPSG code representing the crs of the image - pub fn epsg(&self) -> Option { - let ifd = &self.ifds.as_ref()[0]; - ifd.geo_key_directory - .as_ref() - .and_then(|gkd| gkd.epsg_code()) - } - - /// Return the bounds of the image in native crs - pub fn native_bounds(&self) -> Option<(f64, f64, f64, f64)> { - let ifd = &self.ifds.as_ref()[0]; - ifd.native_bounds() - } } #[cfg(test)] diff --git a/src/geo/affine.rs b/src/geo/affine.rs index 78f7ac6..de5cc89 100644 --- a/src/geo/affine.rs +++ b/src/geo/affine.rs @@ -1,3 +1,5 @@ +use crate::ImageFileDirectory; + /// Affine transformation values. #[derive(Debug)] pub struct AffineTransform(f64, f64, f64, f64, f64, f64); @@ -30,4 +32,22 @@ impl AffineTransform { pub fn f(&self) -> f64 { self.5 } + + /// Construct a new Affine Transform from the IFD + pub fn from_ifd(ifd: &ImageFileDirectory) -> Option { + if let (Some(model_pixel_scale), Some(model_tiepoint)) = + (&ifd.model_pixel_scale, &ifd.model_tiepoint) + { + Some(Self::new( + model_pixel_scale[0], + 0.0, + model_tiepoint[3], + 0.0, + -model_pixel_scale[1], + model_tiepoint[4], + )) + } else { + None + } + } } diff --git a/src/ifd.rs b/src/ifd.rs index 98d3931..c13731c 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -7,14 +7,14 @@ use num_enum::TryFromPrimitive; use crate::async_reader::AsyncCursor; use crate::error::{AiocogeoError, Result}; -use crate::geo::{AffineTransform, GeoKeyDirectory, GeoKeyTag}; +use crate::geo::{GeoKeyDirectory, GeoKeyTag}; use crate::tiff::tags::{ CompressionMethod, PhotometricInterpretation, PlanarConfiguration, Predictor, ResolutionUnit, SampleFormat, Tag, Type, }; use crate::tiff::TiffError; use crate::tiff::Value; -use crate::tile::TiffTile; +use crate::tile::Tile; use crate::AsyncFileReader; const DOCUMENT_NAME: u16 = 269; @@ -663,19 +663,6 @@ impl ImageFileDirectory { self.model_tiepoint.as_deref() } - /// Check if an IFD is masked based on a dictionary of tiff tags - /// https://www.awaresystems.be/imaging/tiff/tifftags/newsubfiletype.html - /// https://gdal.org/drivers/raster/gtiff.html#internal-nodata-masks - pub fn is_masked(&self) -> bool { - if let Some(subfile_type) = self.new_subfile_type { - (subfile_type == 1 || subfile_type == 2) - && self.photometric_interpretation == PhotometricInterpretation::TransparencyMask - && self.compression == CompressionMethod::Deflate - } else { - false - } - } - /// Construct colormap from colormap tag pub fn colormap(&self) -> Option> { fn cmap_transform(val: u16) -> u8 { @@ -709,15 +696,6 @@ impl ImageFileDirectory { } } - /// Returns true if this IFD contains a full resolution image (not an overview) - pub fn is_full_resolution(&self) -> bool { - if let Some(val) = self.new_subfile_type { - val != 0 - } else { - true - } - } - fn get_tile_byte_range(&self, x: usize, y: usize) -> Option> { let tile_offsets = self.tile_offsets.as_deref()?; let tile_byte_counts = self.tile_byte_counts.as_deref()?; @@ -734,12 +712,12 @@ impl ImageFileDirectory { x: usize, y: usize, reader: &dyn AsyncFileReader, - ) -> Result { + ) -> Result { let range = self .get_tile_byte_range(x, y) .ok_or(AiocogeoError::General("Not a tiled TIFF".to_string()))?; let compressed_bytes = reader.get_bytes(range).await?; - Ok(TiffTile { + Ok(Tile { x, y, compressed_bytes, @@ -754,7 +732,7 @@ impl ImageFileDirectory { x: &[usize], y: &[usize], reader: &dyn AsyncFileReader, - ) -> Result> { + ) -> Result> { assert_eq!(x.len(), y.len(), "x and y should have same len"); // 1: Get all the byte ranges for all tiles @@ -773,7 +751,7 @@ impl ImageFileDirectory { // 3: Create tile objects let mut tiles = vec![]; for ((compressed_bytes, &x), &y) in buffers.into_iter().zip(x).zip(y) { - let tile = TiffTile { + let tile = Tile { x, y, compressed_bytes, @@ -793,40 +771,6 @@ impl ImageFileDirectory { let y_count = (self.image_height as f64 / self.tile_height? as f64).ceil(); Some((x_count as usize, y_count as usize)) } - - /// Return the geotransform of the image - /// - /// This does not yet implement decimation - pub fn geotransform(&self) -> Option { - if let (Some(model_pixel_scale), Some(model_tiepoint)) = - (&self.model_pixel_scale, &self.model_tiepoint) - { - Some(AffineTransform::new( - model_pixel_scale[0], - 0.0, - model_tiepoint[3], - 0.0, - -model_pixel_scale[1], - model_tiepoint[4], - )) - } else { - None - } - } - - /// Return the bounds of the image in native crs - pub fn native_bounds(&self) -> Option<(f64, f64, f64, f64)> { - if let Some(gt) = self.geotransform() { - let tlx = gt.c(); - let tly = gt.f(); - - let brx = tlx + (gt.a() * self.image_width as f64); - let bry = tly + (gt.e() * self.image_height as f64); - Some((tlx, bry, brx, tly)) - } else { - None - } - } } /// Read a single tag from the cursor diff --git a/src/lib.rs b/src/lib.rs index 6aa8fb4..fdec48b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,3 +12,4 @@ mod tile; pub use async_reader::{AsyncFileReader, ObjectReader, PrefetchReader}; pub use cog::COGReader; pub use ifd::{ImageFileDirectories, ImageFileDirectory}; +pub use tile::Tile; diff --git a/src/tile.rs b/src/tile.rs index c345603..92a56ee 100644 --- a/src/tile.rs +++ b/src/tile.rs @@ -12,7 +12,7 @@ use crate::tiff::{TiffError, TiffUnsupportedError}; /// /// This is returned by `fetch_tile`. #[derive(Debug)] -pub struct TiffTile { +pub struct Tile { pub(crate) x: usize, pub(crate) y: usize, pub(crate) compressed_bytes: Bytes, @@ -21,7 +21,7 @@ pub struct TiffTile { pub(crate) jpeg_tables: Option, } -impl TiffTile { +impl Tile { /// The column index of this tile. pub fn x(&self) -> usize { self.x @@ -60,7 +60,7 @@ impl TiffTile { /// /// Decoding is separate from fetching so that sync and async operations do not block the same /// runtime. - pub fn decode(&self, decoder_registry: &DecoderRegistry) -> Result { + pub fn decode(self, decoder_registry: &DecoderRegistry) -> Result { let decoder = decoder_registry .as_ref() .get(&self.compression_method)