@@ -26,10 +26,7 @@ use tap_core::{
2626 } ,
2727 signed_message:: Eip712SignedMessage ,
2828} ;
29- use thegraph_core:: {
30- alloy:: { hex:: ToHexExt , primitives:: Address , sol_types:: Eip712Domain } ,
31- CollectionId ,
32- } ;
29+ use thegraph_core:: alloy:: { hex:: ToHexExt , primitives:: Address , sol_types:: Eip712Domain } ;
3330use thiserror:: Error ;
3431use tokio:: sync:: watch:: Receiver ;
3532
@@ -208,6 +205,10 @@ pub struct SenderAllocationState<T: NetworkVersion> {
208205 timestamp_buffer_ns : u64 ,
209206 /// Limit of receipts sent in a Rav Request
210207 rav_request_receipt_limit : u64 ,
208+ /// Data service address for Horizon mode
209+ /// - None for Legacy mode
210+ /// - Some(SubgraphService address) for Horizon mode from config
211+ data_service : Option < Address > ,
211212}
212213
213214/// Configuration derived from config.toml
@@ -559,6 +560,14 @@ where
559560 CheckList :: new ( required_checks) ,
560561 ) ;
561562
563+ // Extract data_service from config based on TapMode
564+ let data_service = match & config. tap_mode {
565+ indexer_config:: TapMode :: Legacy => None ,
566+ indexer_config:: TapMode :: Horizon {
567+ subgraph_service_address,
568+ } => Some ( * subgraph_service_address) ,
569+ } ;
570+
562571 Ok ( Self {
563572 pgpool,
564573 tap_manager,
@@ -574,6 +583,7 @@ where
574583 sender_aggregator,
575584 rav_request_receipt_limit : config. rav_request_receipt_limit ,
576585 timestamp_buffer_ns : config. timestamp_buffer_ns ,
586+ data_service,
577587 } )
578588 }
579589
@@ -1271,7 +1281,8 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
12711281 "# ,
12721282 BigDecimal :: from( min_timestamp) ,
12731283 BigDecimal :: from( max_timestamp) ,
1274- CollectionId :: from( self . allocation_id) . encode_hex( ) ,
1284+ // self.allocation_id is already a CollectionId in Horizon state
1285+ self . allocation_id. encode_hex( ) ,
12751286 self . indexer_address. encode_hex( ) ,
12761287 & signers,
12771288 )
@@ -1296,7 +1307,8 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
12961307 collection_id = $1
12971308 AND signer_address IN (SELECT unnest($2::text[]))
12981309 "# ,
1299- CollectionId :: from( self . allocation_id) . encode_hex( ) ,
1310+ // self.allocation_id is already a CollectionId in Horizon state
1311+ self . allocation_id. encode_hex( ) ,
13001312 & signers
13011313 )
13021314 . fetch_one ( & self . pgpool )
@@ -1345,7 +1357,8 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
13451357 AND signer_address IN (SELECT unnest($4::text[]))
13461358 AND timestamp_ns > $5
13471359 "# ,
1348- CollectionId :: from( self . allocation_id) . encode_hex( ) ,
1360+ // self.allocation_id is already a CollectionId in Horizon state
1361+ self . allocation_id. encode_hex( ) ,
13491362 self . indexer_address. encode_hex( ) ,
13501363 last_id,
13511364 & signers,
@@ -1395,10 +1408,15 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
13951408 collection_id = $1
13961409 AND payer = $2
13971410 AND service_provider = $3
1411+ AND data_service = $4
13981412 "# ,
1399- CollectionId :: from( self . allocation_id) . encode_hex( ) ,
1413+ // self.allocation_id is already a CollectionId in Horizon state
1414+ self . allocation_id. encode_hex( ) ,
14001415 self . sender. encode_hex( ) ,
14011416 self . indexer_address. encode_hex( ) ,
1417+ self . data_service
1418+ . expect( "data_service should be available in Horizon mode" )
1419+ . encode_hex( ) ,
14021420 )
14031421 . execute ( & self . pgpool )
14041422 . await ?;
0 commit comments