Skip to content

Commit d812994

Browse files
committed
chore: refactor logic for deleting zombie filters
1 parent 19704d9 commit d812994

File tree

4 files changed

+49
-17
lines changed

4 files changed

+49
-17
lines changed

src/handlers/http/logstream.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use crate::rbac::role::Action;
2929
use crate::stats::{Stats, event_labels_date, storage_size_labels_date};
3030
use crate::storage::retention::Retention;
3131
use crate::storage::{ObjectStoreFormat, StreamInfo, StreamType};
32-
use crate::users::filters::{FILTERS, Filter};
3332
use crate::utils::actix::extract_session_key_from_req;
3433
use crate::utils::json::flatten::{
3534
self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels,
@@ -57,21 +56,6 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
5756

5857
let objectstore = PARSEABLE.storage.get_object_store();
5958

60-
let all_filters = PARSEABLE.metastore.get_filters().await?;
61-
// collect filters associated with the logstream being deleted
62-
let filters_for_stream: Vec<Filter> = all_filters
63-
.into_iter()
64-
.filter(|filter| filter.stream_name == stream_name)
65-
.collect();
66-
67-
for filter in filters_for_stream.iter() {
68-
PARSEABLE.metastore.delete_filter(filter).await?;
69-
70-
if let Some(filter_id) = filter.filter_id.as_ref() {
71-
FILTERS.delete_filter(filter_id).await;
72-
}
73-
}
74-
7559
// Delete from storage
7660
objectstore.delete_stream(&stream_name).await?;
7761
// Delete from staging
@@ -95,6 +79,12 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
9579
stats::delete_stats(&stream_name, "json")
9680
.unwrap_or_else(|e| warn!("failed to delete stats for stream {}: {:?}", stream_name, e));
9781

82+
83+
// clear filters associated to the deleted logstream
84+
if let Err(e) = PARSEABLE.metastore.delete_zombie_filters(&stream_name).await {
85+
warn!("failed to delete zombie filters associated to stream {}: {:?}", stream_name, e);
86+
}
87+
9888
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
9989
}
10090

src/metastore/metastore_traits.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
108108
async fn get_filters(&self) -> Result<Vec<Filter>, MetastoreError>;
109109
async fn put_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
110110
async fn delete_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
111+
async fn delete_zombie_filters(&self, stream_name: &str) -> Result<bool, MetastoreError>;
111112

112113
/// correlations
113114
async fn get_correlations(&self) -> Result<Vec<Bytes>, MetastoreError>;

src/metastore/metastores/object_store_metastore.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use crate::{
5656
parseable_json_path, schema_path, stream_json_path, to_bytes,
5757
},
5858
},
59-
users::filters::{Filter, migrate_v1_v2},
59+
users::filters::{FILTERS, Filter, migrate_v1_v2},
6060
};
6161

6262
/// Using PARSEABLE's storage as a metastore (default)
@@ -546,6 +546,38 @@ impl Metastore for ObjectStoreMetastore {
546546
.await?)
547547
}
548548

549+
// clear filters associated to a deleted stream
550+
async fn delete_zombie_filters(&self, stream_name: &str) -> Result<bool, MetastoreError> {
551+
// stream should not exist in order to have zombie filters
552+
if PARSEABLE.check_stream_exists(stream_name) {
553+
warn!("no zombie filters cleared for [undeleted] stream {}", stream_name);
554+
return Ok(false);
555+
}
556+
557+
let all_filters = match PARSEABLE.metastore.get_filters().await {
558+
Ok(all_f) => all_f,
559+
Err(e) => {
560+
return Err(e);
561+
}
562+
};
563+
564+
// collect filters associated with the logstream being deleted
565+
let filters_for_stream: Vec<Filter> = all_filters
566+
.into_iter()
567+
.filter(|filter| filter.stream_name == stream_name)
568+
.collect();
569+
570+
for filter in filters_for_stream.iter() {
571+
PARSEABLE.metastore.delete_filter(filter).await?;
572+
573+
if let Some(filter_id) = filter.filter_id.as_ref() {
574+
FILTERS.delete_filter(filter_id).await;
575+
}
576+
}
577+
578+
return Ok(true);
579+
}
580+
549581
/// Get all correlations
550582
async fn get_correlations(&self) -> Result<Vec<Bytes>, MetastoreError> {
551583
let mut correlations = Vec::new();

src/parseable/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,15 @@ impl Parseable {
235235
.unwrap_or_default()
236236
}
237237

238+
// check if a stream exists
239+
pub fn check_stream_exists(&self, stream_name: &str) -> bool {
240+
if self.streams.contains(stream_name) {
241+
return true;
242+
} else {
243+
return false;
244+
}
245+
}
246+
238247
// validate the storage, if the proper path for staging directory is provided
239248
// if the proper data directory is provided, or s3 bucket is provided etc
240249
pub async fn validate_storage(&self) -> Result<Option<Bytes>, ObjectStorageError> {

0 commit comments

Comments
 (0)