Skip to content

Commit 3b6a51d

Browse files
committed
pulled coalesce_ranges conditionally in-tree, feature-gated object_store and reqwest
1 parent 6a7a2dc commit 3b6a51d

File tree

7 files changed

+117
-10
lines changed

7 files changed

+117
-10
lines changed

Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,19 @@ flate2 = "1.0.20"
1515
futures = "0.3.31"
1616
jpeg = { package = "jpeg-decoder", version = "0.3.0", default-features = false }
1717
num_enum = "0.7.3"
18-
object_store = "0.12"
18+
object_store = { version = "0.12", optional = true }
1919
# In the future we could make this feature-flagged, but for now we depend on
2020
# object_store which uses reqwest.
21-
reqwest = { version = "0.12", default-features = false }
21+
reqwest = { version = "0.12", default-features = false, optional = true }
2222
thiserror = "1"
2323
tokio = { version = "1.43.0", optional = true }
2424
weezl = "0.1.0"
2525

2626
[dev-dependencies]
2727
tiff = "0.9.1"
2828
tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] }
29+
30+
[features]
31+
default = ["object_store"]
32+
reqwest = ["dep:reqwest"]
33+
object_store = ["dep:object_store", "reqwest"]

src/cog.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ impl TIFF {
5656
}
5757
}
5858

