Skip to content

Commit 4715e3e

Browse files
committed
Provide a S3 download handler and a cached S3 download handler with the operation handler deciding which to use based on app settings.
Ideally move the decision even further up registering one S3 download handler in the shared state, the actual one used being based on app settings. This would allow a http handler to be registered in the same way. Proves "fun" with the lifetime of objects.
1 parent 37f0c95 commit 4715e3e

File tree

2 files changed

+77
-46
lines changed

2 files changed

+77
-46
lines changed

src/app.rs

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ struct AppState {
5858
resource_manager: ResourceManager,
5959

6060
/// Object chunk cache
61-
chunk_cache: ChunkCache,
61+
chunk_cache: Option<ChunkCache>,
6262
}
6363

6464
impl AppState {
@@ -67,7 +67,11 @@ impl AppState {
6767
let task_limit = args.thread_limit.or_else(|| Some(num_cpus::get() - 1));
6868
let resource_manager =
6969
ResourceManager::new(args.s3_connection_limit, args.memory_limit, task_limit);
70-
let chunk_cache = ChunkCache::new(args);
70+
let chunk_cache = if args.use_chunk_cache {
71+
Some(ChunkCache::new(args))
72+
} else {
73+
None
74+
};
7175

7276
Self {
7377
args: args.clone(),
@@ -165,7 +169,43 @@ async fn schema() -> &'static str {
165169
"Hello, world!"
166170
}
167171

168-
/// Download and optionally cache an object from S3
172+
/// Download an object from S3
173+
///
174+
/// Requests a byte range if `offset` or `size` is specified in the request.
175+
///
176+
/// # Arguments
177+
///
178+
/// * `client`: S3 client object
179+
/// * `request_data`: RequestData object for the request
180+
/// * `resource_manager`: ResourceManager object
181+
async fn download_s3_object<'a>(
182+
client: &s3_client::S3Client,
183+
request_data: &models::RequestData,
184+
resource_manager: &'a ResourceManager,
185+
) -> Result<Bytes, ActiveStorageError> {
186+
187+
// If we're given a size in the request data then use this to
188+
// get an initial guess at the required memory resources.
189+
let memory = request_data.size.unwrap_or(0);
190+
let mut mem_permits = resource_manager.memory(memory).await?;
191+
192+
let range = s3_client::get_range(request_data.offset, request_data.size);
193+
let _conn_permits = resource_manager.s3_connection().await?;
194+
195+
let data = client
196+
.download_object(
197+
&request_data.bucket,
198+
&request_data.object,
199+
range,
200+
resource_manager,
201+
&mut mem_permits,
202+
)
203+
.await;
204+
205+
data
206+
}
207+
208+
/// Download and cache an object from S3
169209
///
170210
/// Requests a byte range if `offset` or `size` is specified in the request.
171211
///
@@ -175,7 +215,7 @@ async fn schema() -> &'static str {
175215
/// * `request_data`: RequestData object for the request
176216
/// * `resource_manager`: ResourceManager object
177217
/// * `chunk_cache`: ChunkCache object
178-
async fn download_object<'a>(
218+
async fn download_and_cache_s3_object<'a>(
179219
client: &s3_client::S3Client,
180220
request_data: &models::RequestData,
181221
resource_manager: &'a ResourceManager,
@@ -184,21 +224,16 @@ async fn download_object<'a>(
184224

185225
let key = format!("{},{:?}", client, request_data);
186226

187-
// If we're using the chunk cache and have a hit it'll return Some(bytes), None when disabled.
188227
match chunk_cache.get(&key).await {
189228
Ok(value) => {
190229
if let Some(bytes) = value {
191-
// TODO: remove debug
192-
println!("Cache hit for key: {}", key);
193230
return Ok(bytes);
194231
}
195232
},
196233
Err(e) => {
197234
return Err(e);
198235
}
199236
}
200-
// TODO: remove debug
201-
println!("Cache miss for key: {}", key);
202237

203238
// If we're given a size in the request data then use this to
204239
// get an initial guess at the required memory resources.
@@ -264,14 +299,24 @@ async fn operation_handler<T: operation::Operation>(
264299
.instrument(tracing::Span::current())
265300
.await;
266301

267-
let data = download_object(
268-
&s3_client,
269-
&request_data,
270-
&state.resource_manager,
271-
&state.chunk_cache,
272-
)
273-
.instrument(tracing::Span::current())
274-
.await?;
302+
let data = if state.args.use_chunk_cache {
303+
download_and_cache_s3_object(
304+
&s3_client,
305+
&request_data,
306+
&state.resource_manager,
307+
&state.chunk_cache.as_ref().unwrap(),
308+
)
309+
.instrument(tracing::Span::current())
310+
.await?
311+
} else {
312+
download_s3_object(
313+
&s3_client,
314+
&request_data,
315+
&state.resource_manager,
316+
)
317+
.instrument(tracing::Span::current())
318+
.await?
319+
};
275320

276321
// All remaining work i s synchronous. If the use_rayon argument was specified, delegate to the
277322
// Rayon thread pool. Otherwise, execute as normal using Tokio.

src/chunk_cache.rs

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,11 @@ use std::{collections::HashMap, ops::Add, path::PathBuf, time::{SystemTime, UNIX
88
use tokio::fs;
99

1010
pub struct ChunkCache {
11-
cache: Option<SimpleDiskCache>,
11+
cache: SimpleDiskCache,
1212
}
1313

1414
impl ChunkCache {
1515
pub fn new(args: &CommandLineArgs) -> Self {
16-
if !args.use_chunk_cache {
17-
return Self {
18-
cache: None,
19-
};
20-
}
21-
2216
// Path to the cache directory.
2317
let path = <Option<String> as Clone>::clone(&args.chunk_cache_path)
2418
.expect("The chunk cache path must be specified when the chunk cache is enabled");
@@ -33,44 +27,36 @@ impl ChunkCache {
3327
};
3428

3529
Self {
36-
cache: Some(SimpleDiskCache::new(
30+
cache: SimpleDiskCache::new(
3731
"chunk_cache",
3832
&path,
3933
lifespan,
4034
60 * 60,
4135
max_size_bytes
42-
)),
36+
),
4337
}
4438
}
4539

4640
pub async fn set(&self, key: &String, value: Bytes) -> Result<Option<Bytes>, ActiveStorageError> {
47-
// Cache will be None with chunk caching disabled.
48-
if let Some(cache) = &self.cache {
49-
match cache.set(key, value).await {
50-
Ok(_) => {
51-
return Ok(None);
52-
},
53-
Err(e) => {
54-
return Err(ActiveStorageError::ChunkCacheError{ error: format!("{:?}", e) });
55-
}
41+
match self.cache.set(key, value).await {
42+
Ok(_) => {
43+
Ok(None)
44+
},
45+
Err(e) => {
46+
Err(ActiveStorageError::ChunkCacheError{ error: format!("{:?}", e) })
5647
}
5748
}
58-
Ok(None)
5949
}
6050

6151
pub async fn get(&self, key: &String) -> Result<Option<Bytes>, ActiveStorageError> {
62-
// Cache will be None with chunk caching disabled.
63-
if let Some(cache) = &self.cache {
64-
match cache.get(key).await {
65-
Ok(value) => {
66-
return Ok(value);
67-
},
68-
Err(e) => {
69-
return Err(ActiveStorageError::ChunkCacheError{ error: format!("{:?}", e) });
70-
}
52+
match self.cache.get(key).await {
53+
Ok(value) => {
54+
Ok(value)
55+
},
56+
Err(e) => {
57+
Err(ActiveStorageError::ChunkCacheError{ error: format!("{:?}", e) })
7158
}
7259
}
73-
Ok(None)
7460
}
7561
}
7662

0 commit comments

Comments
 (0)