Skip to content

Commit 2b6204a

Browse files
committed
Support pre-aggregated event streams in the UI
1 parent 982c414 commit 2b6204a

File tree

14 files changed

+528
-53
lines changed

14 files changed

+528
-53
lines changed

sim-rs/sim-cli/src/events.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
path::PathBuf,
44
};
55

6+
use aggregate::TraceAggregator;
67
use anyhow::Result;
78
use average::Variance;
89
use itertools::Itertools as _;
@@ -21,6 +22,8 @@ use tokio::{
2122
};
2223
use tracing::{info, info_span};
2324

25+
mod aggregate;
26+
2427
type InputBlockId = sim_core::model::InputBlockId<Node>;
2528
type EndorserBlockId = sim_core::model::EndorserBlockId<Node>;
2629
type VoteBundleId = sim_core::model::VoteBundleId<Node>;
@@ -44,6 +47,7 @@ pub struct EventMonitor {
4447
maximum_eb_age: u64,
4548
events_source: mpsc::UnboundedReceiver<(Event, Timestamp)>,
4649
output_path: Option<PathBuf>,
50+
aggregate: bool,
4751
}
4852

4953
impl EventMonitor {
@@ -67,6 +71,7 @@ impl EventMonitor {
6771
maximum_eb_age: config.max_eb_age,
6872
events_source,
6973
output_path,
74+
aggregate: config.aggregate_events,
7075
}
7176
}
7277

@@ -137,9 +142,17 @@ impl EventMonitor {
137142
} else {
138143
OutputFormat::JsonStream
139144
};
140-
OutputTarget::EventStream {
141-
format,
142-
file: BufWriter::new(file),
145+
if self.aggregate {
146+
OutputTarget::AggregatedEventStream {
147+
aggregation: TraceAggregator::new(),
148+
format,
149+
file: BufWriter::new(file),
150+
}
151+
} else {
152+
OutputTarget::EventStream {
153+
format,
154+
file: BufWriter::new(file),
155+
}
143156
}
144157
}
145158
None => OutputTarget::None,
@@ -576,6 +589,11 @@ fn compute_stats<Iter: IntoIterator<Item = f64>>(data: Iter) -> Stats {
576589
}
577590

578591
enum OutputTarget {
592+
AggregatedEventStream {
593+
aggregation: TraceAggregator,
594+
format: OutputFormat,
595+
file: BufWriter<File>,
596+
},
579597
EventStream {
580598
format: OutputFormat,
581599
file: BufWriter<File>,
@@ -586,6 +604,15 @@ enum OutputTarget {
586604
impl OutputTarget {
587605
async fn write(&mut self, event: OutputEvent) -> Result<()> {
588606
match self {
607+
Self::AggregatedEventStream {
608+
aggregation,
609+
format,
610+
file,
611+
} => {
612+
if let Some(summary) = aggregation.process(event) {
613+
Self::write_line(*format, file, summary).await?;
614+
}
615+
}
589616
Self::EventStream { format, file } => {
590617
Self::write_line(*format, file, event).await?;
591618
}
@@ -615,6 +642,16 @@ impl OutputTarget {
615642

616643
async fn flush(self) -> Result<()> {
617644
match self {
645+
Self::AggregatedEventStream {
646+
aggregation,
647+
format,
648+
mut file,
649+
} => {
650+
if let Some(summary) = aggregation.finish() {
651+
Self::write_line(format, &mut file, summary).await?;
652+
}
653+
file.flush().await?;
654+
}
618655
Self::EventStream { mut file, .. } => {
619656
file.flush().await?;
620657
}

0 commit comments

Comments
 (0)