From 3b6a51d117ba0ed02bf1789117f73d2e9393c32d Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Thu, 20 Mar 2025 18:36:39 +0100 Subject: [PATCH 1/3] pulled `coalesce_ranges` conditionally in-tree, feature-gated object_store and reqwest --- Cargo.toml | 9 ++++- src/cog.rs | 1 + src/error.rs | 2 + src/lib.rs | 7 ++++ src/object_store/mod.rs | 1 + src/object_store/util.rs | 84 ++++++++++++++++++++++++++++++++++++++++ src/reader.rs | 23 +++++++---- 7 files changed, 117 insertions(+), 10 deletions(-) create mode 100644 src/object_store/mod.rs create mode 100644 src/object_store/util.rs diff --git a/Cargo.toml b/Cargo.toml index 1c4ed93..ae7990b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,10 +15,10 @@ flate2 = "1.0.20" futures = "0.3.31" jpeg = { package = "jpeg-decoder", version = "0.3.0", default-features = false } num_enum = "0.7.3" -object_store = "0.12" +object_store = { version = "0.12", optional = true } # In the future we could make this feature-flagged, but for now we depend on # object_store which uses reqwest. -reqwest = { version = "0.12", default-features = false } +reqwest = { version = "0.12", default-features = false, optional = true } thiserror = "1" tokio = { version = "1.43.0", optional = true } weezl = "0.1.0" @@ -26,3 +26,8 @@ weezl = "0.1.0" [dev-dependencies] tiff = "0.9.1" tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] } + +[features] +default = ["object_store"] +reqwest = ["dep:reqwest"] +object_store = ["dep:object_store", "reqwest"] diff --git a/src/cog.rs b/src/cog.rs index e9e3c5b..69a5d98 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -56,6 +56,7 @@ impl TIFF { } } +#[cfg(feature = "object_store")] #[cfg(test)] mod test { use std::io::BufReader; diff --git a/src/error.rs b/src/error.rs index 9f5b7f0..720f796 100644 --- a/src/error.rs +++ b/src/error.rs @@ -24,6 +24,7 @@ pub enum AsyncTiffError { JPEGDecodingError(#[from] jpeg::Error), /// Error while fetching data using object store. + #[cfg(feature = "object_store")] #[error(transparent)] ObjectStore(#[from] object_store::Error), @@ -32,6 +33,7 @@ pub enum AsyncTiffError { InternalTIFFError(#[from] crate::tiff::TiffError), /// Reqwest error + #[cfg(feature = "reqwest")] // see https://www.reddit.com/r/rust/comments/xyik51/comment/irhei39/ #[error(transparent)] ReqwestError(#[from] reqwest::Error), diff --git a/src/lib.rs b/src/lib.rs index 6c3445b..c34f51e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,13 @@ mod ifd; pub mod tiff; mod tile; +#[cfg(not(feature = "object_store"))] +mod object_store; +#[cfg(feature = "object_store")] +pub use object_store::coalesce_ranges; +#[cfg(not(feature = "object_store"))] +pub use object_store::util::coalesce_ranges; + pub use cog::TIFF; pub use ifd::{ImageFileDirectories, ImageFileDirectory}; pub use tile::Tile; diff --git a/src/object_store/mod.rs b/src/object_store/mod.rs new file mode 100644 index 0000000..812d1ed --- /dev/null +++ b/src/object_store/mod.rs @@ -0,0 +1 @@ +pub mod util; diff --git a/src/object_store/util.rs b/src/object_store/util.rs new file mode 100644 index 0000000..f9065c0 --- /dev/null +++ b/src/object_store/util.rs @@ -0,0 +1,84 @@ +use bytes::Bytes; +use futures::stream::{StreamExt, TryStreamExt}; +use std::ops::Range; + +const COALESCE_PARALLEL: usize = 10; + +/// Takes a function `fetch` that can fetch a range of bytes and uses this to +/// fetch the provided byte `ranges` +/// +/// To improve performance it will: +/// +/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch` +/// * Make multiple `fetch` requests in parallel (up to maximum of 10) +/// +pub async fn coalesce_ranges( + ranges: &[Range], + fetch: F, + coalesce: u64, +) -> Result, E> +where + F: Send + FnMut(Range) -> Fut, + E: Send, + Fut: std::future::Future> + Send, +{ + let fetch_ranges = merge_ranges(ranges, coalesce); + + let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned()) + .map(fetch) + .buffered(COALESCE_PARALLEL) + .try_collect() + .await?; + + Ok(ranges + .iter() + .map(|range| { + let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let fetch_range = &fetch_ranges[idx]; + let fetch_bytes = &fetched[idx]; + + let start = range.start - fetch_range.start; + let end = range.end - fetch_range.start; + let range = (start as usize)..(end as usize).min(fetch_bytes.len()); + fetch_bytes.slice(range) + }) + .collect()) +} + +/// Returns a sorted list of ranges that cover `ranges` +fn merge_ranges(ranges: &[Range], coalesce: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut ranges = ranges.to_vec(); + ranges.sort_unstable_by_key(|range| range.start); + + let mut ret = Vec::with_capacity(ranges.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != ranges.len() { + let mut range_end = ranges[start_idx].end; + + while end_idx != ranges.len() + && ranges[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(ranges[end_idx].end); + end_idx += 1; + } + + let start = ranges[start_idx].start; + let end = range_end; + ret.push(start..end); + + start_idx = end_idx; + end_idx += 1; + } + + ret +} diff --git a/src/reader.rs b/src/reader.rs index 8563077..e55eace 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -9,8 +9,11 @@ use byteorder::{BigEndian, LittleEndian, ReadBytesExt}; use bytes::buf::Reader; use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +#[cfg(feature = "object_store")] use object_store::ObjectStore; +use crate::coalesce_ranges; + use crate::error::{AsyncTiffError, AsyncTiffResult}; /// The asynchronous interface used to read COG files @@ -33,8 +36,10 @@ pub trait AsyncFileReader: Debug + Send + Sync { /// Retrieve the bytes in `range` fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; - /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` - /// sequentially + /// Retrieve multiple byte ranges. The default implementation will + /// coalesce ranges with: + /// - less than 1024*1024=1MB space in between + /// - 10 parallel requests fn get_byte_ranges( &self, ranges: Vec>, @@ -42,8 +47,8 @@ pub trait AsyncFileReader: Debug + Send + Sync { async move { let mut result = Vec::with_capacity(ranges.len()); - for range in ranges.into_iter() { - let data = self.get_bytes(range).await?; + for data in coalesce_ranges(&ranges, |range| self.get_bytes(range), 1024 * 1024).await? + { result.push(data); } @@ -91,12 +96,13 @@ impl AsyncFileReader for Box { // } /// An AsyncFileReader that reads from an [`ObjectStore`] instance. +#[cfg(feature = "object_store")] #[derive(Clone, Debug)] pub struct ObjectReader { store: Arc, path: object_store::path::Path, } - +#[cfg(feature = "object_store")] impl ObjectReader { /// Creates a new [`ObjectReader`] for the provided [`ObjectStore`] and path /// @@ -105,7 +111,7 @@ impl ObjectReader { Self { store, path } } } - +#[cfg(feature = "object_store")] impl AsyncFileReader for ObjectReader { fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { let range = range.start as _..range.end as _; @@ -134,19 +140,20 @@ impl AsyncFileReader for ObjectReader { } /// An AsyncFileReader that reads from a URL using reqwest. +#[cfg(feature = "reqwest")] #[derive(Debug, Clone)] pub struct ReqwestReader { client: reqwest::Client, url: reqwest::Url, } - +#[cfg(feature = "reqwest")] impl ReqwestReader { /// Construct a new ReqwestReader from a reqwest client and URL. pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self { Self { client, url } } } - +#[cfg(feature = "reqwest")] impl AsyncFileReader for ReqwestReader { fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { let url = self.url.clone(); From 8476f2c21b7bb4b5de07c3c115d5803757bbf709 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Thu, 20 Mar 2025 23:13:30 +0100 Subject: [PATCH 2/3] undid coalesce_ranges addition --- src/lib.rs | 7 ---- src/object_store/mod.rs | 1 - src/object_store/util.rs | 84 ---------------------------------------- src/reader.rs | 12 ++---- 4 files changed, 4 insertions(+), 100 deletions(-) delete mode 100644 src/object_store/mod.rs delete mode 100644 src/object_store/util.rs diff --git a/src/lib.rs b/src/lib.rs index c34f51e..6c3445b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,13 +11,6 @@ mod ifd; pub mod tiff; mod tile; -#[cfg(not(feature = "object_store"))] -mod object_store; -#[cfg(feature = "object_store")] -pub use object_store::coalesce_ranges; -#[cfg(not(feature = "object_store"))] -pub use object_store::util::coalesce_ranges; - pub use cog::TIFF; pub use ifd::{ImageFileDirectories, ImageFileDirectory}; pub use tile::Tile; diff --git a/src/object_store/mod.rs b/src/object_store/mod.rs deleted file mode 100644 index 812d1ed..0000000 --- a/src/object_store/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod util; diff --git a/src/object_store/util.rs b/src/object_store/util.rs deleted file mode 100644 index f9065c0..0000000 --- a/src/object_store/util.rs +++ /dev/null @@ -1,84 +0,0 @@ -use bytes::Bytes; -use futures::stream::{StreamExt, TryStreamExt}; -use std::ops::Range; - -const COALESCE_PARALLEL: usize = 10; - -/// Takes a function `fetch` that can fetch a range of bytes and uses this to -/// fetch the provided byte `ranges` -/// -/// To improve performance it will: -/// -/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch` -/// * Make multiple `fetch` requests in parallel (up to maximum of 10) -/// -pub async fn coalesce_ranges( - ranges: &[Range], - fetch: F, - coalesce: u64, -) -> Result, E> -where - F: Send + FnMut(Range) -> Fut, - E: Send, - Fut: std::future::Future> + Send, -{ - let fetch_ranges = merge_ranges(ranges, coalesce); - - let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned()) - .map(fetch) - .buffered(COALESCE_PARALLEL) - .try_collect() - .await?; - - Ok(ranges - .iter() - .map(|range| { - let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; - let fetch_range = &fetch_ranges[idx]; - let fetch_bytes = &fetched[idx]; - - let start = range.start - fetch_range.start; - let end = range.end - fetch_range.start; - let range = (start as usize)..(end as usize).min(fetch_bytes.len()); - fetch_bytes.slice(range) - }) - .collect()) -} - -/// Returns a sorted list of ranges that cover `ranges` -fn merge_ranges(ranges: &[Range], coalesce: u64) -> Vec> { - if ranges.is_empty() { - return vec![]; - } - - let mut ranges = ranges.to_vec(); - ranges.sort_unstable_by_key(|range| range.start); - - let mut ret = Vec::with_capacity(ranges.len()); - let mut start_idx = 0; - let mut end_idx = 1; - - while start_idx != ranges.len() { - let mut range_end = ranges[start_idx].end; - - while end_idx != ranges.len() - && ranges[end_idx] - .start - .checked_sub(range_end) - .map(|delta| delta <= coalesce) - .unwrap_or(true) - { - range_end = range_end.max(ranges[end_idx].end); - end_idx += 1; - } - - let start = ranges[start_idx].start; - let end = range_end; - ret.push(start..end); - - start_idx = end_idx; - end_idx += 1; - } - - ret -} diff --git a/src/reader.rs b/src/reader.rs index e55eace..f343cfd 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -12,8 +12,6 @@ use futures::future::{BoxFuture, FutureExt, TryFutureExt}; #[cfg(feature = "object_store")] use object_store::ObjectStore; -use crate::coalesce_ranges; - use crate::error::{AsyncTiffError, AsyncTiffResult}; /// The asynchronous interface used to read COG files @@ -36,10 +34,8 @@ pub trait AsyncFileReader: Debug + Send + Sync { /// Retrieve the bytes in `range` fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; - /// Retrieve multiple byte ranges. The default implementation will - /// coalesce ranges with: - /// - less than 1024*1024=1MB space in between - /// - 10 parallel requests + /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` + /// sequentially fn get_byte_ranges( &self, ranges: Vec>, @@ -47,8 +43,8 @@ pub trait AsyncFileReader: Debug + Send + Sync { async move { let mut result = Vec::with_capacity(ranges.len()); - for data in coalesce_ranges(&ranges, |range| self.get_bytes(range), 1024 * 1024).await? - { + for range in ranges.into_iter() { + let data = self.get_bytes(range).await?; result.push(data); } From 2cd95c91724be0a1df5702f1ffcddc53930075cb Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Fri, 21 Mar 2025 00:27:05 +0100 Subject: [PATCH 3/3] updated AsyncFileReader trait and impls to not have Send futures in wasm --- Cargo.toml | 1 + src/reader.rs | 91 +++++++++++++++++++++++++-------------------------- 2 files changed, 46 insertions(+), 46 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ae7990b..ee6a936 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ description = "Low-level asynchronous TIFF reader." readme = "README.md" [dependencies] +async-trait = "0.1.88" byteorder = "1" bytes = "1.7.0" flate2 = "1.0.20" diff --git a/src/reader.rs b/src/reader.rs index f343cfd..16c7d15 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -5,10 +5,11 @@ use std::io::Read; use std::ops::Range; use std::sync::Arc; +use async_trait::async_trait; use byteorder::{BigEndian, LittleEndian, ReadBytesExt}; use bytes::buf::Reader; use bytes::{Buf, Bytes}; -use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +use futures::future::TryFutureExt; #[cfg(feature = "object_store")] use object_store::ObjectStore; @@ -30,41 +31,36 @@ use crate::error::{AsyncTiffError, AsyncTiffResult}; /// [`ObjectStore`]: object_store::ObjectStore /// /// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] pub trait AsyncFileReader: Debug + Send + Sync { /// Retrieve the bytes in `range` - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; + async fn get_bytes(&self, range: Range) -> AsyncTiffResult; /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` /// sequentially - fn get_byte_ranges( - &self, - ranges: Vec>, - ) -> BoxFuture<'_, AsyncTiffResult>> { - async move { - let mut result = Vec::with_capacity(ranges.len()); - - for range in ranges.into_iter() { - let data = self.get_bytes(range).await?; - result.push(data); - } + async fn get_byte_ranges(&self, ranges: Vec>) -> AsyncTiffResult> { + let mut result = Vec::with_capacity(ranges.len()); - Ok(result) + for range in ranges.into_iter() { + let data = self.get_bytes(range).await?; + result.push(data); } - .boxed() + + Ok(result) } } /// This allows Box to be used as an AsyncFileReader, -impl AsyncFileReader for Box { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - self.as_ref().get_bytes(range) +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +impl AsyncFileReader for Box { + async fn get_bytes(&self, range: Range) -> AsyncTiffResult { + self.get_bytes(range).await } - fn get_byte_ranges( - &self, - ranges: Vec>, - ) -> BoxFuture<'_, AsyncTiffResult>> { - self.as_ref().get_byte_ranges(ranges) + async fn get_byte_ranges(&self, ranges: Vec>) -> AsyncTiffResult> { + self.get_byte_ranges(ranges).await } } @@ -108,16 +104,18 @@ impl ObjectReader { } } #[cfg(feature = "object_store")] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl AsyncFileReader for ObjectReader { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + async fn get_bytes(&self, range: Range) -> AsyncTiffResult { let range = range.start as _..range.end as _; self.store .get_range(&self.path, range) .map_err(|e| e.into()) - .boxed() + .await } - fn get_byte_ranges(&self, ranges: Vec>) -> BoxFuture<'_, AsyncTiffResult>> + async fn get_byte_ranges(&self, ranges: Vec>) -> AsyncTiffResult> where Self: Send, { @@ -125,13 +123,10 @@ impl AsyncFileReader for ObjectReader { .into_iter() .map(|r| r.start as _..r.end as _) .collect::>(); - async move { - self.store - .get_ranges(&self.path, &ranges) - .await - .map_err(|e| e.into()) - } - .boxed() + self.store + .get_ranges(&self.path, &ranges) + .await + .map_err(|e| e.into()) } } @@ -150,18 +145,20 @@ impl ReqwestReader { } } #[cfg(feature = "reqwest")] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl AsyncFileReader for ReqwestReader { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + async fn get_bytes(&self, range: Range) -> AsyncTiffResult { + // } + // fn get_bytes<'async_trait>(&'async_trait self, range: Range) -> BoxFuture<'async_trait, AsyncTiffResult> + // { let url = self.url.clone(); let client = self.client.clone(); // HTTP range is inclusive, so we need to subtract 1 from the end let range = format!("bytes={}-{}", range.start, range.end - 1); - async move { - let response = client.get(url).header("Range", range).send().await?; - let bytes = response.bytes().await?; - Ok(bytes) - } - .boxed() + let response = client.get(url).header("Range", range).send().await?; + let bytes = response.bytes().await?; + Ok(bytes) } } @@ -180,29 +177,31 @@ impl PrefetchReader { } } +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl AsyncFileReader for PrefetchReader { - fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + async fn get_bytes(&self, range: Range) -> AsyncTiffResult { if range.start < self.buffer.len() as _ { if range.end < self.buffer.len() as _ { let usize_range = range.start as usize..range.end as usize; let result = self.buffer.slice(usize_range); - async { Ok(result) }.boxed() + Ok(result) } else { // TODO: reuse partial internal buffer - self.reader.get_bytes(range) + self.reader.get_bytes(range).await } } else { - self.reader.get_bytes(range) + self.reader.get_bytes(range).await } } - fn get_byte_ranges(&self, ranges: Vec>) -> BoxFuture<'_, AsyncTiffResult>> + async fn get_byte_ranges(&self, ranges: Vec>) -> AsyncTiffResult> where Self: Send, { // In practice, get_byte_ranges is only used for fetching tiles, which are unlikely to // overlap a metadata prefetch. - self.reader.get_byte_ranges(ranges) + self.reader.get_byte_ranges(ranges).await } }