@@ -12,7 +12,10 @@ use tap_core::manager::adapters::{RavRead, RavStore};
1212use tap_graph:: { ReceiptAggregateVoucher , SignedRav } ;
1313#[ allow( deprecated) ]
1414use thegraph_core:: alloy:: signers:: Signature ;
15- use thegraph_core:: alloy:: { hex:: ToHexExt , primitives:: Address } ;
15+ use thegraph_core:: alloy:: {
16+ hex:: ToHexExt ,
17+ primitives:: { Address , Bytes } ,
18+ } ;
1619
1720use super :: { error:: AdapterError , Horizon , Legacy , TapAgentContext } ;
1821
@@ -149,7 +152,113 @@ impl RavRead<tap_graph::v2::ReceiptAggregateVoucher> for TapAgentContext<Horizon
149152 type AdapterError = AdapterError ;
150153
151154 async fn last_rav ( & self ) -> Result < Option < tap_graph:: v2:: SignedRav > , Self :: AdapterError > {
152- unimplemented ! ( )
155+ // TODO add data service filter
156+ let row = sqlx:: query!(
157+ r#"
158+ SELECT
159+ signature,
160+ allocation_id,
161+ payer,
162+ data_service,
163+ service_provider,
164+ timestamp_ns,
165+ value_aggregate,
166+ metadata
167+ FROM tap_horizon_ravs
168+ WHERE
169+ allocation_id = $1
170+ AND payer = $2
171+ AND service_provider = $3
172+ "# ,
173+ self . allocation_id. encode_hex( ) ,
174+ self . sender. encode_hex( ) ,
175+ self . indexer_address. encode_hex( )
176+ )
177+ . fetch_optional ( & self . pgpool )
178+ . await
179+ . map_err ( |e| AdapterError :: RavRead {
180+ error : e. to_string ( ) ,
181+ } ) ?;
182+
183+ match row {
184+ Some ( row) => {
185+ #[ allow( deprecated) ]
186+ let signature: Signature =
187+ row. signature
188+ . as_slice ( )
189+ . try_into ( )
190+ . map_err ( |e| AdapterError :: RavRead {
191+ error : format ! (
192+ "Error decoding signature while retrieving RAV from database: {}" ,
193+ e
194+ ) ,
195+ } ) ?;
196+ let allocation_id =
197+ Address :: from_str ( & row. allocation_id ) . map_err ( |e| AdapterError :: RavRead {
198+ error : format ! (
199+ "Error decoding allocation_id while retrieving RAV from database: {}" ,
200+ e
201+ ) ,
202+ } ) ?;
203+
204+ let payer = Address :: from_str ( & row. payer ) . map_err ( |e| AdapterError :: RavRead {
205+ error : format ! (
206+ "Error decoding payer while retrieving receipt from database: {}" ,
207+ e
208+ ) ,
209+ } ) ?;
210+
211+ let data_service = Address :: from_str ( & row. data_service ) . map_err ( |e| {
212+ AdapterError :: RavRead {
213+ error : format ! (
214+ "Error decoding data_service while retrieving receipt from database: {}" ,
215+ e
216+ ) ,
217+ }
218+ } ) ?;
219+
220+ let service_provider = Address :: from_str ( & row. service_provider ) . map_err ( |e| {
221+ AdapterError :: RavRead {
222+ error : format ! (
223+ "Error decoding service_provider while retrieving receipt from database: {}" ,
224+ e
225+ ) ,
226+ }
227+ } ) ?;
228+
229+ let metadata = Bytes :: from ( row. metadata ) ;
230+
231+ let timestamp_ns = row. timestamp_ns . to_u64 ( ) . ok_or ( AdapterError :: RavRead {
232+ error : "Error decoding timestamp_ns while retrieving RAV from database"
233+ . to_string ( ) ,
234+ } ) ?;
235+ let value_aggregate = row
236+ . value_aggregate
237+ // Beware, BigDecimal::to_u128() actually uses to_u64() under the hood.
238+ // So we're converting to BigInt to get a proper implementation of to_u128().
239+ . to_bigint ( )
240+ . and_then ( |v| v. to_u128 ( ) )
241+ . ok_or ( AdapterError :: RavRead {
242+ error : "Error decoding value_aggregate while retrieving RAV from database"
243+ . to_string ( ) ,
244+ } ) ?;
245+
246+ let rav = tap_graph:: v2:: ReceiptAggregateVoucher {
247+ allocationId : allocation_id,
248+ timestampNs : timestamp_ns,
249+ valueAggregate : value_aggregate,
250+ dataService : data_service,
251+ serviceProvider : service_provider,
252+ payer,
253+ metadata,
254+ } ;
255+ Ok ( Some ( tap_graph:: v2:: SignedRav {
256+ message : rav,
257+ signature,
258+ } ) )
259+ }
260+ None => Ok ( None ) ,
261+ }
153262 }
154263}
155264
@@ -164,74 +273,145 @@ impl RavStore<tap_graph::v2::ReceiptAggregateVoucher> for TapAgentContext<Horizo
164273
165274 async fn update_last_rav (
166275 & self ,
167- _rav : tap_graph:: v2:: SignedRav ,
276+ rav : tap_graph:: v2:: SignedRav ,
168277 ) -> Result < ( ) , Self :: AdapterError > {
169- unimplemented ! ( )
278+ let signature_bytes: Vec < u8 > = rav. signature . as_bytes ( ) . to_vec ( ) ;
279+
280+ let _fut = sqlx:: query!(
281+ r#"
282+ INSERT INTO tap_horizon_ravs (
283+ payer,
284+ data_service,
285+ service_provider,
286+ metadata,
287+ signature,
288+ allocation_id,
289+ timestamp_ns,
290+ value_aggregate,
291+ created_at,
292+ updated_at
293+ )
294+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9)
295+ ON CONFLICT (payer, data_service, service_provider, allocation_id)
296+ DO UPDATE SET
297+ signature = $5,
298+ timestamp_ns = $7,
299+ value_aggregate = $8,
300+ updated_at = $9
301+ "# ,
302+ rav. message. payer. encode_hex( ) ,
303+ rav. message. dataService. encode_hex( ) ,
304+ rav. message. serviceProvider. encode_hex( ) ,
305+ rav. message. metadata. as_ref( ) ,
306+ signature_bytes,
307+ rav. message. allocationId. encode_hex( ) ,
308+ BigDecimal :: from( rav. message. timestampNs) ,
309+ BigDecimal :: from( BigInt :: from( rav. message. valueAggregate) ) ,
310+ chrono:: Utc :: now( )
311+ )
312+ . execute ( & self . pgpool )
313+ . await
314+ . map_err ( |e| AdapterError :: RavStore {
315+ error : e. to_string ( ) ,
316+ } ) ?;
317+ Ok ( ( ) )
170318 }
171319}
172320
173321#[ cfg( test) ]
174322mod test {
323+
175324 use indexer_monitor:: EscrowAccounts ;
325+ use rstest:: rstest;
176326 use sqlx:: PgPool ;
327+ use tap_core:: signed_message:: Eip712SignedMessage ;
177328 use test_assets:: { TAP_SENDER as SENDER , TAP_SIGNER as SIGNER } ;
178329 use tokio:: sync:: watch;
179330
180331 use super :: * ;
181- use crate :: test:: { create_rav, ALLOCATION_ID_0 , INDEXER } ;
332+ use crate :: {
333+ tap:: context:: NetworkVersion ,
334+ test:: { CreateRav , ALLOCATION_ID_0 , INDEXER } ,
335+ } ;
182336
183337 #[ derive( Debug ) ]
184- struct TestableRav ( SignedRav ) ;
338+ struct TestableRav < T : NetworkVersion > ( Eip712SignedMessage < T :: Rav > ) ;
185339
186- impl Eq for TestableRav { }
340+ impl < T : NetworkVersion > Eq for TestableRav < T > { }
187341
188- impl PartialEq for TestableRav {
342+ impl < T : NetworkVersion > PartialEq for TestableRav < T > {
189343 fn eq ( & self , other : & Self ) -> bool {
190344 self . 0 . message == other. 0 . message
191345 && self . 0 . signature . as_bytes ( ) == other. 0 . signature . as_bytes ( )
192346 }
193347 }
194348
195- # [ sqlx :: test ( migrations = "../../migrations" ) ]
196- async fn update_and_retrieve_rav ( pool : PgPool ) {
197- let timestamp_ns = u64 :: MAX - 10 ;
198- let value_aggregate = u128 :: MAX ;
199- let context = TapAgentContext :: new (
200- pool . clone ( ) ,
349+ const TIMESTAMP_NS : u64 = u64 :: MAX - 10 ;
350+ const VALUE_AGGREGATE : u128 = u128 :: MAX ;
351+
352+ async fn legacy_adapter ( pgpool : PgPool ) -> TapAgentContext < Legacy > {
353+ TapAgentContext :: new (
354+ pgpool ,
201355 ALLOCATION_ID_0 ,
202356 INDEXER . 1 ,
203357 SENDER . 1 ,
204358 watch:: channel ( EscrowAccounts :: default ( ) ) . 1 ,
205- ) ;
359+ )
360+ }
361+
362+ async fn horizon_adapter ( pgpool : PgPool ) -> TapAgentContext < Horizon > {
363+ TapAgentContext :: new (
364+ pgpool,
365+ ALLOCATION_ID_0 ,
366+ INDEXER . 1 ,
367+ SENDER . 1 ,
368+ watch:: channel ( EscrowAccounts :: default ( ) ) . 1 ,
369+ )
370+ }
206371
372+ /// Insert a single receipt and retrieve it from the database using the adapter.
373+ /// The point here it to test the deserialization of large numbers.
374+ #[ rstest]
375+ #[ case( legacy_adapter( _pgpool. clone( ) ) ) ]
376+ #[ case( horizon_adapter( _pgpool. clone( ) ) ) ]
377+ #[ sqlx:: test( migrations = "../../migrations" ) ]
378+ async fn update_and_retrieve_rav < T > (
379+ #[ ignore] _pgpool : PgPool ,
380+ #[ case]
381+ #[ future( awt) ]
382+ context : TapAgentContext < T > ,
383+ ) where
384+ T : CreateRav + std:: fmt:: Debug ,
385+ TapAgentContext < T > : RavRead < T :: Rav > + RavStore < T :: Rav > ,
386+ {
207387 // Insert a rav
208- let mut new_rav = create_rav (
388+ let mut new_rav = T :: create_rav (
209389 ALLOCATION_ID_0 ,
210390 SIGNER . 0 . clone ( ) ,
211- timestamp_ns ,
212- value_aggregate ,
391+ TIMESTAMP_NS ,
392+ VALUE_AGGREGATE ,
213393 ) ;
214394 context. update_last_rav ( new_rav. clone ( ) ) . await . unwrap ( ) ;
215395
216396 // Should trigger a retrieve_last_rav So eventually the last rav should be the one
217397 // we inserted
218398 let last_rav = context. last_rav ( ) . await . unwrap ( ) . unwrap ( ) ;
219399
220- assert_eq ! ( TestableRav ( new_rav. clone( ) ) , TestableRav ( last_rav) ) ;
400+ assert_eq ! ( TestableRav :: < T > ( new_rav. clone( ) ) , TestableRav ( last_rav) ) ;
221401
222402 // Update the RAV 3 times in quick succession
223403 for i in 0 ..3 {
224- new_rav = create_rav (
404+ new_rav = T :: create_rav (
225405 ALLOCATION_ID_0 ,
226406 SIGNER . 0 . clone ( ) ,
227- timestamp_ns + i,
228- value_aggregate - ( i as u128 ) ,
407+ TIMESTAMP_NS + i,
408+ VALUE_AGGREGATE - ( i as u128 ) ,
229409 ) ;
230410 context. update_last_rav ( new_rav. clone ( ) ) . await . unwrap ( ) ;
231411 }
232412
233413 // Check that the last rav is the last one we inserted
234414 let last_rav = context. last_rav ( ) . await . unwrap ( ) ;
235- assert_eq ! ( TestableRav ( new_rav) , TestableRav ( last_rav. unwrap( ) ) ) ;
415+ assert_eq ! ( TestableRav :: < T > ( new_rav) , TestableRav ( last_rav. unwrap( ) ) ) ;
236416 }
237417}
0 commit comments