Skip to content

Commit df2ed74

Browse files
committed
Functional disk cache implementation
1 parent 99e61ba commit df2ed74

File tree

2 files changed

+56
-21
lines changed

2 files changed

+56
-21
lines changed

src/app.rs

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use axum::{
2222
Router, TypedHeader,
2323
};
2424
use bytes::Bytes;
25-
// use cached::{proc_macro::io_cached, stores::DiskCacheBuilder};
25+
use cached::{proc_macro::io_cached, stores::DiskCacheBuilder};
2626

2727
use std::sync::Arc;
2828
use tokio::sync::SemaphorePermit;
@@ -160,6 +160,37 @@ async fn schema() -> &'static str {
160160
"Hello, world!"
161161
}
162162

163+
// /// Download an object from S3
164+
// ///
165+
// /// Requests a byte range if `offset` or `size` is specified in the request.
166+
// ///
167+
// /// # Arguments
168+
// ///
169+
// /// * `client`: S3 client object
170+
// /// * `request_data`: RequestData object for the request
171+
// #[tracing::instrument(
172+
// level = "DEBUG",
173+
// skip(client, request_data, resource_manager, mem_permits)
174+
// )]
175+
// async fn download_object<'a>(
176+
// client: &s3_client::S3Client,
177+
// request_data: &models::RequestData,
178+
// resource_manager: &'a ResourceManager,
179+
// mem_permits: &mut Option<SemaphorePermit<'a>>,
180+
// ) -> Result<Bytes, ActiveStorageError> {
181+
// let range = s3_client::get_range(request_data.offset, request_data.size);
182+
// let _conn_permits = resource_manager.s3_connection().await?;
183+
// client
184+
// .download_object(
185+
// &request_data.bucket,
186+
// &request_data.object,
187+
// range,
188+
// resource_manager,
189+
// mem_permits,
190+
// )
191+
// .await
192+
// }
193+
163194
/// Download an object from S3
164195
///
165196
/// Requests a byte range if `offset` or `size` is specified in the request.
@@ -170,30 +201,34 @@ async fn schema() -> &'static str {
170201
/// * `request_data`: RequestData object for the request
171202
#[tracing::instrument(
172203
level = "DEBUG",
173-
skip(client, request_data, resource_manager, mem_permits)
204+
// skip(client, request_data, resource_manager, mem_permits)
205+
skip(client, request_data)
206+
)]
207+
#[io_cached(
208+
map_error = r##"|e| ActiveStorageError::CacheError{ error: format!("{:?}", e) }"##,
209+
disk = true,
210+
create = r##"{ DiskCacheBuilder::new("test-cache").set_disk_directory("./").build().expect("valid disk cache builder") }"##,
211+
key = "String",
212+
convert = r##"{ format!("{:?}", request_data) }"##
174213
)]
175-
// #[io_cached(
176-
// map_error = r##"|e| ActiveStorageError::CacheError{ error: format!("{:?}", e) }"##,
177-
// disk = true,
178-
// create = r##"{ DiskCacheBuilder::new("test-cache").set_disk_directory("./").build().expect("valid disk cache builder") }"##,
179-
// key = "String",
180-
// convert = r##"{ format!("{:?},{:?},{:?},{:?}", client, request_data, resource_manager, mem_permits) }"##
181-
// )]
182214
async fn download_object<'a>(
183215
client: &s3_client::S3Client,
184216
request_data: &models::RequestData,
185-
resource_manager: &'a ResourceManager,
186-
mem_permits: &mut Option<SemaphorePermit<'a>>,
217+
// resource_manager: &'a ResourceManager,
218+
// mem_permits: &mut Option<SemaphorePermit<'a>>,
187219
) -> Result<Bytes, ActiveStorageError> {
188220
let range = s3_client::get_range(request_data.offset, request_data.size);
189-
let _conn_permits = resource_manager.s3_connection().await?;
221+
// let _conn_permits = resource_manager.s3_connection().await?;
222+
// println!("{:?},{:?}", client, request_data);
223+
// TODO: Add cache hit / miss statistics?
224+
println!("Downloading object");
190225
client
191226
.download_object(
192227
&request_data.bucket,
193228
&request_data.object,
194229
range,
195-
resource_manager,
196-
mem_permits,
230+
// resource_manager,
231+
// mem_permits,
197232
)
198233
.await
199234
}
@@ -232,8 +267,8 @@ async fn operation_handler<T: operation::Operation>(
232267
let data = download_object(
233268
&s3_client,
234269
&request_data,
235-
&state.resource_manager,
236-
&mut _mem_permits,
270+
// &state.resource_manager,
271+
// &mut _mem_permits,
237272
)
238273
.instrument(tracing::Span::current())
239274
.await?;

src/s3_client.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ impl S3Client {
137137
bucket: &str,
138138
key: &str,
139139
range: Option<String>,
140-
resource_manager: &'a ResourceManager,
141-
mem_permits: &mut Option<SemaphorePermit<'a>>,
140+
// resource_manager: &'a ResourceManager,
141+
// mem_permits: &mut Option<SemaphorePermit<'a>>,
142142
) -> Result<Bytes, ActiveStorageError> {
143143
let mut response = self
144144
.client
@@ -156,9 +156,9 @@ impl S3Client {
156156
.try_into()?;
157157

158158
// FIXME: how to account for compressed data?
159-
if mem_permits.is_none() {
160-
*mem_permits = resource_manager.memory(content_length).await?;
161-
};
159+
// if mem_permits.is_none() {
160+
// *mem_permits = resource_manager.memory(content_length).await?;
161+
// };
162162
// The data returned by the S3 client does not have any alignment guarantees. In order to
163163
// reinterpret the data as an array of numbers with a higher alignment than 1, we need to
164164
// return the data in Bytes object in which the underlying data has a higher alignment.

0 commit comments

Comments
 (0)