Skip to content

Commit 634129b

Browse files
authored
Deleting a logstream should delete its stats. (#419)
Return 404 in get_stats if no metrics are present for the log stream. Deleting a log stream also deletes its corresponding stats. Fixes #412
1 parent 0626261 commit 634129b

File tree

2 files changed

+59
-3
lines changed

2 files changed

+59
-3
lines changed

server/src/handlers/http/logstream.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
4646
objectstore.delete_stream(&stream_name).await?;
4747
metadata::STREAM_INFO.delete_stream(&stream_name);
4848
event::STREAM_WRITERS.delete_stream(&stream_name);
49+
stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| {
50+
log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e)
51+
});
4952

5053
let stream_dir = StorageDir::new(&stream_name);
5154
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
@@ -229,6 +232,10 @@ pub async fn put_retention(
229232
pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
230233
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
231234

235+
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
236+
return Err(StreamError::StreamNotFound(stream_name));
237+
}
238+
232239
let stats = stats::get_current_stats(&stream_name, "json")
233240
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
234241

@@ -363,3 +370,30 @@ pub mod error {
363370
}
364371
}
365372
}
373+
374+
#[cfg(test)]
375+
mod tests {
376+
use crate::handlers::http::logstream::error::StreamError;
377+
use crate::handlers::http::logstream::get_stats;
378+
use actix_web::test::TestRequest;
379+
use anyhow::bail;
380+
381+
#[actix_web::test]
382+
#[should_panic]
383+
async fn get_stats_panics_without_logstream() {
384+
let req = TestRequest::default().to_http_request();
385+
let _ = get_stats(req).await;
386+
}
387+
388+
#[actix_web::test]
389+
async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> {
390+
let req = TestRequest::default()
391+
.param("logstream", "test")
392+
.to_http_request();
393+
394+
match get_stats(req).await {
395+
Err(StreamError::StreamNotFound(_)) => Ok(()),
396+
_ => bail!("expected StreamNotFound error"),
397+
}
398+
}
399+
}

server/src/stats.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,19 @@ pub struct Stats {
2727
}
2828

2929
pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option<Stats> {
30+
let event_labels = event_labels(stream_name, format);
31+
let storage_size_labels = storage_size_labels(stream_name);
32+
3033
let events_ingested = EVENTS_INGESTED
31-
.get_metric_with_label_values(&[stream_name, format])
34+
.get_metric_with_label_values(&event_labels)
3235
.ok()?
3336
.get();
3437
let ingestion_size = EVENTS_INGESTED_SIZE
35-
.get_metric_with_label_values(&[stream_name, format])
38+
.get_metric_with_label_values(&event_labels)
3639
.ok()?
3740
.get();
3841
let storage_size = STORAGE_SIZE
39-
.get_metric_with_label_values(&["data", stream_name, "parquet"])
42+
.get_metric_with_label_values(&storage_size_labels)
4043
.ok()?
4144
.get();
4245
// this should be valid for all cases given that gauge must never go negative
@@ -49,3 +52,22 @@ pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option<Stat
4952
storage: storage_size,
5053
})
5154
}
55+
56+
pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Result<()> {
57+
let event_labels = event_labels(stream_name, format);
58+
let storage_size_labels = storage_size_labels(stream_name);
59+
60+
EVENTS_INGESTED.remove_label_values(&event_labels)?;
61+
EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?;
62+
STORAGE_SIZE.remove_label_values(&storage_size_labels)?;
63+
64+
Ok(())
65+
}
66+
67+
fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; 2] {
68+
[stream_name, format]
69+
}
70+
71+
fn storage_size_labels(stream_name: &str) -> [&str; 3] {
72+
["data", stream_name, "parquet"]
73+
}

0 commit comments

Comments
 (0)