Skip to content

Commit 4b7bf99

Browse files
authored
Add retention mode (#332)
* Add API for setting up retention * Support for both local mode and s3 mode
1 parent 26d3200 commit 4b7bf99

File tree

7 files changed

+435
-3
lines changed

7 files changed

+435
-3
lines changed

server/src/handlers/logstream.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::alerts::Alerts;
2727
use crate::event;
2828
use crate::metadata::STREAM_INFO;
2929
use crate::option::CONFIG;
30+
use crate::storage::retention::{self, Retention};
3031
use crate::storage::{LogStream, StorageDir};
3132
use crate::{metadata, validator};
3233

@@ -173,6 +174,36 @@ pub async fn put_alert(
173174
))
174175
}
175176

177+
pub async fn put_retention(
178+
req: HttpRequest,
179+
body: web::Json<serde_json::Value>,
180+
) -> Result<impl Responder, StreamError> {
181+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
182+
let body = body.into_inner();
183+
184+
let retention: Retention = match serde_json::from_value(body) {
185+
Ok(retention) => retention,
186+
Err(err) => return Err(StreamError::InvalidRetentionConfig(err)),
187+
};
188+
189+
if !STREAM_INFO.stream_initialized(&stream_name)? {
190+
return Err(StreamError::UninitializedLogstream);
191+
}
192+
193+
CONFIG
194+
.storage()
195+
.get_object_store()
196+
.put_retention(&stream_name, &retention)
197+
.await?;
198+
199+
retention::init_scheduler(&stream_name, retention);
200+
201+
Ok((
202+
format!("set retention configuration for log stream {stream_name}"),
203+
StatusCode::OK,
204+
))
205+
}
206+
176207
pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
177208
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
178209

@@ -270,6 +301,8 @@ pub mod error {
270301
AlertValidation(#[from] AlertValidationError),
271302
#[error("alert - \"{0}\" is invalid, please check if alert is valid according to this stream's schema and try again")]
272303
InvalidAlert(String),
304+
#[error("failed to set retention configuration due to err: {0}")]
305+
InvalidRetentionConfig(serde_json::Error),
273306
#[error("{msg}")]
274307
Custom { msg: String, status: StatusCode },
275308
}
@@ -286,6 +319,7 @@ pub mod error {
286319
StreamError::BadAlertJson { .. } => StatusCode::BAD_REQUEST,
287320
StreamError::AlertValidation(_) => StatusCode::BAD_REQUEST,
288321
StreamError::InvalidAlert(_) => StatusCode::BAD_REQUEST,
322+
StreamError::InvalidRetentionConfig(_) => StatusCode::BAD_REQUEST,
289323
}
290324
}
291325

