@@ -142,7 +142,8 @@ pub struct SenderAccountsManagerArgs {
142142pub struct State {
143143 sender_ids_v1 : HashSet < Address > ,
144144 sender_ids_v2 : HashSet < Address > ,
145- new_receipts_watcher_handle : Option < tokio:: task:: JoinHandle < ( ) > > ,
145+ new_receipts_watcher_handle_v1 : Option < tokio:: task:: JoinHandle < ( ) > > ,
146+ new_receipts_watcher_handle_v2 : Option < tokio:: task:: JoinHandle < ( ) > > ,
146147
147148 config : & ' static SenderAccountConfig ,
148149 domain_separator : Eip712Domain ,
@@ -191,7 +192,9 @@ impl Actor for SenderAccountsManager {
191192 . map ( AllocationId :: Legacy )
192193 . collect :: < HashSet < _ > > ( )
193194 } ) ;
194- let pglistener = PgListener :: connect_with ( & pgpool. clone ( ) ) . await . unwrap ( ) ;
195+ // we need two connections because each one will listen to different notify events
196+ let pglistener_v1 = PgListener :: connect_with ( & pgpool. clone ( ) ) . await . unwrap ( ) ;
197+ let pglistener_v2 = PgListener :: connect_with ( & pgpool. clone ( ) ) . await . unwrap ( ) ;
195198 let myself_clone = myself. clone ( ) ;
196199 let accounts_clone = escrow_accounts_v1. clone ( ) ;
197200 watch_pipe ( accounts_clone, move |escrow_accounts| {
@@ -225,8 +228,9 @@ impl Actor for SenderAccountsManager {
225228 domain_separator,
226229 sender_ids_v1 : HashSet :: new ( ) ,
227230 sender_ids_v2 : HashSet :: new ( ) ,
228- new_receipts_watcher_handle : None ,
229- pgpool,
231+ new_receipts_watcher_handle_v1 : None ,
232+ new_receipts_watcher_handle_v2 : None ,
233+ pgpool : pgpool. clone ( ) ,
230234 indexer_allocations,
231235 escrow_accounts_v1 : escrow_accounts_v1. clone ( ) ,
232236 escrow_accounts_v2 : escrow_accounts_v2. clone ( ) ,
@@ -279,10 +283,21 @@ impl Actor for SenderAccountsManager {
279283
280284 // Start the new_receipts_watcher task that will consume from the `pglistener`
281285 // after starting all senders
282- state. new_receipts_watcher_handle = Some ( tokio:: spawn ( new_receipts_watcher (
286+ state. new_receipts_watcher_handle_v1 = Some ( tokio:: spawn ( new_receipts_watcher (
283287 myself. get_cell ( ) ,
284- pglistener ,
288+ pglistener_v1 ,
285289 escrow_accounts_v1,
290+ SenderType :: Legacy ,
291+ prefix. clone ( ) ,
292+ ) ) ) ;
293+
294+ // Start the new_receipts_watcher task that will consume from the `pglistener`
295+ // after starting all senders
296+ state. new_receipts_watcher_handle_v2 = Some ( tokio:: spawn ( new_receipts_watcher (
297+ myself. get_cell ( ) ,
298+ pglistener_v2,
299+ escrow_accounts_v2,
300+ SenderType :: Horizon ,
286301 prefix,
287302 ) ) ) ;
288303
@@ -296,9 +311,14 @@ impl Actor for SenderAccountsManager {
296311 ) -> Result < ( ) , ActorProcessingErr > {
297312 // Abort the notification watcher on drop. Otherwise it may panic because the PgPool could
298313 // get dropped before. (Observed in tests)
299- if let Some ( handle) = & state. new_receipts_watcher_handle {
314+ if let Some ( handle) = & state. new_receipts_watcher_handle_v1 {
315+ handle. abort ( ) ;
316+ }
317+
318+ if let Some ( handle) = & state. new_receipts_watcher_handle_v2 {
300319 handle. abort ( ) ;
301320 }
321+
302322 Ok ( ( ) )
303323 }
304324
@@ -741,15 +761,29 @@ async fn new_receipts_watcher(
741761 actor_cell : ActorCell ,
742762 mut pglistener : PgListener ,
743763 escrow_accounts_rx : Receiver < EscrowAccounts > ,
764+ sender_type : SenderType ,
744765 prefix : Option < String > ,
745766) {
746- pglistener
747- . listen ( "scalar_tap_receipt_notification" )
748- . await
749- . expect (
750- "should be able to subscribe to Postgres Notify events on the channel \
767+ match sender_type {
768+ SenderType :: Legacy => {
769+ pglistener
770+ . listen ( "scalar_tap_receipt_notification" )
771+ . await
772+ . expect (
773+ "should be able to subscribe to Postgres Notify events on the channel \
751774 'scalar_tap_receipt_notification'",
752- ) ;
775+ ) ;
776+ }
777+ SenderType :: Horizon => {
778+ pglistener
779+ . listen ( "scalar_tap_receipt_notification" )
780+ . await
781+ . expect (
782+ "should be able to subscribe to Postgres Notify events on the channel \
783+ 'scalar_tap_receipt_notification'",
784+ ) ;
785+ }
786+ }
753787 loop {
754788 let Ok ( pg_notification) = pglistener. recv ( ) . await else {
755789 tracing:: error!(
@@ -770,6 +804,7 @@ async fn new_receipts_watcher(
770804 if let Err ( e) = handle_notification (
771805 new_receipt_notification,
772806 escrow_accounts_rx. clone ( ) ,
807+ sender_type,
773808 prefix. as_deref ( ) ,
774809 )
775810 . await
@@ -798,6 +833,7 @@ async fn new_receipts_watcher(
798833async fn handle_notification (
799834 new_receipt_notification : NewReceiptNotification ,
800835 escrow_accounts_rx : Receiver < EscrowAccounts > ,
836+ sender_type : SenderType ,
801837 prefix : Option < & str > ,
802838) -> anyhow:: Result < ( ) > {
803839 tracing:: trace!(
@@ -835,10 +871,14 @@ async fn handle_notification(
835871 allocation_id
836872 ) ;
837873 let sender_account_name = format ! (
838- "{}{sender_address}" ,
874+ "{}{}{ sender_address}" ,
839875 prefix
840876 . as_ref( )
841- . map_or( String :: default ( ) , |prefix| format!( "{prefix}:" ) )
877+ . map_or( String :: default ( ) , |prefix| format!( "{prefix}:" ) ) ,
878+ match sender_type {
879+ SenderType :: Legacy => "legacy:" ,
880+ SenderType :: Horizon => "horizon:" ,
881+ }
842882 ) ;
843883
844884 let Some ( sender_account) = ActorRef :: < SenderAccountMessage > :: where_is ( sender_account_name)
@@ -849,9 +889,10 @@ async fn handle_notification(
849889 ) ;
850890 } ;
851891 sender_account
852- . cast ( SenderAccountMessage :: NewAllocationId ( AllocationId :: Legacy (
853- * allocation_id,
854- ) ) )
892+ . cast ( SenderAccountMessage :: NewAllocationId ( match sender_type {
893+ SenderType :: Legacy => AllocationId :: Legacy ( * allocation_id) ,
894+ SenderType :: Horizon => AllocationId :: Horizon ( * allocation_id) ,
895+ } ) )
855896 . map_err ( |e| {
856897 anyhow ! (
857898 "Error while sendeing new allocation id message to sender_account: {:?}" ,
@@ -946,7 +987,8 @@ mod tests {
946987 domain_separator : TAP_EIP712_DOMAIN_SEPARATOR . clone ( ) ,
947988 sender_ids_v1 : HashSet :: new ( ) ,
948989 sender_ids_v2 : HashSet :: new ( ) ,
949- new_receipts_watcher_handle : None ,
990+ new_receipts_watcher_handle_v1 : None ,
991+ new_receipts_watcher_handle_v2 : None ,
950992 pgpool,
951993 indexer_allocations : watch:: channel ( HashSet :: new ( ) ) . 1 ,
952994 escrow_accounts_v1 : watch:: channel ( escrow_accounts. clone ( ) ) . 1 ,
@@ -1131,6 +1173,7 @@ mod tests {
11311173 dummy_actor. get_cell ( ) ,
11321174 pglistener,
11331175 escrow_accounts_rx,
1176+ SenderType :: Legacy ,
11341177 Some ( prefix. clone ( ) ) ,
11351178 ) ) ;
11361179
@@ -1174,6 +1217,7 @@ mod tests {
11741217 dummy_actor. get_cell ( ) ,
11751218 pglistener,
11761219 escrow_accounts_rx,
1220+ SenderType :: Legacy ,
11771221 None ,
11781222 ) ) ;
11791223 pgpool. close ( ) . await ;
@@ -1210,9 +1254,14 @@ mod tests {
12101254 value : 1 ,
12111255 } ;
12121256
1213- handle_notification ( new_receipt_notification, escrow_accounts, Some ( & prefix) )
1214- . await
1215- . unwrap ( ) ;
1257+ handle_notification (
1258+ new_receipt_notification,
1259+ escrow_accounts,
1260+ SenderType :: Legacy ,
1261+ Some ( & prefix) ,
1262+ )
1263+ . await
1264+ . unwrap ( ) ;
12161265
12171266 assert_eq ! (
12181267 rx. recv( ) . await . unwrap( ) ,
0 commit comments