diff --git a/Cargo.lock b/Cargo.lock index ddbd7d70a..81f2cf02b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3897,9 +3897,8 @@ dependencies = [ "indexer-attestation", "indexer-query", "indexer-watcher", - "rstest 0.24.0", "reqwest 0.12.13", - + "rstest 0.24.0", "serde", "serde_json", "test-assets", @@ -3951,6 +3950,7 @@ dependencies = [ "build-info-build", "clap", "cost-model", + "futures", "governor", "graphql 0.3.0", "graphql_client", diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index 3a5aea43c..0ba991988 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -63,6 +63,7 @@ pin-project = "1.1.7" tonic.workspace = true itertools = "0.14.0" + [dev-dependencies] hex-literal = "0.4.1" test-assets = { path = "../test-assets" } @@ -74,6 +75,7 @@ tokio-test = "0.4.4" wiremock.workspace = true insta = "1.41.1" test-log.workspace = true +futures = "0.3.31" [build-dependencies] build-info-build = { version = "0.0.40", default-features = false } diff --git a/crates/service/src/tap/receipt_store.rs b/crates/service/src/tap/receipt_store.rs index 0198aaa35..3efa63ccf 100644 --- a/crates/service/src/tap/receipt_store.rs +++ b/crates/service/src/tap/receipt_store.rs @@ -27,29 +27,49 @@ enum ProcessReceiptError { Both(anyhow::Error, anyhow::Error), } +/// Indicates which versions of Receipts where processed +/// It's intended to be used for migration tests +#[derive(Debug, PartialEq, Eq)] +pub enum ProcessedReceipt { + V1, + V2, + Both, + None, +} + impl InnerContext { async fn process_db_receipts( &self, buffer: Vec, - ) -> Result<(), ProcessReceiptError> { + ) -> Result { let (v1_receipts, v2_receipts): (Vec<_>, Vec<_>) = buffer.into_iter().partition_map(|r| match r { DatabaseReceipt::V1(db_receipt_v1) => Either::Left(db_receipt_v1), DatabaseReceipt::V2(db_receipt_v2) => Either::Right(db_receipt_v2), }); + let (insert_v1, insert_v2) = tokio::join!( self.store_receipts_v1(v1_receipts), - self.store_receipts_v2(v2_receipts) + self.store_receipts_v2(v2_receipts), ); + match (insert_v1, insert_v2) { (Err(e1), Err(e2)) => Err(ProcessReceiptError::Both(e1.into(), e2.into())), - (Err(e1), _) => Err(ProcessReceiptError::V1(e1.into())), - (_, Err(e2)) => Err(ProcessReceiptError::V2(e2.into())), - _ => Ok(()), + + (Err(e1), Ok(_)) => Err(ProcessReceiptError::V1(e1.into())), + (Ok(_), Err(e2)) => Err(ProcessReceiptError::V2(e2.into())), + + (Ok(0), Ok(0)) => Ok(ProcessedReceipt::None), + (Ok(_), Ok(0)) => Ok(ProcessedReceipt::V1), + (Ok(0), Ok(_)) => Ok(ProcessedReceipt::V2), + (Ok(_), Ok(_)) => Ok(ProcessedReceipt::Both), } } - async fn store_receipts_v1(&self, receipts: Vec) -> Result<(), AdapterError> { + async fn store_receipts_v1(&self, receipts: Vec) -> Result { + if receipts.is_empty() { + return Ok(0); + } let receipts_len = receipts.len(); let mut signers = Vec::with_capacity(receipts_len); let mut signatures = Vec::with_capacity(receipts_len); @@ -66,7 +86,7 @@ impl InnerContext { nonces.push(receipt.nonce); values.push(receipt.value); } - sqlx::query!( + let query_res = sqlx::query!( r#"INSERT INTO scalar_tap_receipts ( signer_address, signature, @@ -96,10 +116,13 @@ impl InnerContext { anyhow!(e) })?; - Ok(()) + Ok(query_res.rows_affected()) } - async fn store_receipts_v2(&self, receipts: Vec) -> Result<(), AdapterError> { + async fn store_receipts_v2(&self, receipts: Vec) -> Result { + if receipts.is_empty() { + return Ok(0); + } let receipts_len = receipts.len(); let mut signers = Vec::with_capacity(receipts_len); let mut signatures = Vec::with_capacity(receipts_len); @@ -122,7 +145,7 @@ impl InnerContext { nonces.push(receipt.nonce); values.push(receipt.value); } - sqlx::query!( + let query_res = sqlx::query!( r#"INSERT INTO tap_horizon_receipts ( signer_address, signature, @@ -161,7 +184,7 @@ impl InnerContext { anyhow!(e) })?; - Ok(()) + Ok(query_res.rows_affected()) } } @@ -305,3 +328,175 @@ impl DbReceiptV2 { }) } } + +#[cfg(test)] +mod tests { + use std::{path::PathBuf, sync::LazyLock}; + + use futures::future::BoxFuture; + use sqlx::{ + migrate::{MigrationSource, Migrator}, + PgPool, + }; + use test_assets::{ + create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest, INDEXER_ALLOCATIONS, + TAP_EIP712_DOMAIN, + }; + + use crate::tap::{ + receipt_store::{ + DatabaseReceipt, DbReceiptV1, DbReceiptV2, InnerContext, ProcessReceiptError, + ProcessedReceipt, + }, + AdapterError, + }; + + async fn create_v1() -> DatabaseReceipt { + let alloc = INDEXER_ALLOCATIONS.values().next().unwrap().clone(); + let v1 = create_signed_receipt( + SignedReceiptRequest::builder() + .allocation_id(alloc.id) + .value(100) + .build(), + ) + .await; + DatabaseReceipt::V1(DbReceiptV1::from_receipt(&v1, &TAP_EIP712_DOMAIN).unwrap()) + } + + async fn create_v2() -> DatabaseReceipt { + let v2 = create_signed_receipt_v2().call().await; + DatabaseReceipt::V2(DbReceiptV2::from_receipt(&v2, &TAP_EIP712_DOMAIN).unwrap()) + } + + mod when_all_migrations_are_run { + use super::*; + + #[rstest::rstest] + #[case(ProcessedReceipt::None, async { vec![] })] + #[case(ProcessedReceipt::V1, async { vec![create_v1().await] })] + #[case(ProcessedReceipt::V2, async { vec![create_v2().await] })] + #[case(ProcessedReceipt::Both, async { vec![create_v2().await, create_v1().await] })] + #[sqlx::test(migrations = "../../migrations")] + async fn v1_and_v2_are_processed_successfully( + #[ignore] pgpool: PgPool, + #[case] expected: ProcessedReceipt, + #[future(awt)] + #[case] + receipts: Vec, + ) { + let context = InnerContext { pgpool }; + + let res = context.process_db_receipts(receipts).await.unwrap(); + + assert_eq!(res, expected); + } + } + + mod when_horizon_migrations_are_ignored { + use super::*; + + #[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")] + async fn test_empty_receipts_are_processed_successfully(pgpool: PgPool) { + let context = InnerContext { pgpool }; + + let res = context.process_db_receipts(vec![]).await.unwrap(); + + assert_eq!(res, ProcessedReceipt::None); + } + + #[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")] + async fn test_v1_receipts_are_processed_successfully(pgpool: PgPool) { + let context = InnerContext { pgpool }; + + let v1 = create_v1().await; + let receipts = vec![v1]; + + let res = context.process_db_receipts(receipts).await.unwrap(); + + assert_eq!(res, ProcessedReceipt::V1); + } + + #[rstest::rstest] + #[case(async { vec![create_v2().await] })] + #[case(async { vec![create_v2().await, create_v1().await] })] + #[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")] + async fn test_cases_with_v2_receipts_fails_to_process( + #[ignore] pgpool: PgPool, + #[future(awt)] + #[case] + receipts: Vec, + ) { + let context = InnerContext { pgpool }; + + let error = context.process_db_receipts(receipts).await.unwrap_err(); + + let ProcessReceiptError::V2(error) = error else { + panic!() + }; + let d = error.downcast_ref::().unwrap().to_string(); + + assert_eq!( + d, + "error returned from database: relation \"tap_horizon_receipts\" does not exist" + ); + } + + pub static WITHOUT_HORIZON_MIGRATIONS: LazyLock = LazyLock::new(create_migrator); + + pub fn create_migrator() -> Migrator { + futures::executor::block_on(Migrator::new(MigrationRunner::new( + "../../migrations", + ["horizon"], + ))) + .unwrap() + } + + #[derive(Debug)] + pub struct MigrationRunner { + migration_path: PathBuf, + ignored_migrations: Vec, + } + + impl MigrationRunner { + /// Construct a new MigrationRunner that does not apply the given migrations. + /// + /// `ignored_migrations` is any iterable of strings that describes which + /// migrations to be ignored. + pub fn new(path: impl Into, ignored_migrations: I) -> Self + where + I: IntoIterator, + I::Item: Into, + { + Self { + migration_path: path.into(), + ignored_migrations: ignored_migrations.into_iter().map(Into::into).collect(), + } + } + } + + impl MigrationSource<'static> for MigrationRunner { + fn resolve( + self, + ) -> BoxFuture<'static, Result, sqlx::error::BoxDynError>> + { + Box::pin(async move { + let canonical = self.migration_path.canonicalize()?; + let migrations_with_paths = + sqlx::migrate::resolve_blocking(&canonical).unwrap(); + + let migrations_with_paths = migrations_with_paths + .into_iter() + .filter(|(_, p)| { + let path = p.to_str().unwrap(); + self.ignored_migrations + .iter() + .any(|ignored| !path.contains(ignored)) + }) + .collect::>(); + + Ok(migrations_with_paths.into_iter().map(|(m, _p)| m).collect()) + }) + } + } + } +}