Skip to content

Commit d795e8e

Browse files
authored
fix: init_scheduler (#649)
Earlier, separate scheduler was initialized for each stream on load time or whenever retention period is set. Now, a single scheduler is initialized which checks retention config of all the streams and performs the retention cleanup. Fixes #636
1 parent 3e39328 commit d795e8e

File tree

3 files changed

+39
-34
lines changed

3 files changed

+39
-34
lines changed

server/src/handlers/http/logstream.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use serde_json::Value;
2626
use crate::alerts::Alerts;
2727
use crate::metadata::STREAM_INFO;
2828
use crate::option::CONFIG;
29-
use crate::storage::retention::{self, Retention};
29+
use crate::storage::retention::Retention;
3030
use crate::storage::{LogStream, StorageDir};
3131
use crate::{event, stats};
3232
use crate::{metadata, validator};
@@ -219,8 +219,6 @@ pub async fn put_retention(
219219
.put_retention(&stream_name, &retention)
220220
.await?;
221221

222-
retention::init_scheduler(&stream_name, retention);
223-
224222
Ok((
225223
format!("set retention configuration for log stream {stream_name}"),
226224
StatusCode::OK,

server/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> {
7676
}
7777

7878
// track all parquet files already in the data directory
79-
storage::retention::load_retention_from_global().await;
79+
storage::retention::load_retention_from_global();
8080
// load data from stats back to prometheus metrics
8181
metrics::load_from_stats_from_storage().await;
8282

server/src/storage/retention.rs

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -43,40 +43,47 @@ fn async_runtime() -> tokio::runtime::Runtime {
4343
.unwrap()
4444
}
4545

46-
pub async fn load_retention_from_global() {
46+
pub fn load_retention_from_global() {
4747
log::info!("loading retention for all streams");
48-
for stream in STREAM_INFO.list_streams() {
49-
let res = CONFIG
50-
.storage()
51-
.get_object_store()
52-
.get_retention(&stream)
53-
.await;
54-
match res {
55-
Ok(config) => {
56-
if config.tasks.is_empty() {
57-
log::info!("skipping loading retention for {stream}");
58-
continue;
59-
}
60-
init_scheduler(&stream, config)
61-
}
62-
Err(err) => log::warn!("failed to load retention config for {stream} due to {err:?}"),
63-
}
64-
}
48+
init_scheduler();
6549
}
6650

67-
pub fn init_scheduler(stream: &str, config: Retention) {
68-
log::info!("Setting up schedular for {stream}");
51+
pub fn init_scheduler() {
52+
log::info!("Setting up schedular");
6953
let mut scheduler = AsyncScheduler::new();
70-
for Task { action, days, .. } in config.tasks.into_iter() {
71-
let func = match action {
72-
Action::Delete => {
73-
let stream = stream.to_string();
74-
move || action::delete(stream.clone(), u32::from(days))
75-
}
76-
};
54+
let func = move || async {
55+
for stream in STREAM_INFO.list_streams() {
56+
let res = CONFIG
57+
.storage()
58+
.get_object_store()
59+
.get_retention(&stream)
60+
.await;
61+
62+
match res {
63+
Ok(config) => {
64+
for Task { action, days, .. } in config.tasks.into_iter() {
65+
match action {
66+
Action::Delete => {
67+
let stream = stream.to_string();
68+
thread::spawn(move || {
69+
let rt = tokio::runtime::Runtime::new().unwrap();
70+
rt.block_on(async {
71+
// Run the asynchronous delete action
72+
action::delete(stream.clone(), u32::from(days)).await;
73+
});
74+
});
75+
}
76+
};
77+
}
78+
}
79+
Err(err) => {
80+
log::warn!("failed to load retention config for {stream} due to {err:?}")
81+
}
82+
};
83+
}
84+
};
7785

78-
scheduler.every(1.day()).at("00:00").run(func);
79-
}
86+
scheduler.every(1.day()).at("00:00").run(func);
8087

8188
let handler = thread::spawn(|| {
8289
let rt = async_runtime();
@@ -183,7 +190,7 @@ mod action {
183190
use crate::option::CONFIG;
184191

185192
pub(super) async fn delete(stream_name: String, days: u32) {
186-
log::info!("running retention task - delete");
193+
log::info!("running retention task - delete for stream={stream_name}");
187194
let retain_until = get_retain_until(Utc::now().date_naive(), days as u64);
188195

189196
let Ok(dates) = CONFIG

0 commit comments

Comments
 (0)