@@ -3,6 +3,7 @@ use std::{
33 path:: PathBuf ,
44} ;
55
6+ use aggregate:: TraceAggregator ;
67use anyhow:: Result ;
78use average:: Variance ;
89use itertools:: Itertools as _;
@@ -21,6 +22,8 @@ use tokio::{
2122} ;
2223use tracing:: { info, info_span} ;
2324
25+ mod aggregate;
26+
2427type InputBlockId = sim_core:: model:: InputBlockId < Node > ;
2528type EndorserBlockId = sim_core:: model:: EndorserBlockId < Node > ;
2629type VoteBundleId = sim_core:: model:: VoteBundleId < Node > ;
@@ -44,6 +47,7 @@ pub struct EventMonitor {
4447 maximum_eb_age : u64 ,
4548 events_source : mpsc:: UnboundedReceiver < ( Event , Timestamp ) > ,
4649 output_path : Option < PathBuf > ,
50+ aggregate : bool ,
4751}
4852
4953impl EventMonitor {
@@ -67,6 +71,7 @@ impl EventMonitor {
6771 maximum_eb_age : config. max_eb_age ,
6872 events_source,
6973 output_path,
74+ aggregate : config. aggregate_events ,
7075 }
7176 }
7277
@@ -137,9 +142,17 @@ impl EventMonitor {
137142 } else {
138143 OutputFormat :: JsonStream
139144 } ;
140- OutputTarget :: EventStream {
141- format,
142- file : BufWriter :: new ( file) ,
145+ if self . aggregate {
146+ OutputTarget :: AggregatedEventStream {
147+ aggregation : TraceAggregator :: new ( ) ,
148+ format,
149+ file : BufWriter :: new ( file) ,
150+ }
151+ } else {
152+ OutputTarget :: EventStream {
153+ format,
154+ file : BufWriter :: new ( file) ,
155+ }
143156 }
144157 }
145158 None => OutputTarget :: None ,
@@ -576,6 +589,11 @@ fn compute_stats<Iter: IntoIterator<Item = f64>>(data: Iter) -> Stats {
576589}
577590
578591enum OutputTarget {
592+ AggregatedEventStream {
593+ aggregation : TraceAggregator ,
594+ format : OutputFormat ,
595+ file : BufWriter < File > ,
596+ } ,
579597 EventStream {
580598 format : OutputFormat ,
581599 file : BufWriter < File > ,
@@ -586,6 +604,15 @@ enum OutputTarget {
586604impl OutputTarget {
587605 async fn write ( & mut self , event : OutputEvent ) -> Result < ( ) > {
588606 match self {
607+ Self :: AggregatedEventStream {
608+ aggregation,
609+ format,
610+ file,
611+ } => {
612+ if let Some ( summary) = aggregation. process ( event) {
613+ Self :: write_line ( * format, file, summary) . await ?;
614+ }
615+ }
589616 Self :: EventStream { format, file } => {
590617 Self :: write_line ( * format, file, event) . await ?;
591618 }
@@ -615,6 +642,16 @@ impl OutputTarget {
615642
616643 async fn flush ( self ) -> Result < ( ) > {
617644 match self {
645+ Self :: AggregatedEventStream {
646+ aggregation,
647+ format,
648+ mut file,
649+ } => {
650+ if let Some ( summary) = aggregation. finish ( ) {
651+ Self :: write_line ( format, & mut file, summary) . await ?;
652+ }
653+ file. flush ( ) . await ?;
654+ }
618655 Self :: EventStream { mut file, .. } => {
619656 file. flush ( ) . await ?;
620657 }
0 commit comments