@@ -262,9 +262,19 @@ where
262262 }
263263
264264 async fn run ( self , init_receiver : Receiver < InitMessage < I > > , channels : Arc < dyn ClusterChannels > , io : I ) {
265+ let name = self . name ;
265266 let started_at = Instant :: now ( ) ;
266267 let result = self . do_run ( init_receiver, channels, & io) . await ;
267- info ! ( "State machine execution took {:?}" , started_at. elapsed( ) ) ;
268+ let elapsed = started_at. elapsed ( ) ;
269+ match & result {
270+ Ok ( _) => {
271+ info ! ( "State machine finished successfully after {elapsed:?}" ) ;
272+ }
273+ Err ( e) => {
274+ METRICS . inc_failures ( name) ;
275+ error ! ( "State machine execution failed after {elapsed:?}: {e:#}" ) ;
276+ }
277+ } ;
268278 io. handle_final_result ( result) . await ;
269279 }
270280
@@ -280,7 +290,8 @@ where
280290 for party in & parties {
281291 let party_id = & party. party_id ;
282292 info ! ( "Opening stream to {party_id}" ) ;
283- futs. push ( io. open_party_stream ( channels. deref ( ) , & party. party_id ) ) ;
293+ let future = io. open_party_stream ( channels. deref ( ) , & party. party_id ) ;
294+ futs. push ( PartyFuture { party : party_id. clone ( ) , future } ) ;
284295 }
285296 // ensure opening the channel succeeded
286297 let streams = self . await_requests_with_timeout ( futs) . await . context ( "failed to establish channel to peer" ) ?;
@@ -311,13 +322,25 @@ where
311322 }
312323 }
313324
314- async fn await_requests_with_timeout < F , T , E > ( & self , futs : Vec < F > ) -> anyhow:: Result < Vec < T > >
325+ async fn await_requests_with_timeout < F , T , E > ( & self , futs : Vec < PartyFuture < F > > ) -> anyhow:: Result < Vec < T > >
315326 where
316327 F : Future < Output = Result < T , E > > ,
317328 E : std:: error:: Error + Send + Sync + ' static ,
318329 {
330+ let ( parties, futs) : ( Vec < _ > , Vec < _ > ) = futs. into_iter ( ) . map ( |p| ( p. party , p. future ) ) . unzip ( ) ;
319331 match timeout ( self . timeout , join_all ( futs) ) . await {
320- Ok ( futs) => Ok ( futs. into_iter ( ) . collect :: < Result < _ , _ > > ( ) ?) ,
332+ Ok ( results) => {
333+ let mut outputs = Vec :: new ( ) ;
334+ for ( party, result) in parties. into_iter ( ) . zip ( results) {
335+ match result {
336+ Ok ( value) => outputs. push ( value) ,
337+ Err ( e) => {
338+ bail ! ( "failed to send request to {party}: {e}" ) ;
339+ }
340+ } ;
341+ }
342+ Ok ( outputs)
343+ }
321344 Err ( _) => bail ! ( "timed out waiting for request to be handled" ) ,
322345 }
323346 }
@@ -383,7 +406,7 @@ where
383406 messages : Vec < RecipientMessage < PartyId , I :: StateMachineMessage > > ,
384407 ) -> anyhow:: Result < (
385408 Vec < I :: StateMachineMessage > ,
386- Vec < impl Future < Output = Result < ( ) , SendError < I :: OutputMessage > > > + ' a > ,
409+ Vec < PartyFuture < impl Future < Output = Result < ( ) , SendError < I :: OutputMessage > > > + ' a > > ,
387410 ) > {
388411 // accumulate all of our own messages and the ones to be sent for later use.
389412 let mut self_messages = Vec :: new ( ) ;
@@ -412,7 +435,8 @@ where
412435 }
413436 } ;
414437 let message = I :: StateMachineMessage :: encoded_bytes_as_output_message ( encoded_message) ;
415- futs. push ( channel. send ( message) ) ;
438+ let future = channel. send ( message) ;
439+ futs. push ( PartyFuture { party, future } ) ;
416440 }
417441 }
418442 }
@@ -468,6 +492,11 @@ where
468492 }
469493}
470494
495+ struct PartyFuture < F > {
496+ party : PartyId ,
497+ future : F ,
498+ }
499+
471500enum HandleOutput < I >
472501where
473502 I : StateMachineIo ,
@@ -488,6 +517,7 @@ where
488517
489518struct Metrics {
490519 messages : MaybeMetric < Counter > ,
520+ failures : MaybeMetric < Counter > ,
491521 active_state_machines : MaybeMetric < Gauge > ,
492522 execution_duration : MaybeMetric < Histogram < Duration > > ,
493523}
@@ -500,6 +530,9 @@ impl Default for Metrics {
500530 & [ "state_machine" , "direction" ] ,
501531 )
502532 . into ( ) ;
533+ let failures =
534+ Counter :: new ( "state_machine_failures_total" , "Number of state machines that failed" , & [ "state_machine" ] )
535+ . into ( ) ;
503536 let active_state_machines =
504537 Gauge :: new ( "active_state_machines_total" , "Number of active state machines" , & [ "state_machine" ] ) . into ( ) ;
505538 let execution_duration = Histogram :: new (
@@ -509,7 +542,7 @@ impl Default for Metrics {
509542 TimingBuckets :: sub_minute ( ) ,
510543 )
511544 . into ( ) ;
512- Self { messages, active_state_machines, execution_duration }
545+ Self { messages, active_state_machines, execution_duration, failures }
513546 }
514547}
515548
@@ -518,6 +551,10 @@ impl Metrics {
518551 self . messages . with_labels ( [ ( "state_machine" , state_machine) , ( "direction" , direction) ] ) . inc_by ( count) ;
519552 }
520553
554+ fn inc_failures ( & self , state_machine : & str ) {
555+ self . failures . with_labels ( [ ( "state_machine" , state_machine) ] ) . inc ( ) ;
556+ }
557+
521558 fn active_state_machines_guard ( & self , state_machine : & str ) -> ScopedGauge < impl SingleGaugeMetric > {
522559 self . active_state_machines . with_labels ( [ ( "state_machine" , state_machine) ] ) . into_scoped_gauge ( )
523560 }
0 commit comments