Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions python/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,15 @@ struct ObspecReader {
}

impl AsyncFileReader for ObspecReader {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
fn get_metadata_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.backend.get_range_wrapper(&self.path, range).boxed()
}

fn get_byte_ranges(
fn get_image_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.backend.get_range_wrapper(&self.path, range).boxed()
}

fn get_image_byte_ranges(
&self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
Expand Down
4 changes: 2 additions & 2 deletions src/ifd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ impl ImageFileDirectory {
let range = self
.get_tile_byte_range(x, y)
.ok_or(AsyncTiffError::General("Not a tiled TIFF".to_string()))?;
let compressed_bytes = reader.get_bytes(range).await?;
let compressed_bytes = reader.get_image_bytes(range).await?;
Ok(Tile {
x,
y,
Expand Down Expand Up @@ -810,7 +810,7 @@ impl ImageFileDirectory {
.collect::<AsyncTiffResult<Vec<_>>>()?;

// 2: Fetch using `get_ranges
let buffers = reader.get_byte_ranges(byte_ranges).await?;
let buffers = reader.get_image_byte_ranges(byte_ranges).await?;

// 3: Create tile objects
let mut tiles = vec![];
Expand Down
134 changes: 87 additions & 47 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,23 @@ use crate::error::{AsyncTiffError, AsyncTiffResult};
///
/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
pub trait AsyncFileReader: Debug + Send + Sync {
/// Retrieve the bytes in `range`
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>>;
/// Retrieve the bytes in `range` as part of a request for header metadata.
fn get_metadata_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>>;

/// Retrieve multiple byte ranges. The default implementation will call `get_bytes`
/// sequentially
fn get_byte_ranges(
/// Retrieve the bytes in `range` as part of a request for image data, not header metadata.
fn get_image_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>>;

/// Retrieve multiple byte ranges as part of a request for image data, not header metadata. The
/// default implementation will call `get_image_bytes` sequentially
fn get_image_byte_ranges(
&self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
async move {
let mut result = Vec::with_capacity(ranges.len());

for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
let data = self.get_image_bytes(range).await?;
result.push(data);
}

Expand All @@ -55,15 +58,19 @@ pub trait AsyncFileReader: Debug + Send + Sync {

/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.as_ref().get_bytes(range)
fn get_metadata_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.as_ref().get_metadata_bytes(range)
}

fn get_image_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.as_ref().get_image_bytes(range)
}

fn get_byte_ranges(
fn get_image_byte_ranges(
&self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
self.as_ref().get_byte_ranges(ranges)
self.as_ref().get_image_byte_ranges(ranges)
}
}

Expand All @@ -89,31 +96,36 @@ impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug> Toki
pub fn new(inner: T) -> Self {
Self(tokio::sync::Mutex::new(inner))
}
}

#[cfg(feature = "tokio")]
impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug> AsyncFileReader
for TokioReader<T>
{
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
async fn make_range_request(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
use std::io::SeekFrom;
use tokio::io::{AsyncReadExt, AsyncSeekExt};

async move {
let mut file = self.0.lock().await;

file.seek(SeekFrom::Start(range.start)).await?;
let mut file = self.0.lock().await;

let to_read = range.end - range.start;
let mut buffer = Vec::with_capacity(to_read as usize);
let read = file.read(&mut buffer).await? as u64;
if read != to_read {
return Err(AsyncTiffError::EndOfFile(to_read, read));
}
file.seek(SeekFrom::Start(range.start)).await?;

Ok(buffer.into())
let to_read = range.end - range.start;
let mut buffer = Vec::with_capacity(to_read as usize);
let read = file.read(&mut buffer).await? as u64;
if read != to_read {
return Err(AsyncTiffError::EndOfFile(to_read, read));
}
.boxed()

Ok(buffer.into())
}
}

#[cfg(feature = "tokio")]
impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug> AsyncFileReader
for TokioReader<T>
{
fn get_metadata_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.make_range_request(range).boxed()
}

fn get_image_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.make_range_request(range).boxed()
}
}

Expand All @@ -133,19 +145,30 @@ impl ObjectReader {
pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
Self { store, path }
}
}

#[cfg(feature = "object_store")]
impl AsyncFileReader for ObjectReader {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
async fn make_range_request(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
let range = range.start as _..range.end as _;
self.store
.get_range(&self.path, range)
.map_err(|e| e.into())
.boxed()
.await
}
}

#[cfg(feature = "object_store")]
impl AsyncFileReader for ObjectReader {
fn get_metadata_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.make_range_request(range).boxed()
}

fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>>
fn get_image_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.make_range_request(range).boxed()
}

fn get_image_byte_ranges(
&self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>>
where
Self: Send,
{
Expand Down Expand Up @@ -177,11 +200,8 @@ impl ReqwestReader {
pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self {
Self { client, url }
}
}

#[cfg(feature = "reqwest")]
impl AsyncFileReader for ReqwestReader {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
fn make_range_request(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
let url = self.url.clone();
let client = self.client.clone();
// HTTP range is inclusive, so we need to subtract 1 from the end
Expand All @@ -195,6 +215,17 @@ impl AsyncFileReader for ReqwestReader {
}
}

#[cfg(feature = "reqwest")]
impl AsyncFileReader for ReqwestReader {
fn get_metadata_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.make_range_request(range)
}

fn get_image_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
self.make_range_request(range)
}
}

/// An AsyncFileReader that caches the first `prefetch` bytes of a file.
#[derive(Debug)]
pub struct PrefetchReader {
Expand All @@ -205,34 +236,43 @@ pub struct PrefetchReader {
impl PrefetchReader {
/// Construct a new PrefetchReader, catching the first `prefetch` bytes of the file.
pub async fn new(reader: Arc<dyn AsyncFileReader>, prefetch: u64) -> AsyncTiffResult<Self> {
let buffer = reader.get_bytes(0..prefetch).await?;
let buffer = reader.get_metadata_bytes(0..prefetch).await?;
Ok(Self { reader, buffer })
}
}

impl AsyncFileReader for PrefetchReader {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
fn get_metadata_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
if range.start < self.buffer.len() as _ {
if range.end < self.buffer.len() as _ {
let usize_range = range.start as usize..range.end as usize;
let result = self.buffer.slice(usize_range);
async { Ok(result) }.boxed()
} else {
// TODO: reuse partial internal buffer
self.reader.get_bytes(range)
self.reader.get_metadata_bytes(range)
}
} else {
self.reader.get_bytes(range)
self.reader.get_metadata_bytes(range)
}
}

fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>>
fn get_image_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
// In practice, get_image_bytes is only used for fetching tiles, which are unlikely
// to overlap a metadata prefetch.
self.reader.get_image_bytes(range)
}

fn get_image_byte_ranges(
&self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>>
where
Self: Send,
{
// In practice, get_byte_ranges is only used for fetching tiles, which are unlikely to
// overlap a metadata prefetch.
self.reader.get_byte_ranges(ranges)
// In practice, get_image_byte_ranges is only used for fetching tiles, which are unlikely
// to overlap a metadata prefetch.
self.reader.get_image_byte_ranges(ranges)
}
}

Expand Down Expand Up @@ -293,7 +333,7 @@ impl AsyncCursor {
pub(crate) async fn read(&mut self, length: u64) -> AsyncTiffResult<EndianAwareReader> {
let range = self.offset as _..(self.offset + length) as _;
self.offset += length;
let bytes = self.reader.get_bytes(range).await?;
let bytes = self.reader.get_metadata_bytes(range).await?;
Ok(EndianAwareReader {
reader: bytes.reader(),
endianness: self.endianness,
Expand Down