server/src/main.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ async fn main() -> anyhow::Result<()> {
100100

101101
// track all parquet files already in the data directory
102102
storage::CACHED_FILES.track_parquet();
103-
103+
storage::retention::load_retention_from_global().await;
104104
// load data from stats back to prometheus metrics
105105
metrics::load_from_global_stats();
106106

@@ -342,6 +342,11 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) {
342342
web::resource(stats_path("{logstream}"))
343343
.route(web::get().to(handlers::logstream::get_stats)),
344344
)
345+
.service(
346+
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
347+
web::resource(retention_path("{logstream}"))
348+
.route(web::put().to(handlers::logstream::put_retention)),
349+
)
345350
// GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command
346351
.service(web::resource(liveness_path()).route(web::get().to(handlers::liveness)))
347352
// GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes
@@ -412,3 +417,7 @@ fn schema_path(stream_name: &str) -> String {
412417
fn stats_path(stream_name: &str) -> String {
413418
format!("{}/stats", logstream_path(stream_name))
414419
}
420+
421+
fn retention_path(stream_name: &str) -> String {
422+
format!("{}/retention", logstream_path(stream_name))
423+
}

server/src/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use std::sync::{Arc, Mutex};
3838
mod file_link;
3939
mod localfs;
4040
mod object_storage;
41+
pub mod retention;
4142
mod s3;
4243
mod store_metadata;
4344

server/src/storage/localfs.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,12 @@ impl ObjectStorage for LocalFS {
142142
res.map_err(Into::into)
143143
}
144144

145+
async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError> {
146+
let path = self.path_in_root(path);
147+
tokio::fs::remove_dir_all(path).await?;
148+
Ok(())
149+
}
150+
145151
async fn check(&self) -> Result<(), ObjectStorageError> {
146152
fs::create_dir_all(&self.root).await?;
147153
validate_path_is_writeable(&self.root)
@@ -173,6 +179,16 @@ impl ObjectStorage for LocalFS {
173179
Ok(logstreams)
174180
}
175181

182+
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
183+
let path = self.root.join(stream_name);
184+
let directories = ReadDirStream::new(fs::read_dir(&path).await?);
185+
let entries: Vec<DirEntry> = directories.try_collect().await?;
186+
let entries = entries.into_iter().map(dir_name);
187+
let dates: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?;
188+
189+
Ok(dates.into_iter().flatten().collect())
190+
}
191+
176192
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
177193
let op = CopyOptions {
178194
overwrite: true,
@@ -255,6 +271,21 @@ async fn dir_with_stream(
255271
}
256272
}
257273

274+
async fn dir_name(entry: DirEntry) -> Result<Option<String>, ObjectStorageError> {
275+
if entry.file_type().await?.is_dir() {
276+
let dir_name = entry
277+
.path()
278+
.file_name()
279+
.expect("valid path")
280+
.to_str()
281+
.expect("valid unicode")
282+
.to_owned();
283+
Ok(Some(dir_name))
284+
} else {
285+
Ok(None)
286+
}
287+
}
288+
258289
impl From<fs_extra::error::Error> for ObjectStorageError {
259290
fn from(e: fs_extra::error::Error) -> Self {
260291
ObjectStorageError::UnhandledError(Box::new(e))

server/src/storage/object_storage.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
*/
1818

1919
use super::{
20-
file_link::CacheState, LogStream, MoveDataError, ObjectStorageError, ObjectStoreFormat,
21-
Permisssion, StorageDir, StorageMetadata, CACHED_FILES,
20+
file_link::CacheState, retention::Retention, LogStream, MoveDataError, ObjectStorageError,
21+
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, CACHED_FILES,
2222
};
2323
use crate::{
2424
alerts::Alerts,
@@ -77,9 +77,11 @@ pub trait ObjectStorage: Sync + 'static {
7777
path: &RelativePath,
7878
resource: Bytes,
7979
) -> Result<(), ObjectStorageError>;
80+
async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError>;
8081
async fn check(&self) -> Result<(), ObjectStorageError>;
8182
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
8283
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
84+
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
8385
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
8486
fn query_table(
8587
&self,
@@ -142,6 +144,23 @@ pub trait ObjectStorage: Sync + 'static {
142144
self.put_object(&path, to_bytes(&stream_metadata)).await
143145
}
144146

147+
async fn put_retention(
148+
&self,
149+
stream_name: &str,
150+
retention: &Retention,
151+
) -> Result<(), ObjectStorageError> {
152+
let path = stream_json_path(stream_name);
153+
let stream_metadata = self.get_object(&path).await?;
154+
let stats =
155+
serde_json::to_value(retention).expect("rentention tasks are perfectly serializable");
156+
let mut stream_metadata: serde_json::Value =
157+
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");
158+
159+
stream_metadata["retention"] = stats;
160+
161+
self.put_object(&path, to_bytes(&stream_metadata)).await
162+
}
163+
145164
async fn put_metadata(
146165
&self,
147166
parseable_metadata: &StorageMetadata,
@@ -216,6 +235,24 @@ pub trait ObjectStorage: Sync + 'static {
216235
Ok(stats)
217236
}
218237

238+
async fn get_retention(&self, stream_name: &str) -> Result<Retention, ObjectStorageError> {
239+
let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?;
240+
let stream_metadata: Value =
241+
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");
242+
243+
let retention = stream_metadata
244+
.as_object()
245+
.expect("is object")
246+
.get("retention")
247+
.cloned();
248+
249+
if let Some(retention) = retention {
250+
Ok(serde_json::from_value(retention).unwrap())
251+
} else {
252+
Ok(Retention::default())
253+
}
254+
}
255+
219256
async fn get_metadata(&self) -> Result<Option<StorageMetadata>, ObjectStorageError> {
220257
let parseable_metadata: Option<StorageMetadata> =
221258
match self.get_object(&parseable_json_path()).await {

0 commit comments

Comments
 (0)