11use std:: {
22 collections:: { BTreeMap , BTreeSet } ,
33 path:: PathBuf ,
4+ pin:: Pin ,
45} ;
56
67use aggregate:: TraceAggregator ;
78use anyhow:: Result ;
9+ use async_compression:: tokio:: write:: GzipEncoder ;
810use average:: Variance ;
911use itertools:: Itertools as _;
1012use pretty_bytes_rust:: { pretty_bytes, PrettyBytesOptions } ;
@@ -17,7 +19,7 @@ use sim_core::{
1719} ;
1820use tokio:: {
1921 fs:: { self , File } ,
20- io:: { AsyncWriteExt as _, BufWriter } ,
22+ io:: { AsyncWrite , AsyncWriteExt as _, BufWriter } ,
2123 sync:: mpsc,
2224} ;
2325use tracing:: { info, info_span} ;
@@ -28,6 +30,8 @@ type InputBlockId = sim_core::model::InputBlockId<Node>;
2830type EndorserBlockId = sim_core:: model:: EndorserBlockId < Node > ;
2931type VoteBundleId = sim_core:: model:: VoteBundleId < Node > ;
3032
33+ type TraceSink = Pin < Box < dyn AsyncWrite + Send + Sync + ' static > > ;
34+
3135#[ derive( Clone , Serialize ) ]
3236struct OutputEvent {
3337 time_s : Timestamp ,
@@ -130,9 +134,27 @@ impl EventMonitor {
130134 }
131135 }
132136
133- let mut output = match self . output_path {
134- Some ( ref path) => {
135- let file = File :: create ( path) . await ?;
137+ let mut output = match self . output_path . as_mut ( ) {
138+ Some ( path) => {
139+ let file = File :: create ( & path) . await ?;
140+
141+ let mut gzipped = false ;
142+ if path
143+ . extension ( )
144+ . and_then ( |e| e. to_str ( ) )
145+ . is_some_and ( |ext| ext == "gz" )
146+ {
147+ path. set_extension ( "" ) ;
148+ gzipped = true ;
149+ }
150+
151+ let file: TraceSink = if gzipped {
152+ let encoder = GzipEncoder :: new ( file) ;
153+ Box :: pin ( BufWriter :: new ( encoder) )
154+ } else {
155+ Box :: pin ( BufWriter :: new ( file) )
156+ } ;
157+
136158 let format = if path
137159 . extension ( )
138160 . and_then ( |e| e. to_str ( ) )
@@ -146,13 +168,10 @@ impl EventMonitor {
146168 OutputTarget :: AggregatedEventStream {
147169 aggregation : TraceAggregator :: new ( ) ,
148170 format,
149- file : BufWriter :: new ( file ) ,
171+ file,
150172 }
151173 } else {
152- OutputTarget :: EventStream {
153- format,
154- file : BufWriter :: new ( file) ,
155- }
174+ OutputTarget :: EventStream { format, file }
156175 }
157176 }
158177 None => OutputTarget :: None ,
@@ -592,11 +611,11 @@ enum OutputTarget {
592611 AggregatedEventStream {
593612 aggregation : TraceAggregator ,
594613 format : OutputFormat ,
595- file : BufWriter < File > ,
614+ file : TraceSink ,
596615 } ,
597616 EventStream {
598617 format : OutputFormat ,
599- file : BufWriter < File > ,
618+ file : TraceSink ,
600619 } ,
601620 None ,
602621}
@@ -621,9 +640,9 @@ impl OutputTarget {
621640 Ok ( ( ) )
622641 }
623642
624- async fn write_line < T : Serialize > (
643+ async fn write_line < T : Serialize , W : AsyncWrite + Unpin > (
625644 format : OutputFormat ,
626- file : & mut BufWriter < File > ,
645+ file : & mut W ,
627646 event : T ,
628647 ) -> Result < ( ) > {
629648 match format {
0 commit comments