Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
objectstore.delete_stream(&stream_name).await?;
// Delete from staging
let stream_dir = PARSEABLE.get_or_create_stream(&stream_name);
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) {
warn!(
"failed to delete local data for stream {}. Clean {} manually",
"failed to delete local data for stream {} with error {err}. Clean {} manually",
stream_name,
stream_dir.data_path.to_string_lossy()
)
Expand Down
15 changes: 2 additions & 13 deletions src/handlers/http/modal/ingest/ingestor_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,12 @@ pub async fn retention_cleanup(

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
// if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if !PARSEABLE.streams.contains(&stream_name)
&& !PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
.unwrap_or(false)
{
return Err(StreamNotFound(stream_name.clone()).into());
}

// Delete from staging
let stream_dir = PARSEABLE.get_stream(&stream_name)?;
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) {
warn!(
"failed to delete local data for stream {}. Clean {} manually",
"failed to delete local data for stream {} with error {err}. Clean {} manually",
stream_name,
stream_dir.data_path.to_string_lossy()
)
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
// Delete from storage
objectstore.delete_stream(&stream_name).await?;
let stream_dir = PARSEABLE.get_or_create_stream(&stream_name);
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) {
warn!(
"failed to delete local data for stream {}. Clean {} manually",
"failed to delete local data for stream {} with error {err}. Clean {} manually",
stream_name,
stream_dir.data_path.to_string_lossy()
)
Expand Down
25 changes: 19 additions & 6 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
*
*/

use std::collections::HashMap;
use std::sync::Arc;

use itertools::Itertools;
use prometheus::core::Collector;
use prometheus::proto::MetricFamily;
use prometheus::IntGaugeVec;
Expand Down Expand Up @@ -191,12 +193,23 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu
fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) {
let families: Vec<MetricFamily> = metrics.collect().into_iter().collect();
for metric in families.iter().flat_map(|m| m.get_metric()) {
let label: Vec<&str> = metric.get_label().iter().map(|l| l.get_value()).collect();
if !label.starts_with(prefix) {
continue;
}
if let Err(err) = metrics.remove_label_values(&label) {
warn!("Error = {err}");
let label_map: HashMap<&str, &str> = metric
.get_label()
.iter()
.map(|l| (l.get_name(), l.get_value()))
.collect();

let mut should_continue = false;
prefix.iter().for_each(|p| {
// label map doesn't have the key present in prefix, hence continue
if !label_map.values().contains(p) {
should_continue = true;
}
});
if !should_continue {
if let Err(err) = metrics.remove(&label_map) {
warn!("Error = {err}");
}
}
}
}
Expand Down
Loading