@@ -31,10 +31,10 @@ struct OutputEvent {
3131 message : Event ,
3232}
3333
34- #[ derive( clap :: ValueEnum , Clone , Copy ) ]
35- pub enum OutputFormat {
36- EventStream ,
37- SlotStream ,
34+ #[ derive( Clone , Copy ) ]
35+ enum OutputFormat {
36+ JsonStream ,
37+ CborStream ,
3838}
3939
4040pub struct EventMonitor {
@@ -44,15 +44,13 @@ pub struct EventMonitor {
4444 maximum_ib_age : u64 ,
4545 events_source : mpsc:: UnboundedReceiver < ( Event , Timestamp ) > ,
4646 output_path : Option < PathBuf > ,
47- output_format : OutputFormat ,
4847}
4948
5049impl EventMonitor {
5150 pub fn new (
5251 config : & SimConfiguration ,
5352 events_source : mpsc:: UnboundedReceiver < ( Event , Timestamp ) > ,
5453 output_path : Option < PathBuf > ,
55- output_format : Option < OutputFormat > ,
5654 ) -> Self {
5755 let node_ids = config. nodes . iter ( ) . map ( |p| p. id ) . collect ( ) ;
5856 let pool_ids = config
@@ -69,7 +67,6 @@ impl EventMonitor {
6967 maximum_ib_age,
7068 events_source,
7169 output_path,
72- output_format : output_format. unwrap_or ( OutputFormat :: EventStream ) ,
7370 }
7471 }
7572
@@ -128,12 +125,18 @@ impl EventMonitor {
128125 let mut output = match self . output_path {
129126 Some ( ref path) => {
130127 let file = File :: create ( path) . await ?;
131- match self . output_format {
132- OutputFormat :: EventStream => OutputTarget :: EventStream ( BufWriter :: new ( file) ) ,
133- OutputFormat :: SlotStream => OutputTarget :: SlotStream {
134- file : BufWriter :: new ( file) ,
135- next : None ,
136- } ,
128+ let format = if path
129+ . extension ( )
130+ . and_then ( |e| e. to_str ( ) )
131+ . is_some_and ( |ext| ext == "cbor" )
132+ {
133+ OutputFormat :: CborStream
134+ } else {
135+ OutputFormat :: JsonStream
136+ } ;
137+ OutputTarget :: EventStream {
138+ format,
139+ file : BufWriter :: new ( file) ,
137140 }
138141 }
139142 None => OutputTarget :: None ,
@@ -540,61 +543,46 @@ fn compute_stats<Iter: IntoIterator<Item = f64>>(data: Iter) -> Stats {
540543}
541544
542545enum OutputTarget {
543- EventStream ( BufWriter < File > ) ,
544- SlotStream {
546+ EventStream {
547+ format : OutputFormat ,
545548 file : BufWriter < File > ,
546- next : Option < SlotEvents > ,
547549 } ,
548550 None ,
549551}
550552
551- #[ derive( Serialize ) ]
552- struct SlotEvents {
553- slot : u64 ,
554- start_time : Timestamp ,
555- events : Vec < OutputEvent > ,
556- }
557553impl OutputTarget {
558554 async fn write ( & mut self , event : OutputEvent ) -> Result < ( ) > {
559555 match self {
560- Self :: EventStream ( file) => {
561- Self :: write_line ( file, event) . await ?;
562- }
563- Self :: SlotStream { file, next } => {
564- if let Event :: Slot { number } = & event. message {
565- if let Some ( slot) = next. take ( ) {
566- Self :: write_line ( file, slot) . await ?;
567- }
568- * next = Some ( SlotEvents {
569- slot : * number,
570- start_time : event. time ,
571- events : vec ! [ ] ,
572- } ) ;
573- } else if let Some ( slot) = next. as_mut ( ) {
574- slot. events . push ( event) ;
575- }
556+ Self :: EventStream { format, file } => {
557+ Self :: write_line ( * format, file, event) . await ?;
576558 }
577559 Self :: None => { }
578560 }
579561 Ok ( ( ) )
580562 }
581563
582- async fn write_line < T : Serialize > ( file : & mut BufWriter < File > , event : T ) -> Result < ( ) > {
583- let mut string = serde_json:: to_string ( & event) ?;
584- string. push ( '\n' ) ;
585- file. write_all ( string. as_bytes ( ) ) . await ?;
564+ async fn write_line < T : Serialize > (
565+ format : OutputFormat ,
566+ file : & mut BufWriter < File > ,
567+ event : T ,
568+ ) -> Result < ( ) > {
569+ match format {
570+ OutputFormat :: JsonStream => {
571+ let mut string = serde_json:: to_string ( & event) ?;
572+ string. push ( '\n' ) ;
573+ file. write_all ( string. as_bytes ( ) ) . await ?;
574+ }
575+ OutputFormat :: CborStream => {
576+ let bytes = serde_cbor:: to_vec ( & event) ?;
577+ file. write_all ( & bytes) . await ?;
578+ }
579+ }
586580 Ok ( ( ) )
587581 }
588582
589583 async fn flush ( self ) -> Result < ( ) > {
590584 match self {
591- Self :: EventStream ( mut file) => {
592- file. flush ( ) . await ?;
593- }
594- Self :: SlotStream { mut file, mut next } => {
595- if let Some ( slot) = next. take ( ) {
596- Self :: write_line ( & mut file, slot) . await ?;
597- }
585+ Self :: EventStream { mut file, .. } => {
598586 file. flush ( ) . await ?;
599587 }
600588 Self :: None => { }
0 commit comments