Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,10 @@ impl ParseableServer for Server {
}

impl Server {
pub fn get_prism_home() -> Resource {
web::resource("/home").route(web::get().to(http::prism_home::home_api))
pub fn get_prism_home() -> Scope {
web::scope("/home")
.service(web::resource("").route(web::get().to(http::prism_home::home_api)))
.service(web::resource("/search").route(web::get().to(http::prism_home::home_search)))
}

pub fn get_prism_logstream() -> Scope {
Expand Down
11 changes: 10 additions & 1 deletion src/handlers/http/prism_home.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use actix_web::{web, HttpRequest, Responder};

use crate::{
prism::home::{generate_home_response, PrismHomeError},
prism::home::{generate_home_response, generate_home_search_response, PrismHomeError},
utils::actix::extract_session_key_from_req,
};

Expand All @@ -37,3 +37,12 @@ pub async fn home_api(req: HttpRequest) -> Result<impl Responder, PrismHomeError

Ok(web::Json(res))
}

pub async fn home_search(req: HttpRequest) -> Result<impl Responder, PrismHomeError> {
let key = extract_session_key_from_req(&req)
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;

let res = generate_home_search_response(&key).await?;

Ok(web::Json(res))
}
241 changes: 109 additions & 132 deletions src/prism/home/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,12 @@ use crate::{
},
parseable::PARSEABLE,
rbac::{map::SessionKey, role::Action, Users},
stats::Stats,
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
users::{dashboards::DASHBOARDS, filters::FILTERS},
};

type StreamMetadataResponse = Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError>;

#[derive(Debug, Serialize, Default)]
struct StreamInfo {
// stream_count: u32,
// log_source_count: u32,
stats_summary: Stats,
}

#[derive(Debug, Serialize, Default)]
struct DatedStats {
date: String,
Expand Down Expand Up @@ -79,49 +71,33 @@ struct DataSet {

#[derive(Debug, Serialize)]
pub struct HomeResponse {
alert_titles: Vec<TitleAndId>,
alerts_info: AlertsInfo,
correlation_titles: Vec<TitleAndId>,
stream_info: StreamInfo,
stats_details: Vec<DatedStats>,
stream_titles: Vec<String>,
datasets: Vec<DataSet>,
dashboard_titles: Vec<TitleAndId>,
filter_titles: Vec<TitleAndId>,
}

#[derive(Debug, Serialize)]
pub struct HomeSearchResponse {
alerts: Vec<TitleAndId>,
correlations: Vec<TitleAndId>,
dashboards: Vec<TitleAndId>,
filters: Vec<TitleAndId>,
}

pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, PrismHomeError> {
// Execute these operations concurrently
let (
stream_titles_result,
alert_titles_result,
correlation_titles_result,
dashboards_result,
filters_result,
alerts_info_result,
) = tokio::join!(
get_stream_titles(key),
get_alert_titles(key),
get_correlation_titles(key),
get_dashboard_titles(key),
get_filter_titles(key),
get_alerts_info()
);
let (stream_titles_result, alerts_info_result) =
tokio::join!(get_stream_titles(key), get_alerts_info());

let stream_titles = stream_titles_result?;
let alert_titles = alert_titles_result?;
let correlation_titles = correlation_titles_result?;
let dashboard_titles = dashboards_result?;
let filter_titles = filters_result?;
let alerts_info = alerts_info_result?;

// Generate dates for date-wise stats
let mut dates = (0..7)
.map(|i| {
Utc::now()
.checked_sub_signed(chrono::Duration::days(i))
.ok_or_else(|| anyhow::Error::msg("Date conversion failed"))
.unwrap()
.expect("Date conversion failed")
})
.map(|date| date.format("%Y-%m-%d").to_string())
.collect_vec();
Expand Down Expand Up @@ -161,14 +137,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
futures::future::join_all(stats_futures).await;

let mut stream_details = Vec::new();
let mut summary = StreamInfo::default();

for result in stats_results {
match result {
Ok(dated_stats) => {
summary.stats_summary.events += dated_stats.events;
summary.stats_summary.ingestion += dated_stats.ingestion_size;
summary.stats_summary.storage += dated_stats.storage_size;
stream_details.push(dated_stats);
}
Err(e) => {
Expand All @@ -179,104 +151,11 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
}

Ok(HomeResponse {
stream_info: summary,
stats_details: stream_details,
stream_titles,
datasets,
alert_titles,
correlation_titles,
dashboard_titles,
filter_titles,
alerts_info,
})
}

// Helper functions to split the work

async fn get_stream_titles(key: &SessionKey) -> Result<Vec<String>, PrismHomeError> {
let stream_titles: Vec<String> = PARSEABLE
.storage
.get_object_store()
.list_streams()
.await
.map_err(|e| PrismHomeError::Anyhow(anyhow::Error::new(e)))?
.into_iter()
.filter(|logstream| {
Users.authorize(key.clone(), Action::ListStream, Some(logstream), None)
== crate::rbac::Response::Authorized
})
.sorted()
.collect_vec();

Ok(stream_titles)
}

async fn get_alert_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let alert_titles = ALERTS
.list_alerts_for_user(key.clone())
.await?
.iter()
.map(|alert| TitleAndId {
title: alert.title.clone(),
id: alert.id.to_string(),
})
.collect_vec();

Ok(alert_titles)
}

async fn get_correlation_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let correlation_titles = CORRELATIONS
.list_correlations(key)
.await?
.iter()
.map(|corr| TitleAndId {
title: corr.title.clone(),
id: corr.id.clone(),
})
.collect_vec();

Ok(correlation_titles)
}

async fn get_dashboard_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let dashboard_titles = DASHBOARDS
.list_dashboards(key)
.await
.iter()
.map(|dashboard| TitleAndId {
title: dashboard.name.clone(),
id: dashboard
.dashboard_id
.as_ref()
.ok_or_else(|| anyhow::Error::msg("Dashboard ID is null"))
.unwrap()
.clone(),
})
.collect_vec();

Ok(dashboard_titles)
}

async fn get_filter_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let filter_titles = FILTERS
.list_filters(key)
.await
.iter()
.map(|filter| TitleAndId {
title: filter.filter_name.clone(),
id: filter
.filter_id
.as_ref()
.ok_or_else(|| anyhow::Error::msg("Filter ID is null"))
.unwrap()
.clone(),
})
.collect_vec();

Ok(filter_titles)
}

async fn get_stream_metadata(
stream: String,
) -> Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError> {
Expand Down Expand Up @@ -374,6 +253,104 @@ async fn get_stream_stats_for_date(
))
}

