From 3b6a51d117ba0ed02bf1789117f73d2e9393c32d Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Thu, 20 Mar 2025 18:36:39 +0100 Subject: [PATCH 01/13] pulled `coalesce_ranges` conditionally in-tree, feature-gated object_store and reqwest --- Cargo.toml | 9 ++++- src/cog.rs | 1 + src/error.rs | 2 + src/lib.rs | 7 ++++ src/object_store/mod.rs | 1 + src/object_store/util.rs | 84 ++++++++++++++++++++++++++++++++++++++++ src/reader.rs | 23 +++++++---- 7 files changed, 117 insertions(+), 10 deletions(-) create mode 100644 src/object_store/mod.rs create mode 100644 src/object_store/util.rs diff --git a/Cargo.toml b/Cargo.toml index 1c4ed93..ae7990b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,10 +15,10 @@ flate2 = "1.0.20" futures = "0.3.31" jpeg = { package = "jpeg-decoder", version = "0.3.0", default-features = false } num_enum = "0.7.3" -object_store = "0.12" +object_store = { version = "0.12", optional = true } # In the future we could make this feature-flagged, but for now we depend on # object_store which uses reqwest. -reqwest = { version = "0.12", default-features = false } +reqwest = { version = "0.12", default-features = false, optional = true } thiserror = "1" tokio = { version = "1.43.0", optional = true } weezl = "0.1.0" @@ -26,3 +26,8 @@ weezl = "0.1.0" [dev-dependencies] tiff = "0.9.1" tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] } + +[features] +default = ["object_store"] +reqwest = ["dep:reqwest"] +object_store = ["dep:object_store", "reqwest"] diff --git a/src/cog.rs b/src/cog.rs index e9e3c5b..69a5d98 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -56,6 +56,7 @@ impl TIFF { } } +#[cfg(feature = "object_store")] #[cfg(test)] mod test { use std::io::BufReader; diff --git a/src/error.rs b/src/error.rs index 9f5b7f0..720f796 100644 --- a/src/error.rs +++ b/src/error.rs @@ -24,6 +24,7 @@ pub enum AsyncTiffError { JPEGDecodingError(#[from] jpeg::Error), /// Error while fetching data using object store. + #[cfg(feature = "object_store")] #[error(transparent)] ObjectStore(#[from] object_store::Error), @@ -32,6 +33,7 @@ pub enum AsyncTiffError { InternalTIFFError(#[from] crate::tiff::TiffError), /// Reqwest error + #[cfg(feature = "reqwest")] // see https://www.reddit.com/r/rust/comments/xyik51/comment/irhei39/ #[error(transparent)] ReqwestError(#[from] reqwest::Error), diff --git a/src/lib.rs b/src/lib.rs index 6c3445b..c34f51e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,13 @@ mod ifd; pub mod tiff; mod tile; +#[cfg(not(feature = "object_store"))] +mod object_store; +#[cfg(feature = "object_store")] +pub use object_store::coalesce_ranges; +#[cfg(not(feature = "object_store"))] +pub use object_store::util::coalesce_ranges; + pub use cog::TIFF; pub use ifd::{ImageFileDirectories, ImageFileDirectory}; pub use tile::Tile; diff --git a/src/object_store/mod.rs b/src/object_store/mod.rs new file mode 100644 index 0000000..812d1ed --- /dev/null +++ b/src/object_store/mod.rs @@ -0,0 +1 @@ +pub mod util; diff --git a/src/object_store/util.rs b/src/object_store/util.rs new file mode 100644 index 0000000..f9065c0 --- /dev/null +++ b/src/object_store/util.rs @@ -0,0 +1,84 @@ +use bytes::Bytes; +use futures::stream::{StreamExt, TryStreamExt}; +use std::ops::Range; + +const COALESCE_PARALLEL: usize = 10; + +/// Takes a function `fetch` that can fetch a range of bytes and uses this to +/// fetch the provided byte `ranges` +/// +/// To improve performance it will: +/// +/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch` +/// * Make multiple `fetch` requests in parallel (up to maximum of 10) +/// +pub async fn coalesce_ranges( + ranges: &[Range], + fetch: F, + coalesce: u64, +) -> Result, E> +where + F: Send + FnMut(Range) -> Fut, + E: Send, + Fut: std::future::Future> + Send, +{ + let fetch_ranges = merge_ranges(ranges, coalesce); + + let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned()) + .map(fetch) + .buffered(COALESCE_PARALLEL) + .try_collect() + .await?; + + Ok(ranges + .iter() + .map(|range| { + let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let fetch_range = &fetch_ranges[idx]; + let fetch_bytes = &fetched[idx]; + + let start = range.start - fetch_range.start; + let end = range.end - fetch_range.start; + let range = (start as usize)..(end as usize).min(fetch_bytes.len()); + fetch_bytes.slice(range) + }) + .collect()) +} + +/// Returns a sorted list of ranges that cover `ranges` +fn merge_ranges(ranges: &[Range], coalesce: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut ranges = ranges.to_vec(); + ranges.sort_unstable_by_key(|range| range.start); + + let mut ret = Vec::with_capacity(ranges.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != ranges.len() { + let mut range_end = ranges[start_idx].end; + + while end_idx != ranges.len() + && ranges[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(ranges[end_idx].end); + end_idx += 1; + } + + let start = ranges[start_idx].start; + let end = range_end; + ret.push(start..end); + + start_idx = end_idx; + end_idx += 1; + } + + ret +} diff --git a/src/reader.rs b/src/reader.rs index 8563077..e55eace 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -9,8 +9,11 @@ use byteorder::{BigEndian, LittleEndian, ReadBytesExt}; use bytes::buf::Reader; use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +#[cfg(feature = "object_store")] use object_store::ObjectStore; +use crate::coalesce_ranges; + use crate::error::{AsyncTiffError, AsyncTiffResult}; /// The asynchronous interface used to read COG files @@ -33,8 +36,10 @@ pub trait AsyncFileReader: Debug + Send + Sync { /// Retrieve the bytes in `range` fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; - /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` - /// sequentially + /// Retrieve multiple byte ranges. The default implementation will + /// coalesce ranges with: + /// - less than 1024*1024=1MB space in between + /// - 10 parallel requests fn get_byte_ranges( &self, ranges: Vec>, @@ -42,8 +47,8 @@ pub trait AsyncFileReader: Debug + Send + Sync { async move { let mut result = Vec::with_capacity(ranges.len()); - for range in ranges.into_iter() { - let data = self.get_bytes(range).await?; + for data in coalesce_ranges(&ranges, |range| self.get_bytes(range), 1024 * 1024).await? + { result.push(data); } @@ -91,12 +96,13 @@ impl AsyncFileReader for Box { // } /// An AsyncFileReader that reads from an [`ObjectStore`] instance. +#[cfg(feature = "object_store")] #[derive(Clone, Debug)] pub struct ObjectReader { store: Arc, path: object_store::path::Path, } - +#[cfg(feature = "object_store")] impl ObjectReader { /// Creates a new [`ObjectReader`] for the provided [`ObjectStore`] and path /// @@ -105,7 +111,7 @@ impl ObjectReader { Self { store, path } } } - +#[cfg(feature = "object_store")] impl AsyncFileReader for ObjectReader { fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { let range = range.start as _..range.end as _; @@ -134,19 +140,20 @@ impl AsyncFileReader for ObjectReader { } /// An AsyncFileReader that reads from a URL using reqwest. +#[cfg(feature = "reqwest")] #[derive(Debug, Clone)] pub struct ReqwestReader { client: reqwest::Client, url: reqwest::Url, } - +#[cfg(feature = "reqwest")] impl ReqwestReader { /// Construct a new ReqwestReader from a reqwest client and URL. pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self { Self { client, url } } } - +#[cfg(feature = "reqwest")] impl AsyncFileReader for ReqwestReader { fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { let url = self.url.clone(); From 8476f2c21b7bb4b5de07c3c115d5803757bbf709 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Thu, 20 Mar 2025 23:13:30 +0100 Subject: [PATCH 02/13] undid coalesce_ranges addition --- src/lib.rs | 7 ---- src/object_store/mod.rs | 1 - src/object_store/util.rs | 84 ---------------------------------------- src/reader.rs | 12 ++---- 4 files changed, 4 insertions(+), 100 deletions(-) delete mode 100644 src/object_store/mod.rs delete mode 100644 src/object_store/util.rs diff --git a/src/lib.rs b/src/lib.rs index c34f51e..6c3445b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,13 +11,6 @@ mod ifd; pub mod tiff; mod tile; -#[cfg(not(feature = "object_store"))] -mod object_store; -#[cfg(feature = "object_store")] -pub use object_store::coalesce_ranges; -#[cfg(not(feature = "object_store"))] -pub use object_store::util::coalesce_ranges; - pub use cog::TIFF; pub use ifd::{ImageFileDirectories, ImageFileDirectory}; pub use tile::Tile; diff --git a/src/object_store/mod.rs b/src/object_store/mod.rs deleted file mode 100644 index 812d1ed..0000000 --- a/src/object_store/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod util; diff --git a/src/object_store/util.rs b/src/object_store/util.rs deleted file mode 100644 index f9065c0..0000000 --- a/src/object_store/util.rs +++ /dev/null @@ -1,84 +0,0 @@ -use bytes::Bytes; -use futures::stream::{StreamExt, TryStreamExt}; -use std::ops::Range; - -const COALESCE_PARALLEL: usize = 10; - -/// Takes a function `fetch` that can fetch a range of bytes and uses this to -/// fetch the provided byte `ranges` -/// -/// To improve performance it will: -/// -/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch` -/// * Make multiple `fetch` requests in parallel (up to maximum of 10) -/// -pub async fn coalesce_ranges( - ranges: &[Range], - fetch: F, - coalesce: u64, -) -> Result, E> -where - F: Send + FnMut(Range) -> Fut, - E: Send, - Fut: std::future::Future> + Send, -{ - let fetch_ranges = merge_ranges(ranges, coalesce); - - let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned()) - .map(fetch) - .buffered(COALESCE_PARALLEL) - .try_collect() - .await?; - - Ok(ranges - .iter() - .map(|range| { - let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; - let fetch_range = &fetch_ranges[idx]; - let fetch_bytes = &fetched[idx]; - - let start = range.start - fetch_range.start; - let end = range.end - fetch_range.start; - let range = (start as usize)..(end as usize).min(fetch_bytes.len()); - fetch_bytes.slice(range) - }) - .collect()) -} - -/// Returns a sorted list of ranges that cover `ranges` -fn merge_ranges(ranges: &[Range], coalesce: u64) -> Vec> { - if ranges.is_empty() { - return vec![]; - } - - let mut ranges = ranges.to_vec(); - ranges.sort_unstable_by_key(|range| range.start); - - let mut ret = Vec::with_capacity(ranges.len()); - let mut start_idx = 0; - let mut end_idx = 1; - - while start_idx != ranges.len() { - let mut range_end = ranges[start_idx].end; - - while end_idx != ranges.len() - && ranges[end_idx] - .start - .checked_sub(range_end) - .map(|delta| delta <= coalesce) - .unwrap_or(true) - { - range_end = range_end.max(ranges[end_idx].end); - end_idx += 1; - } - - let start = ranges[start_idx].start; - let end = range_end; - ret.push(start..end); - - start_idx = end_idx; - end_idx += 1; - } - - ret -} diff --git a/src/reader.rs b/src/reader.rs index e55eace..f343cfd 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -12,8 +12,6 @@ use futures::future::{BoxFuture, FutureExt, TryFutureExt}; #[cfg(feature = "object_store")] use object_store::ObjectStore; -use crate::coalesce_ranges; - use crate::error::{AsyncTiffError, AsyncTiffResult}; /// The asynchronous interface used to read COG files @@ -36,10 +34,8 @@ pub trait AsyncFileReader: Debug + Send + Sync { /// Retrieve the bytes in `range` fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult>; - /// Retrieve multiple byte ranges. The default implementation will - /// coalesce ranges with: - /// - less than 1024*1024=1MB space in between - /// - 10 parallel requests + /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` + /// sequentially fn get_byte_ranges( &self, ranges: Vec>, @@ -47,8 +43,8 @@ pub trait AsyncFileReader: Debug + Send + Sync { async move { let mut result = Vec::with_capacity(ranges.len()); - for data in coalesce_ranges(&ranges, |range| self.get_bytes(range), 1024 * 1024).await? - { + for range in ranges.into_iter() { + let data = self.get_bytes(range).await?; result.push(data); } From 1b4de75b09ce2a4afc39137fad3baf1e635ab6bf Mon Sep 17 00:00:00 2001 From: Fee Fladder <33122845+feefladder@users.noreply.github.com> Date: Sat, 22 Mar 2025 17:58:19 +0100 Subject: [PATCH 03/13] Update src/error.rs Co-authored-by: Kyle Barron --- src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 720f796..ae0b7eb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -33,7 +33,7 @@ pub enum AsyncTiffError { InternalTIFFError(#[from] crate::tiff::TiffError), /// Reqwest error - #[cfg(feature = "reqwest")] // see https://www.reddit.com/r/rust/comments/xyik51/comment/irhei39/ + #[cfg(feature = "reqwest")] #[error(transparent)] ReqwestError(#[from] reqwest::Error), From 11fea2ad716944b4b378fb74c9a401c90c99501f Mon Sep 17 00:00:00 2001 From: Fee Fladder <33122845+feefladder@users.noreply.github.com> Date: Sat, 22 Mar 2025 17:58:26 +0100 Subject: [PATCH 04/13] Update Cargo.toml Co-authored-by: Kyle Barron --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ae7990b..844fc22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,6 @@ tiff = "0.9.1" tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] } [features] -default = ["object_store"] +default = ["object_store", "reqwest"] reqwest = ["dep:reqwest"] -object_store = ["dep:object_store", "reqwest"] +object_store = ["dep:object_store"] From b1ea07901b4e8cce71e46d9467daa35da17263b7 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Sun, 23 Mar 2025 03:07:48 +0100 Subject: [PATCH 05/13] added CI metadata to toml --- Cargo.toml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 844fc22..96f1cd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,11 @@ tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] } default = ["object_store", "reqwest"] reqwest = ["dep:reqwest"] object_store = ["dep:object_store"] + +[package.metadata.cargo-all-features] + +# If your crate has a large number of optional dependencies, skip them for speed +skip_optional_dependencies = true + +# Exclude certain features from the build matrix +denylist = ["default"] From a78f2af063dc16ddf7d771a77baa0fa326eb8f44 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Sun, 23 Mar 2025 14:47:20 +0100 Subject: [PATCH 06/13] fixed errors/warnings for all features: Added std::fs::File AsyncFileReader impl --- Cargo.toml | 3 +-- src/reader.rs | 21 +++++++++++++++++++-- tests/image_tiff/util.rs | 17 ++++++++++++++--- 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 96f1cd2..16a5b0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,9 +33,8 @@ reqwest = ["dep:reqwest"] object_store = ["dep:object_store"] [package.metadata.cargo-all-features] - # If your crate has a large number of optional dependencies, skip them for speed -skip_optional_dependencies = true +# skip_optional_dependencies = true # Exclude certain features from the build matrix denylist = ["default"] diff --git a/src/reader.rs b/src/reader.rs index f343cfd..dc88e5b 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,14 +1,16 @@ //! Abstractions for network reading. use std::fmt::Debug; -use std::io::Read; +use std::io::{Read, Seek}; use std::ops::Range; use std::sync::Arc; use byteorder::{BigEndian, LittleEndian, ReadBytesExt}; use bytes::buf::Reader; use bytes::{Buf, Bytes}; -use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +use futures::future::{BoxFuture, FutureExt}; +#[cfg(feature = "object_store")] +use futures::TryFutureExt; #[cfg(feature = "object_store")] use object_store::ObjectStore; @@ -68,6 +70,21 @@ impl AsyncFileReader for Box { } } +impl AsyncFileReader for std::fs::File { + fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + async move { + let mut file = self.try_clone()?; + file.seek(std::io::SeekFrom::Start(range.start))?; + let len = (range.end - range.start) as usize; + let mut buf = vec![0u8; len]; + file.read_exact(&mut buf)?; + let res = Bytes::copy_from_slice(&buf); + Ok(res) + } + .boxed() + } +} + // #[cfg(feature = "tokio")] // impl AsyncFileReader // for T diff --git a/tests/image_tiff/util.rs b/tests/image_tiff/util.rs index 834ccc7..161d043 100644 --- a/tests/image_tiff/util.rs +++ b/tests/image_tiff/util.rs @@ -1,15 +1,26 @@ -use std::env::current_dir; -use std::sync::Arc; - +#[cfg(feature = "object_store")] use async_tiff::reader::ObjectReader; use async_tiff::TIFF; +#[cfg(feature = "object_store")] use object_store::local::LocalFileSystem; +#[cfg(feature = "object_store")] +use std::env::current_dir; +use std::sync::Arc; const TEST_IMAGE_DIR: &str = "tests/image_tiff/images/"; +#[cfg(feature = "object_store")] pub(crate) async fn open_tiff(filename: &str) -> TIFF { let store = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap()); let path = format!("{TEST_IMAGE_DIR}/{filename}"); let reader = Arc::new(ObjectReader::new(store.clone(), path.as_str().into())); TIFF::try_open(reader).await.unwrap() } + +#[cfg(not(feature = "object_store"))] +pub(crate) async fn open_tiff(filename: &str) -> TIFF { + // let store = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap()); + let path = format!("{TEST_IMAGE_DIR}/{filename}"); + let reader = Arc::new(std::fs::File::open(path).expect("could not open file")); + TIFF::try_open(reader).await.unwrap() +} From a6821e6f68212c5148d48d06bcfa38d3ece6004e Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Sun, 23 Mar 2025 19:06:07 +0100 Subject: [PATCH 07/13] added CI --- .github/workflows/test.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f38e8cc..a0bd812 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -29,7 +29,7 @@ jobs: - name: "cargo check" run: cargo check --all --all-features - - name: "cargo test" - run: | - cargo test --all - cargo test --all --all-features + - run: cargo install cargo-all-features + + - name: "cargo test all features" + run: cargo test-all-features From fb1c71acc212ea98867d42d145eb5d8d40a6ae89 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Sun, 23 Mar 2025 19:33:57 +0100 Subject: [PATCH 08/13] removed AsyncFileReader impl for std::fs::File, added impls for tokio::fs::File --- Cargo.toml | 5 ++-- src/reader.rs | 48 +++++++++++--------------------- tests/image_tiff/util.rs | 60 +++++++++++++++++++++++++++++++++++++--- 3 files changed, 75 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 16a5b0c..2dd104f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,10 +25,11 @@ weezl = "0.1.0" [dev-dependencies] tiff = "0.9.1" -tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] } +tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread", "io-util"] } [features] -default = ["object_store", "reqwest"] +default = ["object_store", "reqwest", "tokio"] +tokio = ["dep:tokio"] reqwest = ["dep:reqwest"] object_store = ["dep:object_store"] diff --git a/src/reader.rs b/src/reader.rs index dc88e5b..f02ae40 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,7 +1,7 @@ //! Abstractions for network reading. use std::fmt::Debug; -use std::io::{Read, Seek}; +use std::io::Read; use std::ops::Range; use std::sync::Arc; @@ -70,44 +70,28 @@ impl AsyncFileReader for Box { } } -impl AsyncFileReader for std::fs::File { +#[cfg(feature = "tokio")] +impl AsyncFileReader for tokio::fs::File { fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + async move { - let mut file = self.try_clone()?; - file.seek(std::io::SeekFrom::Start(range.start))?; - let len = (range.end - range.start) as usize; - let mut buf = vec![0u8; len]; - file.read_exact(&mut buf)?; - let res = Bytes::copy_from_slice(&buf); - Ok(res) + let mut file = (*self).try_clone().await?; + file.seek(std::io::SeekFrom::Start(range.start)).await?; + + let to_read = (range.end - range.start).try_into().unwrap(); + let mut buffer = Vec::with_capacity(to_read); + let read = file.take(to_read as u64).read_to_end(&mut buffer).await?; + if read != to_read { + return Err(AsyncTiffError::EndOfFile(to_read, read)); + } + + Ok(buffer.into()) } .boxed() } } -// #[cfg(feature = "tokio")] -// impl AsyncFileReader -// for T -// { -// fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { -// use tokio::io::{AsyncReadExt, AsyncSeekExt}; - -// async move { -// self.seek(std::io::SeekFrom::Start(range.start)).await?; - -// let to_read = (range.end - range.start).try_into().unwrap(); -// let mut buffer = Vec::with_capacity(to_read); -// let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; -// if read != to_read { -// return Err(AsyncTiffError::EndOfFile(to_read, read)); -// } - -// Ok(buffer.into()) -// } -// .boxed() -// } -// } - /// An AsyncFileReader that reads from an [`ObjectStore`] instance. #[cfg(feature = "object_store")] #[derive(Clone, Debug)] diff --git a/tests/image_tiff/util.rs b/tests/image_tiff/util.rs index 161d043..b4dd1c7 100644 --- a/tests/image_tiff/util.rs +++ b/tests/image_tiff/util.rs @@ -1,11 +1,24 @@ +use async_tiff::TIFF; +use std::sync::Arc; + #[cfg(feature = "object_store")] use async_tiff::reader::ObjectReader; -use async_tiff::TIFF; #[cfg(feature = "object_store")] use object_store::local::LocalFileSystem; #[cfg(feature = "object_store")] use std::env::current_dir; -use std::sync::Arc; + +#[cfg(not(any(feature = "tokio", feature = "object_store")))] +use async_tiff::{ + error::{AsyncTiffError, AsyncTiffResult}, + reader::AsyncFileReader, +}; +#[cfg(not(any(feature = "tokio", feature = "object_store")))] +use bytes::Bytes; +#[cfg(not(any(feature = "tokio", feature = "object_store")))] +use futures::{future::BoxFuture, FutureExt}; +#[cfg(not(any(feature = "tokio", feature = "object_store")))] +use std::ops::Range; const TEST_IMAGE_DIR: &str = "tests/image_tiff/images/"; @@ -17,10 +30,49 @@ pub(crate) async fn open_tiff(filename: &str) -> TIFF { TIFF::try_open(reader).await.unwrap() } -#[cfg(not(feature = "object_store"))] +#[cfg(all(feature = "tokio", not(feature = "object_store")))] pub(crate) async fn open_tiff(filename: &str) -> TIFF { // let store = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap()); let path = format!("{TEST_IMAGE_DIR}/{filename}"); - let reader = Arc::new(std::fs::File::open(path).expect("could not open file")); + let reader = Arc::new( + tokio::fs::File::open(path) + .await + .expect("could not open file"), + ); + TIFF::try_open(reader).await.unwrap() +} + +#[cfg(not(any(feature = "tokio", feature = "object_store")))] +#[derive(Debug)] +struct TokioFile(tokio::fs::File); +#[cfg(not(any(feature = "tokio", feature = "object_store")))] +impl AsyncFileReader for TokioFile { + fn get_bytes(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + async move { + let mut file = self.0.try_clone().await?; + file.seek(std::io::SeekFrom::Start(range.start)).await?; + + let to_read = (range.end - range.start).try_into().unwrap(); + let mut buffer = Vec::with_capacity(to_read); + let read = file.take(to_read as u64).read_to_end(&mut buffer).await?; + if read != to_read { + return Err(AsyncTiffError::EndOfFile(to_read, read)); + } + + Ok(buffer.into()) + } + .boxed() + } +} +#[cfg(not(any(feature = "tokio", feature = "object_store")))] +pub(crate) async fn open_tiff(filename: &str) -> TIFF { + let path = format!("{TEST_IMAGE_DIR}/{filename}"); + let reader = Arc::new(TokioFile( + tokio::fs::File::open(path) + .await + .expect("could not open file"), + )); TIFF::try_open(reader).await.unwrap() } From feefaddeda02b451fb4ae436e09bdf43878fb460 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Sun, 23 Mar 2025 19:39:37 +0100 Subject: [PATCH 09/13] added required tokio features --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2dd104f..0ef18a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ object_store = { version = "0.12", optional = true } # object_store which uses reqwest. reqwest = { version = "0.12", default-features = false, optional = true } thiserror = "1" -tokio = { version = "1.43.0", optional = true } +tokio = { version = "1.43.0", optional = true, features = ["fs", "io-util"] } weezl = "0.1.0" [dev-dependencies] @@ -28,7 +28,7 @@ tiff = "0.9.1" tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread", "io-util"] } [features] -default = ["object_store", "reqwest", "tokio"] +default = ["object_store", "reqwest"] tokio = ["dep:tokio"] reqwest = ["dep:reqwest"] object_store = ["dep:object_store"] From 713cf1bae430801a3f8c5793728a40f4f513f756 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Sun, 23 Mar 2025 03:01:51 +0100 Subject: [PATCH 10/13] refactored tag reading so we first create an internal buffer on the data and use that --- src/ifd.rs | 307 +++++++++++------------------------------------ src/tiff/tags.rs | 16 +++ 2 files changed, 87 insertions(+), 236 deletions(-) diff --git a/src/ifd.rs b/src/ifd.rs index d522826..339b514 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::io::Read; use std::ops::Range; -use bytes::Bytes; +use bytes::{buf::Buf, Bytes}; use num_enum::TryFromPrimitive; use crate::error::{AsyncTiffError, AsyncTiffResult}; @@ -878,56 +878,37 @@ async fn read_tag_value( return Ok(Value::List(vec![])); } - let tag_size = match tag_type { - Type::BYTE | Type::SBYTE | Type::ASCII | Type::UNDEFINED => 1, - Type::SHORT | Type::SSHORT => 2, - Type::LONG | Type::SLONG | Type::FLOAT | Type::IFD => 4, - Type::LONG8 - | Type::SLONG8 - | Type::DOUBLE - | Type::RATIONAL - | Type::SRATIONAL - | Type::IFD8 => 8, - }; + let tag_size = tag_type.size(); let value_byte_length = count.checked_mul(tag_size).unwrap(); + // prefetch all tag data + let mut data = if (bigtiff && value_byte_length <= 8) || value_byte_length <= 4 { + // value fits in offset field + cursor.read(value_byte_length).await? + } else { + // Seek cursor + let offset = if bigtiff { + cursor.read_u64().await? + } else { + cursor.read_u32().await?.into() + }; + cursor.seek(offset); + cursor.read(value_byte_length).await? + }; // Case 2: there is one value. if count == 1 { - // 2a: the value is 5-8 bytes and we're in BigTiff mode. - if bigtiff && value_byte_length > 4 && value_byte_length <= 8 { - let mut data = cursor.read(value_byte_length).await?; - - return Ok(match tag_type { - Type::LONG8 => Value::UnsignedBig(data.read_u64()?), - Type::SLONG8 => Value::SignedBig(data.read_i64()?), - Type::DOUBLE => Value::Double(data.read_f64()?), - Type::RATIONAL => Value::Rational(data.read_u32()?, data.read_u32()?), - Type::SRATIONAL => Value::SRational(data.read_i32()?, data.read_i32()?), - Type::IFD8 => Value::IfdBig(data.read_u64()?), - Type::BYTE - | Type::SBYTE - | Type::ASCII - | Type::UNDEFINED - | Type::SHORT - | Type::SSHORT - | Type::LONG - | Type::SLONG - | Type::FLOAT - | Type::IFD => unreachable!(), - }); - } - - // NOTE: we should only be reading value_byte_length when it's 4 bytes or fewer. Right now - // we're reading even if it's 8 bytes, but then only using the first 4 bytes of this - // buffer. - let mut data = cursor.read(value_byte_length).await?; - - // 2b: the value is at most 4 bytes or doesn't fit in the offset field. return Ok(match tag_type { + Type::LONG8 => Value::UnsignedBig(data.read_u64()?), + Type::SLONG8 => Value::SignedBig(data.read_i64()?), + Type::DOUBLE => Value::Double(data.read_f64()?), + Type::RATIONAL => Value::Rational(data.read_u32()?, data.read_u32()?), + Type::SRATIONAL => Value::SRational(data.read_i32()?, data.read_i32()?), + Type::IFD8 => Value::IfdBig(data.read_u64()?), Type::BYTE | Type::UNDEFINED => Value::Byte(data.read_u8()?), Type::SBYTE => Value::Signed(data.read_i8()? as i32), Type::SHORT => Value::Short(data.read_u16()?), + Type::IFD => Value::Ifd(data.read_u32()?), Type::SSHORT => Value::Signed(data.read_i16()? as i32), Type::LONG => Value::Unsigned(data.read_u32()?), Type::SLONG => Value::Signed(data.read_i32()?), @@ -940,266 +921,120 @@ async fn read_tag_value( // return Err(TiffError::FormatError(TiffFormatError::InvalidTag)); } } - Type::LONG8 => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - Value::UnsignedBig(cursor.read_u64().await?) - } - Type::SLONG8 => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - Value::SignedBig(cursor.read_i64().await?) - } - Type::DOUBLE => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - Value::Double(cursor.read_f64().await?) - } - Type::RATIONAL => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - let numerator = cursor.read_u32().await?; - let denominator = cursor.read_u32().await?; - Value::Rational(numerator, denominator) - } - Type::SRATIONAL => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - let numerator = cursor.read_i32().await?; - let denominator = cursor.read_i32().await?; - Value::SRational(numerator, denominator) - } - Type::IFD => Value::Ifd(data.read_u32()?), - Type::IFD8 => { - let offset = data.read_u32()?; - cursor.seek(offset as _); - Value::IfdBig(cursor.read_u64().await?) - } }); } - // Case 3: There is more than one value, but it fits in the offset field. - if value_byte_length <= 4 || bigtiff && value_byte_length <= 8 { - let mut data = cursor.read(value_byte_length).await?; - if bigtiff { - cursor.advance(8 - value_byte_length); - } else { - cursor.advance(4 - value_byte_length); - } - - match tag_type { - Type::BYTE | Type::UNDEFINED => { - return { - Ok(Value::List( - (0..count) - .map(|_| Value::Byte(data.read_u8().unwrap())) - .collect(), - )) - }; - } - Type::SBYTE => { - return { - Ok(Value::List( - (0..count) - .map(|_| Value::Signed(data.read_i8().unwrap() as i32)) - .collect(), - )) - } - } - Type::ASCII => { - let mut buf = vec![0; count as usize]; - data.read_exact(&mut buf)?; - if buf.is_ascii() && buf.ends_with(&[0]) { - let v = std::str::from_utf8(&buf) - .map_err(|err| AsyncTiffError::General(err.to_string()))?; - let v = v.trim_matches(char::from(0)); - return Ok(Value::Ascii(v.into())); - } else { - panic!("Invalid tag"); - // return Err(TiffError::FormatError(TiffFormatError::InvalidTag)); - } - } - Type::SHORT => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Short(data.read_u16()?)); - } - return Ok(Value::List(v)); - } - Type::SSHORT => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Signed(i32::from(data.read_i16()?))); - } - return Ok(Value::List(v)); - } - Type::LONG => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Unsigned(data.read_u32()?)); - } - return Ok(Value::List(v)); - } - Type::SLONG => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Signed(data.read_i32()?)); - } - return Ok(Value::List(v)); - } - Type::FLOAT => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Float(data.read_f32()?)); - } - return Ok(Value::List(v)); - } - Type::IFD => { - let mut v = Vec::new(); - for _ in 0..count { - v.push(Value::Ifd(data.read_u32()?)); - } - return Ok(Value::List(v)); - } - Type::LONG8 - | Type::SLONG8 - | Type::RATIONAL - | Type::SRATIONAL - | Type::DOUBLE - | Type::IFD8 => { - unreachable!() - } - } - } - - // Seek cursor - let offset = if bigtiff { - cursor.read_u64().await? - } else { - cursor.read_u32().await?.into() - }; - cursor.seek(offset); - - // Case 4: there is more than one value, and it doesn't fit in the offset field. match tag_type { - // TODO check if this could give wrong results - // at a different endianess of file/computer. Type::BYTE | Type::UNDEFINED => { - let mut v = Vec::with_capacity(count as _); + let mut v = Vec::new(); for _ in 0..count { - v.push(Value::Byte(cursor.read_u8().await?)) + v.push(Value::Byte(data.read_u8()?)); } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::SBYTE => { - let mut v = Vec::with_capacity(count as _); + let mut v = Vec::new(); for _ in 0..count { - v.push(Value::Signed(cursor.read_i8().await? as i32)) + v.push(Value::SignedByte(data.read_i8()?)); + } + return Ok(Value::List(v)); + } + Type::ASCII => { + let mut buf = vec![0; count as usize]; + data.read_exact(&mut buf)?; + if buf.is_ascii() && buf.ends_with(&[0]) { + let v = std::str::from_utf8(&buf) + .map_err(|err| AsyncTiffError::General(err.to_string()))?; + let v = v.trim_matches(char::from(0)); + return Ok(Value::Ascii(v.into())); + } else { + panic!("Invalid tag"); + // return Err(TiffError::FormatError(TiffFormatError::InvalidTag)); } - Ok(Value::List(v)) } Type::SHORT => { - let mut v = Vec::with_capacity(count as _); + let mut v = Vec::new(); for _ in 0..count { - v.push(Value::Short(cursor.read_u16().await?)) + v.push(Value::Short(data.read_u16()?)); } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::SSHORT => { - let mut v = Vec::with_capacity(count as _); + let mut v = Vec::new(); for _ in 0..count { - v.push(Value::Signed(cursor.read_i16().await? as i32)) + v.push(Value::Signed(i32::from(data.read_i16()?))); } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::LONG => { - let mut v = Vec::with_capacity(count as _); + let mut v = Vec::new(); for _ in 0..count { - v.push(Value::Unsigned(cursor.read_u32().await?)) + v.push(Value::Unsigned(data.read_u32()?)); } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::SLONG => { - let mut v = Vec::with_capacity(count as _); + let mut v = Vec::new(); for _ in 0..count { - v.push(Value::Signed(cursor.read_i32().await?)) + v.push(Value::Signed(data.read_i32()?)); } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::FLOAT => { - let mut v = Vec::with_capacity(count as _); + let mut v = Vec::new(); for _ in 0..count { - v.push(Value::Float(cursor.read_f32().await?)) + v.push(Value::Float(data.read_f32()?)); } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::DOUBLE => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { - v.push(Value::Double(cursor.read_f64().await?)) + v.push(Value::Double(data.read_f64()?)) } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::RATIONAL => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { - v.push(Value::Rational( - cursor.read_u32().await?, - cursor.read_u32().await?, - )) + v.push(Value::Rational(data.read_u32()?, data.read_u32()?)) } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::SRATIONAL => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { - v.push(Value::SRational( - cursor.read_i32().await?, - cursor.read_i32().await?, - )) + v.push(Value::SRational(data.read_i32()?, data.read_i32()?)) } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::LONG8 => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { - v.push(Value::UnsignedBig(cursor.read_u64().await?)) + v.push(Value::UnsignedBig(data.read_u64()?)) } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::SLONG8 => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { - v.push(Value::SignedBig(cursor.read_i64().await?)) + v.push(Value::SignedBig(data.read_i64()?)) } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::IFD => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { - v.push(Value::Ifd(cursor.read_u32().await?)) + v.push(Value::Ifd(data.read_u32()?)) } - Ok(Value::List(v)) + return Ok(Value::List(v)); } Type::IFD8 => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { - v.push(Value::IfdBig(cursor.read_u64().await?)) - } - Ok(Value::List(v)) - } - Type::ASCII => { - let mut out = vec![0; count as _]; - let mut buf = cursor.read(count).await?; - buf.read_exact(&mut out)?; - - // Strings may be null-terminated, so we trim anything downstream of the null byte - if let Some(first) = out.iter().position(|&b| b == 0) { - out.truncate(first); + v.push(Value::IfdBig(data.read_u64()?)) } - Ok(Value::Ascii( - String::from_utf8(out).map_err(|err| AsyncTiffError::General(err.to_string()))?, - )) + return Ok(Value::List(v)); } } } diff --git a/src/tiff/tags.rs b/src/tiff/tags.rs index 524a726..567f314 100644 --- a/src/tiff/tags.rs +++ b/src/tiff/tags.rs @@ -175,6 +175,22 @@ pub enum Type(u16) { } } +impl Type { + pub const fn size(&self) -> u64 { + match self { + Type::BYTE | Type::SBYTE | Type::ASCII | Type::UNDEFINED => 1, + Type::SHORT | Type::SSHORT => 2, + Type::LONG | Type::SLONG | Type::FLOAT | Type::IFD => 4, + Type::LONG8 + | Type::SLONG8 + | Type::DOUBLE + | Type::RATIONAL + | Type::SRATIONAL + | Type::IFD8 => 8, + } + } +} + tags! { /// See [TIFF compression tags](https://www.awaresystems.be/imaging/tiff/tifftags/compression.html) /// for reference. From 3cd1b6b89d35a6c384dda817ae6e3f947e042977 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Sun, 23 Mar 2025 15:30:59 +0100 Subject: [PATCH 11/13] fixed cursor handling --- src/ifd.rs | 24 ++++++++++++++++-------- src/reader.rs | 3 +++ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/ifd.rs b/src/ifd.rs index 339b514..fe6e4b8 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -7,7 +7,7 @@ use num_enum::TryFromPrimitive; use crate::error::{AsyncTiffError, AsyncTiffResult}; use crate::geo::{GeoKeyDirectory, GeoKeyTag}; -use crate::reader::{AsyncCursor, AsyncFileReader}; +use crate::reader::{AsyncCursor, AsyncFileReader, EndianAwareReader}; use crate::tiff::tags::{ CompressionMethod, PhotometricInterpretation, PlanarConfiguration, Predictor, ResolutionUnit, SampleFormat, Tag, Type, @@ -839,7 +839,7 @@ impl ImageFileDirectory { /// Read a single tag from the cursor async fn read_tag(cursor: &mut AsyncCursor, bigtiff: bool) -> AsyncTiffResult<(Tag, Value)> { - let start_cursor_position = cursor.position(); + // let start_cursor_position = cursor.position(); let tag_name = Tag::from_u16_exhaustive(cursor.read_u16().await?); @@ -855,9 +855,9 @@ async fn read_tag(cursor: &mut AsyncCursor, bigtiff: bool) -> AsyncTiffResult<(T let tag_value = read_tag_value(cursor, tag_type, count, bigtiff).await?; - // TODO: better handle management of cursor state - let ifd_entry_size = if bigtiff { 20 } else { 12 }; - cursor.seek(start_cursor_position + ifd_entry_size); + // TODO: better handle management of cursor state <- should be done now + // let ifd_entry_size = if bigtiff { 20 } else { 12 }; + // cursor.seek(start_cursor_position + ifd_entry_size); Ok((tag_name, tag_value)) } @@ -885,7 +885,13 @@ async fn read_tag_value( // prefetch all tag data let mut data = if (bigtiff && value_byte_length <= 8) || value_byte_length <= 4 { // value fits in offset field - cursor.read(value_byte_length).await? + let res = cursor.read(value_byte_length).await?; + if bigtiff { + cursor.advance(8-value_byte_length); + } else { + cursor.advance(4-value_byte_length); + } + res } else { // Seek cursor let offset = if bigtiff { @@ -893,8 +899,10 @@ async fn read_tag_value( } else { cursor.read_u32().await?.into() }; - cursor.seek(offset); - cursor.read(value_byte_length).await? + let reader = cursor.reader().get_bytes(offset..offset+value_byte_length).await?.reader(); + EndianAwareReader::new(reader, cursor.endianness()) + // cursor.seek(offset); + // cursor.read(value_byte_length).await? }; // Case 2: there is one value. if count == 1 { diff --git a/src/reader.rs b/src/reader.rs index f02ae40..7cff498 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -349,6 +349,9 @@ pub(crate) struct EndianAwareReader { } impl EndianAwareReader { + pub(crate) fn new(reader: Reader, endianness: Endianness) -> Self { + Self { reader, endianness } + } /// Read a u8 from the cursor, advancing the internal state by 1 byte. pub(crate) fn read_u8(&mut self) -> AsyncTiffResult { Ok(self.reader.read_u8()?) From 72ffc228f310fcfdd6158c13c6fce3292f098d6b Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Sun, 23 Mar 2025 21:04:54 +0100 Subject: [PATCH 12/13] fixed clippy --- src/ifd.rs | 54 +++++++++++++++++++++++++++------------------------ src/reader.rs | 12 ++++++++++-- 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/src/ifd.rs b/src/ifd.rs index fe6e4b8..c146a02 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -887,9 +887,9 @@ async fn read_tag_value( // value fits in offset field let res = cursor.read(value_byte_length).await?; if bigtiff { - cursor.advance(8-value_byte_length); + cursor.advance(8 - value_byte_length); } else { - cursor.advance(4-value_byte_length); + cursor.advance(4 - value_byte_length); } res } else { @@ -899,7 +899,11 @@ async fn read_tag_value( } else { cursor.read_u32().await?.into() }; - let reader = cursor.reader().get_bytes(offset..offset+value_byte_length).await?.reader(); + let reader = cursor + .reader() + .get_bytes(offset..offset + value_byte_length) + .await? + .reader(); EndianAwareReader::new(reader, cursor.endianness()) // cursor.seek(offset); // cursor.read(value_byte_length).await? @@ -934,18 +938,18 @@ async fn read_tag_value( match tag_type { Type::BYTE | Type::UNDEFINED => { - let mut v = Vec::new(); + let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::Byte(data.read_u8()?)); } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::SBYTE => { - let mut v = Vec::new(); + let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::SignedByte(data.read_i8()?)); } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::ASCII => { let mut buf = vec![0; count as usize]; @@ -954,95 +958,95 @@ async fn read_tag_value( let v = std::str::from_utf8(&buf) .map_err(|err| AsyncTiffError::General(err.to_string()))?; let v = v.trim_matches(char::from(0)); - return Ok(Value::Ascii(v.into())); + Ok(Value::Ascii(v.into())) } else { panic!("Invalid tag"); // return Err(TiffError::FormatError(TiffFormatError::InvalidTag)); } } Type::SHORT => { - let mut v = Vec::new(); + let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::Short(data.read_u16()?)); } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::SSHORT => { - let mut v = Vec::new(); + let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::Signed(i32::from(data.read_i16()?))); } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::LONG => { - let mut v = Vec::new(); + let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::Unsigned(data.read_u32()?)); } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::SLONG => { - let mut v = Vec::new(); + let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::Signed(data.read_i32()?)); } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::FLOAT => { - let mut v = Vec::new(); + let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::Float(data.read_f32()?)); } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::DOUBLE => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::Double(data.read_f64()?)) } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::RATIONAL => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::Rational(data.read_u32()?, data.read_u32()?)) } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::SRATIONAL => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::SRational(data.read_i32()?, data.read_i32()?)) } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::LONG8 => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::UnsignedBig(data.read_u64()?)) } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::SLONG8 => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::SignedBig(data.read_i64()?)) } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::IFD => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::Ifd(data.read_u32()?)) } - return Ok(Value::List(v)); + Ok(Value::List(v)) } Type::IFD8 => { let mut v = Vec::with_capacity(count as _); for _ in 0..count { v.push(Value::IfdBig(data.read_u64()?)) } - return Ok(Value::List(v)); + Ok(Value::List(v)) } } } diff --git a/src/reader.rs b/src/reader.rs index 7cff498..96901aa 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -272,31 +272,37 @@ impl AsyncCursor { } /// Read a u8 from the cursor, advancing the internal state by 1 byte. + #[allow(dead_code)] pub(crate) async fn read_u8(&mut self) -> AsyncTiffResult { self.read(1).await?.read_u8() } /// Read a i8 from the cursor, advancing the internal state by 1 byte. + #[allow(dead_code)] pub(crate) async fn read_i8(&mut self) -> AsyncTiffResult { self.read(1).await?.read_i8() } /// Read a u16 from the cursor, advancing the internal state by 2 bytes. + #[allow(dead_code)] pub(crate) async fn read_u16(&mut self) -> AsyncTiffResult { self.read(2).await?.read_u16() } /// Read a i16 from the cursor, advancing the internal state by 2 bytes. + #[allow(dead_code)] pub(crate) async fn read_i16(&mut self) -> AsyncTiffResult { self.read(2).await?.read_i16() } /// Read a u32 from the cursor, advancing the internal state by 4 bytes. + #[allow(dead_code)] pub(crate) async fn read_u32(&mut self) -> AsyncTiffResult { self.read(4).await?.read_u32() } /// Read a i32 from the cursor, advancing the internal state by 4 bytes. + #[allow(dead_code)] pub(crate) async fn read_i32(&mut self) -> AsyncTiffResult { self.read(4).await?.read_i32() } @@ -307,24 +313,25 @@ impl AsyncCursor { } /// Read a i64 from the cursor, advancing the internal state by 8 bytes. + #[allow(dead_code)] pub(crate) async fn read_i64(&mut self) -> AsyncTiffResult { self.read(8).await?.read_i64() } + #[allow(dead_code)] pub(crate) async fn read_f32(&mut self) -> AsyncTiffResult { self.read(4).await?.read_f32() } + #[allow(dead_code)] pub(crate) async fn read_f64(&mut self) -> AsyncTiffResult { self.read(8).await?.read_f64() } - #[allow(dead_code)] pub(crate) fn reader(&self) -> &Arc { &self.reader } - #[allow(dead_code)] pub(crate) fn endianness(&self) -> Endianness { self.endianness } @@ -338,6 +345,7 @@ impl AsyncCursor { self.offset = offset; } + #[allow(dead_code)] pub(crate) fn position(&self) -> u64 { self.offset } From 5606b6251fa29ae70967991486e834111c767187 Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Sun, 23 Mar 2025 22:46:47 +0100 Subject: [PATCH 13/13] made ifd prefetch its data before processing, read_tag now operates on an EndianAwareReader and Arc --- src/ifd.rs | 105 ++++++++++++++++++++++++++------------------------ src/reader.rs | 16 ++++++++ 2 files changed, 70 insertions(+), 51 deletions(-) diff --git a/src/ifd.rs b/src/ifd.rs index c146a02..3d7e927 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::io::Read; use std::ops::Range; +use std::sync::Arc; use bytes::{buf::Buf, Bytes}; use num_enum::TryFromPrimitive; @@ -195,26 +196,24 @@ impl ImageFileDirectory { } else { cursor.read_u16().await?.into() }; - let mut tags = HashMap::with_capacity(tag_count as usize); - for _ in 0..tag_count { - let (tag_name, tag_value) = read_tag(cursor, bigtiff).await?; - tags.insert(tag_name, tag_value); - } - // Tag 2 bytes // Type 2 bytes // Count: // - bigtiff: 8 bytes // - else: 4 bytes // Value: - // - bigtiff: 8 bytes either a pointer the value itself - // - else: 4 bytes either a pointer the value itself + // - bigtiff: 8 bytes either a pointer or the value itself + // - else: 4 bytes either a pointer or the value itself let ifd_entry_byte_size = if bigtiff { 20 } else { 12 }; - // The size of `tag_count` that we read above - let tag_count_byte_size = if bigtiff { 8 } else { 2 }; - // Reset the cursor position before reading the next ifd offset - cursor.seek(ifd_start + (ifd_entry_byte_size * tag_count) + tag_count_byte_size); + // read all tag data into an EndianAwareReader + let mut reader = cursor.read(ifd_entry_byte_size * tag_count).await?; + + let mut tags = HashMap::with_capacity(tag_count as usize); + for _ in 0..tag_count { + let (tag_name, tag_value) = read_tag(&mut reader, cursor.reader(), bigtiff).await?; + tags.insert(tag_name, tag_value); + } let next_ifd_offset = if bigtiff { cursor.read_u64().await? @@ -838,46 +837,25 @@ impl ImageFileDirectory { } /// Read a single tag from the cursor -async fn read_tag(cursor: &mut AsyncCursor, bigtiff: bool) -> AsyncTiffResult<(Tag, Value)> { +async fn read_tag( + cursor: &mut EndianAwareReader, + file_reader: &Arc, + bigtiff: bool, +) -> AsyncTiffResult<(Tag, Value)> { // let start_cursor_position = cursor.position(); - let tag_name = Tag::from_u16_exhaustive(cursor.read_u16().await?); + let tag_name = Tag::from_u16_exhaustive(cursor.read_u16()?); - let tag_type_code = cursor.read_u16().await?; + let tag_type_code = cursor.read_u16()?; let tag_type = Type::from_u16(tag_type_code).expect( "Unknown tag type {tag_type_code}. TODO: we should skip entries with unknown tag types.", ); let count = if bigtiff { - cursor.read_u64().await? + cursor.read_u64()? } else { - cursor.read_u32().await?.into() + cursor.read_u32()?.into() }; - let tag_value = read_tag_value(cursor, tag_type, count, bigtiff).await?; - - // TODO: better handle management of cursor state <- should be done now - // let ifd_entry_size = if bigtiff { 20 } else { 12 }; - // cursor.seek(start_cursor_position + ifd_entry_size); - - Ok((tag_name, tag_value)) -} - -/// Read a tag's value from the cursor -/// -/// NOTE: this does not maintain cursor state -// This is derived from the upstream tiff crate: -// https://github.com/image-rs/image-tiff/blob/6dc7a266d30291db1e706c8133357931f9e2a053/src/decoder/ifd.rs#L369-L639 -async fn read_tag_value( - cursor: &mut AsyncCursor, - tag_type: Type, - count: u64, - bigtiff: bool, -) -> AsyncTiffResult { - // Case 1: there are no values so we can return immediately. - if count == 0 { - return Ok(Value::List(vec![])); - } - let tag_size = tag_type.size(); let value_byte_length = count.checked_mul(tag_size).unwrap(); @@ -885,22 +863,22 @@ async fn read_tag_value( // prefetch all tag data let mut data = if (bigtiff && value_byte_length <= 8) || value_byte_length <= 4 { // value fits in offset field - let res = cursor.read(value_byte_length).await?; + let mut res = vec![0u8; value_byte_length as usize]; + cursor.read_exact(&mut res)?; if bigtiff { - cursor.advance(8 - value_byte_length); + cursor.advance(8 - value_byte_length)?; } else { - cursor.advance(4 - value_byte_length); + cursor.advance(4 - value_byte_length)?; } - res + EndianAwareReader::new(Bytes::from_owner(res).reader(), cursor.endianness()) } else { - // Seek cursor + // fetch using file_reader let offset = if bigtiff { - cursor.read_u64().await? + cursor.read_u64()? } else { - cursor.read_u32().await?.into() + cursor.read_u32()?.into() }; - let reader = cursor - .reader() + let reader = file_reader .get_bytes(offset..offset + value_byte_length) .await? .reader(); @@ -908,6 +886,31 @@ async fn read_tag_value( // cursor.seek(offset); // cursor.read(value_byte_length).await? }; + + let tag_value = read_tag_value(&mut data, tag_type, count)?; + + // TODO: better handle management of cursor state <- should be done now + // let ifd_entry_size = if bigtiff { 20 } else { 12 }; + // cursor.seek(start_cursor_position + ifd_entry_size); + + Ok((tag_name, tag_value)) +} + +/// Read a tag's value from the cursor +/// +/// NOTE: this does not maintain cursor state +// This is derived from the upstream tiff crate: +// https://github.com/image-rs/image-tiff/blob/6dc7a266d30291db1e706c8133357931f9e2a053/src/decoder/ifd.rs#L369-L639 +fn read_tag_value( + data: &mut EndianAwareReader, + tag_type: Type, + count: u64, +) -> AsyncTiffResult { + // Case 1: there are no values so we can return immediately. + if count == 0 { + return Ok(Value::List(vec![])); + } + // Case 2: there is one value. if count == 1 { return Ok(match tag_type { diff --git a/src/reader.rs b/src/reader.rs index 96901aa..4cdc01d 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -332,11 +332,13 @@ impl AsyncCursor { &self.reader } + #[allow(dead_code)] pub(crate) fn endianness(&self) -> Endianness { self.endianness } /// Advance cursor position by a set amount + #[allow(dead_code)] pub(crate) fn advance(&mut self, amount: u64) { self.offset += amount; } @@ -360,6 +362,20 @@ impl EndianAwareReader { pub(crate) fn new(reader: Reader, endianness: Endianness) -> Self { Self { reader, endianness } } + + pub(crate) fn endianness(&self) -> Endianness { + self.endianness + } + + pub(crate) fn advance(&mut self, amt: u64) -> AsyncTiffResult { + // TODO: can we use consume? + // from https://stackoverflow.com/a/42247224 + Ok(std::io::copy( + &mut self.reader.by_ref().take(amt), + &mut std::io::sink(), + )?) + } + /// Read a u8 from the cursor, advancing the internal state by 1 byte. pub(crate) fn read_u8(&mut self) -> AsyncTiffResult { Ok(self.reader.read_u8()?)