Skip to content

Commit b3de156

Browse files
committed
Where the requested chunk has an offset/size the cache stores the downloaded byte range under a key that doesn't account for this range.
When another request is received we get a cache hit even though the requested byte range may differ. Fix this by downloading and caching the entire chunk, applying the byte range post download or cache hit. If caching is disabled honour the byte range in the S3 client download.
1 parent 4de39db commit b3de156

File tree

3 files changed

+57
-28
lines changed

3 files changed

+57
-28
lines changed

src/app.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -182,45 +182,39 @@ async fn schema() -> &'static str {
182182

183183
/// Download an object from S3
184184
///
185-
/// Requests a byte range if `offset` or `size` is specified in the request.
186-
///
187185
/// # Arguments
188186
///
189187
/// * `client`: S3 client object
190-
/// * `request_data`: RequestData object for the request
188+
/// * `bucket`: Name of the bucket
189+
/// * `key`: Name of the object in the bucket
190+
/// * `range`: Optional byte range
191191
/// * `resource_manager`: ResourceManager object
192-
#[tracing::instrument(level = "DEBUG", skip(client, request_data, resource_manager))]
192+
/// * `mem_permits`: Memory permits for the request
193+
#[tracing::instrument(level = "DEBUG", skip(client, bucket, key, range, resource_manager))]
193194
async fn download_s3_object<'a>(
194195
client: &s3_client::S3Client,
195-
request_data: &models::RequestData,
196+
bucket: &str,
197+
key: &str,
198+
range: Option<String>,
196199
resource_manager: &'a ResourceManager,
197200
mut mem_permits: Option<SemaphorePermit<'a>>,
198201
) -> Result<Bytes, ActiveStorageError> {
199-
// Convert request data to byte range for S3 request
200-
let range = s3_client::get_range(request_data.offset, request_data.size);
201202
// Acquire connection permit to be freed via drop when this function returns
202203
let _conn_permits = resource_manager.s3_connection().await?;
203204

204205
client
205-
.download_object(
206-
&request_data.bucket,
207-
&request_data.object,
208-
range,
209-
resource_manager,
210-
&mut mem_permits,
211-
)
206+
.download_object(bucket, key, range, resource_manager, &mut mem_permits)
212207
.await
213208
}
214209

215210
/// Download and cache an object from S3
216211
///
217-
/// Requests a byte range if `offset` or `size` is specified in the request.
218-
///
219212
/// # Arguments
220213
///
221214
/// * `client`: S3 client object
222215
/// * `request_data`: RequestData object for the request
223216
/// * `resource_manager`: ResourceManager object
217+
/// * `mem_permits`: Memory permits for the request
224218
/// * `chunk_cache`: ChunkCache object
225219
#[tracing::instrument(
226220
level = "DEBUG",
@@ -285,19 +279,35 @@ async fn download_and_cache_s3_object<'a>(
285279
.instrument(tracing::Span::current())
286280
.await?;
287281
if let Some(bytes) = cache_value {
288-
return Ok(bytes);
282+
return Ok(s3_client::apply_range(
283+
bytes,
284+
request_data.offset,
285+
request_data.size,
286+
));
289287
}
290288
}
291289

292-
let data = download_s3_object(client, request_data, resource_manager, mem_permits).await?;
290+
let data = download_s3_object(
291+
client,
292+
&request_data.bucket,
293+
&request_data.object,
294+
None,
295+
resource_manager,
296+
mem_permits,
297+
)
298+
.await?;
293299

294300
// Write data to cache
295-
chunk_cache.set(&key, data.clone()).await?;
301+
chunk_cache.set(&key, &data).await?;
296302

297303
// Increment the prometheus metric for cache misses
298304
LOCAL_CACHE_MISSES.with_label_values(&["disk"]).inc();
299305

300-
Ok(data)
306+
Ok(s3_client::apply_range(
307+
data,
308+
request_data.offset,
309+
request_data.size,
310+
))
301311
}
302312

303313
/// Handler for Active Storage operations
@@ -341,7 +351,9 @@ async fn operation_handler<T: operation::Operation>(
341351

342352
let data = match (&state.args.use_chunk_cache, &state.chunk_cache) {
343353
(false, _) => {
344-
download_s3_object(&s3_client, &request_data, &state.resource_manager, _mem_permits)
354+
// Convert request data offset and size to byte range for S3 request
355+
let range = s3_client::get_range(request_data.offset, request_data.size);
356+
download_s3_object(&s3_client, &request_data.bucket, &request_data.object, range, &state.resource_manager, _mem_permits)
345357
.instrument(tracing::Span::current())
346358
.await?
347359
}
@@ -393,10 +405,7 @@ fn operation<T: operation::Operation>(
393405
assert_eq!(ptr, data.as_ptr());
394406
}
395407
// Convert to a mutable vector to allow in-place byte order conversion.
396-
let ptr = data.as_ptr();
397408
let vec: Vec<u8> = data.into();
398-
// Assert that we're using zero-copy.
399-
assert_eq!(ptr, vec.as_ptr());
400409
debug_span!("operation").in_scope(|| T::execute(&request_data, vec))
401410
}
402411

src/chunk_cache.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ struct ChunkCacheEntry {
2222

2323
impl ChunkCacheEntry {
2424
/// Return a ChunkCacheEntry object
25-
fn new(key: &str, value: Bytes) -> Self {
25+
fn new(key: &str, value: &Bytes) -> Self {
2626
let key = key.to_owned();
2727
// Make sure we own the `Bytes` so we don't see unexpected, but not incorrect,
2828
// behaviour caused by the zero copy of `Bytes`. i.e. let us choose when to copy.
29-
let value = Bytes::copy_from_slice(&value);
29+
let value = Bytes::copy_from_slice(value);
3030
Self { key, value }
3131
}
3232
}
@@ -103,8 +103,8 @@ impl ChunkCache {
103103
///
104104
/// * `key`: Unique key identifying the chunk
105105
/// * `value`: Chunk `Bytes` to be cached
106-
pub async fn set(&self, key: &str, value: Bytes) -> Result<(), ActiveStorageError> {
107-
match self.sender.send(ChunkCacheEntry::new(key, value)).await {
106+
pub async fn set(&self, key: &str, value: &Bytes) -> Result<(), ActiveStorageError> {
107+
match self.sender.send(ChunkCacheEntry::new(key, &value)).await {
108108
Ok(_) => Ok(()),
109109
Err(e) => Err(ActiveStorageError::ChunkCacheError {
110110
error: format!("{}", e),

src/s3_client.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,26 @@ pub fn get_range(offset: Option<usize>, size: Option<usize>) -> Option<String> {
273273
}
274274
}
275275

276+
/// Apply an optional byte range to a `Bytes` object
277+
/// to yield the same bytes as would be returned by a
278+
/// download request with HTTP Range header.
279+
///
280+
/// # Arguments
281+
///
282+
/// * `offset`: Optional offset of data in bytes
283+
/// * `size`: Optional size of data in bytes
284+
pub fn apply_range(bytes: Bytes, offset: Option<usize>, size: Option<usize>) -> Bytes {
285+
match (offset, size) {
286+
(offset, Some(size)) => {
287+
let offset = offset.unwrap_or(0);
288+
let end = offset + size;
289+
bytes.slice(offset..end)
290+
}
291+
(Some(offset), None) => bytes.slice(offset..),
292+
_ => bytes,
293+
}
294+
}
295+
276296
#[cfg(test)]
277297
mod tests {
278298
use super::*;

0 commit comments

Comments
 (0)