-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlib.rs
More file actions
211 lines (184 loc) · 6.78 KB
/
lib.rs
File metadata and controls
211 lines (184 loc) · 6.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
//! # s3-cache
//!
//! A high-performance caching proxy for S3-compatible object storage services.
//!
//! This library implements an S3-FIFO cache with sharded async support, designed to
//! reduce latency and bandwidth usage when accessing frequently-requested S3 objects.
//!
//! ## Features
//!
//! - **S3-FIFO Caching**: Uses the S3-FIFO eviction algorithm for improved cache hit rates
//! - **Async/Sharded**: Lock-free sharded cache for high concurrency workloads
//! - **Range Request Support**: Caches partial object reads (byte ranges)
//! - **Cache Invalidation**: Automatic invalidation on PUT/DELETE operations
//! - **Dry-run Mode**: Validate cache correctness without serving cached data
//! - **Telemetry**: OpenTelemetry metrics support
use std::sync::Arc;
use std::time::Duration;
use aws_credential_types::Credentials;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as ConnBuilder,
};
use reqwest::Url;
use s3s::service::S3ServiceBuilder;
use tokio::net::TcpListener;
use tracing::{debug, error, info};
use crate::service::S3CachingServiceProxy;
pub use self::config::Config;
pub use self::error::ApplicationError;
pub use self::fifo_cache::FifoCache;
pub use self::proxy::{S3CachingProxy, range_to_string};
pub use self::s3_cache::{CacheKey, CachedObject, S3Cache};
pub use self::s3_op::s3_operation_name;
pub use self::statistics::UniqueRequestedObjectsStatisticsTracker;
mod auth;
mod config;
mod error;
mod fifo_cache;
mod proxy;
mod s3_cache;
mod s3_op;
mod service;
mod statistics;
mod telemetry;
/// Result type alias using [`ApplicationError`] as the error type.
pub type Result<T> = std::result::Result<T, ApplicationError>;
static CARGO_CRATE_NAME: &str = env!("CARGO_CRATE_NAME");
/// Starts the S3 caching proxy server, returning `Ok(())` on successful shutdown,
/// or an error if startup or initialization fails.
///
/// This function initializes telemetry, connects to the upstream S3 service,
/// creates the cache, and starts an HTTP server to handle S3 requests.
///
/// The server will run until it receives a SIGINT (Ctrl+C) signal, at which
/// point it will perform a graceful shutdown with a 10-second timeout.
///
/// # Example
///
/// ```no_run
/// use clap::Parser;
///
/// use s3_cache::{Config, start_app};
///
/// # #[tokio::main]
/// # async fn main() -> s3_cache::Result<()> {
/// let config = Config::parse();
/// start_app(config).await?;
/// # Ok(())
/// # }
/// ```
pub async fn start_app(config: Config) -> Result<()> {
let (addr_tx, _) = tokio::sync::oneshot::channel();
let shutdown = async { tokio::signal::ctrl_c().await.expect("ctrl_c failed") };
start_app_with_shutdown(config, shutdown, addr_tx).await
}
/// Starts the S3 caching proxy server with a custom shutdown signal.
///
/// Unlike [`start_app`], this function accepts an arbitrary future as the shutdown
/// trigger and reports the bound [`SocketAddr`] via `addr_tx` immediately after the
/// TCP listener is bound (before the first connection is accepted). This makes it
/// suitable for embedding the server in integration tests where the caller needs to
/// know the actual port (e.g., when binding to `127.0.0.1:0`).
pub async fn start_app_with_shutdown<F>(
config: Config,
shutdown: F,
addr_tx: tokio::sync::oneshot::Sender<std::net::SocketAddr>,
) -> Result<()>
where
F: Future<Output = ()> + Send + 'static,
{
let (metrics_provider, logs_provider) = telemetry::initialize_telemetry(&config)?;
info!("Starting {CARGO_CRATE_NAME} with {config}");
// Build AWS SDK config for upstream MinIO
let credentials = Credentials::new(
&config.upstream_access_key_id,
&config.upstream_secret_access_key,
None,
None,
"s3-cache-static",
);
let sdk_config = aws_config::from_env()
.endpoint_url(&config.upstream_endpoint)
.region(aws_sdk_s3::config::Region::new(
config.upstream_region.clone(),
))
.credentials_provider(credentials)
.load()
.await;
let s3_client = aws_sdk_s3::Client::from_conf(
aws_sdk_s3::config::Builder::from(&sdk_config)
.force_path_style(true)
.build(),
);
let proxy = s3s_aws::Proxy::from(s3_client);
// Build cache
let cache = config.cache_enabled.then(|| {
Arc::new(S3Cache::new(
config.cache_max_entries,
config.cache_max_size_bytes,
Duration::from_secs(config.cache_ttl_seconds as u64),
config.cache_shards,
))
});
// Build caching proxy
let caching_proxy = S3CachingProxy::from_aws_proxy(
proxy,
cache,
config.cache_max_object_size_bytes,
config.cache_dry_run,
);
// Build S3 service with auth, wrapped in a health check layer
let service = {
let mut b = S3ServiceBuilder::new(caching_proxy);
b.set_auth(auth::create_auth(&config));
let upstream_health_endpoint = Url::parse(&config.upstream_endpoint)
.unwrap()
.join("/minio/health/ready")
.unwrap();
S3CachingServiceProxy::new(b.build(), upstream_health_endpoint)
};
// Start hyper server
let listener = TcpListener::bind(config.listen_addr).await?;
// Report the bound address before entering the accept loop so callers using
// port 0 can discover which port was assigned.
let _ = addr_tx.send(listener.local_addr()?);
let http_server = ConnBuilder::new(TokioExecutor::new());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
let mut shutdown = std::pin::pin!(shutdown);
info!("Listening on http://{}/", listener.local_addr()?);
loop {
let (socket, remote_addr) = tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
error!("Error accepting connection: {err}");
continue;
}
}
}
_ = shutdown.as_mut() => { break; }
};
debug!("Accepted connection from {remote_addr}");
let conn = http_server.serve_connection(TokioIo::new(socket), service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
if let Err(err) = conn.await {
debug!("Connection error: {err}");
}
});
}
info!("Shutting down gracefully...");
tokio::select! {
() = graceful.shutdown() => {
info!("Graceful shutdown complete");
},
() = tokio::time::sleep(Duration::from_secs(10)) => {
info!("Graceful shutdown timed out after 10s, aborting");
}
}
telemetry::shutdown_metrics(metrics_provider);
telemetry::shutdown_logs(logs_provider);
Ok(())
}