Skip to content

Commit ac24570

Browse files
committed
bugfix
Stats were not getting deleted during stream deletion
1 parent a90ed13 commit ac24570

File tree

4 files changed

+25
-23
lines changed

4 files changed

+25
-23
lines changed

src/handlers/http/logstream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
5757
objectstore.delete_stream(&stream_name).await?;
5858
// Delete from staging
5959
let stream_dir = PARSEABLE.get_or_create_stream(&stream_name);
60-
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
60+
if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) {
6161
warn!(
62-
"failed to delete local data for stream {}. Clean {} manually",
62+
"failed to delete local data for stream {} with error {err}. Clean {} manually",
6363
stream_name,
6464
stream_dir.data_path.to_string_lossy()
6565
)

src/handlers/http/modal/ingest/ingestor_logstream.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,23 +59,12 @@ pub async fn retention_cleanup(
5959

6060
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
6161
let stream_name = stream_name.into_inner();
62-
// if the stream not found in memory map,
63-
//check if it exists in the storage
64-
//create stream and schema from storage
65-
if !PARSEABLE.streams.contains(&stream_name)
66-
&& !PARSEABLE
67-
.create_stream_and_schema_from_storage(&stream_name)
68-
.await
69-
.unwrap_or(false)
70-
{
71-
return Err(StreamNotFound(stream_name.clone()).into());
72-
}
7362

7463
// Delete from staging
7564
let stream_dir = PARSEABLE.get_stream(&stream_name)?;
76-
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
65+
if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) {
7766
warn!(
78-
"failed to delete local data for stream {}. Clean {} manually",
67+
"failed to delete local data for stream {} with error {err}. Clean {} manually",
7968
stream_name,
8069
stream_dir.data_path.to_string_lossy()
8170
)

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
6868
// Delete from storage
6969
objectstore.delete_stream(&stream_name).await?;
7070
let stream_dir = PARSEABLE.get_or_create_stream(&stream_name);
71-
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
71+
if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) {
7272
warn!(
73-
"failed to delete local data for stream {}. Clean {} manually",
73+
"failed to delete local data for stream {} with error {err}. Clean {} manually",
7474
stream_name,
7575
stream_dir.data_path.to_string_lossy()
7676
)

src/stats.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
*
1717
*/
1818

19+
use std::collections::HashMap;
1920
use std::sync::Arc;
2021

22+
use itertools::Itertools;
2123
use prometheus::core::Collector;
2224
use prometheus::proto::MetricFamily;
2325
use prometheus::IntGaugeVec;
@@ -191,12 +193,23 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu
191193
fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) {
192194
let families: Vec<MetricFamily> = metrics.collect().into_iter().collect();
193195
for metric in families.iter().flat_map(|m| m.get_metric()) {
194-
let label: Vec<&str> = metric.get_label().iter().map(|l| l.get_value()).collect();
195-
if !label.starts_with(prefix) {
196-
continue;
197-
}
198-
if let Err(err) = metrics.remove_label_values(&label) {
199-
warn!("Error = {err}");
196+
let label_map: HashMap<&str, &str> = metric
197+
.get_label()
198+
.iter()
199+
.map(|l| (l.get_name(), l.get_value()))
200+
.collect();
201+
202+
let mut should_continue = false;
203+
prefix.iter().for_each(|p| {
204+
// label map doesn't have the key present in prefix, hence continue
205+
if !label_map.values().contains(p) {
206+
should_continue = true;
207+
}
208+
});
209+
if !should_continue {
210+
if let Err(err) = metrics.remove(&label_map) {
211+
warn!("Error = {err}");
212+
}
200213
}
201214
}
202215
}

0 commit comments

Comments
 (0)