@@ -18,11 +18,13 @@ use ethers::{
1818 signers:: { coins_bip39:: English , LocalWallet , MnemonicBuilder , Signer } ,
1919 types:: { Address , H160 } ,
2020} ;
21- use jsonrpsee:: { core:: client:: ClientT , http_client:: HttpClientBuilder , server:: ServerHandle } ;
21+ use jsonrpsee:: {
22+ core:: client:: ClientT , http_client:: HttpClientBuilder , rpc_params, server:: ServerHandle ,
23+ } ;
2224use rand:: { rngs:: StdRng , Rng , SeedableRng } ;
2325use rstest:: * ;
2426
25- use tap_aggregator:: server as agg_server;
27+ use tap_aggregator:: { jsonrpsee_helpers , server as agg_server} ;
2628use tap_core:: {
2729 adapters:: {
2830 collateral_adapter_mock:: CollateralAdapterMock ,
@@ -31,8 +33,8 @@ use tap_core::{
3133 receipt_storage_adapter_mock:: ReceiptStorageAdapterMock ,
3234 } ,
3335 eip_712_signed_message:: EIP712SignedMessage ,
34- tap_receipt :: ReceiptCheck ,
35- tap_receipt:: { Receipt , ReceivedReceipt } ,
36+ tap_manager :: SignedRAV ,
37+ tap_receipt:: { Receipt , ReceiptCheck , ReceivedReceipt } ,
3638} ;
3739
3840use crate :: indexer_mock;
@@ -80,7 +82,7 @@ fn receipt_threshold_1(num_queries: u64, num_batches: u64) -> u64 {
8082// The number of receipts collected before Indexer 2 sends a RAV request
8183#[ fixture]
8284fn receipt_threshold_2 ( num_queries : u64 , num_batches : u64 ) -> u64 {
83- num_queries * num_batches
85+ num_queries * num_batches / 2
8486}
8587
8688// The private key (LocalWallet) and public key (Address) of a Gateway
@@ -279,6 +281,72 @@ async fn requests_2(
279281 Ok ( requests)
280282}
281283
284+ #[ fixture]
285+ async fn repeated_timestamp_request (
286+ keys_gateway : ( LocalWallet , Address ) ,
287+ query_price : Vec < u128 > ,
288+ allocation_ids : Vec < H160 > ,
289+ num_batches : u64 ,
290+ receipt_threshold_1 : u64 ,
291+ ) -> Result < Vec < ( EIP712SignedMessage < Receipt > , u64 ) > > {
292+ let ( gateway_key, _) = keys_gateway;
293+
294+ // Create signed receipts
295+ let mut requests =
296+ generate_requests ( query_price, num_batches, & gateway_key, allocation_ids[ 0 ] ) . await ?;
297+
298+ // Create a new receipt with the timestamp equal to the latest receipt in the first RAV request batch
299+ let repeat_timestamp = requests[ receipt_threshold_1 as usize - 1 ]
300+ . 0
301+ . message
302+ . timestamp_ns ;
303+ let target_receipt = & requests[ receipt_threshold_1 as usize ] . 0 . message ;
304+ let repeat_receipt = Receipt {
305+ allocation_id : target_receipt. allocation_id ,
306+ timestamp_ns : repeat_timestamp,
307+ nonce : target_receipt. nonce ,
308+ value : target_receipt. value ,
309+ } ;
310+
311+ // Sign the new receipt and insert it in the second batch
312+ requests[ receipt_threshold_1 as usize ] . 0 =
313+ EIP712SignedMessage :: new ( repeat_receipt, & gateway_key) . await ?;
314+ Ok ( requests)
315+ }
316+
317+ #[ fixture]
318+ async fn repeated_timestamp_incremented_by_one_request (
319+ keys_gateway : ( LocalWallet , Address ) ,
320+ query_price : Vec < u128 > ,
321+ allocation_ids : Vec < H160 > ,
322+ num_batches : u64 ,
323+ receipt_threshold_1 : u64 ,
324+ ) -> Result < Vec < ( EIP712SignedMessage < Receipt > , u64 ) > > {
325+ let ( gateway_key, _) = keys_gateway;
326+ // Create your Receipt here
327+ let mut requests =
328+ generate_requests ( query_price, num_batches, & gateway_key, allocation_ids[ 0 ] ) . await ?;
329+
330+ // Create a new receipt with the timestamp equal to the latest receipt timestamp+1 in the first RAV request batch
331+ let repeat_timestamp = requests[ receipt_threshold_1 as usize - 1 ]
332+ . 0
333+ . message
334+ . timestamp_ns
335+ + 1 ;
336+ let target_receipt = & requests[ receipt_threshold_1 as usize ] . 0 . message ;
337+ let repeat_receipt = Receipt {
338+ allocation_id : target_receipt. allocation_id ,
339+ timestamp_ns : repeat_timestamp,
340+ nonce : target_receipt. nonce ,
341+ value : target_receipt. value ,
342+ } ;
343+
344+ // Sign the new receipt and insert it in the second batch
345+ requests[ receipt_threshold_1 as usize ] . 0 =
346+ EIP712SignedMessage :: new ( repeat_receipt, & gateway_key) . await ?;
347+ Ok ( requests)
348+ }
349+
282350#[ fixture]
283351async fn wrong_requests (
284352 wrong_keys_gateway : ( LocalWallet , Address ) ,
@@ -634,6 +702,175 @@ async fn test_manager_wrong_requestor_keys(
634702 Ok ( ( ) )
635703}
636704
705+ #[ rstest]
706+ #[ tokio:: test]
707+ async fn test_tap_manager_rav_timestamp_cuttoff (
708+ #[ future] two_indexers_test_servers : Result <
709+ (
710+ ServerHandle ,
711+ SocketAddr ,
712+ ServerHandle ,
713+ SocketAddr ,
714+ ServerHandle ,
715+ SocketAddr ,
716+ ) ,
717+ Error ,
718+ > ,
719+ #[ future] repeated_timestamp_request : Result < Vec < ( EIP712SignedMessage < Receipt > , u64 ) > > ,
720+ #[ future] repeated_timestamp_incremented_by_one_request : Result <
721+ Vec < ( EIP712SignedMessage < Receipt > , u64 ) > ,
722+ > ,
723+ receipt_threshold_1 : u64 ,
724+ ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
725+ // This test checks that tap_core is correctly filtering receipts by timestamp.
726+ let (
727+ server_handle_1,
728+ socket_addr_1,
729+ _server_handle_2,
730+ socket_addr_2,
731+ _gateway_handle,
732+ _gateway_addr,
733+ ) = two_indexers_test_servers. await ?;
734+
735+ let indexer_1_address = "http://" . to_string ( ) + & socket_addr_1. to_string ( ) ;
736+ let indexer_2_address = "http://" . to_string ( ) + & socket_addr_2. to_string ( ) ;
737+ let client_1 = HttpClientBuilder :: default ( ) . build ( indexer_1_address) ?;
738+ let client_2 = HttpClientBuilder :: default ( ) . build ( indexer_2_address) ?;
739+ let requests = repeated_timestamp_request. await ?;
740+
741+ let mut counter = 1 ;
742+ for ( receipt_1, id) in requests {
743+ let result = client_1. request ( "request" , ( id, receipt_1) ) . await ;
744+
745+ // The first receipt in the second batch has the same timestamp as the last receipt in the first batch.
746+ // TAP manager should ignore this receipt when creating the second RAV request.
747+ // The indexer_mock will throw an error if the number of receipts in RAV request is less than the expected number.
748+ // An error is expected when requesting the second RAV.
749+ if counter == 2 * receipt_threshold_1 {
750+ match result {
751+ Ok ( ( ) ) => panic ! ( "Should have failed RAV request" ) ,
752+ Err ( _) => { }
753+ }
754+ } else {
755+ match result {
756+ Ok ( ( ) ) => { }
757+ Err ( e) => panic ! ( "Error making receipt request: {:?}" , e) ,
758+ }
759+ }
760+ counter += 1 ;
761+ }
762+
763+ server_handle_1. stop ( ) ?;
764+
765+ // Here the timestamp first receipt in the second batch is equal to timestamp + 1 of the last receipt in the first batch.
766+ // No errors are expected.
767+ let requests = repeated_timestamp_incremented_by_one_request. await ?;
768+ for ( receipt_1, id) in requests {
769+ let result = client_2. request ( "request" , ( id, receipt_1) ) . await ;
770+ match result {
771+ Ok ( ( ) ) => { }
772+ Err ( e) => panic ! ( "Error making receipt request: {:?}" , e) ,
773+ }
774+ }
775+ Ok ( ( ) )
776+ }
777+
778+ #[ rstest]
779+ #[ tokio:: test]
780+ async fn test_tap_aggregator_rav_timestamp_cuttoff (
781+ keys_gateway : ( LocalWallet , Address ) ,
782+ http_request_size_limit : u32 ,
783+ http_response_size_limit : u32 ,
784+ http_max_concurrent_connections : u32 ,
785+ #[ future] repeated_timestamp_request : Result < Vec < ( EIP712SignedMessage < Receipt > , u64 ) > > ,
786+ #[ future] repeated_timestamp_incremented_by_one_request : Result <
787+ Vec < ( EIP712SignedMessage < Receipt > , u64 ) > ,
788+ > ,
789+ receipt_threshold_1 : u64 ,
790+ ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
791+ // This test checks that tap_aggregator is correctly rejecting receipts with invalid timestamps
792+ let ( gateway_handle, gateway_addr) = start_gateway_aggregator (
793+ keys_gateway,
794+ http_request_size_limit,
795+ http_response_size_limit,
796+ http_max_concurrent_connections,
797+ )
798+ . await ?;
799+ let client =
800+ HttpClientBuilder :: default ( ) . build ( format ! ( "http://{}" , gateway_addr. to_string( ) ) ) ?;
801+
802+ // This is the first part of the test, two batches of receipts are sent to the aggregator.
803+ // The second batch has one receipt with the same timestamp as the latest receipt in the first batch.
804+ // The first RAV will have the same timestamp as one receipt in the second batch.
805+ // tap_aggregator should reject the second RAV request due to the repeated timestamp.
806+ let requests = repeated_timestamp_request. await ?;
807+ let first_batch = & requests[ 0 ..receipt_threshold_1 as usize ] ;
808+ let second_batch = & requests[ receipt_threshold_1 as usize ..2 * receipt_threshold_1 as usize ] ;
809+
810+ let receipts = first_batch
811+ . iter ( )
812+ . map ( |( r, _) | r. clone ( ) )
813+ . collect :: < Vec < _ > > ( ) ;
814+ let params = rpc_params ! ( & aggregate_server_api_version( ) , & receipts, None :: <( ) >) ;
815+ let first_rav_response: jsonrpsee_helpers:: JsonRpcResponse < SignedRAV > =
816+ client. request ( "aggregate_receipts" , params) . await ?;
817+
818+ let receipts = second_batch
819+ . iter ( )
820+ . map ( |( r, _) | r. clone ( ) )
821+ . collect :: < Vec < _ > > ( ) ;
822+ let params = rpc_params ! (
823+ & aggregate_server_api_version( ) ,
824+ & receipts,
825+ first_rav_response. data
826+ ) ;
827+ let second_rav_response: Result <
828+ jsonrpsee_helpers:: JsonRpcResponse < SignedRAV > ,
829+ jsonrpsee:: core:: Error ,
830+ > = client. request ( "aggregate_receipts" , params) . await ;
831+ match second_rav_response {
832+ Ok ( _) => panic ! ( "Should have failed RAV request" ) ,
833+ Err ( _) => { }
834+ }
835+
836+ // This is the second part of the test, two batches of receipts are sent to the aggregator.
837+ // The second batch has one receipt with the timestamp = timestamp+1 of the latest receipt in the first batch.
838+ // tap_aggregator should accept the second RAV request.
839+ let requests = repeated_timestamp_incremented_by_one_request. await ?;
840+ let first_batch = & requests[ 0 ..receipt_threshold_1 as usize ] ;
841+ let second_batch = & requests[ receipt_threshold_1 as usize ..2 * receipt_threshold_1 as usize ] ;
842+
843+ let receipts = first_batch
844+ . iter ( )
845+ . map ( |( r, _) | r. clone ( ) )
846+ . collect :: < Vec < _ > > ( ) ;
847+ let params = rpc_params ! ( & aggregate_server_api_version( ) , & receipts, None :: <( ) >) ;
848+ let first_rav_response: jsonrpsee_helpers:: JsonRpcResponse < SignedRAV > =
849+ client. request ( "aggregate_receipts" , params) . await ?;
850+
851+ let receipts = second_batch
852+ . iter ( )
853+ . map ( |( r, _) | r. clone ( ) )
854+ . collect :: < Vec < _ > > ( ) ;
855+ let params = rpc_params ! (
856+ & aggregate_server_api_version( ) ,
857+ & receipts,
858+ first_rav_response. data
859+ ) ;
860+ let second_rav_response: jsonrpsee_helpers:: JsonRpcResponse < SignedRAV > =
861+ client. request ( "aggregate_receipts" , params) . await ?;
862+
863+ // Compute the expected aggregate value and check that it matches the latest RAV.
864+ let mut expected_value = 0 ;
865+ for ( receipt, _) in first_batch. iter ( ) . chain ( second_batch. iter ( ) ) {
866+ expected_value += receipt. message . value ;
867+ }
868+ assert ! ( expected_value == second_rav_response. data. message. value_aggregate) ;
869+
870+ gateway_handle. stop ( ) ?;
871+ Ok ( ( ) )
872+ }
873+
637874async fn generate_requests (
638875 query_price : Vec < u128 > ,
639876 num_batches : u64 ,
0 commit comments