Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ num_enum = "0.7.3"
object_store = { version = "0.12", optional = true }
reqwest = { version = "0.12", default-features = false, optional = true }
thiserror = "1"
tokio = { version = "1.43.0", optional = true, default-features = false, features = [
"io-util",
"sync",
] }
tokio = { version = "1.43.0", default-features = false, features = ["sync"] }
weezl = "0.1.0"

[dev-dependencies]
Expand All @@ -37,7 +34,7 @@ tokio-test = "0.4.4"

[features]
default = ["object_store", "reqwest"]
tokio = ["dep:tokio"]
tokio = ["tokio/io-util"]
reqwest = ["dep:reqwest"]
object_store = ["dep:object_store"]

Expand Down
7 changes: 7 additions & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

141 changes: 141 additions & 0 deletions src/metadata/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//! Caching strategies for metadata fetching.

use std::ops::Range;
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use tokio::sync::Mutex;

use crate::error::AsyncTiffResult;
use crate::metadata::MetadataFetch;

/// Logic for managing a cache of sequential buffers
struct SequentialCache {
/// Contiguous blocks from offset 0
///
/// # Invariant
/// - Buffers are contiguous from offset 0
buffers: Vec<Bytes>,

/// Total length cached (== sum of buffers lengths)
len: u64,
}

impl SequentialCache {
/// Create a new, empty SequentialCache
fn new() -> Self {
Self {
buffers: vec![],
len: 0,
}
}

/// Check if the given range is fully contained within the cached buffers
fn contains(&self, range: Range<u64>) -> bool {
range.end <= self.len
}

/// Slice out the given range from the cached buffers
fn slice(&self, range: Range<u64>) -> Bytes {
let out_len = (range.end - range.start) as usize;
// guaranteed valid
let mut remaining = range;
let mut out_buffers: Vec<Bytes> = vec![];

for b in &self.buffers {
let b_len = b.len() as u64;

// this block falls entirely before the desired range start
if remaining.start >= b_len {
remaining.start -= b_len;
remaining.end -= b_len;
continue;
}

// we slice bytes out of *this* block
let start = remaining.start as usize;
let end = (remaining.end - remaining.start).min(b_len - remaining.start) as usize;

let chunk = b.slice(start..end);
out_buffers.push(chunk);
Comment on lines +61 to +62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slice here might return an empty chunk, and the .push would fail with "IOError(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })". Maybe re-check the logic here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you think of an example of a unit test that would catch this?


// consumed some portion; update and potentially break
remaining.start = 0;
if remaining.end <= b_len {
break;
}
remaining.end -= b_len;
}

if out_buffers.len() == 1 {
out_buffers.into_iter().next().unwrap()
} else {
let mut out = BytesMut::with_capacity(out_len);
for b in out_buffers {
out.extend_from_slice(&b);
}
out.into()
}
}

fn append_buffer(&mut self, buffer: Bytes) {
self.len += buffer.len() as u64;
self.buffers.push(buffer);
}
}

/// A MetadataFetch implementation that caches fetched data in exponentially growing chunks,
/// sequentially from the beginning of the file.
pub struct ExponentialMetadataCache<F: MetadataFetch> {
inner: F,
cache: Arc<Mutex<SequentialCache>>,
}

impl<F: MetadataFetch> ExponentialMetadataCache<F> {
/// Create a new ExponentialMetadataCache wrapping the given MetadataFetch
pub fn new(inner: F) -> AsyncTiffResult<Self> {
Ok(Self {
inner,
cache: Arc::new(Mutex::new(SequentialCache::new())),
})
}
}

fn next_fetch_size(existing_len: u64) -> u64 {
if existing_len == 0 {
64 * 1024
} else {
existing_len * 2
}
}

impl<F: MetadataFetch + Send + Sync> MetadataFetch for ExponentialMetadataCache<F> {
fn fetch(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
let cache = self.cache.clone();

Box::pin(async move {
let mut g = cache.lock().await;

// First check if we already have the range cached
if g.contains(range.start..range.end) {
return Ok(g.slice(range));
}

// Compute the correct fetch range
let start_len = g.len;
let needed = range.end.saturating_sub(start_len);
let fetch_size = next_fetch_size(start_len).max(needed);
let fetch_range = start_len..start_len + fetch_size;

// Perform the fetch while holding mutex
// (this is OK because the mutex is async)
let bytes = self.inner.fetch(fetch_range).await?;

// Now append safely
g.append_buffer(bytes);

Ok(g.slice(range))
})
}
}
2 changes: 2 additions & 0 deletions src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@
//! fetches the first `N` bytes out of a file.
//!

pub mod cache;
mod fetch;
mod reader;

pub use cache::ExponentialMetadataCache;
pub use fetch::{MetadataFetch, PrefetchBuffer};
pub use reader::{ImageFileDirectoryReader, TiffMetadataReader};
Loading