From 3b6a51d117ba0ed02bf1789117f73d2e9393c32d Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Thu, 20 Mar 2025 18:36:39 +0100 Subject: [PATCH 1/6] pulled `coalesce_ranges` conditionally in-tree, feature-gated object_store and reqwest --- Cargo.toml | 9 ++++- src/cog.rs | 1 + src/error.rs | 2 + src/lib.rs | 7 ++++ src/object_store/mod.rs | 1 + src/object_store/util.rs | 84 ++++++++++++++++++++++++++++++++++++++++ src/reader.rs | 23 +++++++---- 7 files changed, 117 insertions(+), 10 deletions(-) create mode 100644 src/object_store/mod.rs create mode 100644 src/object_store/util.rs diff --git a/Cargo.toml b/Cargo.toml index 1c4ed93..ae7990b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,10 +15,10 @@ flate2 = "1.0.20" futures = "0.3.31" jpeg = { package = "jpeg-decoder", version = "0.3.0", default-features = false } num_enum = "0.7.3" -object_store = "0.12" +object_store = { version = "0.12", optional = true } # In the future we could make this feature-flagged, but for now we depend on # object_store which uses reqwest. -reqwest = { version = "0.12", default-features = false } +reqwest = { version = "0.12", default-features = false, optional = true } thiserror = "1" tokio = { version = "1.43.0", optional = true } weezl = "0.1.0" @@ -26,3 +26,8 @@ weezl = "0.1.0" [dev-dependencies] tiff = "0.9.1" tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] } + +[features] +default = ["object_store"] +reqwest = ["dep:reqwest"] +object_store = ["dep:object_store", "reqwest"] diff --git a/src/cog.rs b/src/cog.rs index e9e3c5b..69a5d98 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -56,6 +56,7 @@ impl TIFF { } } +#[cfg(feature = "object_store")] #[cfg(test)] mod test { use std::io::BufReader; diff --git a/src/error.rs b/src/error.rs index 9f5b7f0..720f796 100644 --- a/src/error.rs +++ b/src/error.rs @@ -24,6 +24,7 @@ pub enum AsyncTiffError { JPEGDecodingError(#[from] jpeg::Error), /// Error while fetching data using object store. + #[cfg(feature = "object_store")] #[error(transparent)] ObjectStore(#[from] object_store::Error), @@ -32,6 +33,7 @@ pub enum AsyncTiffError { InternalTIFFError(#[from] crate::tiff::TiffError), /// Reqwest error + #[cfg(feature = "reqwest")] // see https://www.reddit.com/r/rust/comments/xyik51/comment/irhei39/ #[error(transparent)] ReqwestError(#[from] reqwest::Error), diff --git a/src/lib.rs b/src/lib.rs index 6c3445b..c34f51e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,13 @@ mod ifd; pub mod tiff; mod tile; +#[cfg(not(feature = "object_store"))] +mod object_store; +#[cfg(feature = "object_store")] +pub use object_store::coalesce_ranges; +#[cfg(not(feature = "object_store"))] +pub use object_store::util::coalesce_ranges; + pub use cog::TIFF; pub use ifd::{ImageFileDirectories, ImageFileDirectory}; pub use tile::Tile; diff --git a/src/object_store/mod.rs b/src/object_store/mod.rs new file mode 100644 index 0000000..812d1ed --- /dev/null +++ b/src/object_store/mod.rs @@ -0,0 +1 @@ +pub mod util; diff --git a/src/object_store/util.rs b/src/object_store/util.rs new file mode 100644 index 0000000..f9065c0 --- /dev/null +++ b/src/object_store/util.rs @@ -0,0 +1,84 @@ +use bytes::Bytes; +use futures::stream::{StreamExt, TryStreamExt}; +use std::ops::Range; + +const COALESCE_PARALLEL: usize = 10; + +/// Takes a function `fetch` that can fetch a range of bytes and uses this to +/// fetch the provided byte `ranges` +/// +/// To improve performance it will: +/// +/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch` +/// * Make multiple `fetch` requests in parallel (up to maximum of 10) +/// +pub async fn coalesce_ranges( + ranges: &[Range], + fetch: F, + coalesce: u64, +) -> Result, E> +where + F: Send + FnMut(Range) -> Fut, + E: Send, + Fut: std::future::Future> + Send, +{ + let fetch_ranges = merge_ranges(ranges, coalesce); + + let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned()) + .map(fetch) + .buffered(COALESCE_PARALLEL) + .try_collect() + .await?; + + Ok(ranges + .iter() + .map(|range| { + let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let fetch_range = &fetch_ranges[idx]; + let fetch_bytes = &fetched[idx]; + + let start = range.start - fetch_range.start; + let end = range.end - fetch_range.start; + let range = (start as usize)..(end as usize).min(fetch_bytes.len()); + fetch_bytes.slice(range) + }) + .collect()) +} + +/// Returns a sorted list of ranges that cover `ranges` +fn merge_ranges(ranges: &[Range], coalesce: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut ranges = ranges.to_vec(); + ranges.sort_unstable_by_key(|range| range.start); + + let mut ret = Vec::with_capacity(ranges.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != ranges.len() { + let mut range_end = ranges[start_idx].end; + + while end_idx != ranges.len() + && ranges[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(ranges[end_idx].end); + end_idx += 1; + } + + let start = ranges[start_idx].start; + let end = range_end; + ret.push(start..end); + + start_idx = end_idx; + end_idx += 1; + } + + ret +} diff --git a/src/reader.rs b/src/reader.rs index 8563077..e55eace 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -9,8 +9,11 @@ use byteorder::{BigEndian, LittleEndian, ReadBytesExt}; use bytes::buf::Reader; use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +#[cfg(feature = "object_store")] use object_store::ObjectStore; +use crate::coalesce_ranges; + use crate::error::{AsyncTiffError, AsyncTiffResult}; /// The asynchronous interface used to read COG files @@ -33,8 +36,10 @@ pub trait AsyncFileReader: Debug + Send + Sync { /// Retrieve the bytes in `range` fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; - /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` - /// sequentially + /// Retrieve multiple byte ranges. The default implementation will + /// coalesce ranges with: + /// - less than 1024*1024=1MB space in between + /// - 10 parallel requests fn get_byte_ranges( &self, ranges: Vec>, @@ -42,8 +47,8 @@ pub trait AsyncFileReader: Debug + Send + Sync { async move { let mut result = Vec::with_capacity(ranges.len()); - for range in ranges.into_iter() { - let data = self.get_bytes(range).await?; + for data in coalesce_ranges(&ranges, |range| self.get_bytes(range), 1024 * 1024).await? + { result.push(data); } @@ -91,12 +96,13 @@ impl AsyncFileReader for Box { // } /// An AsyncFileReader that reads from an [`ObjectStore`] instance. +#[cfg(feature = "object_store")] #[derive(Clone, Debug)] pub struct ObjectReader { store: Arc, path: object_store::path::Path, } - +#[cfg(feature = "object_store")] impl ObjectReader { /// Creates a new [`ObjectReader`] for the provided [`ObjectStore`] and path /// @@ -105,7 +111,7 @@ impl ObjectReader { Self { store, path } } } - +#[cfg(feature = "object_store")] impl AsyncFileReader for ObjectReader { fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { let range = range.start as _..range.end as _; @@ -134,19 +140,20 @@ impl AsyncFileReader for ObjectReader { } /// An AsyncFileReader that reads from a URL using reqwest. +#[cfg(feature = "reqwest")] #[derive(Debug, Clone)] pub struct ReqwestReader { client: reqwest::Client, url: reqwest::Url, } - +#[cfg(feature = "reqwest")] impl ReqwestReader { /// Construct a new ReqwestReader from a reqwest client and URL. pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self { Self { client, url } } } - +#[cfg(feature = "reqwest")] impl AsyncFileReader for ReqwestReader { fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { let url = self.url.clone(); From 8476f2c21b7bb4b5de07c3c115d5803757bbf709 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Thu, 20 Mar 2025 23:13:30 +0100 Subject: [PATCH 2/6] undid coalesce_ranges addition --- src/lib.rs | 7 ---- src/object_store/mod.rs | 1 - src/object_store/util.rs | 84 ---------------------------------------- src/reader.rs | 12 ++---- 4 files changed, 4 insertions(+), 100 deletions(-) delete mode 100644 src/object_store/mod.rs delete mode 100644 src/object_store/util.rs diff --git a/src/lib.rs b/src/lib.rs index c34f51e..6c3445b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,13 +11,6 @@ mod ifd; pub mod tiff; mod tile; -#[cfg(not(feature = "object_store"))] -mod object_store; -#[cfg(feature = "object_store")] -pub use object_store::coalesce_ranges; -#[cfg(not(feature = "object_store"))] -pub use object_store::util::coalesce_ranges; - pub use cog::TIFF; pub use ifd::{ImageFileDirectories, ImageFileDirectory}; pub use tile::Tile; diff --git a/src/object_store/mod.rs b/src/object_store/mod.rs deleted file mode 100644 index 812d1ed..0000000 --- a/src/object_store/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod util; diff --git a/src/object_store/util.rs b/src/object_store/util.rs deleted file mode 100644 index f9065c0..0000000 --- a/src/object_store/util.rs +++ /dev/null @@ -1,84 +0,0 @@ -use bytes::Bytes; -use futures::stream::{StreamExt, TryStreamExt}; -use std::ops::Range; - -const COALESCE_PARALLEL: usize = 10; - -/// Takes a function `fetch` that can fetch a range of bytes and uses this to -/// fetch the provided byte `ranges` -/// -/// To improve performance it will: -/// -/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch` -/// * Make multiple `fetch` requests in parallel (up to maximum of 10) -/// -pub async fn coalesce_ranges( - ranges: &[Range], - fetch: F, - coalesce: u64, -) -> Result, E> -where - F: Send + FnMut(Range) -> Fut, - E: Send, - Fut: std::future::Future> + Send, -{ - let fetch_ranges = merge_ranges(ranges, coalesce); - - let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned()) - .map(fetch) - .buffered(COALESCE_PARALLEL) - .try_collect() - .await?; - - Ok(ranges - .iter() - .map(|range| { - let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; - let fetch_range = &fetch_ranges[idx]; - let fetch_bytes = &fetched[idx]; - - let start = range.start - fetch_range.start; - let end = range.end - fetch_range.start; - let range = (start as usize)..(end as usize).min(fetch_bytes.len()); - fetch_bytes.slice(range) - }) - .collect()) -} - -/// Returns a sorted list of ranges that cover `ranges` -fn merge_ranges(ranges: &[Range], coalesce: u64) -> Vec> { - if ranges.is_empty() { - return vec![]; - } - - let mut ranges = ranges.to_vec(); - ranges.sort_unstable_by_key(|range| range.start); - - let mut ret = Vec::with_capacity(ranges.len()); - let mut start_idx = 0; - let mut end_idx = 1; - - while start_idx != ranges.len() { - let mut range_end = ranges[start_idx].end; - - while end_idx != ranges.len() - && ranges[end_idx] - .start - .checked_sub(range_end) - .map(|delta| delta <= coalesce) - .unwrap_or(true) - { - range_end = range_end.max(ranges[end_idx].end); - end_idx += 1; - } - - let start = ranges[start_idx].start; - let end = range_end; - ret.push(start..end); - - start_idx = end_idx; - end_idx += 1; - } - - ret -} diff --git a/src/reader.rs b/src/reader.rs index e55eace..f343cfd 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -12,8 +12,6 @@ use futures::future::{BoxFuture, FutureExt, TryFutureExt}; #[cfg(feature = "object_store")] use object_store::ObjectStore; -use crate::coalesce_ranges; - use crate::error::{AsyncTiffError, AsyncTiffResult}; /// The asynchronous interface used to read COG files @@ -36,10 +34,8 @@ pub trait AsyncFileReader: Debug + Send + Sync { /// Retrieve the bytes in `range` fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; - /// Retrieve multiple byte ranges. The default implementation will - /// coalesce ranges with: - /// - less than 1024*1024=1MB space in between - /// - 10 parallel requests + /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` + /// sequentially fn get_byte_ranges( &self, ranges: Vec>, @@ -47,8 +43,8 @@ pub trait AsyncFileReader: Debug + Send + Sync { async move { let mut result = Vec::with_capacity(ranges.len()); - for data in coalesce_ranges(&ranges, |range| self.get_bytes(range), 1024 * 1024).await? - { + for range in ranges.into_iter() { + let data = self.get_bytes(range).await?; result.push(data); } From 1b4de75b09ce2a4afc39137fad3baf1e635ab6bf Mon Sep 17 00:00:00 2001 From: Fee Fladder <33122845+feefladder@users.noreply.github.com> Date: Sat, 22 Mar 2025 17:58:19 +0100 Subject: [PATCH 3/6] Update src/error.rs Co-authored-by: Kyle Barron --- src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 720f796..ae0b7eb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -33,7 +33,7 @@ pub enum AsyncTiffError { InternalTIFFError(#[from] crate::tiff::TiffError), /// Reqwest error - #[cfg(feature = "reqwest")] // see https://www.reddit.com/r/rust/comments/xyik51/comment/irhei39/ + #[cfg(feature = "reqwest")] #[error(transparent)] ReqwestError(#[from] reqwest::Error), From 11fea2ad716944b4b378fb74c9a401c90c99501f Mon Sep 17 00:00:00 2001 From: Fee Fladder <33122845+feefladder@users.noreply.github.com> Date: Sat, 22 Mar 2025 17:58:26 +0100 Subject: [PATCH 4/6] Update Cargo.toml Co-authored-by: Kyle Barron --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ae7990b..844fc22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,6 @@ tiff = "0.9.1" tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] } [features] -default = ["object_store"] +default = ["object_store", "reqwest"] reqwest = ["dep:reqwest"] -object_store = ["dep:object_store", "reqwest"] +object_store = ["dep:object_store"] From b1ea07901b4e8cce71e46d9467daa35da17263b7 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Sun, 23 Mar 2025 03:07:48 +0100 Subject: [PATCH 5/6] added CI metadata to toml --- Cargo.toml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 844fc22..96f1cd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,11 @@ tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] } default = ["object_store", "reqwest"] reqwest = ["dep:reqwest"] object_store = ["dep:object_store"] + +[package.metadata.cargo-all-features] + +# If your crate has a large number of optional dependencies, skip them for speed +skip_optional_dependencies = true + +# Exclude certain features from the build matrix +denylist = ["default"] From 0da465057f27843181b2be7a0020504bd23121b4 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Fri, 21 Mar 2025 00:54:22 +0100 Subject: [PATCH 6/6] added specific functions to AsyncFileReader trait so we know where we are wrt. caching --- src/ifd.rs | 4 ++-- src/reader.rs | 12 ++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/ifd.rs b/src/ifd.rs index d522826..d5b1f9c 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -779,7 +779,7 @@ impl ImageFileDirectory { let range = self .get_tile_byte_range(x, y) .ok_or(AsyncTiffError::General("Not a tiled TIFF".to_string()))?; - let compressed_bytes = reader.get_bytes(range).await?; + let compressed_bytes = reader.get_tile_bytes(range).await?; Ok(Tile { x, y, @@ -810,7 +810,7 @@ impl ImageFileDirectory { .collect::>>()?; // 2: Fetch using `get_ranges - let buffers = reader.get_byte_ranges(byte_ranges).await?; + let buffers = reader.get_tile_byte_ranges(byte_ranges).await?; // 3: Create tile objects let mut tiles = vec![]; diff --git a/src/reader.rs b/src/reader.rs index f343cfd..9e0e7fe 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -52,6 +52,18 @@ pub trait AsyncFileReader: Debug + Send + Sync { } .boxed() } + + /// Same as [`get_bytes`], but this function is called when retrieving + /// compressed tile data + fn get_tile_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + self.get_bytes(range) + } + + /// Same as [`get_byte_ranges`], but this function is only called when retrieving + /// compressed tile data + fn get_tile_byte_ranges(&self, ranges: Vec>) -> BoxFuture<'_, AsyncTiffResult>> { + self.get_byte_ranges(ranges) + } } /// This allows Box to be used as an AsyncFileReader,