Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ tracing-subscriber = { version = "0.3", features = [

[dev-dependencies]
http = "1"
http-body-util = "0.1"
hyper = { version = "1", features = ["client"] }
mock_instant = "0.6"
testcontainers = "0.27"
testcontainers-modules = { version = "0.15", features = ["minio"] }
Expand Down
6 changes: 3 additions & 3 deletions src/bin/s3_cache_sim/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use http::{HeaderMap, Method, Uri};
use s3s::dto::*;
use s3s::{S3, S3Request};

use s3_cache::{CachingProxy, S3Cache};
use s3_cache::{S3Cache, S3CachingProxy};

use simulated_backend::SimulatedBackend;
use tracing_subscriber::fmt::format::FmtSpan;
Expand Down Expand Up @@ -156,7 +156,7 @@ async fn main() {
.await;

// Build cache + proxy (or direct backend if --no-cache)
let proxy: Option<Arc<CachingProxy<_>>> = if args.no_cache {
let proxy: Option<Arc<S3CachingProxy<_>>> = if args.no_cache {
None
} else {
let cache = Arc::new(S3Cache::new(
Expand All @@ -165,7 +165,7 @@ async fn main() {
Duration::from_secs(args.cache_ttl_secs),
args.cache_shards,
));
Some(Arc::new(CachingProxy::new(
Some(Arc::new(S3CachingProxy::new(
backend.clone(),
Some(cache),
args.max_cacheable_size,
Expand Down
11 changes: 6 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::{debug, error, info};
pub use self::config::Config;
pub use self::error::ApplicationError;
pub use self::fifo_cache::FifoCache;
pub use self::proxy_service::{CachingProxy, range_to_string};
pub use self::proxy::{S3CachingProxy, range_to_string};
pub use self::s3_cache::{CacheKey, CachedObject, S3Cache};
pub use self::statistics::UniqueRequestedObjectsStatisticsTracker;

Expand All @@ -38,8 +38,9 @@ mod config;
mod error;
mod fifo_cache;
mod metrics_writer;
mod proxy_service;
mod proxy;
mod s3_cache;
mod service;
mod statistics;
mod telemetry;

Expand Down Expand Up @@ -133,18 +134,18 @@ where
});

// Build caching proxy
let caching_proxy = proxy_service::CachingProxy::from_aws_proxy(
let caching_proxy = S3CachingProxy::from_aws_proxy(
proxy,
cache,
config.cache_max_object_size_bytes,
config.cache_dry_run,
);

// Build S3 service with auth
// Build S3 service with auth, wrapped in a health check layer
let service = {
let mut b = S3ServiceBuilder::new(caching_proxy);
b.set_auth(auth::create_auth(&config));
b.build()
service::S3CachingServiceProxy::new(b.build())
};

// Start Prometheus metrics writer if configured
Expand Down
8 changes: 4 additions & 4 deletions src/proxy_service.rs → src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::telemetry;
///
/// The type parameter `T` defaults to [`s3s_aws::Proxy`] but can be any type
/// implementing the [`S3`] trait.
pub struct CachingProxy<T = Proxy> {
pub struct S3CachingProxy<T = Proxy> {
inner: T,
cache: Option<Arc<S3Cache>>,
max_cacheable_size: usize,
Expand All @@ -31,7 +31,7 @@ pub struct CachingProxy<T = Proxy> {
dry_run: bool,
}

impl<T> CachingProxy<T> {
impl<T> S3CachingProxy<T> {
/// Creates a new caching proxy wrapping an S3 implementation.
///
/// Pass `None` for `cache` to disable caching (passthrough mode).
Expand Down Expand Up @@ -70,7 +70,7 @@ impl<T> CachingProxy<T> {
}
}

impl CachingProxy<Proxy> {
impl S3CachingProxy<Proxy> {
/// Convenience constructor for wrapping [`s3s_aws::Proxy`].
///
/// This is equivalent to calling [`new`](Self::new) with a [`Proxy`] type parameter.
Expand Down Expand Up @@ -110,7 +110,7 @@ pub fn range_to_string(range: &Range) -> String {
}

#[async_trait::async_trait]
impl<T: S3 + Send + Sync> S3 for CachingProxy<T> {
impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
async fn get_object(
&self,
req: S3Request<GetObjectInput>,
Expand Down
52 changes: 52 additions & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use bytes::Bytes;
use hyper::service::Service;
use hyper::{Method, Request, Response, body::Incoming};
use s3s::{Body, HttpError};
use std::future::Future;
use std::pin::Pin;

type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;

/// Wraps an S3 service and short-circuits `GET /` and `GET /health` requests,
/// returning `200 OK` with a plain-text `"Status OK"` body without forwarding
/// them to the S3 layer or requiring authentication.
pub struct S3CachingServiceProxy<S> {
inner: S,
}

impl<S> S3CachingServiceProxy<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
}

impl<S: Clone> Clone for S3CachingServiceProxy<S> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}

impl<S> Service<Request<Incoming>> for S3CachingServiceProxy<S>
where
S: Service<Request<Incoming>, Response = Response<Body>, Error = HttpError>,
S::Future: Send + 'static,
{
type Response = Response<Body>;
type Error = HttpError;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;

fn call(&self, req: Request<Incoming>) -> Self::Future {
if req.method() == Method::GET && (req.uri().path() == "/" || req.uri().path() == "/health")
{
let response = Response::builder()
.status(200)
.body(Body::from(Bytes::from_static(b"Status OK")))
.unwrap();
Box::pin(std::future::ready(Ok(response)))
} else {
Box::pin(self.inner.call(req))
}
}
}
28 changes: 14 additions & 14 deletions tests/integration_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod common;
use bytes::Bytes;
use common::MockS3Backend;
use common::helpers::*;
use s3_cache::{CacheKey, CachingProxy};
use s3_cache::{CacheKey, S3CachingProxy};
use s3s::S3;

#[tokio::test]
Expand All @@ -16,7 +16,7 @@ async fn get_object_cache_miss_then_hit() {

// Setup: Cache + Proxy
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// First request: cache miss
let req = build_get_request("test-bucket", "key.txt", None);
Expand Down Expand Up @@ -49,7 +49,7 @@ async fn cache_ttl_expiration() {

// Cache with 60 second TTL
let cache = create_test_cache(100, usize::MAX, 60);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// First request: populate cache
let req = build_get_request("test-bucket", "expiring.txt", None);
Expand Down Expand Up @@ -87,7 +87,7 @@ async fn cache_size_eviction() {

// Cache with room for only 5 entries
let cache = create_test_cache(5, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// Fetch all 10 objects
for i in 0..10 {
Expand Down Expand Up @@ -138,7 +138,7 @@ async fn cache_object_count_limit() {

// Cache limited to 10 entries
let cache = create_test_cache(10, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// Fetch 15 objects
for i in 0..15 {
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn oversized_object_not_cached() {

// Cache with max size 100KB
let cache = create_test_cache(100, 100_000, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// First request: object too large, streams through without caching
let req = build_get_request("test-bucket", "large.bin", None);
Expand Down Expand Up @@ -211,7 +211,7 @@ async fn concurrent_cache_access() {
.await;

let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

for _ in 0..10 {
let req = build_get_request("test-bucket", "concurrent.txt", None);
Expand All @@ -236,7 +236,7 @@ async fn different_buckets_separate_cache() {
.await;

let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// Fetch from both buckets
let req = build_get_request("bucket-a", "key.txt", None);
Expand Down Expand Up @@ -270,7 +270,7 @@ async fn cache_byte_size_eviction() {

// Cache with max_size of 2000 bytes (room for ~4 objects of 500 bytes)
let cache = create_test_cache(100, 2000, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// Fetch all 10 objects
for i in 0..10 {
Expand Down Expand Up @@ -306,7 +306,7 @@ async fn backend_error_not_cached() {
// Don't add the object — backend will return NoSuchKey

let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// Request non-existent object
let req = build_get_request("test-bucket", "missing.txt", None);
Expand All @@ -333,7 +333,7 @@ async fn max_cacheable_size_rejects_large_objects() {

// Proxy with max_cacheable_size = 1000 (rejects objects > 1KB)
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), 1000, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), 1000, false);

// Small object: should be cached
let req = build_get_request("test-bucket", "small.bin", None);
Expand Down Expand Up @@ -363,7 +363,7 @@ async fn cache_hit_preserves_metadata() {
.await;

let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// First request: cache miss
let req = build_get_request("test-bucket", "meta.txt", None);
Expand Down Expand Up @@ -392,7 +392,7 @@ async fn head_object_does_not_populate_cache() {
.await;

let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// HEAD request — should be delegated, not cached
let req = s3s::S3Request {
Expand Down Expand Up @@ -425,7 +425,7 @@ async fn put_then_get_sees_new_content() {
.await;

let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, false);

// GET: caches "version1"
let req = build_get_request("test-bucket", "mutable.txt", None);
Expand Down
22 changes: 11 additions & 11 deletions tests/integration_dry_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod common;
use bytes::Bytes;
use common::MockS3Backend;
use common::helpers::*;
use s3_cache::CachingProxy;
use s3_cache::S3CachingProxy;
use s3s::S3;

#[tokio::test]
Expand All @@ -16,7 +16,7 @@ async fn cache_miss_populates_with_hash() {

// Setup: Cache + Proxy in dry-run mode
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);

// First request in dry-run mode: cache miss
let req = build_get_request("test-bucket", "key.txt", None);
Expand All @@ -39,7 +39,7 @@ async fn always_fetches_from_upstream() {

// Setup: Cache + Proxy in dry-run mode
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);

// First request: cache miss
let req = build_get_request("test-bucket", "key.txt", None);
Expand Down Expand Up @@ -67,7 +67,7 @@ async fn returns_fresh_data() {

// Setup: Cache + Proxy in dry-run mode
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);

// First request: populates cache with "version1"
let req = build_get_request("test-bucket", "mutable.txt", None);
Expand Down Expand Up @@ -103,7 +103,7 @@ async fn with_matching_cache_data() {

// Setup: Cache + Proxy in dry-run mode
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);

// First request: populates cache
let req = build_get_request("test-bucket", "stable.txt", None);
Expand All @@ -128,7 +128,7 @@ async fn with_mismatched_cache_data() {

// Setup: Cache + Proxy in dry-run mode
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);

// First request: populates cache with "original"
let req = build_get_request("test-bucket", "changing.txt", None);
Expand Down Expand Up @@ -161,7 +161,7 @@ async fn multiple_objects() {

// Setup: Cache + Proxy in dry-run mode
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);

// Fetch all objects
for i in 0..5 {
Expand Down Expand Up @@ -197,7 +197,7 @@ async fn with_large_objects() {

// Setup: Cache + Proxy in dry-run mode with size limit
let cache = create_test_cache(100, 100_000, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), 100_000, true);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), 100_000, true);

// Request large object in dry-run mode
let req = build_get_request("test-bucket", "large.bin", None);
Expand Down Expand Up @@ -225,7 +225,7 @@ async fn concurrent_access() {

// Setup: Cache + Proxy in dry-run mode
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);

for _ in 0..10 {
let req = build_get_request("test-bucket", "concurrent.txt", None);
Expand All @@ -248,7 +248,7 @@ async fn backend_error_not_cached() {

// Setup: Cache + Proxy in dry-run mode
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);

// Request non-existent object
let req = build_get_request("test-bucket", "missing.txt", None);
Expand All @@ -270,7 +270,7 @@ async fn preserves_metadata() {

// Setup: Cache + Proxy in dry-run mode
let cache = create_test_cache(100, usize::MAX, 300);
let proxy = CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);
let proxy = S3CachingProxy::new(backend.clone(), Some(cache.clone()), usize::MAX, true);

// First request
let req = build_get_request("test-bucket", "meta.txt", None);
Expand Down
Loading