Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pin-project = "1.1.7"
tonic.workspace = true
itertools = "0.14.0"


[dev-dependencies]
hex-literal = "0.4.1"
test-assets = { path = "../test-assets" }
Expand All @@ -74,6 +75,7 @@ tokio-test = "0.4.4"
wiremock.workspace = true
insta = "1.41.1"
test-log.workspace = true
futures = "0.3.31"

[build-dependencies]
build-info-build = { version = "0.0.40", default-features = false }
217 changes: 206 additions & 11 deletions crates/service/src/tap/receipt_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,49 @@ enum ProcessReceiptError {
Both(anyhow::Error, anyhow::Error),
}

/// Indicates which versions of Receipts where processed
/// It's intended to be used for migration tests
#[derive(Debug, PartialEq, Eq)]
pub enum ProcessedReceipt {
V1,
V2,
Both,
None,
}

impl InnerContext {
async fn process_db_receipts(
&self,
buffer: Vec<DatabaseReceipt>,
) -> Result<(), ProcessReceiptError> {
) -> Result<ProcessedReceipt, ProcessReceiptError> {
let (v1_receipts, v2_receipts): (Vec<_>, Vec<_>) =
buffer.into_iter().partition_map(|r| match r {
DatabaseReceipt::V1(db_receipt_v1) => Either::Left(db_receipt_v1),
DatabaseReceipt::V2(db_receipt_v2) => Either::Right(db_receipt_v2),
});

let (insert_v1, insert_v2) = tokio::join!(
self.store_receipts_v1(v1_receipts),
self.store_receipts_v2(v2_receipts)
self.store_receipts_v2(v2_receipts),
);

match (insert_v1, insert_v2) {
(Err(e1), Err(e2)) => Err(ProcessReceiptError::Both(e1.into(), e2.into())),
(Err(e1), _) => Err(ProcessReceiptError::V1(e1.into())),
(_, Err(e2)) => Err(ProcessReceiptError::V2(e2.into())),
_ => Ok(()),

(Err(e1), Ok(_)) => Err(ProcessReceiptError::V1(e1.into())),
(Ok(_), Err(e2)) => Err(ProcessReceiptError::V2(e2.into())),

(Ok(0), Ok(0)) => Ok(ProcessedReceipt::None),
(Ok(_), Ok(0)) => Ok(ProcessedReceipt::V1),
(Ok(0), Ok(_)) => Ok(ProcessedReceipt::V2),
(Ok(_), Ok(_)) => Ok(ProcessedReceipt::Both),
}
}

async fn store_receipts_v1(&self, receipts: Vec<DbReceiptV1>) -> Result<(), AdapterError> {
async fn store_receipts_v1(&self, receipts: Vec<DbReceiptV1>) -> Result<u64, AdapterError> {
if receipts.is_empty() {
return Ok(0);
}
let receipts_len = receipts.len();
let mut signers = Vec::with_capacity(receipts_len);
let mut signatures = Vec::with_capacity(receipts_len);
Expand All @@ -66,7 +86,7 @@ impl InnerContext {
nonces.push(receipt.nonce);
values.push(receipt.value);
}
sqlx::query!(
let query_res = sqlx::query!(
r#"INSERT INTO scalar_tap_receipts (
signer_address,
signature,
Expand Down Expand Up @@ -96,10 +116,13 @@ impl InnerContext {
anyhow!(e)
})?;

Ok(())
Ok(query_res.rows_affected())
}

async fn store_receipts_v2(&self, receipts: Vec<DbReceiptV2>) -> Result<(), AdapterError> {
async fn store_receipts_v2(&self, receipts: Vec<DbReceiptV2>) -> Result<u64, AdapterError> {
if receipts.is_empty() {
return Ok(0);
}
let receipts_len = receipts.len();
let mut signers = Vec::with_capacity(receipts_len);
let mut signatures = Vec::with_capacity(receipts_len);
Expand All @@ -122,7 +145,7 @@ impl InnerContext {
nonces.push(receipt.nonce);
values.push(receipt.value);
}
sqlx::query!(
let query_res = sqlx::query!(
r#"INSERT INTO tap_horizon_receipts (
signer_address,
signature,
Expand Down Expand Up @@ -161,7 +184,7 @@ impl InnerContext {
anyhow!(e)
})?;

Ok(())
Ok(query_res.rows_affected())
}
}

Expand Down Expand Up @@ -305,3 +328,175 @@ impl DbReceiptV2 {
})
}
}

#[cfg(test)]
mod tests {
use std::{path::PathBuf, sync::LazyLock};

use futures::future::BoxFuture;
use sqlx::{
migrate::{MigrationSource, Migrator},
PgPool,
};
use test_assets::{
create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest, INDEXER_ALLOCATIONS,
TAP_EIP712_DOMAIN,
};

use crate::tap::{
receipt_store::{
DatabaseReceipt, DbReceiptV1, DbReceiptV2, InnerContext, ProcessReceiptError,
ProcessedReceipt,
},
AdapterError,
};

async fn create_v1() -> DatabaseReceipt {
let alloc = INDEXER_ALLOCATIONS.values().next().unwrap().clone();
let v1 = create_signed_receipt(
SignedReceiptRequest::builder()
.allocation_id(alloc.id)
.value(100)
.build(),
)
.await;
DatabaseReceipt::V1(DbReceiptV1::from_receipt(&v1, &TAP_EIP712_DOMAIN).unwrap())
}

async fn create_v2() -> DatabaseReceipt {
let v2 = create_signed_receipt_v2().call().await;
DatabaseReceipt::V2(DbReceiptV2::from_receipt(&v2, &TAP_EIP712_DOMAIN).unwrap())
}

mod when_all_migrations_are_run {
use super::*;

#[rstest::rstest]
#[case(ProcessedReceipt::None, async { vec![] })]
#[case(ProcessedReceipt::V1, async { vec![create_v1().await] })]
#[case(ProcessedReceipt::V2, async { vec![create_v2().await] })]
#[case(ProcessedReceipt::Both, async { vec![create_v2().await, create_v1().await] })]
#[sqlx::test(migrations = "../../migrations")]
async fn v1_and_v2_are_processed_successfully(
#[ignore] pgpool: PgPool,
#[case] expected: ProcessedReceipt,
#[future(awt)]
#[case]
receipts: Vec<DatabaseReceipt>,
) {
let context = InnerContext { pgpool };

let res = context.process_db_receipts(receipts).await.unwrap();

assert_eq!(res, expected);
}
}

mod when_horizon_migrations_are_ignored {
use super::*;

#[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")]
async fn test_empty_receipts_are_processed_successfully(pgpool: PgPool) {
let context = InnerContext { pgpool };

let res = context.process_db_receipts(vec![]).await.unwrap();

assert_eq!(res, ProcessedReceipt::None);
}

#[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")]
async fn test_v1_receipts_are_processed_successfully(pgpool: PgPool) {
let context = InnerContext { pgpool };

let v1 = create_v1().await;
let receipts = vec![v1];

let res = context.process_db_receipts(receipts).await.unwrap();

assert_eq!(res, ProcessedReceipt::V1);
}

#[rstest::rstest]
#[case(async { vec![create_v2().await] })]
#[case(async { vec![create_v2().await, create_v1().await] })]
#[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")]
async fn test_cases_with_v2_receipts_fails_to_process(
#[ignore] pgpool: PgPool,
#[future(awt)]
#[case]
receipts: Vec<DatabaseReceipt>,
) {
let context = InnerContext { pgpool };

let error = context.process_db_receipts(receipts).await.unwrap_err();

let ProcessReceiptError::V2(error) = error else {
panic!()
};
let d = error.downcast_ref::<AdapterError>().unwrap().to_string();

assert_eq!(
d,
"error returned from database: relation \"tap_horizon_receipts\" does not exist"
);
}

pub static WITHOUT_HORIZON_MIGRATIONS: LazyLock<Migrator> = LazyLock::new(create_migrator);

pub fn create_migrator() -> Migrator {
futures::executor::block_on(Migrator::new(MigrationRunner::new(
"../../migrations",
["horizon"],
)))
.unwrap()
}

#[derive(Debug)]
pub struct MigrationRunner {
migration_path: PathBuf,
ignored_migrations: Vec<String>,
}

impl MigrationRunner {
/// Construct a new MigrationRunner that does not apply the given migrations.
///
/// `ignored_migrations` is any iterable of strings that describes which
/// migrations to be ignored.
pub fn new<I>(path: impl Into<PathBuf>, ignored_migrations: I) -> Self
where
I: IntoIterator,
I::Item: Into<String>,
{
Self {
migration_path: path.into(),
ignored_migrations: ignored_migrations.into_iter().map(Into::into).collect(),
}
}
}

impl MigrationSource<'static> for MigrationRunner {
fn resolve(
self,
) -> BoxFuture<'static, Result<Vec<sqlx::migrate::Migration>, sqlx::error::BoxDynError>>
{
Box::pin(async move {
let canonical = self.migration_path.canonicalize()?;
let migrations_with_paths =
sqlx::migrate::resolve_blocking(&canonical).unwrap();

let migrations_with_paths = migrations_with_paths
.into_iter()
.filter(|(_, p)| {
let path = p.to_str().unwrap();
self.ignored_migrations
.iter()
.any(|ignored| !path.contains(ignored))
})
.collect::<Vec<_>>();

Ok(migrations_with_paths.into_iter().map(|(m, _p)| m).collect())
})
}
}
}
}
Loading