@@ -122,10 +122,23 @@ impl NewReceiptNotification {
122122 AllocationId :: Legacy ( AllocationIdCore :: from ( n. allocation_id ) )
123123 }
124124 NewReceiptNotification :: V2 ( n) => {
125- // Convert the hex string to CollectionId
126- let collection_id = CollectionId :: from_str ( & n. collection_id )
127- . expect ( "Valid collection_id in database" ) ;
128- AllocationId :: Horizon ( collection_id)
125+ // Convert the hex string to CollectionId (trim spaces from fixed-length DB field)
126+ let trimmed_collection_id = n. collection_id . trim ( ) ;
127+ match CollectionId :: from_str ( trimmed_collection_id) {
128+ Ok ( collection_id) => AllocationId :: Horizon ( collection_id) ,
129+ Err ( e) => {
130+ tracing:: error!(
131+ collection_id = %n. collection_id,
132+ trimmed_collection_id = %trimmed_collection_id,
133+ error = %e,
134+ "Failed to parse collection_id from database notification"
135+ ) ;
136+ // Fall back to treating as address for now
137+ let fallback_address =
138+ trimmed_collection_id. parse ( ) . unwrap_or ( Address :: ZERO ) ;
139+ AllocationId :: Legacy ( AllocationIdCore :: from ( fallback_address) )
140+ }
141+ }
129142 }
130143 }
131144 }
@@ -185,7 +198,7 @@ impl Display for AllocationId {
185198
186199/// Type used in [SenderAccountsManager] and [SenderAccount] to route the correct escrow queries
187200/// and to use the correct set of tables
188- #[ derive( Clone , Copy ) ]
201+ #[ derive( Clone , Copy , Debug ) ]
189202pub enum SenderType {
190203 /// SenderAccounts that are found in Escrow Subgraph v1 (Legacy)
191204 Legacy ,
@@ -973,14 +986,28 @@ async fn new_receipts_watcher(
973986 ) ;
974987 }
975988 }
989+
990+ tracing:: info!(
991+ "New receipts watcher started and listening for notifications, sender_type: {:?}, prefix: {:?}" ,
992+ sender_type, prefix
993+ ) ;
994+
976995 loop {
996+ tracing:: debug!( "Waiting for notification from pglistener..." ) ;
997+
977998 let Ok ( pg_notification) = pglistener. recv ( ) . await else {
978999 tracing:: error!(
9791000 "should be able to receive Postgres Notify events on the channel \
9801001 'scalar_tap_receipt_notification'/'tap_horizon_receipt_notification'"
9811002 ) ;
9821003 break ;
9831004 } ;
1005+
1006+ tracing:: info!(
1007+ channel = pg_notification. channel( ) ,
1008+ payload = pg_notification. payload( ) ,
1009+ "Received notification from database"
1010+ ) ;
9841011 // Determine notification format based on the channel name
9851012 let new_receipt_notification = match pg_notification. channel ( ) {
9861013 "scalar_tap_receipt_notification" => {
@@ -1019,15 +1046,20 @@ async fn new_receipts_watcher(
10191046 break ;
10201047 }
10211048 } ;
1022- if let Err ( e ) = handle_notification (
1049+ match handle_notification (
10231050 new_receipt_notification,
10241051 escrow_accounts_rx. clone ( ) ,
10251052 sender_type,
10261053 prefix. as_deref ( ) ,
10271054 )
10281055 . await
10291056 {
1030- tracing:: error!( "{}" , e) ;
1057+ Ok ( ( ) ) => {
1058+ tracing:: debug!( "Successfully handled notification" ) ;
1059+ }
1060+ Err ( e) => {
1061+ tracing:: error!( "Error handling notification: {}" , e) ;
1062+ }
10311063 }
10321064 }
10331065 // shutdown the whole system
0 commit comments