pub async fn generate_home_search_response(
key: &SessionKey,
) -> Result<HomeSearchResponse, PrismHomeError> {
let (alert_titles, correlation_titles, dashboard_titles, filter_titles) = tokio::join!(
get_alert_titles(key),
get_correlation_titles(key),
get_dashboard_titles(key),
get_filter_titles(key)
);

let alerts = alert_titles?;
let correlations = correlation_titles?;
let dashboards = dashboard_titles?;
let filters = filter_titles?;

Ok(HomeSearchResponse {
alerts,
correlations,
dashboards,
filters,
})
}

// Helper functions to split the work
async fn get_stream_titles(key: &SessionKey) -> Result<Vec<String>, PrismHomeError> {
let stream_titles: Vec<String> = PARSEABLE
.storage
.get_object_store()
.list_streams()
.await
.map_err(|e| PrismHomeError::Anyhow(anyhow::Error::new(e)))?
.into_iter()
.filter(|logstream| {
Users.authorize(key.clone(), Action::ListStream, Some(logstream), None)
== crate::rbac::Response::Authorized
})
.sorted()
.collect_vec();

Ok(stream_titles)
}

async fn get_alert_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let alert_titles = ALERTS
.list_alerts_for_user(key.clone())
.await?
.iter()
.map(|alert| TitleAndId {
title: alert.title.clone(),
id: alert.id.to_string(),
})
.collect_vec();

Ok(alert_titles)
}

async fn get_correlation_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let correlation_titles = CORRELATIONS
.list_correlations(key)
.await?
.iter()
.map(|corr| TitleAndId {
title: corr.title.clone(),
id: corr.id.clone(),
})
.collect_vec();

Ok(correlation_titles)
}

async fn get_dashboard_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let dashboard_titles = DASHBOARDS
.list_dashboards(key)
.await
.iter()
.map(|dashboard| TitleAndId {
title: dashboard.name.clone(),
id: dashboard.dashboard_id.as_ref().unwrap().clone(),
})
.collect_vec();

Ok(dashboard_titles)
}

async fn get_filter_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let filter_titles = FILTERS
.list_filters(key)
.await
.iter()
.map(|filter| TitleAndId {
title: filter.filter_name.clone(),
id: filter.filter_id.as_ref().unwrap().clone(),
})
.collect_vec();

Ok(filter_titles)
}

#[derive(Debug, thiserror::Error)]
pub enum PrismHomeError {
#[error("Error: {0}")]
Expand Down
Loading
Loading