59+
#[cfg(feature = "object_store")]
5960
#[cfg(test)]
6061
mod test {
6162
use std::io::BufReader;

src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub enum AsyncTiffError {
2424
JPEGDecodingError(#[from] jpeg::Error),
2525

2626
/// Error while fetching data using object store.
27+
#[cfg(feature = "object_store")]
2728
#[error(transparent)]
2829
ObjectStore(#[from] object_store::Error),
2930

@@ -32,6 +33,7 @@ pub enum AsyncTiffError {
3233
InternalTIFFError(#[from] crate::tiff::TiffError),
3334

3435
/// Reqwest error
36+
#[cfg(feature = "reqwest")] // see https://www.reddit.com/r/rust/comments/xyik51/comment/irhei39/
3537
#[error(transparent)]
3638
ReqwestError(#[from] reqwest::Error),
3739

src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ mod ifd;
1111
pub mod tiff;
1212
mod tile;
1313

14+
#[cfg(not(feature = "object_store"))]
15+
mod object_store;
16+
#[cfg(feature = "object_store")]
17+
pub use object_store::coalesce_ranges;
18+
#[cfg(not(feature = "object_store"))]
19+
pub use object_store::util::coalesce_ranges;
20+
1421
pub use cog::TIFF;
1522
pub use ifd::{ImageFileDirectories, ImageFileDirectory};
1623
pub use tile::Tile;

src/object_store/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod util;

src/object_store/util.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use bytes::Bytes;
2+
use futures::stream::{StreamExt, TryStreamExt};
3+
use std::ops::Range;
4+
5+
const COALESCE_PARALLEL: usize = 10;
6+
7+
/// Takes a function `fetch` that can fetch a range of bytes and uses this to
8+
/// fetch the provided byte `ranges`
9+
///
10+
/// To improve performance it will:
11+
///
12+
/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch`
13+
/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
14+
///
15+
pub async fn coalesce_ranges<F, E, Fut>(
16+
ranges: &[Range<u64>],
17+
fetch: F,
18+
coalesce: u64,
19+
) -> Result<Vec<Bytes>, E>
20+
where
21+
F: Send + FnMut(Range<u64>) -> Fut,
22+
E: Send,
23+
Fut: std::future::Future<Output = Result<Bytes, E>> + Send,
24+
{
25+
let fetch_ranges = merge_ranges(ranges, coalesce);
26+
27+
let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned())
28+
.map(fetch)
29+
.buffered(COALESCE_PARALLEL)
30+
.try_collect()
31+
.await?;
32+
33+
Ok(ranges
34+
.iter()
35+
.map(|range| {
36+
let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
37+
let fetch_range = &fetch_ranges[idx];
38+
let fetch_bytes = &fetched[idx];
39+
40+
let start = range.start - fetch_range.start;
41+
let end = range.end - fetch_range.start;
42+
let range = (start as usize)..(end as usize).min(fetch_bytes.len());
43+
fetch_bytes.slice(range)
44+
})
45+
.collect())
46+
}
47+
48+
/// Returns a sorted list of ranges that cover `ranges`
49+
fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
50+
if ranges.is_empty() {
51+
return vec![];
52+
}
53+
54+
let mut ranges = ranges.to_vec();
55+
ranges.sort_unstable_by_key(|range| range.start);
56+
57+
let mut ret = Vec::with_capacity(ranges.len());
58+
let mut start_idx = 0;
59+
let mut end_idx = 1;
60+
61+
while start_idx != ranges.len() {
62+
let mut range_end = ranges[start_idx].end;
63+
64+
while end_idx != ranges.len()
65+
&& ranges[end_idx]
66+
.start
67+
.checked_sub(range_end)
68+
.map(|delta| delta <= coalesce)
69+
.unwrap_or(true)
70+
{
71+
range_end = range_end.max(ranges[end_idx].end);
72+
end_idx += 1;
73+
}
74+
75+
let start = ranges[start_idx].start;
76+
let end = range_end;
77+
ret.push(start..end);
78+
79+
start_idx = end_idx;
80+
end_idx += 1;
81+
}
82+
83+
ret
84+
}

src/reader.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
99
use bytes::buf::Reader;
1010
use bytes::{Buf, Bytes};
1111
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
12+
#[cfg(feature = "object_store")]
1213
use object_store::ObjectStore;
1314

15+
use crate::coalesce_ranges;
16+
1417
use crate::error::{AsyncTiffError, AsyncTiffResult};
1518

1619
/// The asynchronous interface used to read COG files
@@ -33,17 +36,19 @@ pub trait AsyncFileReader: Debug + Send + Sync {
3336
/// Retrieve the bytes in `range`
3437
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>>;
3538

36-
/// Retrieve multiple byte ranges. The default implementation will call `get_bytes`
37-
/// sequentially
39+
/// Retrieve multiple byte ranges. The default implementation will
40+
/// coalesce ranges with:
41+
/// - less than 1024*1024=1MB space in between
42+
/// - 10 parallel requests
3843
fn get_byte_ranges(
3944
&self,
4045
ranges: Vec<Range<u64>>,
4146
) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
4247
async move {
4348
let mut result = Vec::with_capacity(ranges.len());
4449

45-
for range in ranges.into_iter() {
46-
let data = self.get_bytes(range).await?;
50+
for data in coalesce_ranges(&ranges, |range| self.get_bytes(range), 1024 * 1024).await?
51+
{
4752
result.push(data);
4853
}
4954

@@ -91,12 +96,13 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
9196
// }
9297

9398
/// An AsyncFileReader that reads from an [`ObjectStore`] instance.
99+
#[cfg(feature = "object_store")]
94100
#[derive(Clone, Debug)]
95101
pub struct ObjectReader {
96102
store: Arc<dyn ObjectStore>,
97103
path: object_store::path::Path,
98104
}
99-
105+
#[cfg(feature = "object_store")]
100106
impl ObjectReader {
101107
/// Creates a new [`ObjectReader`] for the provided [`ObjectStore`] and path
102108
///
@@ -105,7 +111,7 @@ impl ObjectReader {
105111
Self { store, path }
106112
}
107113
}
108-
114+
#[cfg(feature = "object_store")]
109115
impl AsyncFileReader for ObjectReader {
110116
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
111117
let range = range.start as _..range.end as _;
@@ -134,19 +140,20 @@ impl AsyncFileReader for ObjectReader {
134140
}
135141

136142
/// An AsyncFileReader that reads from a URL using reqwest.
143+
#[cfg(feature = "reqwest")]
137144
#[derive(Debug, Clone)]
138145
pub struct ReqwestReader {
139146
client: reqwest::Client,
140147
url: reqwest::Url,
141148
}
142-
149+
#[cfg(feature = "reqwest")]
143150
impl ReqwestReader {
144151
/// Construct a new ReqwestReader from a reqwest client and URL.
145152
pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self {
146153
Self { client, url }
147154
}
148155
}
149-
156+
#[cfg(feature = "reqwest")]
150157
impl AsyncFileReader for ReqwestReader {
151158
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
152159
let url = self.url.clone();

0 commit comments

Comments
 (0)