@@ -7,7 +7,7 @@ use itertools::{Either, Itertools};
77use sqlx:: { types:: BigDecimal , PgPool } ;
88use tap_core:: { manager:: adapters:: ReceiptStore , receipt:: WithValueAndTimestamp } ;
99use thegraph_core:: alloy:: { hex:: ToHexExt , sol_types:: Eip712Domain } ;
10- use tokio:: { sync:: mpsc:: Receiver , task:: JoinHandle } ;
10+ use tokio:: { sync:: mpsc:: Receiver , sync :: oneshot :: Sender as OneShotSender , task:: JoinHandle } ;
1111use tokio_util:: sync:: CancellationToken ;
1212
1313use super :: { AdapterError , CheckingReceipt , IndexerTapContext , TapReceipt } ;
@@ -40,19 +40,28 @@ pub enum ProcessedReceipt {
4040impl InnerContext {
4141 async fn process_db_receipts (
4242 & self ,
43- buffer : Vec < DatabaseReceipt > ,
43+ buffer : Vec < ( DatabaseReceipt , OneShotSender < Result < ( ) , AdapterError > > ) > ,
4444 ) -> Result < ProcessedReceipt , ProcessReceiptError > {
45- let ( v1_receipts, v2_receipts) : ( Vec < _ > , Vec < _ > ) =
46- buffer. into_iter ( ) . partition_map ( |r| match r {
47- DatabaseReceipt :: V1 ( db_receipt_v1) => Either :: Left ( db_receipt_v1) ,
48- DatabaseReceipt :: V2 ( db_receipt_v2) => Either :: Right ( db_receipt_v2) ,
49- } ) ;
45+ let ( v1_data, v2_data) : ( Vec < _ > , Vec < _ > ) =
46+ buffer
47+ . into_iter ( )
48+ . partition_map ( |( receipt, sender) | match receipt {
49+ DatabaseReceipt :: V1 ( receipt) => Either :: Left ( ( receipt, sender) ) ,
50+ DatabaseReceipt :: V2 ( receipt) => Either :: Right ( ( receipt, sender) ) ,
51+ } ) ;
52+
53+ let ( v1_receipts, v1_senders) : ( Vec < _ > , Vec < _ > ) = v1_data. into_iter ( ) . unzip ( ) ;
54+ let ( v2_receipts, v2_senders) : ( Vec < _ > , Vec < _ > ) = v2_data. into_iter ( ) . unzip ( ) ;
5055
5156 let ( insert_v1, insert_v2) = tokio:: join!(
5257 self . store_receipts_v1( v1_receipts) ,
5358 self . store_receipts_v2( v2_receipts) ,
5459 ) ;
5560
61+ // send back the result of storing receipts to callers
62+ Self :: notify_senders ( v1_senders, & insert_v1, "V1" ) ;
63+ Self :: notify_senders ( v2_senders, & insert_v2, "V2" ) ;
64+
5665 match ( insert_v1, insert_v2) {
5766 ( Err ( e1) , Err ( e2) ) => Err ( ProcessReceiptError :: Both ( e1. into ( ) , e2. into ( ) ) ) ,
5867
@@ -66,6 +75,29 @@ impl InnerContext {
6675 }
6776 }
6877
78+ fn notify_senders (
79+ senders : Vec < OneShotSender < Result < ( ) , AdapterError > > > ,
80+ result : & Result < Option < u64 > , AdapterError > ,
81+ version : & str ,
82+ ) {
83+ match result {
84+ Ok ( _) => {
85+ for sender in senders {
86+ let _ = sender. send ( Ok ( ( ) ) ) ;
87+ }
88+ }
89+ Err ( e) => {
90+ // Create error message once
91+ let err_msg = format ! ( "Failed to store {} receipts: {}" , version, e) ;
92+ tracing:: error!( "{}" , err_msg) ;
93+ for sender in senders {
94+ // Convert to AdapterError for each sender
95+ let _ = sender. send ( Err ( anyhow ! ( err_msg. clone( ) ) . into ( ) ) ) ;
96+ }
97+ }
98+ }
99+ }
100+
69101 async fn store_receipts_v1 (
70102 & self ,
71103 receipts : Vec < DbReceiptV1 > ,
@@ -197,7 +229,7 @@ impl InnerContext {
197229impl IndexerTapContext {
198230 pub fn spawn_store_receipt_task (
199231 inner_context : InnerContext ,
200- mut receiver : Receiver < DatabaseReceipt > ,
232+ mut receiver : Receiver < ( DatabaseReceipt , OneShotSender < Result < ( ) , AdapterError > > ) > ,
201233 cancelation_token : CancellationToken ,
202234 ) -> JoinHandle < ( ) > {
203235 const BUFFER_SIZE : usize = 100 ;
@@ -224,13 +256,19 @@ impl ReceiptStore<TapReceipt> for IndexerTapContext {
224256
225257 async fn store_receipt ( & self , receipt : CheckingReceipt ) -> Result < u64 , Self :: AdapterError > {
226258 let db_receipt = DatabaseReceipt :: from_receipt ( receipt, & self . domain_separator ) ?;
227- self . receipt_producer . send ( db_receipt) . await . map_err ( |e| {
228- tracing:: error!( "Failed to queue receipt for storage: {}" , e) ;
229- anyhow ! ( e)
230- } ) ?;
259+ let ( result_tx, result_rx) = tokio:: sync:: oneshot:: channel ( ) ;
260+ self . receipt_producer
261+ . send ( ( db_receipt, result_tx) )
262+ . await
263+ . map_err ( |e| {
264+ tracing:: error!( "Failed to queue receipt for storage: {}" , e) ;
265+ anyhow ! ( e)
266+ } ) ?;
267+
268+ let res = result_rx. await . map_err ( |e| anyhow ! ( e) ) ?;
231269
232270 // We don't need receipt_ids
233- Ok ( 0 )
271+ res . map ( |_| 0 )
234272 }
235273}
236274
@@ -374,6 +412,23 @@ mod tests {
374412 DatabaseReceipt :: V2 ( DbReceiptV2 :: from_receipt ( & v2, & TAP_EIP712_DOMAIN ) . unwrap ( ) )
375413 }
376414
415+ pub type VecReceiptTx = Vec < (
416+ DatabaseReceipt ,
417+ tokio:: sync:: oneshot:: Sender < Result < ( ) , AdapterError > > ,
418+ ) > ;
419+ pub type VecRx = Vec < tokio:: sync:: oneshot:: Receiver < Result < ( ) , AdapterError > > > ;
420+
421+ pub fn attach_oneshot_channels ( receipts : Vec < DatabaseReceipt > ) -> ( VecReceiptTx , VecRx ) {
422+ let mut txs = Vec :: with_capacity ( receipts. len ( ) ) ;
423+ let mut rxs = Vec :: with_capacity ( receipts. len ( ) ) ;
424+ for r in receipts. into_iter ( ) {
425+ let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
426+ txs. push ( ( r, tx) ) ;
427+ rxs. push ( rx) ;
428+ }
429+ ( txs, rxs)
430+ }
431+
377432 mod when_all_migrations_are_run {
378433 use super :: * ;
379434
@@ -391,6 +446,7 @@ mod tests {
391446 receipts : Vec < DatabaseReceipt > ,
392447 ) {
393448 let context = InnerContext { pgpool } ;
449+ let ( receipts, _rxs) = attach_oneshot_channels ( receipts) ;
394450
395451 let res = context. process_db_receipts ( receipts) . await . unwrap ( ) ;
396452
@@ -415,7 +471,9 @@ mod tests {
415471 let context = InnerContext { pgpool } ;
416472
417473 let v1 = create_v1 ( ) . await ;
474+
418475 let receipts = vec ! [ v1] ;
476+ let ( receipts, _rxs) = attach_oneshot_channels ( receipts) ;
419477
420478 let res = context. process_db_receipts ( receipts) . await . unwrap ( ) ;
421479
@@ -434,6 +492,7 @@ mod tests {
434492 ) {
435493 let context = InnerContext { pgpool } ;
436494
495+ let ( receipts, _rxs) = attach_oneshot_channels ( receipts) ;
437496 let error = context. process_db_receipts ( receipts) . await . unwrap_err ( ) ;
438497
439498 let ProcessReceiptError :: V2 ( error) = error else {
0 commit comments