diff --git a/src/correlation/correlation_utils.rs b/src/correlation/correlation_utils.rs
new file mode 100644
index 000000000..0975836bf
--- /dev/null
+++ b/src/correlation/correlation_utils.rs
@@ -0,0 +1,62 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use itertools::Itertools;
+
+use crate::rbac::{
+ map::SessionKey,
+ role::{Action, Permission},
+ Users,
+};
+
+use super::{CorrelationError, TableConfig};
+
+pub async fn user_auth_for_query(
+ session_key: &SessionKey,
+ table_configs: &[TableConfig],
+) -> Result<(), CorrelationError> {
+ let tables = table_configs.iter().map(|t| &t.table_name).collect_vec();
+ let permissions = Users.get_permissions(session_key);
+
+ for table_name in tables {
+ let mut authorized = false;
+
+ // in permission check if user can run query on the stream.
+ // also while iterating add any filter tags for this stream
+ for permission in permissions.iter() {
+ match permission {
+ Permission::Stream(Action::All, _) => {
+ authorized = true;
+ break;
+ }
+ Permission::StreamWithTag(Action::Query, ref stream, _)
+ if stream == table_name || stream == "*" =>
+ {
+ authorized = true;
+ }
+ _ => (),
+ }
+ }
+
+ if !authorized {
+ return Err(CorrelationError::Unauthorized);
+ }
+ }
+
+ Ok(())
+}
diff --git a/src/correlation/mod.rs b/src/correlation/mod.rs
new file mode 100644
index 000000000..a5022407a
--- /dev/null
+++ b/src/correlation/mod.rs
@@ -0,0 +1,314 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use std::collections::HashSet;
+
+use actix_web::http::header::ContentType;
+use chrono::Utc;
+use correlation_utils::user_auth_for_query;
+use datafusion::error::DataFusionError;
+use http::StatusCode;
+use itertools::Itertools;
+use once_cell::sync::Lazy;
+use serde::{Deserialize, Serialize};
+use serde_json::Error as SerdeError;
+use tokio::sync::RwLock;
+use tracing::{error, trace, warn};
+
+use crate::{
+ handlers::http::rbac::RBACError, option::CONFIG, query::QUERY_SESSION, rbac::map::SessionKey,
+ storage::ObjectStorageError, users::filters::FilterQuery, utils::get_hash,
+};
+
+pub mod correlation_utils;
+
+pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default);
+
+#[derive(Debug, Default)]
+pub struct Correlation(RwLock>);
+
+impl Correlation {
+ pub async fn load(&self) -> Result<(), CorrelationError> {
+ // lead correlations from storage
+ let store = CONFIG.storage().get_object_store();
+ let all_correlations = store.get_correlations().await.unwrap_or_default();
+
+ let mut correlations = vec![];
+ for corr in all_correlations {
+ if corr.is_empty() {
+ continue;
+ }
+
+ let correlation: CorrelationConfig = match serde_json::from_slice(&corr) {
+ Ok(c) => c,
+ Err(e) => {
+ error!("Unable to load correlation- {e}");
+ continue;
+ }
+ };
+
+ correlations.push(correlation);
+ }
+
+ let mut s = self.0.write().await;
+ s.append(&mut correlations.clone());
+ Ok(())
+ }
+
+ pub async fn list_correlations_for_user(
+ &self,
+ session_key: &SessionKey,
+ ) -> Result, CorrelationError> {
+ let correlations = self.0.read().await.iter().cloned().collect_vec();
+
+ let mut user_correlations = vec![];
+ for c in correlations {
+ if user_auth_for_query(session_key, &c.table_configs)
+ .await
+ .is_ok()
+ {
+ user_correlations.push(c);
+ }
+ }
+ Ok(user_correlations)
+ }
+
+ pub async fn get_correlation_by_id(
+ &self,
+ correlation_id: &str,
+ ) -> Result {
+ let read = self.0.read().await;
+ let correlation = read.iter().find(|c| c.id == correlation_id).cloned();
+
+ if let Some(c) = correlation {
+ Ok(c)
+ } else {
+ Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
+ "Unable to find correlation with ID- {correlation_id}"
+ ))))
+ }
+ }
+
+ pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> {
+ // save to memory
+ let mut s = self.0.write().await;
+ s.retain(|c| c.id != correlation.id);
+ s.push(correlation.clone());
+ Ok(())
+ }
+
+ pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> {
+ // now delete from memory
+ let read_access = self.0.read().await;
+
+ let index = read_access
+ .iter()
+ .enumerate()
+ .find(|(_, c)| c.id == correlation_id)
+ .to_owned();
+
+ if let Some((index, _)) = index {
+ // drop the read access in order to get exclusive write access
+ drop(read_access);
+ self.0.write().await.remove(index);
+ trace!("removed correlation from memory");
+ } else {
+ warn!("Correlation ID- {correlation_id} not found in memory!");
+ }
+ Ok(())
+ }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum CorrelationVersion {
+ V1,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CorrelationConfig {
+ pub version: CorrelationVersion,
+ pub id: String,
+ pub table_configs: Vec,
+ pub join_config: JoinConfig,
+ pub filter: Option,
+ pub start_time: Option,
+ pub end_time: Option,
+}
+
+impl CorrelationConfig {}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CorrelationRequest {
+ pub version: CorrelationVersion,
+ pub table_configs: Vec,
+ pub join_config: JoinConfig,
+ pub filter: Option,
+ pub start_time: Option,
+ pub end_time: Option,
+}
+
+impl From for CorrelationConfig {
+ fn from(val: CorrelationRequest) -> Self {
+ Self {
+ version: val.version,
+ id: get_hash(Utc::now().timestamp_micros().to_string().as_str()),
+ table_configs: val.table_configs,
+ join_config: val.join_config,
+ filter: val.filter,
+ start_time: val.start_time,
+ end_time: val.end_time,
+ }
+ }
+}
+
+impl CorrelationRequest {
+ pub fn generate_correlation_config(self, id: String) -> CorrelationConfig {
+ CorrelationConfig {
+ version: self.version,
+ id,
+ table_configs: self.table_configs,
+ join_config: self.join_config,
+ filter: self.filter,
+ start_time: self.start_time,
+ end_time: self.end_time,
+ }
+ }
+
+ /// This function will validate the TableConfigs, JoinConfig, and user auth
+ pub async fn validate(&self, session_key: &SessionKey) -> Result<(), CorrelationError> {
+ let ctx = &QUERY_SESSION;
+
+ let h1: HashSet<&String> = self.table_configs.iter().map(|t| &t.table_name).collect();
+ let h2: HashSet<&String> = self
+ .join_config
+ .join_conditions
+ .iter()
+ .map(|j| &j.table_name)
+ .collect();
+
+ // check if table config tables are the same
+ if h1.len() != 2 {
+ return Err(CorrelationError::Metadata(
+ "Must provide config for two unique tables",
+ ));
+ }
+
+ // check that the tables mentioned in join config are
+ // the same as those in table config
+ if h1 != h2 {
+ return Err(CorrelationError::Metadata(
+ "Must provide same tables for join config and table config",
+ ));
+ }
+
+ // check if user has access to table
+ user_auth_for_query(session_key, &self.table_configs).await?;
+
+ // to validate table config, we need to check whether the mentioned fields
+ // are present in the table or not
+ for table_config in self.table_configs.iter() {
+ // table config check
+ let df = ctx.table(&table_config.table_name).await?;
+
+ let mut selected_fields = table_config
+ .selected_fields
+ .iter()
+ .map(|c| c.as_str())
+ .collect_vec();
+
+ // unwrap because we have determined that the tables in table config are the same as those in join config
+ let condition = self
+ .join_config
+ .join_conditions
+ .iter()
+ .find(|j| j.table_name == table_config.table_name)
+ .unwrap();
+ let join_field = condition.field.as_str();
+
+ if !selected_fields.contains(&join_field) {
+ selected_fields.push(join_field);
+ }
+
+ // if this errors out then the table config is incorrect or join config is incorrect
+ df.select_columns(selected_fields.as_slice())?;
+ }
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum CorrelationError {
+ #[error("Failed to connect to storage: {0}")]
+ ObjectStorage(#[from] ObjectStorageError),
+ #[error("Serde Error: {0}")]
+ Serde(#[from] SerdeError),
+ #[error("Cannot perform this operation: {0}")]
+ Metadata(&'static str),
+ #[error("User does not exist")]
+ UserDoesNotExist(#[from] RBACError),
+ #[error("Error: {0}")]
+ AnyhowError(#[from] anyhow::Error),
+ #[error("Unauthorized")]
+ Unauthorized,
+ #[error("DataFusion Error: {0}")]
+ DataFusion(#[from] DataFusionError),
+}
+
+impl actix_web::ResponseError for CorrelationError {
+ fn status_code(&self) -> http::StatusCode {
+ match self {
+ Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ Self::Serde(_) => StatusCode::BAD_REQUEST,
+ Self::Metadata(_) => StatusCode::BAD_REQUEST,
+ Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND,
+ Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ Self::Unauthorized => StatusCode::BAD_REQUEST,
+ Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ }
+ }
+
+ fn error_response(&self) -> actix_web::HttpResponse {
+ actix_web::HttpResponse::build(self.status_code())
+ .insert_header(ContentType::plaintext())
+ .body(self.to_string())
+ }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TableConfig {
+ pub selected_fields: Vec,
+ pub table_name: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct JoinCondition {
+ pub table_name: String,
+ pub field: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct JoinConfig {
+ pub join_conditions: Vec,
+}
diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs
new file mode 100644
index 000000000..dc5799c01
--- /dev/null
+++ b/src/handlers/http/correlation.rs
@@ -0,0 +1,136 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use actix_web::{web, HttpRequest, Responder};
+use bytes::Bytes;
+use relative_path::RelativePathBuf;
+
+use crate::{
+ option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY,
+ utils::actix::extract_session_key_from_req,
+};
+
+use crate::correlation::{
+ correlation_utils::user_auth_for_query, CorrelationConfig, CorrelationError,
+ CorrelationRequest, CORRELATIONS,
+};
+
+pub async fn list(req: HttpRequest) -> Result {
+ let session_key = extract_session_key_from_req(&req)
+ .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
+
+ let correlations = CORRELATIONS
+ .list_correlations_for_user(&session_key)
+ .await?;
+
+ Ok(web::Json(correlations))
+}
+
+pub async fn get(req: HttpRequest) -> Result {
+ let session_key = extract_session_key_from_req(&req)
+ .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
+
+ let correlation_id = req
+ .match_info()
+ .get("correlation_id")
+ .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
+
+ let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
+
+ if user_auth_for_query(&session_key, &correlation.table_configs)
+ .await
+ .is_ok()
+ {
+ Ok(web::Json(correlation))
+ } else {
+ Err(CorrelationError::Unauthorized)
+ }
+}
+
+pub async fn post(req: HttpRequest, body: Bytes) -> Result {
+ let session_key = extract_session_key_from_req(&req)
+ .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
+
+ let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
+
+ correlation_request.validate(&session_key).await?;
+
+ let correlation: CorrelationConfig = correlation_request.into();
+
+ // Save to disk
+ let store = CONFIG.storage().get_object_store();
+ store.put_correlation(&correlation).await?;
+
+ // Save to memory
+ CORRELATIONS.update(&correlation).await?;
+
+ Ok(format!("Saved correlation with ID- {}", correlation.id))
+}
+
+pub async fn modify(req: HttpRequest, body: Bytes) -> Result {
+ let session_key = extract_session_key_from_req(&req)
+ .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
+
+ let correlation_id = req
+ .match_info()
+ .get("correlation_id")
+ .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
+
+ // validate whether user has access to this correlation object or not
+ let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
+ user_auth_for_query(&session_key, &correlation.table_configs).await?;
+
+ let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
+ correlation_request.validate(&session_key).await?;
+
+ let correlation = correlation_request.generate_correlation_config(correlation_id.to_owned());
+
+ // Save to disk
+ let store = CONFIG.storage().get_object_store();
+ store.put_correlation(&correlation).await?;
+
+ // Save to memory
+ CORRELATIONS.update(&correlation).await?;
+
+ Ok(format!("Modified correlation with ID- {}", correlation.id))
+}
+
+pub async fn delete(req: HttpRequest) -> Result {
+ let session_key = extract_session_key_from_req(&req)
+ .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
+
+ let correlation_id = req
+ .match_info()
+ .get("correlation_id")
+ .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
+
+ let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
+
+ // validate user's query auth
+ user_auth_for_query(&session_key, &correlation.table_configs).await?;
+
+ // Delete from disk
+ let store = CONFIG.storage().get_object_store();
+ let path =
+ RelativePathBuf::from_iter([CORRELATIONS_ROOT_DIRECTORY, &correlation.id.to_string()]);
+ store.delete_object(&path).await?;
+
+ // Delete from memory
+ CORRELATIONS.delete(correlation_id).await?;
+ Ok(format!("Deleted correlation with ID- {correlation_id}"))
+}
diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs
index f627b613a..8fabc8f1f 100644
--- a/src/handlers/http/mod.rs
+++ b/src/handlers/http/mod.rs
@@ -29,6 +29,7 @@ use self::{cluster::get_ingestor_info, query::Query};
pub mod about;
pub mod cluster;
+pub mod correlation;
pub mod health_check;
pub mod ingest;
mod kinesis;
diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs
index 792bb6571..32bf8e2d5 100644
--- a/src/handlers/http/modal/query_server.rs
+++ b/src/handlers/http/modal/query_server.rs
@@ -16,6 +16,7 @@
*
*/
+use crate::correlation::CORRELATIONS;
use crate::handlers::airplane;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::logstream::create_internal_stream_if_not_exists;
@@ -50,6 +51,7 @@ impl ParseableServer for QueryServer {
config
.service(
web::scope(&base_path())
+ .service(Server::get_correlation_webscope())
.service(Server::get_query_factory())
.service(Server::get_trino_factory())
.service(Server::get_cache_webscope())
@@ -94,6 +96,9 @@ impl ParseableServer for QueryServer {
//create internal stream at server start
create_internal_stream_if_not_exists().await?;
+ if let Err(e) = CORRELATIONS.load().await {
+ error!("{e}");
+ }
FILTERS.load().await?;
DASHBOARDS.load().await?;
// track all parquet files already in the data directory
diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs
index 6c0ec9fd8..d50edb96e 100644
--- a/src/handlers/http/modal/server.rs
+++ b/src/handlers/http/modal/server.rs
@@ -17,6 +17,7 @@
*/
use crate::analytics;
+use crate::correlation::CORRELATIONS;
use crate::handlers;
use crate::handlers::http::about;
use crate::handlers::http::base_path;
@@ -67,6 +68,7 @@ impl ParseableServer for Server {
config
.service(
web::scope(&base_path())
+ .service(Self::get_correlation_webscope())
.service(Self::get_query_factory())
.service(Self::get_trino_factory())
.service(Self::get_cache_webscope())
@@ -102,6 +104,9 @@ impl ParseableServer for Server {
migration::run_migration(&CONFIG).await?;
+ if let Err(e) = CORRELATIONS.load().await {
+ error!("{e}");
+ }
FILTERS.load().await?;
DASHBOARDS.load().await?;
@@ -172,6 +177,41 @@ impl Server {
)
}
+ pub fn get_correlation_webscope() -> Scope {
+ web::scope("/correlation")
+ .service(
+ web::resource("")
+ .route(
+ web::get()
+ .to(http::correlation::list)
+ .authorize(Action::GetCorrelation),
+ )
+ .route(
+ web::post()
+ .to(http::correlation::post)
+ .authorize(Action::CreateCorrelation),
+ ),
+ )
+ .service(
+ web::resource("/{correlation_id}")
+ .route(
+ web::get()
+ .to(http::correlation::get)
+ .authorize(Action::GetCorrelation),
+ )
+ .route(
+ web::put()
+ .to(http::correlation::modify)
+ .authorize(Action::PutCorrelation),
+ )
+ .route(
+ web::delete()
+ .to(http::correlation::delete)
+ .authorize(Action::DeleteCorrelation),
+ ),
+ )
+ }
+
// get the dashboards web scope
pub fn get_dashboards_webscope() -> Scope {
web::scope("/dashboards")
diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs
index 27414b9d0..895032d20 100644
--- a/src/handlers/http/query.rs
+++ b/src/handlers/http/query.rs
@@ -52,7 +52,7 @@ use crate::utils::time::{TimeParseError, TimeRange};
use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
/// Query Request through http endpoint.
-#[derive(Debug, serde::Deserialize, serde::Serialize)]
+#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Query {
pub query: String,
diff --git a/src/lib.rs b/src/lib.rs
index 7a85f54e7..5c8e09274 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -22,6 +22,7 @@ pub mod analytics;
pub mod banner;
mod catalog;
mod cli;
+pub mod correlation;
mod event;
pub mod handlers;
pub mod hottier;
diff --git a/src/rbac/role.rs b/src/rbac/role.rs
index f94c8f171..f383345ad 100644
--- a/src/rbac/role.rs
+++ b/src/rbac/role.rs
@@ -62,6 +62,10 @@ pub enum Action {
DeleteFilter,
Login,
Metrics,
+ GetCorrelation,
+ CreateCorrelation,
+ DeleteCorrelation,
+ PutCorrelation,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -120,6 +124,10 @@ impl RoleBuilder {
| Action::ListStream
| Action::ListCluster
| Action::ListClusterMetrics
+ | Action::CreateCorrelation
+ | Action::DeleteCorrelation
+ | Action::GetCorrelation
+ | Action::PutCorrelation
| Action::Deleteingestor
| Action::PutHotTierEnabled
| Action::GetHotTierEnabled
@@ -208,6 +216,10 @@ pub mod model {
Action::DeleteStream,
Action::ListStream,
Action::GetStreamInfo,
+ Action::CreateCorrelation,
+ Action::DeleteCorrelation,
+ Action::GetCorrelation,
+ Action::PutCorrelation,
Action::DetectSchema,
Action::GetSchema,
Action::GetStats,
@@ -250,6 +262,10 @@ pub mod model {
Action::PutHotTierEnabled,
Action::GetHotTierEnabled,
Action::DeleteHotTierEnabled,
+ Action::CreateCorrelation,
+ Action::DeleteCorrelation,
+ Action::GetCorrelation,
+ Action::PutCorrelation,
Action::ListDashboard,
Action::GetDashboard,
Action::CreateDashboard,
@@ -282,6 +298,10 @@ pub mod model {
Action::GetFilter,
Action::CreateFilter,
Action::DeleteFilter,
+ Action::CreateCorrelation,
+ Action::DeleteCorrelation,
+ Action::GetCorrelation,
+ Action::PutCorrelation,
Action::ListDashboard,
Action::GetDashboard,
Action::CreateDashboard,
diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs
index 78b8114c0..1229425b1 100644
--- a/src/storage/localfs.rs
+++ b/src/storage/localfs.rs
@@ -39,8 +39,9 @@ use crate::{
};
use super::{
- LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY,
- SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
+ LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider,
+ CORRELATIONS_ROOT_DIRECTORY, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME,
+ STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
};
#[derive(Debug, Clone, clap::Args)]
@@ -295,7 +296,12 @@ impl ObjectStorage for LocalFS {
}
async fn list_streams(&self) -> Result, ObjectStorageError> {
- let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR];
+ let ignore_dir = &[
+ "lost+found",
+ PARSEABLE_ROOT_DIRECTORY,
+ USERS_ROOT_DIR,
+ CORRELATIONS_ROOT_DIRECTORY,
+ ];
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
let entries: Vec = directories.try_collect().await?;
let entries = entries
@@ -315,7 +321,11 @@ impl ObjectStorage for LocalFS {
}
async fn list_old_streams(&self) -> Result, ObjectStorageError> {
- let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY];
+ let ignore_dir = &[
+ "lost+found",
+ PARSEABLE_ROOT_DIRECTORY,
+ CORRELATIONS_ROOT_DIRECTORY,
+ ];
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
let entries: Vec = directories.try_collect().await?;
let entries = entries
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index a018c2b1c..9b4c6163b 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -51,6 +51,7 @@ pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable";
pub const SCHEMA_FILE_NAME: &str = ".schema";
pub const ALERT_FILE_NAME: &str = ".alert.json";
pub const MANIFEST_FILE: &str = "manifest.json";
+pub const CORRELATIONS_ROOT_DIRECTORY: &str = ".correlations";
/// local sync interval to move data.records to /tmp dir of that stream.
/// 60 sec is a reasonable value.
diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs
index e7fb38e6e..b5b1e74f5 100644
--- a/src/storage/object_storage.rs
+++ b/src/storage/object_storage.rs
@@ -21,10 +21,11 @@ use super::{
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
};
use super::{
- ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY,
- SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
+ ALERT_FILE_NAME, CORRELATIONS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME,
+ PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
};
+use crate::correlation::{CorrelationConfig, CorrelationError};
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
@@ -624,6 +625,30 @@ pub trait ObjectStorage: Send + Sync + 'static {
// pick a better name
fn get_bucket_name(&self) -> String;
+
+ async fn put_correlation(
+ &self,
+ correlation: &CorrelationConfig,
+ ) -> Result<(), ObjectStorageError> {
+ let path = RelativePathBuf::from_iter([
+ CORRELATIONS_ROOT_DIRECTORY,
+ &format!("{}.json", correlation.id),
+ ]);
+ self.put_object(&path, to_bytes(correlation)).await?;
+ Ok(())
+ }
+
+ async fn get_correlations(&self) -> Result, CorrelationError> {
+ let correlation_path = RelativePathBuf::from_iter([CORRELATIONS_ROOT_DIRECTORY]);
+ let correlation_bytes = self
+ .get_objects(
+ Some(&correlation_path),
+ Box::new(|file_name| file_name.ends_with(".json")),
+ )
+ .await?;
+
+ Ok(correlation_bytes)
+ }
}
pub async fn commit_schema_to_storage(