Skip to content

Commit 811567e

Browse files
fix: skip processing of receipts when there's no receipt for a version (#653)
* fix: skip processing of receipts when there's no receipt for a version * refactor: rename `Processed::All` to `Processed::Both` * refactor: delegate empty checking to store receipts functions * style: Cargo.lock re-ordered itself
1 parent 228e701 commit 811567e

File tree

3 files changed

+210
-13
lines changed

3 files changed

+210
-13
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ pin-project = "1.1.7"
6363
tonic.workspace = true
6464
itertools = "0.14.0"
6565

66+
6667
[dev-dependencies]
6768
hex-literal = "0.4.1"
6869
test-assets = { path = "../test-assets" }
@@ -74,6 +75,7 @@ tokio-test = "0.4.4"
7475
wiremock.workspace = true
7576
insta = "1.41.1"
7677
test-log.workspace = true
78+
futures = "0.3.31"
7779

7880
[build-dependencies]
7981
build-info-build = { version = "0.0.40", default-features = false }

crates/service/src/tap/receipt_store.rs

Lines changed: 206 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,49 @@ enum ProcessReceiptError {
2727
Both(anyhow::Error, anyhow::Error),
2828
}
2929

30+
/// Indicates which versions of Receipts where processed
31+
/// It's intended to be used for migration tests
32+
#[derive(Debug, PartialEq, Eq)]
33+
pub enum ProcessedReceipt {
34+
V1,
35+
V2,
36+
Both,
37+
None,
38+
}
39+
3040
impl InnerContext {
3141
async fn process_db_receipts(
3242
&self,
3343
buffer: Vec<DatabaseReceipt>,
34-
) -> Result<(), ProcessReceiptError> {
44+
) -> Result<ProcessedReceipt, ProcessReceiptError> {
3545
let (v1_receipts, v2_receipts): (Vec<_>, Vec<_>) =
3646
buffer.into_iter().partition_map(|r| match r {
3747
DatabaseReceipt::V1(db_receipt_v1) => Either::Left(db_receipt_v1),
3848
DatabaseReceipt::V2(db_receipt_v2) => Either::Right(db_receipt_v2),
3949
});
50+
4051
let (insert_v1, insert_v2) = tokio::join!(
4152
self.store_receipts_v1(v1_receipts),
42-
self.store_receipts_v2(v2_receipts)
53+
self.store_receipts_v2(v2_receipts),
4354
);
55+
4456
match (insert_v1, insert_v2) {
4557
(Err(e1), Err(e2)) => Err(ProcessReceiptError::Both(e1.into(), e2.into())),
46-
(Err(e1), _) => Err(ProcessReceiptError::V1(e1.into())),
47-
(_, Err(e2)) => Err(ProcessReceiptError::V2(e2.into())),
48-
_ => Ok(()),
58+
59+
(Err(e1), Ok(_)) => Err(ProcessReceiptError::V1(e1.into())),
60+
(Ok(_), Err(e2)) => Err(ProcessReceiptError::V2(e2.into())),
61+
62+
(Ok(0), Ok(0)) => Ok(ProcessedReceipt::None),
63+
(Ok(_), Ok(0)) => Ok(ProcessedReceipt::V1),
64+
(Ok(0), Ok(_)) => Ok(ProcessedReceipt::V2),
65+
(Ok(_), Ok(_)) => Ok(ProcessedReceipt::Both),
4966
}
5067
}
5168

52-
async fn store_receipts_v1(&self, receipts: Vec<DbReceiptV1>) -> Result<(), AdapterError> {
69+
async fn store_receipts_v1(&self, receipts: Vec<DbReceiptV1>) -> Result<u64, AdapterError> {
70+
if receipts.is_empty() {
71+
return Ok(0);
72+
}
5373
let receipts_len = receipts.len();
5474
let mut signers = Vec::with_capacity(receipts_len);
5575
let mut signatures = Vec::with_capacity(receipts_len);
@@ -66,7 +86,7 @@ impl InnerContext {
6686
nonces.push(receipt.nonce);
6787
values.push(receipt.value);
6888
}
69-
sqlx::query!(
89+
let query_res = sqlx::query!(
7090
r#"INSERT INTO scalar_tap_receipts (
7191
signer_address,
7292
signature,
@@ -96,10 +116,13 @@ impl InnerContext {
96116
anyhow!(e)
97117
})?;
98118

99-
Ok(())
119+
Ok(query_res.rows_affected())
100120
}
101121

102-
async fn store_receipts_v2(&self, receipts: Vec<DbReceiptV2>) -> Result<(), AdapterError> {
122+
async fn store_receipts_v2(&self, receipts: Vec<DbReceiptV2>) -> Result<u64, AdapterError> {
123+
if receipts.is_empty() {
124+
return Ok(0);
125+
}
103126
let receipts_len = receipts.len();
104127
let mut signers = Vec::with_capacity(receipts_len);
105128
let mut signatures = Vec::with_capacity(receipts_len);
@@ -122,7 +145,7 @@ impl InnerContext {
122145
nonces.push(receipt.nonce);
123146
values.push(receipt.value);
124147
}
125-
sqlx::query!(
148+
let query_res = sqlx::query!(
126149
r#"INSERT INTO tap_horizon_receipts (
127150
signer_address,
128151
signature,
@@ -161,7 +184,7 @@ impl InnerContext {
161184
anyhow!(e)
162185
})?;
163186

164-
Ok(())
187+
Ok(query_res.rows_affected())
165188
}
166189
}
167190

@@ -305,3 +328,175 @@ impl DbReceiptV2 {
305328
})
306329
}
307330
}
331+
332+
#[cfg(test)]
333+
mod tests {
334+
use std::{path::PathBuf, sync::LazyLock};
335+
336+
use futures::future::BoxFuture;
337+
use sqlx::{
338+
migrate::{MigrationSource, Migrator},
339+
PgPool,
340+
};
341+
use test_assets::{
342+
create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest, INDEXER_ALLOCATIONS,
343+
TAP_EIP712_DOMAIN,
344+
};
345+
346+
use crate::tap::{
347+
receipt_store::{
348+
DatabaseReceipt, DbReceiptV1, DbReceiptV2, InnerContext, ProcessReceiptError,
349+
ProcessedReceipt,
350+
},
351+
AdapterError,
352+
};
353+
354+
async fn create_v1() -> DatabaseReceipt {
355+
let alloc = INDEXER_ALLOCATIONS.values().next().unwrap().clone();
356+
let v1 = create_signed_receipt(
357+
SignedReceiptRequest::builder()
358+
.allocation_id(alloc.id)
359+
.value(100)
360+
.build(),
361+
)
362+
.await;
363+
DatabaseReceipt::V1(DbReceiptV1::from_receipt(&v1, &TAP_EIP712_DOMAIN).unwrap())
364+
}
365+
366+
async fn create_v2() -> DatabaseReceipt {
367+
let v2 = create_signed_receipt_v2().call().await;
368+
DatabaseReceipt::V2(DbReceiptV2::from_receipt(&v2, &TAP_EIP712_DOMAIN).unwrap())
369+
}
370+
371+
mod when_all_migrations_are_run {
372+
use super::*;
373+
374+
#[rstest::rstest]
375+
#[case(ProcessedReceipt::None, async { vec![] })]
376+
#[case(ProcessedReceipt::V1, async { vec![create_v1().await] })]
377+
#[case(ProcessedReceipt::V2, async { vec![create_v2().await] })]
378+
#[case(ProcessedReceipt::Both, async { vec![create_v2().await, create_v1().await] })]
379+
#[sqlx::test(migrations = "../../migrations")]
380+
async fn v1_and_v2_are_processed_successfully(
381+
#[ignore] pgpool: PgPool,
382+
#[case] expected: ProcessedReceipt,
383+
#[future(awt)]
384+
#[case]
385+
receipts: Vec<DatabaseReceipt>,
386+
) {
387+
let context = InnerContext { pgpool };
388+
389+
let res = context.process_db_receipts(receipts).await.unwrap();
390+
391+
assert_eq!(res, expected);
392+
}
393+
}
394+
395+
mod when_horizon_migrations_are_ignored {
396+
use super::*;
397+
398+
#[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")]
399+
async fn test_empty_receipts_are_processed_successfully(pgpool: PgPool) {
400+
let context = InnerContext { pgpool };
401+
402+
let res = context.process_db_receipts(vec![]).await.unwrap();
403+
404+
assert_eq!(res, ProcessedReceipt::None);
405+
}
406+
407+
#[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")]
408+
async fn test_v1_receipts_are_processed_successfully(pgpool: PgPool) {
409+
let context = InnerContext { pgpool };
410+
411+
let v1 = create_v1().await;
412+
let receipts = vec![v1];
413+
414+
let res = context.process_db_receipts(receipts).await.unwrap();
415+
416+
assert_eq!(res, ProcessedReceipt::V1);
417+
}
418+
419+
#[rstest::rstest]
420+
#[case(async { vec![create_v2().await] })]
421+
#[case(async { vec![create_v2().await, create_v1().await] })]
422+
#[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")]
423+
async fn test_cases_with_v2_receipts_fails_to_process(
424+
#[ignore] pgpool: PgPool,
425+
#[future(awt)]
426+
#[case]
427+
receipts: Vec<DatabaseReceipt>,
428+
) {
429+
let context = InnerContext { pgpool };
430+
431+
let error = context.process_db_receipts(receipts).await.unwrap_err();
432+
433+
let ProcessReceiptError::V2(error) = error else {
434+
panic!()
435+
};
436+
let d = error.downcast_ref::<AdapterError>().unwrap().to_string();
437+
438+
assert_eq!(
439+
d,
440+
"error returned from database: relation \"tap_horizon_receipts\" does not exist"
441+
);
442+
}
443+
444+
pub static WITHOUT_HORIZON_MIGRATIONS: LazyLock<Migrator> = LazyLock::new(create_migrator);
445+
446+
pub fn create_migrator() -> Migrator {
447+
futures::executor::block_on(Migrator::new(MigrationRunner::new(
448+
"../../migrations",
449+
["horizon"],
450+
)))
451+
.unwrap()
452+
}
453+
454+
#[derive(Debug)]
455+
pub struct MigrationRunner {
456+
migration_path: PathBuf,
457+
ignored_migrations: Vec<String>,
458+
}
459+
460+
impl MigrationRunner {
461+
/// Construct a new MigrationRunner that does not apply the given migrations.
462+
///
463+
/// `ignored_migrations` is any iterable of strings that describes which
464+
/// migrations to be ignored.
465+
pub fn new<I>(path: impl Into<PathBuf>, ignored_migrations: I) -> Self
466+
where
467+
I: IntoIterator,
468+
I::Item: Into<String>,
469+
{
470+
Self {
471+
migration_path: path.into(),
472+
ignored_migrations: ignored_migrations.into_iter().map(Into::into).collect(),
473+
}
474+
}
475+
}
476+
477+
impl MigrationSource<'static> for MigrationRunner {
478+
fn resolve(
479+
self,
480+
) -> BoxFuture<'static, Result<Vec<sqlx::migrate::Migration>, sqlx::error::BoxDynError>>
481+
{
482+
Box::pin(async move {
483+
let canonical = self.migration_path.canonicalize()?;
484+
let migrations_with_paths =
485+
sqlx::migrate::resolve_blocking(&canonical).unwrap();
486+
487+
let migrations_with_paths = migrations_with_paths
488+
.into_iter()
489+
.filter(|(_, p)| {
490+
let path = p.to_str().unwrap();
491+
self.ignored_migrations
492+
.iter()
493+
.any(|ignored| !path.contains(ignored))
494+
})
495+
.collect::<Vec<_>>();
496+
497+
Ok(migrations_with_paths.into_iter().map(|(m, _p)| m).collect())
498+
})
499+
}
500+
}
501+
}
502+
}

0 commit comments

Comments
 (0)