Skip to content

Commit 3122f19

Browse files
committed
Add tokio wrapper
1 parent 9e04ae9 commit 3122f19

File tree

3 files changed

+45
-14
lines changed

3 files changed

+45
-14
lines changed

Cargo.toml

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,18 @@ object_store = { version = "0.12", optional = true }
2020
# object_store which uses reqwest.
2121
reqwest = { version = "0.12", default-features = false, optional = true }
2222
thiserror = "1"
23-
tokio = { version = "1.43.0", optional = true, features = ["fs", "io-util"] }
23+
tokio = { version = "1.43.0", optional = true }
2424
weezl = "0.1.0"
2525

2626
[dev-dependencies]
27+
object_store = "0.12"
2728
tiff = "0.9.1"
28-
tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread", "io-util"] }
29+
tokio = { version = "1.9", features = [
30+
"macros",
31+
"fs",
32+
"rt-multi-thread",
33+
"io-util",
34+
] }
2935

3036
[features]
3137
default = ["object_store", "reqwest"]
@@ -34,8 +40,3 @@ reqwest = ["dep:reqwest"]
3440
object_store = ["dep:object_store"]
3541

3642
[package.metadata.cargo-all-features]
37-
# If your crate has a large number of optional dependencies, skip them for speed
38-
# skip_optional_dependencies = true
39-
40-
# Exclude certain features from the build matrix
41-
denylist = ["default"]

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use thiserror::Error;
99
pub enum AsyncTiffError {
1010
/// End of file error.
1111
#[error("End of File: expected to read {0} bytes, got {1}")]
12-
EndOfFile(usize, usize),
12+
EndOfFile(u64, u64),
1313

1414
/// General error.
1515
#[error("General error: {0}")]

src/reader.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,18 +70,44 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
7070
}
7171
}
7272

73+
/// A wrapper for things that implement [AsyncRead] and [AsyncSeek] to also implement
74+
/// [AsyncFileReader].
75+
///
76+
/// This wrapper is needed because `AsyncRead` and `AsyncSeek` require mutable access to seek and
77+
/// read data, while the `AsyncFileReader` trait requires immutable access to read data.
78+
///
79+
/// [AsyncRead]: tokio::io::AsyncRead
80+
/// [AsyncSeek]: tokio::io::AsyncSeek
81+
#[cfg(feature = "tokio")]
82+
#[derive(Debug)]
83+
pub struct TokioReader<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug>(
84+
tokio::sync::Mutex<T>,
85+
);
86+
87+
#[cfg(feature = "tokio")]
88+
impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug> TokioReader<T> {
89+
/// Create a new TokioReader from a reader.
90+
pub fn new(inner: T) -> Self {
91+
Self(tokio::sync::Mutex::new(inner))
92+
}
93+
}
94+
7395
#[cfg(feature = "tokio")]
74-
impl AsyncFileReader for tokio::fs::File {
96+
impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug> AsyncFileReader
97+
for TokioReader<T>
98+
{
7599
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
100+
use std::io::SeekFrom;
76101
use tokio::io::{AsyncReadExt, AsyncSeekExt};
77102

78103
async move {
79-
let mut file = (*self).try_clone().await?;
80-
file.seek(std::io::SeekFrom::Start(range.start)).await?;
104+
let mut file = self.0.lock().await;
81105

82-
let to_read = (range.end - range.start).try_into().unwrap();
83-
let mut buffer = Vec::with_capacity(to_read);
84-
let read = file.take(to_read as u64).read_to_end(&mut buffer).await?;
106+
file.seek(SeekFrom::Start(range.start)).await?;
107+
108+
let to_read = range.end - range.start;
109+
let mut buffer = Vec::with_capacity(to_read as usize);
110+
let read = file.read(&mut buffer).await? as u64;
85111
if read != to_read {
86112
return Err(AsyncTiffError::EndOfFile(to_read, read));
87113
}
@@ -99,6 +125,7 @@ pub struct ObjectReader {
99125
store: Arc<dyn ObjectStore>,
100126
path: object_store::path::Path,
101127
}
128+
102129
#[cfg(feature = "object_store")]
103130
impl ObjectReader {
104131
/// Creates a new [`ObjectReader`] for the provided [`ObjectStore`] and path
@@ -108,6 +135,7 @@ impl ObjectReader {
108135
Self { store, path }
109136
}
110137
}
138+
111139
#[cfg(feature = "object_store")]
112140
impl AsyncFileReader for ObjectReader {
113141
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
@@ -143,13 +171,15 @@ pub struct ReqwestReader {
143171
client: reqwest::Client,
144172
url: reqwest::Url,
145173
}
174+
146175
#[cfg(feature = "reqwest")]
147176
impl ReqwestReader {
148177
/// Construct a new ReqwestReader from a reqwest client and URL.
149178
pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self {
150179
Self { client, url }
151180
}
152181
}
182+
153183
#[cfg(feature = "reqwest")]
154184
impl AsyncFileReader for ReqwestReader {
155185
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {

0 commit comments

Comments
 (0)