@@ -27,25 +27,50 @@ enum ProcessReceiptError {
2727 Both ( anyhow:: Error , anyhow:: Error ) ,
2828}
2929
30+ /// Indicates which versions of Receipts where processed
31+ /// It's intended to be used for migration tests
32+ #[ derive( Debug , PartialEq , Eq ) ]
33+ pub enum Processed {
34+ V1 ,
35+ V2 ,
36+ All ,
37+ None ,
38+ }
39+
3040impl InnerContext {
3141 async fn process_db_receipts (
3242 & self ,
3343 buffer : Vec < DatabaseReceipt > ,
34- ) -> Result < ( ) , ProcessReceiptError > {
44+ ) -> Result < Processed , ProcessReceiptError > {
3545 let ( v1_receipts, v2_receipts) : ( Vec < _ > , Vec < _ > ) =
3646 buffer. into_iter ( ) . partition_map ( |r| match r {
3747 DatabaseReceipt :: V1 ( db_receipt_v1) => Either :: Left ( db_receipt_v1) ,
3848 DatabaseReceipt :: V2 ( db_receipt_v2) => Either :: Right ( db_receipt_v2) ,
3949 } ) ;
40- let ( insert_v1, insert_v2) = tokio:: join!(
41- self . store_receipts_v1( v1_receipts) ,
42- self . store_receipts_v2( v2_receipts)
43- ) ;
50+
51+ let ( insert_v1, insert_v2) = match ( v1_receipts. is_empty ( ) , v2_receipts. is_empty ( ) ) {
52+ ( true , true ) => ( None , None ) ,
53+ ( false , true ) => ( Some ( self . store_receipts_v1 ( v1_receipts) . await ) , None ) ,
54+ ( true , false ) => ( None , Some ( self . store_receipts_v2 ( v2_receipts) . await ) ) ,
55+ ( false , false ) => {
56+ let ( v1, v2) = tokio:: join!(
57+ self . store_receipts_v1( v1_receipts) ,
58+ self . store_receipts_v2( v2_receipts) ,
59+ ) ;
60+ ( Some ( v1) , Some ( v2) )
61+ }
62+ } ;
63+
4464 match ( insert_v1, insert_v2) {
45- ( Err ( e1) , Err ( e2) ) => Err ( ProcessReceiptError :: Both ( e1. into ( ) , e2. into ( ) ) ) ,
46- ( Err ( e1) , _) => Err ( ProcessReceiptError :: V1 ( e1. into ( ) ) ) ,
47- ( _, Err ( e2) ) => Err ( ProcessReceiptError :: V2 ( e2. into ( ) ) ) ,
48- _ => Ok ( ( ) ) ,
65+ ( Some ( Err ( e1) ) , Some ( Err ( e2) ) ) => Err ( ProcessReceiptError :: Both ( e1. into ( ) , e2. into ( ) ) ) ,
66+ ( Some ( Err ( e1) ) , _) => Err ( ProcessReceiptError :: V1 ( e1. into ( ) ) ) ,
67+ ( _, Some ( Err ( e2) ) ) => Err ( ProcessReceiptError :: V2 ( e2. into ( ) ) ) ,
68+
69+ // only useful for testing
70+ ( Some ( Ok ( _) ) , None ) => Ok ( Processed :: V1 ) ,
71+ ( None , Some ( Ok ( _) ) ) => Ok ( Processed :: V2 ) ,
72+ ( Some ( Ok ( _) ) , Some ( Ok ( _) ) ) => Ok ( Processed :: All ) ,
73+ ( None , None ) => Ok ( Processed :: None ) ,
4974 }
5075 }
5176
@@ -305,3 +330,174 @@ impl DbReceiptV2 {
305330 } )
306331 }
307332}
333+
334+ #[ cfg( test) ]
335+ mod tests {
336+ use std:: { path:: PathBuf , sync:: LazyLock } ;
337+
338+ use futures:: future:: BoxFuture ;
339+ use sqlx:: {
340+ migrate:: { MigrationSource , Migrator } ,
341+ PgPool ,
342+ } ;
343+ use test_assets:: {
344+ create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest , INDEXER_ALLOCATIONS ,
345+ TAP_EIP712_DOMAIN ,
346+ } ;
347+
348+ use crate :: tap:: {
349+ receipt_store:: {
350+ DatabaseReceipt , DbReceiptV1 , DbReceiptV2 , InnerContext , ProcessReceiptError , Processed ,
351+ } ,
352+ AdapterError ,
353+ } ;
354+
355+ async fn create_v1 ( ) -> DatabaseReceipt {
356+ let alloc = INDEXER_ALLOCATIONS . values ( ) . next ( ) . unwrap ( ) . clone ( ) ;
357+ let v1 = create_signed_receipt (
358+ SignedReceiptRequest :: builder ( )
359+ . allocation_id ( alloc. id )
360+ . value ( 100 )
361+ . build ( ) ,
362+ )
363+ . await ;
364+ DatabaseReceipt :: V1 ( DbReceiptV1 :: from_receipt ( & v1, & TAP_EIP712_DOMAIN ) . unwrap ( ) )
365+ }
366+
367+ async fn create_v2 ( ) -> DatabaseReceipt {
368+ let v2 = create_signed_receipt_v2 ( ) . call ( ) . await ;
369+ DatabaseReceipt :: V2 ( DbReceiptV2 :: from_receipt ( & v2, & TAP_EIP712_DOMAIN ) . unwrap ( ) )
370+ }
371+
372+ mod when_all_migrations_are_run {
373+ use super :: * ;
374+
375+ #[ rstest:: rstest]
376+ #[ case( Processed :: None , async { vec![ ] } ) ]
377+ #[ case( Processed :: V1 , async { vec![ create_v1( ) . await ] } ) ]
378+ #[ case( Processed :: V2 , async { vec![ create_v2( ) . await ] } ) ]
379+ #[ case( Processed :: All , async { vec![ create_v2( ) . await , create_v1( ) . await ] } ) ]
380+ #[ sqlx:: test( migrations = "../../migrations" ) ]
381+ async fn v1_and_v2_are_processed_successfully (
382+ #[ ignore] pgpool : PgPool ,
383+ #[ case] expected : Processed ,
384+ #[ future( awt) ]
385+ #[ case]
386+ receipts : Vec < DatabaseReceipt > ,
387+ ) {
388+ let context = InnerContext { pgpool } ;
389+
390+ let res = context. process_db_receipts ( receipts) . await . unwrap ( ) ;
391+
392+ assert_eq ! ( res, expected) ;
393+ }
394+ }
395+
396+ mod when_horizon_migrations_are_ignored {
397+ use super :: * ;
398+
399+ #[ sqlx:: test( migrator = "WITHOUT_HORIZON_MIGRATIONS" ) ]
400+ async fn test_empty_receipts_are_processed_successfully ( pgpool : PgPool ) {
401+ let context = InnerContext { pgpool } ;
402+
403+ let res = context. process_db_receipts ( vec ! [ ] ) . await . unwrap ( ) ;
404+
405+ assert_eq ! ( res, Processed :: None ) ;
406+ }
407+
408+ #[ sqlx:: test( migrator = "WITHOUT_HORIZON_MIGRATIONS" ) ]
409+ async fn test_v1_receipts_are_processed_successfully ( pgpool : PgPool ) {
410+ let context = InnerContext { pgpool } ;
411+
412+ let v1 = create_v1 ( ) . await ;
413+ let receipts = vec ! [ v1] ;
414+
415+ let res = context. process_db_receipts ( receipts) . await . unwrap ( ) ;
416+
417+ assert_eq ! ( res, Processed :: V1 ) ;
418+ }
419+
420+ #[ rstest:: rstest]
421+ #[ case( async { vec![ create_v2( ) . await ] } ) ]
422+ #[ case( async { vec![ create_v2( ) . await , create_v1( ) . await ] } ) ]
423+ #[ sqlx:: test( migrator = "WITHOUT_HORIZON_MIGRATIONS" ) ]
424+ async fn test_cases_with_v2_receipts_fails_to_process (
425+ #[ ignore] pgpool : PgPool ,
426+ #[ future( awt) ]
427+ #[ case]
428+ receipts : Vec < DatabaseReceipt > ,
429+ ) {
430+ let context = InnerContext { pgpool } ;
431+
432+ let error = context. process_db_receipts ( receipts) . await . unwrap_err ( ) ;
433+
434+ let ProcessReceiptError :: V2 ( error) = error else {
435+ panic ! ( )
436+ } ;
437+ let d = error. downcast_ref :: < AdapterError > ( ) . unwrap ( ) . to_string ( ) ;
438+
439+ assert_eq ! (
440+ d,
441+ "error returned from database: relation \" tap_horizon_receipts\" does not exist"
442+ ) ;
443+ }
444+
445+ pub static WITHOUT_HORIZON_MIGRATIONS : LazyLock < Migrator > = LazyLock :: new ( create_migrator) ;
446+
447+ pub fn create_migrator ( ) -> Migrator {
448+ futures:: executor:: block_on ( Migrator :: new ( MigrationRunner :: new (
449+ "../../migrations" ,
450+ [ "horizon" ] ,
451+ ) ) )
452+ . unwrap ( )
453+ }
454+
455+ #[ derive( Debug ) ]
456+ pub struct MigrationRunner {
457+ migration_path : PathBuf ,
458+ ignored_migrations : Vec < String > ,
459+ }
460+
461+ impl MigrationRunner {
462+ /// Construct a new MigrationRunner that does not apply the given migrations.
463+ ///
464+ /// `ignored_migrations` is any iterable of strings that describes which
465+ /// migrations to be ignored.
466+ pub fn new < I > ( path : impl Into < PathBuf > , ignored_migrations : I ) -> Self
467+ where
468+ I : IntoIterator ,
469+ I :: Item : Into < String > ,
470+ {
471+ Self {
472+ migration_path : path. into ( ) ,
473+ ignored_migrations : ignored_migrations. into_iter ( ) . map ( Into :: into) . collect ( ) ,
474+ }
475+ }
476+ }
477+
478+ impl MigrationSource < ' static > for MigrationRunner {
479+ fn resolve (
480+ self ,
481+ ) -> BoxFuture < ' static , Result < Vec < sqlx:: migrate:: Migration > , sqlx:: error:: BoxDynError > >
482+ {
483+ Box :: pin ( async move {
484+ let canonical = self . migration_path . canonicalize ( ) ?;
485+ let migrations_with_paths =
486+ sqlx:: migrate:: resolve_blocking ( & canonical) . unwrap ( ) ;
487+
488+ let migrations_with_paths = migrations_with_paths
489+ . into_iter ( )
490+ . filter ( |( _, p) | {
491+ let path = p. to_str ( ) . unwrap ( ) ;
492+ self . ignored_migrations
493+ . iter ( )
494+ . any ( |ignored| !path. contains ( ignored) )
495+ } )
496+ . collect :: < Vec < _ > > ( ) ;
497+
498+ Ok ( migrations_with_paths. into_iter ( ) . map ( |( m, _p) | m) . collect ( ) )
499+ } )
500+ }
501+ }
502+ }
503+ }
0 commit comments