Skip to content

Commit 43524f9

Browse files
committed
Replace prefetch buffer
1 parent f109f22 commit 43524f9

File tree

4 files changed

+41
-69
lines changed

4 files changed

+41
-69
lines changed

src/cog.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ mod test {
2727
use tiff::decoder::{DecodingResult, Limits};
2828

2929
use super::*;
30-
use crate::metadata::{PrefetchBuffer, TiffMetadataReader};
30+
use crate::metadata::cache::ReadaheadMetadataCache;
31+
use crate::metadata::TiffMetadataReader;
3132
use crate::reader::{AsyncFileReader, ObjectReader};
3233

3334
#[ignore = "local file"]
@@ -37,16 +38,9 @@ mod test {
3738
let path = object_store::path::Path::parse("m_4007307_sw_18_060_20220803.tif").unwrap();
3839
let store = Arc::new(LocalFileSystem::new_with_prefix(folder).unwrap());
3940
let reader = Arc::new(ObjectReader::new(store, path)) as Arc<dyn AsyncFileReader>;
40-
let prefetch_reader = PrefetchBuffer::new(reader.clone(), 32 * 1024)
41-
.await
42-
.unwrap();
43-
let mut metadata_reader = TiffMetadataReader::try_open(&prefetch_reader)
44-
.await
45-
.unwrap();
46-
let ifds = metadata_reader
47-
.read_all_ifds(&prefetch_reader)
48-
.await
49-
.unwrap();
41+
let cached_reader = ReadaheadMetadataCache::new(reader.clone()).unwrap();
42+
let mut metadata_reader = TiffMetadataReader::try_open(&cached_reader).await.unwrap();
43+
let ifds = metadata_reader.read_all_ifds(&cached_reader).await.unwrap();
5044
let tiff = TIFF::new(ifds);
5145

5246
let ifd = &tiff.ifds[1];

src/metadata/cache.rs

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::error::AsyncTiffResult;
1111
use crate::metadata::MetadataFetch;
1212

1313
/// Logic for managing a cache of sequential buffers
14-
struct SequentialCache {
14+
struct SequentialBlockCache {
1515
/// Contiguous blocks from offset 0
1616
///
1717
/// # Invariant
@@ -22,8 +22,8 @@ struct SequentialCache {
2222
len: u64,
2323
}
2424

25-
impl SequentialCache {
26-
/// Create a new, empty SequentialCache
25+
impl SequentialBlockCache {
26+
/// Create a new, empty SequentialBlockCache
2727
fn new() -> Self {
2828
Self {
2929
buffers: vec![],
@@ -55,7 +55,8 @@ impl SequentialCache {
5555

5656
// we slice bytes out of *this* block
5757
let start = remaining.start as usize;
58-
let end = (remaining.end - remaining.start).min(b_len - remaining.start) as usize;
58+
let size = (remaining.end - remaining.start).min(b_len - remaining.start) as usize;
59+
let end = start + size;
5960

6061
let chunk = b.slice(start..end);
6162
out_buffers.push(chunk);
@@ -87,35 +88,49 @@ impl SequentialCache {
8788

8889
/// A MetadataFetch implementation that caches fetched data in exponentially growing chunks,
8990
/// sequentially from the beginning of the file.
90-
pub struct ExponentialMetadataCache<F: MetadataFetch> {
91+
pub struct ReadaheadMetadataCache<F: MetadataFetch> {
9192
inner: F,
92-
cache: Arc<Mutex<SequentialCache>>,
93+
cache: Arc<Mutex<SequentialBlockCache>>,
94+
initial: u64,
95+
multiplier: f64,
9396
}
9497

95-
impl<F: MetadataFetch> ExponentialMetadataCache<F> {
96-
/// Create a new ExponentialMetadataCache wrapping the given MetadataFetch
98+
impl<F: MetadataFetch> ReadaheadMetadataCache<F> {
99+
/// Create a new ReadaheadMetadataCache wrapping the given MetadataFetch
97100
pub fn new(inner: F) -> AsyncTiffResult<Self> {
98101
Ok(Self {
99102
inner,
100-
cache: Arc::new(Mutex::new(SequentialCache::new())),
103+
cache: Arc::new(Mutex::new(SequentialBlockCache::new())),
104+
initial: 32 * 1024,
105+
multiplier: 2.0,
101106
})
102107
}
103-
}
104108

105-
fn next_fetch_size(existing_len: u64) -> u64 {
106-
if existing_len == 0 {
107-
64 * 1024
108-
} else {
109-
existing_len * 2
109+
/// Set the initial fetch size in bytes, otherwise defaults to 32 KiB
110+
pub fn with_initial_size(mut self, initial: u64) -> Self {
111+
self.initial = initial;
112+
self
113+
}
114+
115+
/// Set the multiplier for subsequent fetch sizes, otherwise defaults to 2.0
116+
pub fn with_multiplier(mut self, multiplier: f64) -> Self {
117+
self.multiplier = multiplier;
118+
self
119+
}
120+
121+
fn next_fetch_size(&self, existing_len: u64) -> u64 {
122+
if existing_len == 0 {
123+
self.initial
124+
} else {
125+
(existing_len as f64 * self.multiplier).round() as u64
126+
}
110127
}
111128
}
112129

113-
impl<F: MetadataFetch + Send + Sync> MetadataFetch for ExponentialMetadataCache<F> {
130+
impl<F: MetadataFetch + Send + Sync> MetadataFetch for ReadaheadMetadataCache<F> {
114131
fn fetch(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
115-
let cache = self.cache.clone();
116-
117132
Box::pin(async move {
118-
let mut g = cache.lock().await;
133+
let mut g = self.cache.lock().await;
119134

120135
// First check if we already have the range cached
121136
if g.contains(range.start..range.end) {
@@ -125,7 +140,7 @@ impl<F: MetadataFetch + Send + Sync> MetadataFetch for ExponentialMetadataCache<
125140
// Compute the correct fetch range
126141
let start_len = g.len;
127142
let needed = range.end.saturating_sub(start_len);
128-
let fetch_size = next_fetch_size(start_len).max(needed);
143+
let fetch_size = self.next_fetch_size(start_len).max(needed);
129144
let fetch_range = start_len..start_len + fetch_size;
130145

131146
// Perform the fetch while holding mutex

src/metadata/fetch.rs

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::ops::Range;
22

33
use bytes::Bytes;
44
use futures::future::BoxFuture;
5-
use futures::FutureExt;
65

76
use crate::error::AsyncTiffResult;
87
use crate::reader::{AsyncFileReader, EndianAwareReader, Endianness};
@@ -25,41 +24,6 @@ impl<T: AsyncFileReader> MetadataFetch for T {
2524
}
2625
}
2726

28-
/// Buffering for the first `N` bytes of a file.
29-
///
30-
/// This is designed so that the async requests made by the underlying tag reader get intercepted
31-
/// here and served from the existing buffer when possible.
32-
#[derive(Debug)]
33-
pub struct PrefetchBuffer<F: MetadataFetch> {
34-
fetch: F,
35-
buffer: Bytes,
36-
}
37-
38-
impl<F: MetadataFetch> PrefetchBuffer<F> {
39-
/// Construct a new PrefetchBuffer, catching the first `prefetch` bytes of the file.
40-
pub async fn new(fetch: F, prefetch: u64) -> AsyncTiffResult<Self> {
41-
let buffer = fetch.fetch(0..prefetch).await?;
42-
Ok(Self { fetch, buffer })
43-
}
44-
}
45-
46-
impl<F: MetadataFetch> MetadataFetch for PrefetchBuffer<F> {
47-
fn fetch(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
48-
if range.start < self.buffer.len() as _ {
49-
if range.end < self.buffer.len() as _ {
50-
let usize_range = range.start as usize..range.end as usize;
51-
let result = self.buffer.slice(usize_range);
52-
async { Ok(result) }.boxed()
53-
} else {
54-
// TODO: reuse partial internal buffer
55-
self.fetch.fetch(range)
56-
}
57-
} else {
58-
self.fetch.fetch(range)
59-
}
60-
}
61-
}
62-
6327
pub(crate) struct MetadataCursor<'a, F: MetadataFetch> {
6428
fetch: &'a F,
6529
offset: u64,

src/metadata/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,5 @@ pub mod cache;
6262
mod fetch;
6363
mod reader;
6464

65-
pub use cache::ExponentialMetadataCache;
66-
pub use fetch::{MetadataFetch, PrefetchBuffer};
65+
pub use fetch::MetadataFetch;
6766
pub use reader::{ImageFileDirectoryReader, TiffMetadataReader};

0 commit comments

Comments
 (0)