@@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
44use anyhow:: { anyhow, Context , Result } ;
55use payjoin:: bitcoin:: consensus:: encode:: serialize_hex;
66use payjoin:: bitcoin:: { Amount , FeeRate } ;
7- use payjoin:: persist:: OptionalTransitionOutcome ;
7+ use payjoin:: persist:: { OptionalTransitionOutcome , SessionPersister } ;
88use payjoin:: receive:: v2:: {
99 replay_event_log as replay_receiver_event_log, HasReplyableError , Initialized ,
1010 MaybeInputsOwned , MaybeInputsSeen , Monitor , OutputsUnknown , PayjoinProposal ,
@@ -286,26 +286,38 @@ impl AppTrait for App {
286286
287287 let mut tasks = Vec :: new ( ) ;
288288
289+ // Process receiver sessions
289290 for session_id in recv_session_ids {
290291 let self_clone = self . clone ( ) ;
291- let recv_persister = ReceiverPersister :: from_id ( self . db . clone ( ) , session_id) ;
292- let receiver_state = replay_receiver_event_log ( & recv_persister)
293- . map_err ( |e| anyhow ! ( "Failed to replay receiver event log: {:?}" , e) ) ?
294- . 0 ;
295- tasks. push ( tokio:: spawn ( async move {
296- self_clone. process_receiver_session ( receiver_state, & recv_persister) . await
297- } ) ) ;
292+ let recv_persister = ReceiverPersister :: from_id ( self . db . clone ( ) , session_id. clone ( ) ) ;
293+ match replay_receiver_event_log ( & recv_persister) {
294+ Ok ( ( receiver_state, _) ) => {
295+ tasks. push ( tokio:: spawn ( async move {
296+ self_clone. process_receiver_session ( receiver_state, & recv_persister) . await
297+ } ) ) ;
298+ }
299+ Err ( e) => {
300+ tracing:: error!( "An error {:?} occurred while replaying receiver session" , e) ;
301+ Self :: close_failed_session ( & recv_persister, & session_id, "receiver" ) ;
302+ }
303+ }
298304 }
299305
306+ // Process sender sessions
300307 for session_id in send_session_ids {
301- let sender_persiter = SenderPersister :: from_id ( self . db . clone ( ) , session_id) ;
302- let sender_state = replay_sender_event_log ( & sender_persiter)
303- . map_err ( |e| anyhow ! ( "Failed to replay sender event log: {:?}" , e) ) ?
304- . 0 ;
305- let self_clone = self . clone ( ) ;
306- tasks. push ( tokio:: spawn ( async move {
307- self_clone. process_sender_session ( sender_state, & sender_persiter) . await
308- } ) ) ;
308+ let sender_persiter = SenderPersister :: from_id ( self . db . clone ( ) , session_id. clone ( ) ) ;
309+ match replay_sender_event_log ( & sender_persiter) {
310+ Ok ( ( sender_state, _) ) => {
311+ let self_clone = self . clone ( ) ;
312+ tasks. push ( tokio:: spawn ( async move {
313+ self_clone. process_sender_session ( sender_state, & sender_persiter) . await
314+ } ) ) ;
315+ }
316+ Err ( e) => {
317+ tracing:: error!( "An error {:?} occurred while replaying Sender session" , e) ;
318+ Self :: close_failed_session ( & sender_persiter, & session_id, "sender" ) ;
319+ }
320+ }
309321 }
310322
311323 let mut interrupt = self . interrupt . clone ( ) ;
@@ -450,6 +462,17 @@ impl AppTrait for App {
450462}
451463
452464impl App {
465+ fn close_failed_session < P > ( persister : & P , session_id : & SessionId , role : & str )
466+ where
467+ P : SessionPersister ,
468+ {
469+ if let Err ( close_err) = SessionPersister :: close ( persister) {
470+ tracing:: error!( "Failed to close {} session {}: {:?}" , role, session_id, close_err) ;
471+ } else {
472+ tracing:: error!( "Closed failed {} session: {}" , role, session_id) ;
473+ }
474+ }
475+
453476 async fn process_sender_session (
454477 & self ,
455478 session : SendSession ,
0 commit comments