Skip to content

update: Prism home changes #1371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 14, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/prism/home/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,21 @@ use crate::{
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
parseable::PARSEABLE,
rbac::{map::SessionKey, role::Action, Users},
stats::Stats,
storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
users::{dashboards::DASHBOARDS, filters::FILTERS},
};

type StreamMetadataResponse = Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError>;

#[derive(Debug, Serialize, Default)]
pub struct DatasetStats {
dataset_name: String,
events: u64,
ingestion_size: u64,
storage_size: u64,
}

#[derive(Debug, Serialize, Default)]
pub struct DatedStats {
date: String,
Expand All @@ -65,6 +74,7 @@ pub struct HomeResponse {
pub alerts_info: AlertsInfo,
pub stats_details: Vec<DatedStats>,
pub datasets: Vec<DataSet>,
pub top_five_ingestion: Vec<(String, Stats)>,
}

#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -117,7 +127,7 @@ pub async fn generate_home_response(
let stream_metadata_results: Vec<StreamMetadataResponse> =
futures::future::join_all(stream_metadata_futures).await;

let mut stream_wise_stream_json = HashMap::new();
let mut stream_wise_stream_json: HashMap<String, Vec<ObjectStoreFormat>> = HashMap::new();
let mut datasets = Vec::new();

for result in stream_metadata_results {
Expand All @@ -144,6 +154,8 @@ pub async fn generate_home_response(
}
}

let top_five_ingestion = get_top_5_streams_by_ingestion(&stream_wise_stream_json);

// Process stats for all dates concurrently
let stats_futures = dates
.iter()
Expand All @@ -169,8 +181,39 @@ pub async fn generate_home_response(
stats_details: stream_details,
datasets,
alerts_info,
top_five_ingestion,
})
}

fn get_top_5_streams_by_ingestion(
stream_wise_stream_json: &HashMap<String, Vec<ObjectStoreFormat>>,
) -> Vec<(String, Stats)> {
let mut result: Vec<_> = stream_wise_stream_json
.iter()
.map(|(stream_name, formats)| {
let total_stats = formats.iter().fold(
Stats {
events: 0,
ingestion: 0,
storage: 0,
},
|mut acc, osf| {
let current = &osf.stats.current_stats;
acc.events += current.events;
acc.ingestion += current.ingestion;
acc.storage += current.storage;
acc
},
);
(stream_name.clone(), total_stats)
})
.collect();

result.sort_by_key(|(_, stats)| std::cmp::Reverse(stats.ingestion));
result.truncate(5);
result
}

async fn get_stream_metadata(
stream: String,
) -> Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError> {
Expand Down
Loading