Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ pub async fn sync_role_update_with_ingestors(
.await
}

pub fn fetch_daily_stats_from_ingestors(
pub fn fetch_daily_stats(
date: &str,
stream_meta_list: &[ObjectStoreFormat],
) -> Result<Stats, StreamError> {
Expand Down
24 changes: 9 additions & 15 deletions src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ use crate::{
handlers::http::{
base_path_without_preceding_slash,
cluster::{
self, fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors,
sync_streams_with_ingestors,
self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors,
utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats},
},
logstream::{error::StreamError, get_stats_date},
logstream::error::StreamError,
modal::{NodeMetadata, NodeType},
},
hottier::HotTierManager,
Expand Down Expand Up @@ -154,22 +153,18 @@ pub async fn get_stats(
}

if !date_value.is_empty() {
let querier_stats = get_stats_date(&stream_name, date_value).await?;

// this function requires all the ingestor stream jsons
let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]);
let obs = PARSEABLE
.storage
.get_object_store()
.get_objects(
Some(&path),
Box::new(|file_name| {
file_name.starts_with(".ingestor") && file_name.ends_with("stream.json")
}),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await?;

let mut ingestor_stream_jsons = Vec::new();
let mut stream_jsons = Vec::new();
for ob in obs {
let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) {
Ok(d) => d,
Expand All @@ -178,16 +173,15 @@ pub async fn get_stats(
continue;
}
};
ingestor_stream_jsons.push(stream_metadata);
stream_jsons.push(stream_metadata);
}

let ingestor_stats =
fetch_daily_stats_from_ingestors(date_value, &ingestor_stream_jsons)?;
let stats = fetch_daily_stats(date_value, &stream_jsons)?;

let total_stats = Stats {
events: querier_stats.events + ingestor_stats.events,
ingestion: querier_stats.ingestion + ingestor_stats.ingestion,
storage: querier_stats.storage + ingestor_stats.storage,
events: stats.events,
ingestion: stats.ingestion,
storage: stats.storage,
};
let stats = serde_json::to_value(total_stats)?;

Expand Down
23 changes: 7 additions & 16 deletions src/prism/home/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ use crate::{
alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS},
correlation::{CorrelationError, CORRELATIONS},
event::format::LogSource,
handlers::http::{
cluster::fetch_daily_stats_from_ingestors,
logstream::{error::StreamError, get_stats_date},
},
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
parseable::PARSEABLE,
rbac::{map::SessionKey, role::Action, Users},
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
Expand Down Expand Up @@ -221,9 +218,9 @@ async fn stats_for_date(
};

// Process each stream concurrently
let stream_stats_futures = stream_wise_meta.iter().map(|(stream, meta)| {
get_stream_stats_for_date(stream.clone(), date.clone(), meta.clone())
});
let stream_stats_futures = stream_wise_meta
.values()
.map(|meta| get_stream_stats_for_date(date.clone(), meta.clone()));

let stream_stats_results = futures::future::join_all(stream_stats_futures).await;

Expand All @@ -246,18 +243,12 @@ async fn stats_for_date(
}

async fn get_stream_stats_for_date(
stream: String,
date: String,
meta: Vec<ObjectStoreFormat>,
) -> Result<(u64, u64, u64), PrismHomeError> {
let querier_stats = get_stats_date(&stream, &date).await?;
let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?;

Ok((
querier_stats.events + ingestor_stats.events,
querier_stats.ingestion + ingestor_stats.ingestion,
querier_stats.storage + ingestor_stats.storage,
))
let stats = fetch_daily_stats(&date, &meta)?;

Ok((stats.events, stats.ingestion, stats.storage))
}

pub async fn generate_home_search_response(
Expand Down
Loading