diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b52d65e5a..f78f7bd8a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -41,6 +41,8 @@ jobs: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 - name: Install libsasl2-dev run: apt-get update && apt-get install -y libsasl2-dev + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - name: Cache dependencies uses: Swatinem/rust-cache@f0deed1e0edfc6a9be95417288c0e1099b1eeec3 # v2.7.7 if: ${{ !startsWith(github.head_ref, 'renovate/') }} @@ -52,8 +54,6 @@ jobs: - name: Run sccache-cache uses: mozilla-actions/sccache-action@7d986dd989559c6ecdb630a3fd2557667be217ad # v0.0.9 if: ${{ !startsWith(github.head_ref, 'renovate/') }} - - name: Install protobuf compiler - run: apt-get update && apt-get install protobuf-compiler -y - name: Install sqlx run: cargo install sqlx-cli --no-default-features --features postgres - name: Run the test sqlx migrations @@ -73,6 +73,8 @@ jobs: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 - name: Install libsasl2-dev run: apt-get update && apt-get install -y libsasl2-dev + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - name: Cache dependencies uses: Swatinem/rust-cache@f0deed1e0edfc6a9be95417288c0e1099b1eeec3 # v2.7.7 if: ${{ !startsWith(github.head_ref, 'renovate/') }} @@ -84,8 +86,6 @@ jobs: - name: Run sccache-cache uses: mozilla-actions/sccache-action@7d986dd989559c6ecdb630a3fd2557667be217ad # v0.0.9 if: ${{ !startsWith(github.head_ref, 'renovate/') }} - - name: Install protobuf compiler - run: apt-get update && apt-get install protobuf-compiler -y - run: | rustup component add clippy # Temporarily allowing dead-code, while denying all other warnings @@ -94,31 +94,28 @@ jobs: test-and-coverage: name: cargo test and coverage runs-on: ubuntu-latest - permissions: - contents: write - pull-requests: write - actions: read - services: - postgres: - image: postgres:15 - env: - POSTGRES_HOST_AUTH_METHOD: trust - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 - ports: - - 5432:5432 container: image: rust:1.86-bookworm + options: --privileged -v /var/run/docker.sock:/var/run/docker.sock env: - DATABASE_URL: postgres://postgres@postgres:5432 CI: true + SQLX_OFFLINE: true + DOCKER_HOST: unix:///var/run/docker.sock steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 + - name: Install Docker CLI and verify + run: | + apt-get update + apt-get install -y docker.io + # Verify Docker is accessible + docker version + docker ps + # Pre-pull the postgres:15 image to avoid timeouts + docker pull postgres:15 - name: Install libsasl2-dev run: apt-get update && apt-get install -y libsasl2-dev + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - name: Cache dependencies uses: Swatinem/rust-cache@f0deed1e0edfc6a9be95417288c0e1099b1eeec3 # v2.7.7 if: ${{ !startsWith(github.head_ref, 'renovate/') }} @@ -127,17 +124,11 @@ jobs: echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV if: ${{ !startsWith(github.head_ref, 'renovate/') }} - - name: Install protobuf compiler - run: apt-get update && apt-get install protobuf-compiler -y - name: Run sccache-cache uses: mozilla-actions/sccache-action@7d986dd989559c6ecdb630a3fd2557667be217ad # v0.0.9 if: ${{ !startsWith(github.head_ref, 'renovate/') }} - name: Install cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov - - name: Install sqlx - run: cargo install sqlx-cli --no-default-features --features postgres - - name: Run the test sqlx migrations - run: cargo sqlx migrate run - name: Run tests and generate coverage report run: cargo llvm-cov test --all-features --workspace --lcov --output-path lcov.info - name: Upload coverage to Coveralls @@ -149,30 +140,27 @@ jobs: test-docs: name: cargo test docs code snippets runs-on: ubuntu-latest - permissions: - contents: write - pull-requests: write - actions: read - services: - postgres: - image: postgres:15 - env: - POSTGRES_HOST_AUTH_METHOD: trust - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 - ports: - - 5432:5432 container: image: rust:1.86-bookworm + options: --privileged -v /var/run/docker.sock:/var/run/docker.sock env: - DATABASE_URL: postgres://postgres@postgres:5432 + SQLX_OFFLINE: true + DOCKER_HOST: unix:///var/run/docker.sock steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 + - name: Install Docker CLI and verify + run: | + apt-get update + apt-get install -y docker.io + # Verify Docker is accessible + docker version + docker ps + # Pre-pull the postgres:15 image to avoid timeouts + docker pull postgres:15 - name: Install libsasl2-dev run: apt-get update && apt-get install -y libsasl2-dev + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - name: Cache dependencies uses: Swatinem/rust-cache@f0deed1e0edfc6a9be95417288c0e1099b1eeec3 # v2.7.7 if: ${{ !startsWith(github.head_ref, 'renovate/') }} @@ -181,14 +169,8 @@ jobs: echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV if: ${{ !startsWith(github.head_ref, 'renovate/') }} - - name: Install protobuf compiler - run: apt-get update && apt-get install protobuf-compiler -y - name: Run sccache-cache uses: mozilla-actions/sccache-action@7d986dd989559c6ecdb630a3fd2557667be217ad # v0.0.9 if: ${{ !startsWith(github.head_ref, 'renovate/') }} - - name: Install sqlx - run: cargo install sqlx-cli --no-default-features --features postgres - - name: Run the test sqlx migrations - run: cargo sqlx migrate run - name: Test documentation code snippets run: cargo test --doc --all-features --workspace diff --git a/Cargo.lock b/Cargo.lock index 075d44950..3725d91cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1672,6 +1672,56 @@ dependencies = [ "zeroize", ] +[[package]] +name = "bollard" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97ccca1260af6a459d75994ad5acc1651bcabcbdbc41467cc9786519ab854c30" +dependencies = [ + "base64 0.22.1", + "bollard-stubs", + "bytes", + "futures-core", + "futures-util", + "hex", + "home", + "http 1.3.1", + "http-body-util", + "hyper 1.6.0", + "hyper-named-pipe", + "hyper-rustls 0.27.7", + "hyper-util", + "hyperlocal", + "log", + "pin-project-lite", + "rustls 0.23.28", + "rustls-native-certs", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "serde", + "serde_derive", + "serde_json", + "serde_repr", + "serde_urlencoded", + "thiserror 2.0.12", + "tokio", + "tokio-util", + "tower-service", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.47.1-rc.27.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f179cfbddb6e77a5472703d4b30436bff32929c0aa8a9008ecf23d1d3cdd0da" +dependencies = [ + "serde", + "serde_repr", + "serde_with", +] + [[package]] name = "bon" version = "2.3.0" @@ -2465,7 +2515,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.104", ] [[package]] @@ -2606,6 +2656,17 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "docker_credential" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d89dfcba45b4afad7450a99b39e751590463e45c04728cf555d36bb66940de8" +dependencies = [ + "base64 0.21.7", + "serde", + "serde_json", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -2816,6 +2877,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "etcetera" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c7b13d0780cb82722fd59f6f57f925e143427e4a75313a6c77243bf5326ae6" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.59.0", +] + [[package]] name = "event-listener" version = "5.4.0" @@ -2879,6 +2951,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "findshlibs" version = "0.10.2" @@ -3640,6 +3724,21 @@ dependencies = [ "hyper 0.14.32", ] +[[package]] +name = "hyper-named-pipe" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" +dependencies = [ + "hex", + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", + "winapi", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -3740,6 +3839,21 @@ dependencies = [ "windows-registry", ] +[[package]] +name = "hyperlocal" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" +dependencies = [ + "hex", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.63" @@ -3963,6 +4077,8 @@ dependencies = [ "serde_json", "serde_yaml", "sqlx", + "test-assets", + "testcontainers-modules", "thegraph-core", "thiserror 1.0.69", "tokio", @@ -4303,7 +4419,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -4638,6 +4754,7 @@ checksum = "1580801010e535496706ba011c15f8532df6b42297d2e471fec38ceadd8c0638" dependencies = [ "bitflags 2.9.1", "libc", + "redox_syscall 0.5.13", ] [[package]] @@ -5431,11 +5548,36 @@ checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.13", "smallvec", "windows-targets 0.52.6", ] +[[package]] +name = "parse-display" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a" +dependencies = [ + "parse-display-derive", + "regex", + "regex-syntax 0.8.5", +] + +[[package]] +name = "parse-display-derive" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "regex-syntax 0.8.5", + "structmeta", + "syn 2.0.104", +] + [[package]] name = "paste" version = "1.0.15" @@ -5884,7 +6026,7 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "itertools 0.14.0", "log", "multimap", @@ -6209,6 +6351,15 @@ dependencies = [ "sasl2-sys", ] +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.13" @@ -6340,7 +6491,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls 0.21.12", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -6649,7 +6800,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6700,6 +6851,15 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.12.0" @@ -6727,7 +6887,7 @@ dependencies = [ "security-framework 3.2.0", "security-framework-sys", "webpki-root-certs 0.26.11", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6874,9 +7034,9 @@ dependencies = [ [[package]] name = "sdd" -version = "3.0.8" +version = "3.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "584e070911c7017da6cb2eb0788d09f43d789029b5877d3e5ecc8acf86ceee21" +checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" [[package]] name = "seahash" @@ -7588,7 +7748,7 @@ dependencies = [ "chrono", "crc", "dotenvy", - "etcetera", + "etcetera 0.8.0", "futures-channel", "futures-core", "futures-util", @@ -7689,6 +7849,29 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "structmeta" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329" +dependencies = [ + "proc-macro2", + "quote", + "structmeta-derive", + "syn 2.0.104", +] + +[[package]] +name = "structmeta-derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "strum" version = "0.26.3" @@ -7992,7 +8175,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -8007,8 +8190,10 @@ dependencies = [ "stdext", "tap_core", "tap_graph", + "testcontainers-modules", "thegraph-core", "tokio", + "tracing", ] [[package]] @@ -8053,6 +8238,44 @@ dependencies = [ "which", ] +[[package]] +name = "testcontainers" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23bb7577dca13ad86a78e8271ef5d322f37229ec83b8d98da6d996c588a1ddb1" +dependencies = [ + "async-trait", + "bollard", + "bollard-stubs", + "bytes", + "docker_credential", + "either", + "etcetera 0.10.0", + "futures", + "log", + "memchr", + "parse-display", + "pin-project-lite", + "serde", + "serde_json", + "serde_with", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tokio-tar", + "tokio-util", + "url", +] + +[[package]] +name = "testcontainers-modules" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac95cde96549fc19c6bf19ef34cc42bd56e264c1cb97e700e21555be0ecf9e2" +dependencies = [ + "testcontainers", +] + [[package]] name = "thegraph-core" version = "0.15.1" @@ -8284,6 +8507,21 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tar" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75" +dependencies = [ + "filetime", + "futures-core", + "libc", + "redox_syscall 0.3.5", + "tokio", + "tokio-stream", + "xattr", +] + [[package]] name = "tokio-test" version = "0.4.4" @@ -9069,7 +9307,7 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" dependencies = [ - "redox_syscall", + "redox_syscall 0.5.13", "wasite", ] @@ -9095,7 +9333,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -9601,6 +9839,16 @@ dependencies = [ "tap", ] +[[package]] +name = "xattr" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af3a19837351dc82ba89f8a125e22a3c475f05aba604acc023d62b2739ae2909" +dependencies = [ + "libc", + "rustix", +] + [[package]] name = "yansi" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index faf3dc7d5..cb9ef32db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ tap_core = { version = "4.1.4", default-features = false } tap_graph = { version = "0.3.4", features = ["v2"] } tempfile = "3.8.0" test-log = { version = "0.2.12", default-features = false } +testcontainers-modules = { version = "0.12.1", features = ["postgres"] } test-with = "0.14.6" thegraph-core = { version = "0.15.1", features = [ "attestation", diff --git a/crates/dips/Cargo.toml b/crates/dips/Cargo.toml index 12671f26f..07ca7ca01 100644 --- a/crates/dips/Cargo.toml +++ b/crates/dips/Cargo.toml @@ -39,6 +39,8 @@ serde_json.workspace = true [dev-dependencies] rand.workspace = true indexer-watcher = { path = "../watcher" } +testcontainers-modules = { workspace = true, features = ["postgres"] } +test-assets = { path = "../test-assets" } [build-dependencies] tonic-build = { workspace = true, optional = true } diff --git a/crates/dips/src/database.rs b/crates/dips/src/database.rs index df07454ea..039c8c612 100644 --- a/crates/dips/src/database.rs +++ b/crates/dips/src/database.rs @@ -135,7 +135,6 @@ pub(crate) mod test { use std::sync::Arc; use build_info::chrono::Duration; - use sqlx::PgPool; use thegraph_core::alloy::{ primitives::{ruint::aliases::U256, Address}, sol_types::SolValue, @@ -145,9 +144,10 @@ pub(crate) mod test { use super::*; use crate::{CancellationRequest, IndexingAgreementVoucher}; - #[sqlx::test(migrations = "../../migrations")] - async fn test_store_agreement(pool: PgPool) { - let store = Arc::new(PsqlAgreementStore { pool }); + #[tokio::test] + async fn test_store_agreement() { + let test_db = test_assets::setup_shared_test_db().await; + let store = Arc::new(PsqlAgreementStore { pool: test_db.pool }); let id = Uuid::now_v7(); // Create metadata first @@ -196,9 +196,10 @@ pub(crate) mod test { assert_eq!(row.subgraph_deployment_id, "Qm123"); } - #[sqlx::test(migrations = "../../migrations")] - async fn test_get_agreement_by_id(pool: PgPool) { - let store = Arc::new(PsqlAgreementStore { pool }); + #[tokio::test] + async fn test_get_agreement_by_id() { + let test_db = test_assets::setup_shared_test_db().await; + let store = Arc::new(PsqlAgreementStore { pool: test_db.pool }); let id = Uuid::parse_str("a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d9").unwrap(); // Create metadata first @@ -277,9 +278,10 @@ pub(crate) mod test { assert!(!stored_agreement.cancelled); } - #[sqlx::test(migrations = "../../migrations")] - async fn test_cancel_agreement(pool: PgPool) { - let store = Arc::new(PsqlAgreementStore { pool }); + #[tokio::test] + async fn test_cancel_agreement() { + let test_db = test_assets::setup_shared_test_db().await; + let store = Arc::new(PsqlAgreementStore { pool: test_db.pool }); let id = Uuid::parse_str("a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7e9").unwrap(); // Create metadata first diff --git a/crates/service/src/database/cost_model.rs b/crates/service/src/database/cost_model.rs index 45de7c445..bb4e3e51f 100644 --- a/crates/service/src/database/cost_model.rs +++ b/crates/service/src/database/cost_model.rs @@ -251,8 +251,10 @@ pub(crate) mod test { ] } - #[sqlx::test(migrations = "../../migrations")] - async fn success_cost_models(pool: PgPool) { + #[tokio::test] + async fn success_cost_models() { + let test_db = test_assets::setup_shared_test_db().await; + let pool = test_db.pool; let test_models = test_data(); let test_deployments = test_models .iter() @@ -313,8 +315,10 @@ pub(crate) mod test { } } - #[sqlx::test(migrations = "../../migrations")] - async fn global_fallback_cost_models(pool: PgPool) { + #[tokio::test] + async fn global_fallback_cost_models() { + let test_db = test_assets::setup_shared_test_db().await; + let pool = test_db.pool; let test_models = test_data(); let test_deployments = test_models .iter() @@ -403,8 +407,10 @@ pub(crate) mod test { assert_eq!(missing_model.model, global_model.model); } - #[sqlx::test(migrations = "../../migrations")] - async fn success_cost_model(pool: PgPool) { + #[tokio::test] + async fn success_cost_model() { + let test_db = test_assets::setup_shared_test_db().await; + let pool = test_db.pool; add_cost_models(&pool, to_db_models(test_data())).await; let deployment_id_from_bytes = DeploymentId::from_str( @@ -425,8 +431,10 @@ pub(crate) mod test { assert_eq!(model.model, Some("default => 0.00025;".to_string())); } - #[sqlx::test(migrations = "../../migrations")] - async fn global_fallback_cost_model(pool: PgPool) { + #[tokio::test] + async fn global_fallback_cost_model() { + let test_db = test_assets::setup_shared_test_db().await; + let pool = test_db.pool; let test_models = test_data(); let global_model = global_cost_model(); diff --git a/crates/service/src/middleware/auth.rs b/crates/service/src/middleware/auth.rs index 95cba48c8..a6dbc5108 100644 --- a/crates/service/src/middleware/auth.rs +++ b/crates/service/src/middleware/auth.rs @@ -69,9 +69,10 @@ mod tests { service } - #[sqlx::test(migrations = "../../migrations")] - async fn test_composition_header_valid(pgpool: PgPool) { - let mut service = service(pgpool.clone()).await; + #[tokio::test] + async fn test_composition_header_valid() { + let test_db = test_assets::setup_shared_test_db().await; + let mut service = service(test_db.pool.clone()).await; // should allow queries that contains the free token // if the token does not match, return payment required let mut req = Request::new(Default::default()); @@ -83,9 +84,10 @@ mod tests { assert_eq!(res.status(), StatusCode::OK); } - #[sqlx::test(migrations = "../../migrations")] - async fn test_composition_header_invalid(pgpool: PgPool) { - let mut service = service(pgpool.clone()).await; + #[tokio::test] + async fn test_composition_header_invalid() { + let test_db = test_assets::setup_shared_test_db().await; + let mut service = service(test_db.pool.clone()).await; // if the token exists but is wrong, try the receipt let mut req = Request::new(Default::default()); @@ -96,9 +98,10 @@ mod tests { assert_eq!(res.status(), StatusCode::PAYMENT_REQUIRED); } - #[sqlx::test(migrations = "../../migrations")] - async fn test_composition_with_receipt(pgpool: PgPool) { - let mut service = service(pgpool.clone()).await; + #[tokio::test] + async fn test_composition_with_receipt() { + let test_db = test_assets::setup_shared_test_db().await; + let mut service = service(test_db.pool.clone()).await; let receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await; @@ -111,16 +114,17 @@ mod tests { // verify receipts assert_while_retry!({ let result = sqlx::query!("SELECT * FROM scalar_tap_receipts") - .fetch_all(&pgpool) + .fetch_all(&test_db.pool) .await .unwrap(); result.is_empty() }); } - #[sqlx::test(migrations = "../../migrations")] - async fn test_composition_without_header_or_receipt(pgpool: PgPool) { - let mut service = service(pgpool.clone()).await; + #[tokio::test] + async fn test_composition_without_header_or_receipt() { + let test_db = test_assets::setup_shared_test_db().await; + let mut service = service(test_db.pool.clone()).await; // if it has neither, should return payment required let req = Request::new(Default::default()); let res = service.call(req).await.unwrap(); diff --git a/crates/service/src/middleware/auth/tap.rs b/crates/service/src/middleware/auth/tap.rs index c6062e9ed..ff3bd605e 100644 --- a/crates/service/src/middleware/auth/tap.rs +++ b/crates/service/src/middleware/auth/tap.rs @@ -164,12 +164,10 @@ mod tests { } #[rstest] - #[sqlx::test(migrations = "../../migrations")] - async fn test_tap_valid_receipt( - metric: &'static prometheus::CounterVec, - #[ignore] pgpool: PgPool, - ) { - let mut service = service(metric, pgpool.clone()).await; + #[tokio::test] + async fn test_tap_valid_receipt(metric: &'static prometheus::CounterVec) { + let test_db = test_assets::setup_shared_test_db().await; + let mut service = service(metric, test_db.pool.clone()).await; let receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await; @@ -182,7 +180,7 @@ mod tests { // verify receipts assert_while_retry!({ sqlx::query!("SELECT * FROM scalar_tap_receipts") - .fetch_all(&pgpool) + .fetch_all(&test_db.pool) .await .unwrap() .is_empty() @@ -190,12 +188,10 @@ mod tests { } #[rstest] - #[sqlx::test(migrations = "../../migrations")] - async fn test_invalid_receipt_with_failed_metric( - metric: &'static prometheus::CounterVec, - #[ignore] pgpool: PgPool, - ) { - let mut service = service(metric, pgpool.clone()).await; + #[tokio::test] + async fn test_invalid_receipt_with_failed_metric(metric: &'static prometheus::CounterVec) { + let test_db = test_assets::setup_shared_test_db().await; + let mut service = service(metric, test_db.pool.clone()).await; // if it fails tap receipt, should return failed to process payment + tap message assert_eq!(metric.collect().first().unwrap().get_metric().len(), 0); @@ -224,12 +220,10 @@ mod tests { } #[rstest] - #[sqlx::test(migrations = "../../migrations")] - async fn test_tap_missing_signed_receipt( - metric: &'static prometheus::CounterVec, - #[ignore] pgpool: PgPool, - ) { - let mut service = service(metric, pgpool.clone()).await; + #[tokio::test] + async fn test_tap_missing_signed_receipt(metric: &'static prometheus::CounterVec) { + let test_db = test_assets::setup_shared_test_db().await; + let mut service = service(metric, test_db.pool.clone()).await; // if it doesnt contain the signed receipt // should return payment required let req = Request::new(Body::default()); diff --git a/crates/service/src/tap/checks/deny_list_check.rs b/crates/service/src/tap/checks/deny_list_check.rs index 3d52f3ab5..28aae1f18 100644 --- a/crates/service/src/tap/checks/deny_list_check.rs +++ b/crates/service/src/tap/checks/deny_list_check.rs @@ -262,8 +262,10 @@ mod tests { DenyListCheck::new(pgpool).await } - #[sqlx::test(migrations = "../../migrations")] - async fn test_sender_denylist(pgpool: PgPool) { + #[tokio::test] + async fn test_sender_denylist() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // Add the sender to the denylist sqlx::query!( r#" @@ -292,8 +294,10 @@ mod tests { .is_err()); } - #[sqlx::test(migrations = "../../migrations")] - async fn test_sender_denylist_updates(pgpool: PgPool) { + #[tokio::test] + async fn test_sender_denylist_updates() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let signed_receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await; let deny_list_check = new_deny_list_check(pgpool.clone()).await; diff --git a/crates/service/src/tap/checks/value_check.rs b/crates/service/src/tap/checks/value_check.rs index f7bc129c0..eb4713076 100644 --- a/crates/service/src/tap/checks/value_check.rs +++ b/crates/service/src/tap/checks/value_check.rs @@ -368,7 +368,6 @@ enum CostModelNotification { mod tests { use std::time::Duration; - use sqlx::PgPool; use tap_core::receipt::{checks::Check, Context}; use test_assets::{create_signed_receipt, flush_messages, SignedReceiptRequest}; use tokio::time::sleep; @@ -379,14 +378,18 @@ mod tests { tap::{CheckingReceipt, TapReceipt}, }; - #[sqlx::test(migrations = "../../migrations")] - async fn initialize_check(pgpool: PgPool) { + #[tokio::test] + async fn initialize_check() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let check = MinimumValue::new(pgpool, Duration::from_secs(0)).await; assert_eq!(check.cost_model_map.read().unwrap().len(), 0); } - #[sqlx::test(migrations = "../../migrations")] - async fn should_initialize_check_with_models(pgpool: PgPool) { + #[tokio::test] + async fn should_initialize_check_with_models() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // insert 2 cost models for different deployment_id let test_models = test::test_data(); @@ -399,8 +402,10 @@ mod tests { assert!(check.global_model.read().unwrap().is_none()); } - #[sqlx::test(migrations = "../../migrations")] - async fn should_watch_model_insert(pgpool: PgPool) { + #[tokio::test] + async fn should_watch_model_insert() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let mut check = MinimumValue::new(pgpool.clone(), Duration::from_secs(0)).await; assert_eq!(check.cost_model_map.read().unwrap().len(), 0); @@ -416,8 +421,10 @@ mod tests { ); } - #[sqlx::test(migrations = "../../migrations")] - async fn should_watch_model_remove(pgpool: PgPool) { + #[tokio::test] + async fn should_watch_model_remove() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // insert 2 cost models for different deployment_id let test_models = test::test_data(); add_cost_models(&pgpool, to_db_models(test_models.clone())).await; @@ -436,8 +443,10 @@ mod tests { assert_eq!(check.cost_model_map.read().unwrap().len(), 0); } - #[sqlx::test(migrations = "../../migrations")] - async fn should_start_global_model(pgpool: PgPool) { + #[tokio::test] + async fn should_start_global_model() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let global_model = global_cost_model(); add_cost_models(&pgpool, vec![global_model.clone()]).await; @@ -445,8 +454,10 @@ mod tests { assert!(check.global_model.read().unwrap().is_some()); } - #[sqlx::test(migrations = "../../migrations")] - async fn should_watch_global_model(pgpool: PgPool) { + #[tokio::test] + async fn should_watch_global_model() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let mut check = MinimumValue::new(pgpool.clone(), Duration::from_secs(0)).await; let global_model = global_cost_model(); @@ -457,8 +468,10 @@ mod tests { assert!(check.global_model.read().unwrap().is_some()); } - #[sqlx::test(migrations = "../../migrations")] - async fn should_remove_global_model(pgpool: PgPool) { + #[tokio::test] + async fn should_remove_global_model() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let global_model = global_cost_model(); add_cost_models(&pgpool, vec![global_model.clone()]).await; @@ -475,8 +488,10 @@ mod tests { assert_eq!(check.cost_model_map.read().unwrap().len(), 0); } - #[sqlx::test(migrations = "../../migrations")] - async fn should_check_minimal_value(pgpool: PgPool) { + #[tokio::test] + async fn should_check_minimal_value() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // insert cost models for different deployment_id let test_models = test::test_data(); @@ -565,8 +580,10 @@ mod tests { .expect("should accept more than minimal"); } - #[sqlx::test(migrations = "../../migrations")] - async fn should_check_using_global(pgpool: PgPool) { + #[tokio::test] + async fn should_check_using_global() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // insert cost models for different deployment_id let test_models = test::test_data(); let global_model = global_cost_model(); diff --git a/crates/service/src/tap/receipt_store.rs b/crates/service/src/tap/receipt_store.rs index 287eac208..c11cca7e4 100644 --- a/crates/service/src/tap/receipt_store.rs +++ b/crates/service/src/tap/receipt_store.rs @@ -380,13 +380,10 @@ impl DbReceiptV2 { #[cfg(test)] mod tests { - use std::{path::PathBuf, sync::LazyLock}; + use std::path::PathBuf; use futures::future::BoxFuture; - use sqlx::{ - migrate::{MigrationSource, Migrator}, - PgPool, - }; + use sqlx::migrate::{MigrationSource, Migrator}; use test_assets::{ create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest, INDEXER_ALLOCATIONS, TAP_EIP712_DOMAIN, @@ -442,15 +439,17 @@ mod tests { #[case(ProcessedReceipt::V1, async { vec![create_v1().await] })] #[case(ProcessedReceipt::V2, async { vec![create_v2().await] })] #[case(ProcessedReceipt::Both, async { vec![create_v2().await, create_v1().await] })] - #[sqlx::test(migrations = "../../migrations")] + #[tokio::test] async fn v1_and_v2_are_processed_successfully( - #[ignore] pgpool: PgPool, #[case] expected: ProcessedReceipt, #[future(awt)] #[case] receipts: Vec, ) { - let context = InnerContext { pgpool }; + let test_db = test_assets::setup_shared_test_db().await; + let context = InnerContext { + pgpool: test_db.pool, + }; let (receipts, _rxs) = attach_oneshot_channels(receipts); let res = context.process_db_receipts(receipts).await.unwrap(); @@ -462,18 +461,26 @@ mod tests { mod when_horizon_migrations_are_ignored { use super::*; - #[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")] - async fn test_empty_receipts_are_processed_successfully(pgpool: PgPool) { - let context = InnerContext { pgpool }; + #[tokio::test] + async fn test_empty_receipts_are_processed_successfully() { + let migrator = create_migrator(); + let test_db = test_assets::setup_test_db_with_migrator(migrator).await; + let context = InnerContext { + pgpool: test_db.pool, + }; let res = context.process_db_receipts(vec![]).await.unwrap(); assert_eq!(res, ProcessedReceipt::None); } - #[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")] - async fn test_v1_receipts_are_processed_successfully(pgpool: PgPool) { - let context = InnerContext { pgpool }; + #[tokio::test] + async fn test_v1_receipts_are_processed_successfully() { + let migrator = create_migrator(); + let test_db = test_assets::setup_test_db_with_migrator(migrator).await; + let context = InnerContext { + pgpool: test_db.pool, + }; let v1 = create_v1().await; @@ -488,14 +495,20 @@ mod tests { #[rstest::rstest] #[case(async { vec![create_v2().await] })] #[case(async { vec![create_v2().await, create_v1().await] })] - #[sqlx::test(migrator = "WITHOUT_HORIZON_MIGRATIONS")] + #[tokio::test] async fn test_cases_with_v2_receipts_fails_to_process( - #[ignore] pgpool: PgPool, #[future(awt)] #[case] receipts: Vec, ) { - let context = InnerContext { pgpool }; + // Create a database without horizon migrations by running a custom migrator + // that excludes horizon-related migrations + let migrator = create_migrator(); + let test_db = test_assets::setup_test_db_with_migrator(migrator).await; + + let context = InnerContext { + pgpool: test_db.pool, + }; let (receipts, _rxs) = attach_oneshot_channels(receipts); let error = context.process_db_receipts(receipts).await.unwrap_err(); @@ -511,8 +524,6 @@ mod tests { ); } - pub static WITHOUT_HORIZON_MIGRATIONS: LazyLock = LazyLock::new(create_migrator); - pub fn create_migrator() -> Migrator { futures::executor::block_on(Migrator::new(MigrationRunner::new( "../../migrations", diff --git a/crates/service/tests/router_test.rs b/crates/service/tests/router_test.rs index 8c4691a54..19884d3c5 100644 --- a/crates/service/tests/router_test.rs +++ b/crates/service/tests/router_test.rs @@ -12,7 +12,6 @@ use indexer_service_rs::{ QueryBody, }; use reqwest::{Method, StatusCode, Url}; -use sqlx::PgPool; use test_assets::{ create_signed_receipt, SignedReceiptRequest, INDEXER_ALLOCATIONS, TAP_EIP712_DOMAIN, }; @@ -24,8 +23,10 @@ use wiremock::{ Mock, MockServer, ResponseTemplate, }; -#[sqlx::test(migrations = "../../migrations")] -async fn full_integration_test(database: PgPool) { +#[tokio::test] +async fn full_integration_test() { + let test_db = test_assets::setup_shared_test_db().await; + let database = test_db.pool; let http_client = reqwest::Client::builder() .tcp_nodelay(true) .build() diff --git a/crates/tap-agent/Cargo.toml b/crates/tap-agent/Cargo.toml index b5cb7daf7..213b9dc91 100644 --- a/crates/tap-agent/Cargo.toml +++ b/crates/tap-agent/Cargo.toml @@ -58,8 +58,8 @@ educe.workspace = true # import the current crate. For testing we import the current crate with the `test` # feature enabled in order to enable test-only infrastructure within our app when running tests. my-crate = { package = "indexer-tap-agent", path = ".", features = ["test"] } -serial_test.workspace = true tempfile.workspace = true +serial_test.workspace = true wiremock.workspace = true wiremock-grpc.workspace = true test-assets = { path = "../test-assets" } diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 35226fd45..c1a08e8e8 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -606,6 +606,17 @@ impl State { .set(unaggregated_fees.value as f64); } + /// Determines whether the sender should be denied/blocked based on current fees and balance. + /// + /// The deny condition is reached when either: + /// 1. Total potential fees (pending RAVs + unaggregated fees) exceed the sender's balance + /// 2. Total risky fees (unaggregated + invalid) exceed max_amount_willing_to_lose + /// + /// When a successful RAV request clears unaggregated fees, this function should return + /// false, indicating the deny condition is resolved and retries can stop. + /// + /// This is the core logic that determines when the retry mechanism should continue + /// versus when it should stop after successful RAV processing. fn deny_condition_reached(&self) -> bool { let pending_ravs = self.rav_tracker.get_total_fee(); let unaggregated_fees = self.sender_fee_tracker.get_total_fee(); @@ -1254,12 +1265,19 @@ impl Actor for SenderAccount { } } + // Retry logic: Check if the deny condition is still met after RAV processing + // This is crucial for stopping retries when RAV requests successfully resolve + // the underlying issue (e.g., clearing unaggregated fees). match (state.denied, state.deny_condition_reached()) { - // Allow the sender right after the potential RAV request. This way, the - // sender can be allowed again as soon as possible if the RAV was successful. + // Case: Sender was denied BUT deny condition no longer met + // This happens when a successful RAV request clears unaggregated fees, + // reducing total_potential_fees below the balance threshold. + // Action: Remove from denylist and stop retrying. (true, false) => state.remove_from_denylist().await, - // if couldn't remove from denylist, resend the message in 30 seconds - // this may trigger another rav request + + // Case: Sender still denied AND deny condition still met + // This happens when RAV requests fail or don't sufficiently reduce fees. + // Action: Schedule another retry to attempt RAV creation again. (true, true) => { // retry in a moment state.scheduled_rav_request = @@ -1548,10 +1566,8 @@ pub mod tests { use indexer_monitor::EscrowAccounts; use ractor::{call, Actor, ActorRef, ActorStatus}; use serde_json::json; - use serial_test::serial; - use sqlx::PgPool; use test_assets::{ - flush_messages, pgpool, ALLOCATION_ID_0, ALLOCATION_ID_1, TAP_SENDER as SENDER, + flush_messages, ALLOCATION_ID_0, ALLOCATION_ID_1, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER, }; use thegraph_core::{ @@ -1582,8 +1598,7 @@ pub mod tests { const BUFFER_DURATION: Duration = Duration::from_millis(100); const RETRY_DURATION: Duration = Duration::from_millis(1000); - #[rstest::fixture] - async fn mock_escrow_subgraph() -> MockServer { + async fn setup_mock_escrow_subgraph() -> MockServer { let mock_escrow_subgraph_server: MockServer = MockServer::start().await; mock_escrow_subgraph_server .register( @@ -1605,24 +1620,11 @@ pub mod tests { prefix: String, } - #[rstest::fixture] - async fn basic_sender_account(#[future(awt)] pgpool: PgPool) -> TestSenderAccount { - let (sender_account, msg_receiver, prefix, _) = - create_sender_account().pgpool(pgpool).call().await; - TestSenderAccount { - sender_account, - msg_receiver, - prefix, - } - } - - #[rstest::rstest] - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_update_allocation_ids( - #[ignore] pgpool: PgPool, - #[future(awt)] mock_escrow_subgraph: MockServer, - ) { + #[tokio::test] + async fn test_update_allocation_ids() { + let mock_escrow_subgraph = setup_mock_escrow_subgraph().await; + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // Start a mock graphql server using wiremock let mock_server = MockServer::start().await; @@ -1709,13 +1711,11 @@ pub mod tests { assert!(actor_ref.is_none()); } - #[rstest::rstest] - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_new_allocation_id( - #[ignore] pgpool: PgPool, - #[future(awt)] mock_escrow_subgraph: MockServer, - ) { + #[tokio::test] + async fn test_new_allocation_id() { + let mock_escrow_subgraph = setup_mock_escrow_subgraph().await; + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // Start a mock graphql server using wiremock let mock_server = MockServer::start().await; @@ -1826,11 +1826,17 @@ pub mod tests { .as_nanos() as u64 } - #[rstest::rstest] #[tokio::test] - async fn test_update_receipt_fees_no_rav( - #[future(awt)] basic_sender_account: TestSenderAccount, - ) { + async fn test_update_receipt_fees_no_rav() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + let (sender_account, msg_receiver, prefix, _) = + create_sender_account().pgpool(pgpool).call().await; + let basic_sender_account = TestSenderAccount { + sender_account, + msg_receiver, + prefix, + }; // create a fake sender allocation let (triggered_rav_request, _, _) = create_mock_sender_allocation( basic_sender_account.prefix, @@ -1854,13 +1860,17 @@ pub mod tests { assert_not_triggered!(&triggered_rav_request); } - #[rstest::rstest] - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_update_receipt_fees_trigger_rav( - #[ignore] _pgpool: PgPool, - #[future(awt)] mut basic_sender_account: TestSenderAccount, - ) { + #[tokio::test] + async fn test_update_receipt_fees_trigger_rav() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + let (sender_account, msg_receiver, prefix, _) = + create_sender_account().pgpool(pgpool).call().await; + let mut basic_sender_account = TestSenderAccount { + sender_account, + msg_receiver, + prefix, + }; // create a fake sender allocation let (triggered_rav_request, _, _) = create_mock_sender_allocation( basic_sender_account.prefix, @@ -1896,9 +1906,10 @@ pub mod tests { assert_triggered!(&triggered_rav_request); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_counter_greater_limit_trigger_rav(pgpool: PgPool) { + #[tokio::test] + async fn test_counter_greater_limit_trigger_rav() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let (sender_account, mut msg_receiver, prefix, _) = create_sender_account() .pgpool(pgpool.clone()) .rav_request_receipt_limit(2) @@ -1947,12 +1958,11 @@ pub mod tests { } #[rstest::rstest] - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_remove_sender_account( - #[ignore] pgpool: PgPool, - #[future(awt)] mock_escrow_subgraph: MockServer, - ) { + #[tokio::test] + async fn test_remove_sender_account() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + let mock_escrow_subgraph = setup_mock_escrow_subgraph().await; let (sender_account, _, prefix, _) = create_sender_account() .pgpool(pgpool) .initial_allocation( @@ -1987,8 +1997,9 @@ pub mod tests { /// Test that the deny status is correctly loaded from the DB at the start of the actor #[rstest::rstest] #[tokio::test] - #[serial] - async fn test_init_deny(#[future(awt)] pgpool: PgPool) { + async fn test_init_deny() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; sqlx::query!( r#" INSERT INTO scalar_tap_denylist (sender_address) @@ -2019,9 +2030,25 @@ pub mod tests { assert!(deny); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_retry_unaggregated_fees(pgpool: PgPool) { + /// Tests the retry mechanism for RAV requests when a sender is blocked due to unaggregated fees. + /// + /// This test verifies that: + /// 1. When unaggregated fees exceed the allowed limit, the sender enters a retry state + /// 2. The retry mechanism triggers RAV requests to resolve the blocked condition + /// 3. When a RAV request succeeds and clears unaggregated fees, retries stop appropriately + /// + /// Key behavior tested: + /// - Sender is blocked when max_unaggregated_fees_per_sender = 0 and any fees are added + /// - First retry attempt triggers a RAV request + /// - Successful RAV request clears unaggregated fees and creates a RAV for the amount + /// - No additional retries occur since the deny condition is resolved + /// + /// This aligns with the TAP protocol where RAV creation aggregates unaggregated receipts + /// into a voucher, effectively clearing the unaggregated fees balance. + #[tokio::test] + async fn test_retry_unaggregated_fees() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // we set to zero to block the sender, no matter the fee let max_unaggregated_fees_per_sender: u128 = 0; @@ -2055,14 +2082,18 @@ pub mod tests { tokio::time::sleep(RETRY_DURATION).await; assert_triggered!(triggered_rav_request); - // wait to retry again + // Verify that no additional retry happens since the first RAV request + // successfully cleared the unaggregated fees and resolved the deny condition. + // This validates that the retry mechanism stops when the underlying issue is resolved, + // which is the correct behavior according to the TAP protocol and retry logic. tokio::time::sleep(RETRY_DURATION).await; - assert_triggered!(triggered_rav_request); + assert_not_triggered!(triggered_rav_request); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_deny_allow(pgpool: PgPool) { + #[tokio::test] + async fn test_deny_allow() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; async fn get_deny_status(sender_account: &ActorRef) -> bool { call!(sender_account, SenderAccountMessage::GetDeny).unwrap() } @@ -2156,9 +2187,10 @@ pub mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_initialization_with_pending_ravs_over_the_limit(pgpool: PgPool) { + #[tokio::test] + async fn test_initialization_with_pending_ravs_over_the_limit() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // add last non-final ravs let signed_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 4, ESCROW_VALUE); store_rav_with_options() @@ -2181,9 +2213,10 @@ pub mod tests { assert!(deny); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_unaggregated_fees_over_balance(pgpool: PgPool) { + #[tokio::test] + async fn test_unaggregated_fees_over_balance() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // add last non-final ravs let signed_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 4, ESCROW_VALUE / 2); store_rav_with_options() @@ -2285,9 +2318,10 @@ pub mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_trusted_sender(pgpool: PgPool) { + #[tokio::test] + async fn test_trusted_sender() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let max_amount_willing_to_lose_grt = ESCROW_VALUE / 10; // initialize with no trigger value and no max receipt deny let (sender_account, mut msg_receiver, prefix, _) = create_sender_account() @@ -2298,8 +2332,8 @@ pub mod tests { .call() .await; - let (mock_sender_allocation, _) = - MockSenderAllocation::new_with_next_rav_value(sender_account.clone()); + let (mock_sender_allocation, _, _) = + MockSenderAllocation::new_with_triggered_rav_request(sender_account.clone()); let name = format!("{}:{}:{}", prefix, SENDER.1, ALLOCATION_ID_0); let (allocation, _) = MockSenderAllocation::spawn(Some(name), mock_sender_allocation, ()) @@ -2356,9 +2390,10 @@ pub mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_pending_rav_already_redeemed_and_redeem(pgpool: PgPool) { + #[tokio::test] + async fn test_pending_rav_already_redeemed_and_redeem() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // Start a mock graphql server using wiremock let mock_server = MockServer::start().await; @@ -2442,9 +2477,10 @@ pub mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_thawing_deposit_process(pgpool: PgPool) { + #[tokio::test] + async fn test_thawing_deposit_process() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // add last non-final ravs let signed_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 4, ESCROW_VALUE / 2); store_rav_with_options() @@ -2495,9 +2531,10 @@ pub mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_sender_denied_close_allocation_stop_retry(pgpool: PgPool) { + #[tokio::test] + async fn test_sender_denied_close_allocation_stop_retry() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // we set to 1 to block the sender on a really low value let max_unaggregated_fees_per_sender: u128 = 1; diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index 2e7bedc29..d327b118f 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -1148,10 +1148,9 @@ mod tests { use ractor::{Actor, ActorRef, ActorStatus}; use reqwest::Url; use ruint::aliases::U256; - use serial_test::serial; use sqlx::{postgres::PgListener, PgPool}; use test_assets::{ - assert_while_retry, flush_messages, pgpool, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER, + assert_while_retry, flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER, }; use thegraph_core::alloy::hex::ToHexExt; use tokio::sync::{ @@ -1192,43 +1191,26 @@ mod tests { struct TestState { prefix: String, state: State, + _test_db: test_assets::TestDatabase, } - #[rstest::fixture] - async fn state(#[future(awt)] pgpool: PgPool) -> TestState { - let (prefix, state) = create_state(pgpool.clone()).await; - TestState { prefix, state } - } - #[rstest::fixture] - async fn receipts(#[future(awt)] pgpool: PgPool) { - for i in 1..=10 { - let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into()); - store_receipt(&pgpool.clone(), receipt.signed_receipt()) - .await - .unwrap(); + async fn setup_state() -> TestState { + let test_db = test_assets::setup_shared_test_db().await; + let (prefix, state) = create_state(test_db.pool.clone()).await; + TestState { + prefix, + state, + _test_db: test_db, } } - #[rstest::fixture] - async fn supervisor() -> ActorRef<()> { + async fn setup_supervisor() -> ActorRef<()> { DummyActor::spawn().await } - #[rstest::fixture] - pub async fn pglistener(#[future(awt)] pgpool: PgPool) -> PgListener { - let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap(); - pglistener - .listen("scalar_tap_receipt_notification") - .await - .expect( - "should be able to subscribe to Postgres Notify events on the channel \ - 'scalar_tap_receipt_notification'", - ); - pglistener - } - - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_create_sender_accounts_manager(pgpool: PgPool) { + #[tokio::test] + async fn test_create_sender_accounts_manager() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let (_, _, (actor, join_handle)) = create_sender_accounts_manager().pgpool(pgpool).call().await; actor.stop_and_wait(None, None).await.unwrap(); @@ -1265,10 +1247,10 @@ mod tests { ) } - #[rstest::rstest] #[tokio::test] - #[serial] - async fn test_pending_sender_allocations(#[future(awt)] pgpool: PgPool) { + async fn test_pending_sender_allocations() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let (_, state) = create_state(pgpool.clone()).await; // add receipts to the database for i in 1..=10 { @@ -1289,9 +1271,10 @@ mod tests { assert_eq!(pending_allocation_id.get(&SENDER.1).unwrap().len(), 2); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_update_sender_account(pgpool: PgPool) { + #[tokio::test] + async fn test_update_sender_account() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let (prefix, mut notify, (actor, join_handle)) = create_sender_accounts_manager().pgpool(pgpool).call().await; @@ -1338,13 +1321,10 @@ mod tests { join_handle.await.unwrap(); } - #[rstest::rstest] #[tokio::test] - #[serial] - async fn test_create_sender_account( - #[future(awt)] state: TestState, - #[future(awt)] supervisor: ActorRef<()>, - ) { + async fn test_create_sender_account() { + let state = setup_state().await; + let supervisor = setup_supervisor().await; // we wait to check if the sender is created state .state @@ -1364,13 +1344,11 @@ mod tests { assert!(actor_ref.is_some()); } - #[rstest::rstest] #[tokio::test] - #[serial] - async fn test_deny_sender_account_on_failure( - #[future(awt)] pgpool: PgPool, - #[future(awt)] supervisor: ActorRef<()>, - ) { + async fn test_deny_sender_account_on_failure() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + let supervisor = DummyActor::spawn().await; let (_prefix, state) = create_state(pgpool.clone()).await; state .create_or_deny_sender( @@ -1400,10 +1378,10 @@ mod tests { assert!(denied, "Sender was not denied after failing."); } - #[rstest::rstest] #[tokio::test] - #[serial] - async fn test_receive_notifications(#[future(awt)] pgpool: PgPool) { + async fn test_receive_notifications() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let prefix = generate_random_prefix(); // create dummy allocation @@ -1471,10 +1449,10 @@ mod tests { new_receipts_watcher_handle.abort(); } - #[rstest::rstest] #[tokio::test] - #[serial] - async fn test_manager_killed_in_database_connection(#[future(awt)] pgpool: PgPool) { + async fn test_manager_killed_in_database_connection() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let mut pglistener = PgListener::connect_with(&pgpool).await.unwrap(); pglistener .listen("scalar_tap_receipt_notification") @@ -1503,7 +1481,6 @@ mod tests { } #[tokio::test] - #[serial] async fn test_create_allocation_id() { let senders_to_signers = vec![(SENDER.1, vec![SIGNER.1])].into_iter().collect(); let escrow_accounts = EscrowAccounts::new(HashMap::new(), senders_to_signers); diff --git a/crates/tap-agent/src/agent/sender_allocation.rs b/crates/tap-agent/src/agent/sender_allocation.rs index a5d2f3f11..4a6fff3ba 100644 --- a/crates/tap-agent/src/agent/sender_allocation.rs +++ b/crates/tap-agent/src/agent/sender_allocation.rs @@ -1304,7 +1304,6 @@ pub mod tests { use ractor::{call, cast, Actor, ActorRef, ActorStatus}; use ruint::aliases::U256; use serde_json::json; - use serial_test::serial; use sqlx::PgPool; use tap_aggregator::grpc::v1::{tap_aggregator_client::TapAggregatorClient, RavResponse}; use tap_core::receipt::{ @@ -1312,7 +1311,7 @@ pub mod tests { Context, }; use test_assets::{ - flush_messages, pgpool, ALLOCATION_ID_0, TAP_EIP712_DOMAIN as TAP_EIP712_DOMAIN_SEPARATOR, + flush_messages, ALLOCATION_ID_0, TAP_EIP712_DOMAIN as TAP_EIP712_DOMAIN_SEPARATOR, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER, }; use thegraph_core::AllocationId as AllocationIdCore; @@ -1348,20 +1347,47 @@ pub mod tests { mock_escrow_subgraph().await } + #[rstest::fixture] + async fn pgpool() -> test_assets::TestDatabase { + test_assets::setup_shared_test_db().await + } + + struct StateWithContainer { + state: SenderAllocationState, + _test_db: test_assets::TestDatabase, + } + + impl std::ops::Deref for StateWithContainer { + type Target = SenderAllocationState; + fn deref(&self) -> &Self::Target { + &self.state + } + } + + impl std::ops::DerefMut for StateWithContainer { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.state + } + } + #[rstest::fixture] async fn state( - #[future(awt)] pgpool: PgPool, + #[future(awt)] pgpool: test_assets::TestDatabase, #[future(awt)] mock_escrow_subgraph_server: (MockServer, MockGuard), - ) -> SenderAllocationState { + ) -> StateWithContainer { let (mock_escrow_subgraph_server, _mock_escrow_subgraph_guard) = mock_escrow_subgraph_server; let args = create_sender_allocation_args() - .pgpool(pgpool.clone()) + .pgpool(pgpool.pool.clone()) .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.uri()) .call() .await; - SenderAllocationState::new(args).await.unwrap() + let state = SenderAllocationState::new(args).await.unwrap(); + StateWithContainer { + state, + _test_db: pgpool, + } } async fn mock_escrow_subgraph() -> (MockServer, MockGuard) { @@ -1472,22 +1498,21 @@ pub mod tests { #[rstest::rstest] #[tokio::test] - #[serial] async fn should_update_unaggregated_fees_on_start( - #[future(awt)] pgpool: PgPool, + #[future(awt)] pgpool: test_assets::TestDatabase, #[future[awt]] mock_escrow_subgraph_server: (MockServer, MockGuard), ) { let (mut last_message_emitted, sender_account) = create_mock_sender_account().await; // Add receipts to the database. for i in 1..=10 { let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into()); - store_receipt(&pgpool, receipt.signed_receipt()) + store_receipt(&pgpool.pool, receipt.signed_receipt()) .await .unwrap(); } let (sender_allocation, _notify) = create_sender_allocation() - .pgpool(pgpool.clone()) + .pgpool(pgpool.pool.clone()) .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.0.uri()) .sender_account(sender_account) .call() @@ -1509,22 +1534,21 @@ pub mod tests { #[rstest::rstest] #[tokio::test] - #[serial] async fn should_return_invalid_receipts_on_startup( - #[future(awt)] pgpool: PgPool, + #[future(awt)] pgpool: test_assets::TestDatabase, #[future[awt]] mock_escrow_subgraph_server: (MockServer, MockGuard), ) { let (mut message_receiver, sender_account) = create_mock_sender_account().await; // Add receipts to the database. for i in 1..=10 { let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into()); - store_invalid_receipt(&pgpool, receipt.signed_receipt()) + store_invalid_receipt(&pgpool.pool, receipt.signed_receipt()) .await .unwrap(); } let (sender_allocation, _notify) = create_sender_allocation() - .pgpool(pgpool.clone()) + .pgpool(pgpool.pool.clone()) .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.0.uri()) .sender_account(sender_account) .call() @@ -1549,15 +1573,14 @@ pub mod tests { #[rstest::rstest] #[tokio::test] - #[serial] async fn test_receive_new_receipt( - #[future(awt)] pgpool: PgPool, + #[future(awt)] pgpool: test_assets::TestDatabase, #[future[awt]] mock_escrow_subgraph_server: (MockServer, MockGuard), ) { let (mut message_receiver, sender_account) = create_mock_sender_account().await; let (sender_allocation, mut msg_receiver) = create_sender_allocation() - .pgpool(pgpool.clone()) + .pgpool(pgpool.pool.clone()) .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.0.uri()) .sender_account(sender_account) .call() @@ -1611,9 +1634,10 @@ pub mod tests { assert_eq!(last_message_emitted, expected_message); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_trigger_rav_request(pgpool: PgPool) { + #[tokio::test] + async fn test_trigger_rav_request() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // Start a mock graphql server using wiremock let mock_server = MockServer::start().await; @@ -1731,9 +1755,10 @@ pub mod tests { insta::assert_debug_snapshot!(startup_msg); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_several_receipts_rav_request(pgpool: PgPool) { + #[tokio::test] + async fn test_several_receipts_rav_request() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; const AMOUNT_OF_RECEIPTS: u64 = 1000; execute(pgpool, |pgpool| async move { // Add receipts to the database. @@ -1749,9 +1774,10 @@ pub mod tests { .await; } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_several_receipts_batch_insert_rav_request(pgpool: PgPool) { + #[tokio::test] + async fn test_several_receipts_batch_insert_rav_request() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // Add batch receipts to the database. const AMOUNT_OF_RECEIPTS: u64 = 1000; execute(pgpool, |pgpool| async move { @@ -1770,16 +1796,15 @@ pub mod tests { #[rstest::rstest] #[tokio::test] - #[serial] async fn test_close_allocation_no_pending_fees( - #[future(awt)] pgpool: PgPool, + #[future(awt)] pgpool: test_assets::TestDatabase, #[future[awt]] mock_escrow_subgraph_server: (MockServer, MockGuard), ) { let (mut message_receiver, sender_account) = create_mock_sender_account().await; // create allocation let (sender_allocation, _notify) = create_sender_allocation() - .pgpool(pgpool.clone()) + .pgpool(pgpool.pool.clone()) .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.0.uri()) .sender_account(sender_account) .call() @@ -1799,8 +1824,10 @@ pub mod tests { wiremock_grpc::generate!("tap_aggregator.v1.TapAggregator", MockTapAggregator); } - #[test_log::test(sqlx::test(migrations = "../../migrations"))] - async fn test_close_allocation_with_pending_fees(pgpool: PgPool) { + #[test_log::test(tokio::test)] + async fn test_close_allocation_with_pending_fees() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; use wiremock_gen::MockTapAggregator; let mut mock_aggregator = MockTapAggregator::start_default().await; @@ -1867,13 +1894,12 @@ pub mod tests { #[rstest::rstest] #[tokio::test] - #[serial] async fn should_return_unaggregated_fees_without_rav( - #[future(awt)] pgpool: PgPool, + #[future(awt)] pgpool: test_assets::TestDatabase, #[future[awt]] mock_escrow_subgraph_server: (MockServer, MockGuard), ) { let args = create_sender_allocation_args() - .pgpool(pgpool.clone()) + .pgpool(pgpool.pool.clone()) .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.0.uri()) .call() .await; @@ -1882,7 +1908,7 @@ pub mod tests { // Add receipts to the database. for i in 1..10 { let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into()); - store_receipt(&pgpool, receipt.signed_receipt()) + store_receipt(&pgpool.pool, receipt.signed_receipt()) .await .unwrap(); } @@ -1896,13 +1922,12 @@ pub mod tests { #[rstest::rstest] #[tokio::test] - #[serial] async fn should_calculate_invalid_receipts_fee( - #[future(awt)] pgpool: PgPool, + #[future(awt)] pgpool: test_assets::TestDatabase, #[future[awt]] mock_escrow_subgraph_server: (MockServer, MockGuard), ) { let args = create_sender_allocation_args() - .pgpool(pgpool.clone()) + .pgpool(pgpool.pool.clone()) .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.0.uri()) .call() .await; @@ -1911,7 +1936,7 @@ pub mod tests { // Add receipts to the database. for i in 1..10 { let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into()); - store_invalid_receipt(&pgpool, receipt.signed_receipt()) + store_invalid_receipt(&pgpool.pool, receipt.signed_receipt()) .await .unwrap(); } @@ -1931,13 +1956,12 @@ pub mod tests { /// than the RAV's timestamp. #[rstest::rstest] #[tokio::test] - #[serial] async fn should_return_unaggregated_fees_with_rav( - #[future(awt)] pgpool: PgPool, + #[future(awt)] pgpool: test_assets::TestDatabase, #[future[awt]] mock_escrow_subgraph_server: (MockServer, MockGuard), ) { let args = create_sender_allocation_args() - .pgpool(pgpool.clone()) + .pgpool(pgpool.pool.clone()) .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.0.uri()) .call() .await; @@ -1946,12 +1970,12 @@ pub mod tests { // This RAV has timestamp 4. The sender_allocation should only consider receipts // with a timestamp greater than 4. let signed_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10); - store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); + store_rav(&pgpool.pool, signed_rav, SENDER.1).await.unwrap(); // Add receipts to the database. for i in 1..10 { let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into()); - store_receipt(&pgpool, receipt.signed_receipt()) + store_receipt(&pgpool.pool, receipt.signed_receipt()) .await .unwrap(); } @@ -1964,8 +1988,7 @@ pub mod tests { #[rstest::rstest] #[tokio::test] - #[serial] - async fn test_store_failed_rav(#[future[awt]] state: SenderAllocationState) { + async fn test_store_failed_rav(#[future[awt]] state: StateWithContainer) { let signed_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10); // just unit test if it is working @@ -1978,8 +2001,7 @@ pub mod tests { #[rstest::rstest] #[tokio::test] - #[serial] - async fn test_store_invalid_receipts(#[future[awt]] mut state: SenderAllocationState) { + async fn test_store_invalid_receipts(#[future[awt]] mut state: StateWithContainer) { struct FailingCheck; #[async_trait::async_trait] @@ -2022,8 +2044,7 @@ pub mod tests { #[rstest::rstest] #[tokio::test] - #[serial] - async fn test_mark_rav_last(#[future[awt]] state: SenderAllocationState) { + async fn test_mark_rav_last(#[future[awt]] state: StateWithContainer) { // mark rav as final let result = state.mark_rav_last().await; @@ -2033,16 +2054,15 @@ pub mod tests { #[rstest::rstest] #[tokio::test] - #[serial] async fn test_failed_rav_request( - #[future(awt)] pgpool: PgPool, + #[future(awt)] pgpool: test_assets::TestDatabase, #[future[awt]] mock_escrow_subgraph_server: (MockServer, MockGuard), ) { // Add receipts to the database. for i in 0..10 { let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, u64::MAX, i.into()); - store_receipt(&pgpool, receipt.signed_receipt()) + store_receipt(&pgpool.pool, receipt.signed_receipt()) .await .unwrap(); } @@ -2051,7 +2071,7 @@ pub mod tests { // Create a sender_allocation. let (sender_allocation, mut notify) = create_sender_allocation() - .pgpool(pgpool.clone()) + .pgpool(pgpool.pool.clone()) .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.0.uri()) .sender_account(sender_account) .call() @@ -2081,9 +2101,10 @@ pub mod tests { //assert_eq!(total_unaggregated_fees.value, 45u128); } - #[sqlx::test(migrations = "../../migrations")] - #[serial] - async fn test_rav_request_when_all_receipts_invalid(pgpool: PgPool) { + #[tokio::test] + async fn test_rav_request_when_all_receipts_invalid() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; // Start a mock graphql server using wiremock let mock_server = MockServer::start().await; diff --git a/crates/tap-agent/src/tap/context/rav.rs b/crates/tap-agent/src/tap/context/rav.rs index 00f9d4b8a..866d10676 100644 --- a/crates/tap-agent/src/tap/context/rav.rs +++ b/crates/tap-agent/src/tap/context/rav.rs @@ -323,7 +323,6 @@ mod test { use indexer_monitor::EscrowAccounts; use rstest::rstest; - use sqlx::PgPool; use tap_core::signed_message::Eip712SignedMessage; use test_assets::TAP_SIGNER as SIGNER; use tokio::sync::watch; @@ -349,31 +348,52 @@ mod test { const TIMESTAMP_NS: u64 = u64::MAX - 10; const VALUE_AGGREGATE: u128 = u128::MAX; - async fn legacy_adapter(pgpool: PgPool) -> TapAgentContext { - TapAgentContext::builder() - .pgpool(pgpool) + struct TestContextWithContainer { + context: TapAgentContext, + _test_db: test_assets::TestDatabase, + } + + impl std::ops::Deref for TestContextWithContainer { + type Target = TapAgentContext; + fn deref(&self) -> &Self::Target { + &self.context + } + } + + async fn legacy_adapter_with_testcontainers() -> TestContextWithContainer { + let test_db = test_assets::setup_shared_test_db().await; + let context = TapAgentContext::builder() + .pgpool(test_db.pool.clone()) .escrow_accounts(watch::channel(EscrowAccounts::default()).1) - .build() + .build(); + TestContextWithContainer { + context, + _test_db: test_db, + } } - async fn horizon_adapter(pgpool: PgPool) -> TapAgentContext { - TapAgentContext::builder() - .pgpool(pgpool) + async fn horizon_adapter_with_testcontainers() -> TestContextWithContainer { + let test_db = test_assets::setup_shared_test_db().await; + let context = TapAgentContext::builder() + .pgpool(test_db.pool.clone()) .escrow_accounts(watch::channel(EscrowAccounts::default()).1) - .build() + .build(); + TestContextWithContainer { + context, + _test_db: test_db, + } } /// Insert a single receipt and retrieve it from the database using the adapter. /// The point here it to test the deserialization of large numbers. #[rstest] - #[case(legacy_adapter(_pgpool.clone()))] - #[case(horizon_adapter(_pgpool.clone()))] - #[sqlx::test(migrations = "../../migrations")] + #[case(legacy_adapter_with_testcontainers())] + #[case(horizon_adapter_with_testcontainers())] + #[tokio::test] async fn update_and_retrieve_rav( - #[ignore] _pgpool: PgPool, #[case] #[future(awt)] - context: TapAgentContext, + context: TestContextWithContainer, ) where T: CreateRav + std::fmt::Debug, TapAgentContext: RavRead + RavStore, diff --git a/crates/tap-agent/src/tap/context/receipt.rs b/crates/tap-agent/src/tap/context/receipt.rs index 6997d67b2..fffc51d8e 100644 --- a/crates/tap-agent/src/tap/context/receipt.rs +++ b/crates/tap-agent/src/tap/context/receipt.rs @@ -391,7 +391,7 @@ mod test { use bigdecimal::{num_bigint::ToBigInt, ToPrimitive}; use indexer_monitor::EscrowAccounts; - use rstest::{fixture, rstest}; + use rstest::fixture; use sqlx::PgPool; use tap_core::{ manager::adapters::{ReceiptDelete, ReceiptRead}, @@ -446,17 +446,28 @@ mod test { /// Insert a single receipt and retrieve it from the database using the adapter. /// The point here it to test the deserialization of large numbers. - #[rstest] - #[case(legacy_adapter(_pgpool.clone(), _escrow.clone()))] - #[case(horizon_adapter(_pgpool.clone(), _escrow.clone()))] - #[sqlx::test(migrations = "../../migrations")] - async fn insert_and_retrieve_single_receipt( - #[ignore] _pgpool: PgPool, - #[from(escrow_accounts)] _escrow: Receiver, - #[case] - #[future(awt)] - context: TapAgentContext, - ) where + #[tokio::test] + async fn insert_and_retrieve_single_receipt_legacy() { + // Set up test database with testcontainers + let test_db = test_assets::setup_shared_test_db().await; + let escrow_accounts = escrow_accounts(); + let context = legacy_adapter(test_db.pool, escrow_accounts).await; + + insert_and_retrieve_single_receipt_impl(context).await; + } + + #[tokio::test] + async fn insert_and_retrieve_single_receipt_horizon() { + // Set up test database with testcontainers + let test_db = test_assets::setup_shared_test_db().await; + let escrow_accounts = escrow_accounts(); + let context = horizon_adapter(test_db.pool, escrow_accounts).await; + + insert_and_retrieve_single_receipt_impl(context).await; + } + + async fn insert_and_retrieve_single_receipt_impl(context: TapAgentContext) + where T: CreateReceipt, TapAgentContext: ReceiptRead + ReceiptDelete, { @@ -824,23 +835,37 @@ mod test { } } - #[rstest] - #[case(legacy_adapter(_pgpool.clone(), _escrow.clone()))] - #[case(horizon_adapter(_pgpool.clone(), _escrow.clone()))] - #[sqlx::test(migrations = "../../migrations")] - async fn retrieve_receipts_with_limit( - #[ignore] _pgpool: PgPool, - #[from(escrow_accounts)] _escrow: Receiver, - #[case] - #[future(awt)] + struct TestContextWithContainer { context: TapAgentContext, - ) where - T: CreateReceipt, - TapAgentContext: ReceiptRead + ReceiptDelete, - { + _test_db: test_assets::TestDatabase, + } + + impl std::ops::Deref for TestContextWithContainer { + type Target = TapAgentContext; + fn deref(&self) -> &Self::Target { + &self.context + } + } + + #[tokio::test] + async fn retrieve_receipts_with_limit_legacy() { + let test_db = test_assets::setup_shared_test_db().await; + let escrow_accounts = watch::channel(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(1000))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .1; + let context_wrapper: TestContextWithContainer = TestContextWithContainer { + context: TapAgentContext::builder() + .pgpool(test_db.pool.clone()) + .escrow_accounts(escrow_accounts) + .build(), + _test_db: test_db, + }; + let context = &context_wrapper.context; // Creating 100 receipts with timestamps 42 to 141 for i in 0..100 { - let receipt = T::create_received_receipt( + let receipt = Legacy::create_received_receipt( ALLOCATION_ID_0, &SIGNER.0, i + 684, @@ -866,7 +891,7 @@ mod test { // add a copy in the same timestamp for i in 0..100 { - let receipt = T::create_received_receipt( + let receipt = Legacy::create_received_receipt( ALLOCATION_ID_0, &SIGNER.0, i + 684, @@ -891,24 +916,96 @@ mod test { assert_eq!(recovered_received_receipt_vec.len(), 49); } - #[rstest] - #[case(legacy_adapter(pgpool.clone(), escrow_accounts.clone()))] - #[case(horizon_adapter(pgpool.clone(), escrow_accounts.clone()))] - #[sqlx::test(migrations = "../../migrations")] - async fn retrieve_receipts_in_timestamp_range( - #[ignore] pgpool: PgPool, - #[from(escrow_accounts)] escrow_accounts: Receiver, - #[case] - #[future(awt)] - context: TapAgentContext, - ) where - T: CreateReceipt, - TapAgentContext: ReceiptRead + ReceiptDelete, - { + #[tokio::test] + async fn retrieve_receipts_with_limit_horizon() { + let test_db = test_assets::setup_shared_test_db().await; + let escrow_accounts = watch::channel(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(1000))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .1; + let context_wrapper: TestContextWithContainer = TestContextWithContainer { + context: TapAgentContext::builder() + .pgpool(test_db.pool.clone()) + .escrow_accounts(escrow_accounts) + .build(), + _test_db: test_db, + }; + let context = &context_wrapper.context; + + // Creating 100 receipts with timestamps 42 to 141 + for i in 0..100 { + let receipt = Horizon::create_received_receipt( + ALLOCATION_ID_0, + &SIGNER.0, + i + 684, + i + 42, + (i + 124).into(), + ); + store_receipt(&context.pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + + let recovered_received_receipt_vec = context + .retrieve_receipts_in_timestamp_range(0..141, Some(10)) + .await + .unwrap(); + assert_eq!(recovered_received_receipt_vec.len(), 10); + + let recovered_received_receipt_vec = context + .retrieve_receipts_in_timestamp_range(0..141, Some(50)) + .await + .unwrap(); + assert_eq!(recovered_received_receipt_vec.len(), 50); + + // add a copy in the same timestamp + for i in 0..100 { + let receipt = Horizon::create_received_receipt( + ALLOCATION_ID_0, + &SIGNER.0, + i + 684, + i + 43, + (i + 124).into(), + ); + store_receipt(&context.pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + + let recovered_received_receipt_vec = context + .retrieve_receipts_in_timestamp_range(0..141, Some(10)) + .await + .unwrap(); + assert_eq!(recovered_received_receipt_vec.len(), 9); + + let recovered_received_receipt_vec = context + .retrieve_receipts_in_timestamp_range(0..141, Some(50)) + .await + .unwrap(); + assert_eq!(recovered_received_receipt_vec.len(), 49); + } + + #[tokio::test] + async fn retrieve_receipts_in_timestamp_range_legacy() { + let test_db = test_assets::setup_shared_test_db().await; + let escrow_accounts = watch::channel(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(1000))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .1; + let context_wrapper: TestContextWithContainer = TestContextWithContainer { + context: TapAgentContext::builder() + .pgpool(test_db.pool.clone()) + .escrow_accounts(escrow_accounts.clone()) + .build(), + _test_db: test_db, + }; + let context = &context_wrapper.context; // Creating 10 receipts with timestamps 42 to 51 let mut received_receipt_vec = Vec::new(); for i in 0..10 { - received_receipt_vec.push(T::create_received_receipt( + received_receipt_vec.push(Legacy::create_received_receipt( ALLOCATION_ID_0, &SIGNER.0, i + 684, @@ -917,14 +1014,14 @@ mod test { )); // Adding irrelevant receipts to make sure they are not retrieved - received_receipt_vec.push(T::create_received_receipt( + received_receipt_vec.push(Legacy::create_received_receipt( ALLOCATION_ID_IRRELEVANT, &SIGNER.0, i + 684, i + 42, (i + 124).into(), )); - received_receipt_vec.push(T::create_received_receipt( + received_receipt_vec.push(Legacy::create_received_receipt( ALLOCATION_ID_0, &SENDER_IRRELEVANT.0, i + 684, @@ -937,7 +1034,7 @@ mod test { let mut received_receipt_id_vec = Vec::new(); for received_receipt in received_receipt_vec.iter() { received_receipt_id_vec.push( - store_receipt(&pgpool, received_receipt.signed_receipt()) + store_receipt(&context.pgpool, received_receipt.signed_receipt()) .await .unwrap(), ); @@ -1016,24 +1113,272 @@ mod test { } } - #[rstest] - #[case(legacy_adapter(_pgpool.clone(), escrow_accounts.clone()))] - #[case(horizon_adapter(_pgpool.clone(), escrow_accounts.clone()))] - #[sqlx::test(migrations = "../../migrations")] - async fn remove_receipts_in_timestamp_range( - #[ignore] _pgpool: PgPool, - #[from(escrow_accounts)] escrow_accounts: Receiver, - #[case] - #[future(awt)] - context: TapAgentContext, - ) where - T: CreateReceipt + RemoveRange, - TapAgentContext: ReceiptRead + ReceiptDelete, - { + #[tokio::test] + async fn retrieve_receipts_in_timestamp_range_horizon() { + let test_db = test_assets::setup_shared_test_db().await; + let escrow_accounts = watch::channel(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(1000))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .1; + let context_wrapper: TestContextWithContainer = TestContextWithContainer { + context: TapAgentContext::builder() + .pgpool(test_db.pool.clone()) + .escrow_accounts(escrow_accounts.clone()) + .build(), + _test_db: test_db, + }; + let context = &context_wrapper.context; + + // Creating 10 receipts with timestamps 42 to 51 + let mut received_receipt_vec = Vec::new(); + for i in 0..10 { + received_receipt_vec.push(Horizon::create_received_receipt( + ALLOCATION_ID_0, + &SIGNER.0, + i + 684, + i + 42, + (i + 124).into(), + )); + + // Adding irrelevant receipts to make sure they are not retrieved + received_receipt_vec.push(Horizon::create_received_receipt( + ALLOCATION_ID_IRRELEVANT, + &SIGNER.0, + i + 684, + i + 42, + (i + 124).into(), + )); + received_receipt_vec.push(Horizon::create_received_receipt( + ALLOCATION_ID_0, + &SENDER_IRRELEVANT.0, + i + 684, + i + 42, + (i + 124).into(), + )); + } + + // Storing the receipts + let mut received_receipt_id_vec = Vec::new(); + for received_receipt in received_receipt_vec.iter() { + received_receipt_id_vec.push( + store_receipt(&context.pgpool, received_receipt.signed_receipt()) + .await + .unwrap(), + ); + } + + macro_rules! test_ranges{ + ($($arg: expr), +) => { + { + $( + assert!( + retrieve_range_and_check(&context, escrow_accounts.clone(), &received_receipt_vec, $arg) + .await + .is_ok()); + )+ + } + }; + } + + #[allow(clippy::reversed_empty_ranges)] + { + test_ranges!( + .., + ..41, + ..42, + ..43, + ..50, + ..51, + ..52, + ..=41, + ..=42, + ..=43, + ..=50, + ..=51, + ..=52, + 21..=41, + 21..=42, + 21..=43, + 21..=50, + 21..=51, + 21..=52, + 41..=41, + 41..=42, + 41..=43, + 41..=50, + 50..=48, + 41..=51, + 41..=52, + 51..=51, + 51..=52, + 21..41, + 21..42, + 21..43, + 21..50, + 21..51, + 21..52, + 41..41, + 41..42, + 41..43, + 41..50, + 50..48, + 41..51, + 41..52, + 51..51, + 51..52, + 41.., + 42.., + 43.., + 50.., + 51.., + 52.., + (Bound::Excluded(42), Bound::Excluded(43)), + (Bound::Excluded(43), Bound::Excluded(43)), + (Bound::Excluded(43), Bound::Excluded(44)), + (Bound::Excluded(43), Bound::Excluded(45)) + ); + } + } + + #[tokio::test] + async fn remove_receipts_in_timestamp_range_legacy() { + let test_db = test_assets::setup_shared_test_db().await; + let escrow_accounts = watch::channel(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(1000))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .1; + let context_wrapper: TestContextWithContainer = TestContextWithContainer { + context: TapAgentContext::builder() + .pgpool(test_db.pool.clone()) + .escrow_accounts(escrow_accounts.clone()) + .build(), + _test_db: test_db, + }; + let context = &context_wrapper.context; + // Creating 10 receipts with timestamps 42 to 51 + let mut received_receipt_vec = Vec::new(); + for i in 0..10 { + received_receipt_vec.push(Legacy::create_received_receipt( + ALLOCATION_ID_0, + &SIGNER.0, + i + 684, + i + 42, + (i + 124).into(), + )); + + // Adding irrelevant receipts to make sure they are not retrieved + received_receipt_vec.push(Legacy::create_received_receipt( + ALLOCATION_ID_IRRELEVANT, + &SIGNER.0, + i + 684, + i + 42, + (i + 124).into(), + )); + received_receipt_vec.push(Legacy::create_received_receipt( + ALLOCATION_ID_0, + &SENDER_IRRELEVANT.0, + i + 684, + i + 42, + (i + 124).into(), + )); + } + + macro_rules! test_ranges{ + ($($arg: expr), +) => { + { + $( + assert!( + Legacy::remove_range_and_check(&context, escrow_accounts.clone(), &received_receipt_vec, $arg) + .await.is_ok() + ); + ) + + } + }; + } + + #[allow(clippy::reversed_empty_ranges)] + { + test_ranges!( + .., + ..41, + ..42, + ..43, + ..50, + ..51, + ..52, + ..=41, + ..=42, + ..=43, + ..=50, + ..=51, + ..=52, + 21..=41, + 21..=42, + 21..=43, + 21..=50, + 21..=51, + 21..=52, + 41..=41, + 41..=42, + 41..=43, + 41..=50, + 50..=48, + 41..=51, + 41..=52, + 51..=51, + 51..=52, + 21..41, + 21..42, + 21..43, + 21..50, + 21..51, + 21..52, + 41..41, + 41..42, + 41..43, + 41..50, + 50..48, + 41..51, + 41..52, + 51..51, + 51..52, + 41.., + 42.., + 43.., + 50.., + 51.., + 52.., + (Bound::Excluded(42), Bound::Excluded(43)), + (Bound::Excluded(43), Bound::Excluded(43)), + (Bound::Excluded(43), Bound::Excluded(44)), + (Bound::Excluded(43), Bound::Excluded(45)) + ); + } + } + + #[tokio::test] + async fn remove_receipts_in_timestamp_range_horizon() { + let test_db = test_assets::setup_shared_test_db().await; + let escrow_accounts = watch::channel(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(1000))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .1; + let context_wrapper: TestContextWithContainer = TestContextWithContainer { + context: TapAgentContext::builder() + .pgpool(test_db.pool.clone()) + .escrow_accounts(escrow_accounts.clone()) + .build(), + _test_db: test_db, + }; + let context = &context_wrapper.context; + // Creating 10 receipts with timestamps 42 to 51 let mut received_receipt_vec = Vec::new(); for i in 0..10 { - received_receipt_vec.push(T::create_received_receipt( + received_receipt_vec.push(Horizon::create_received_receipt( ALLOCATION_ID_0, &SIGNER.0, i + 684, @@ -1042,14 +1387,14 @@ mod test { )); // Adding irrelevant receipts to make sure they are not retrieved - received_receipt_vec.push(T::create_received_receipt( + received_receipt_vec.push(Horizon::create_received_receipt( ALLOCATION_ID_IRRELEVANT, &SIGNER.0, i + 684, i + 42, (i + 124).into(), )); - received_receipt_vec.push(T::create_received_receipt( + received_receipt_vec.push(Horizon::create_received_receipt( ALLOCATION_ID_0, &SENDER_IRRELEVANT.0, i + 684, @@ -1063,7 +1408,7 @@ mod test { { $( assert!( - T::remove_range_and_check(&context, escrow_accounts.clone(), &received_receipt_vec, $arg) + Horizon::remove_range_and_check(&context, escrow_accounts.clone(), &received_receipt_vec, $arg) .await.is_ok() ); ) + diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index 40b1bc627..41b6adc4d 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -755,13 +755,12 @@ pub mod actors { use std::{fmt::Debug, sync::Arc}; use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent}; - use test_assets::{ALLOCATION_ID_0, TAP_SIGNER}; + use test_assets::ALLOCATION_ID_0; use thegraph_core::{alloy::primitives::Address, AllocationId as AllocationIdCore}; use tokio::sync::{mpsc, watch, Notify}; - use super::create_rav; use crate::agent::{ - sender_account::{ReceiptFees, SenderAccountMessage}, + sender_account::{RavInformation, ReceiptFees, SenderAccountMessage}, sender_accounts_manager::{AllocationId, NewReceiptNotification}, sender_allocation::SenderAllocationMessage, unaggregated_receipts::UnaggregatedReceipts, @@ -893,11 +892,21 @@ pub mod actors { } } + /// Mock implementation of SenderAllocation for testing purposes. + /// + /// This mock simulates the behavior of a real sender allocation actor, particularly + /// for testing RAV request flows and retry mechanisms. When a RAV request is triggered, + /// it sends back a successful response that follows TAP protocol behavior: + /// + /// - Clears unaggregated fees to zero (they become part of the RAV) + /// - Creates a RAV for the full aggregated amount + /// - Properly resolves deny conditions to stop unnecessary retries + /// + /// This implementation aligns with the documented expectation: + /// "set the unnagregated fees to zero and the rav to the amount" pub struct MockSenderAllocation { triggered_rav_request: Arc, sender_actor: Option>, - - next_rav_value: watch::Receiver, next_unaggregated_fees_value: watch::Receiver, receipts: mpsc::Sender, } @@ -913,7 +922,6 @@ pub mod actors { sender_actor: Some(sender_actor), triggered_rav_request: triggered_rav_request.clone(), receipts: mpsc::channel(1).0, - next_rav_value: watch::channel(0).1, next_unaggregated_fees_value, }, triggered_rav_request, @@ -924,16 +932,15 @@ pub mod actors { pub fn new_with_next_rav_value( sender_actor: ActorRef, ) -> (Self, watch::Sender) { - let (next_rav_value_sender, next_rav_value) = watch::channel(0); + let (unaggregated_fees, next_unaggregated_fees_value) = watch::channel(0); ( Self { sender_actor: Some(sender_actor), triggered_rav_request: Arc::new(Notify::new()), receipts: mpsc::channel(1).0, - next_rav_value, - next_unaggregated_fees_value: watch::channel(0).1, + next_unaggregated_fees_value, }, - next_rav_value_sender, + unaggregated_fees, ) } @@ -945,7 +952,6 @@ pub mod actors { sender_actor: None, triggered_rav_request: Arc::new(Notify::new()), receipts: tx, - next_rav_value: watch::channel(0).1, next_unaggregated_fees_value: watch::channel(0).1, }, rx, @@ -977,21 +983,30 @@ pub mod actors { SenderAllocationMessage::TriggerRavRequest => { self.triggered_rav_request.notify_one(); if let Some(sender_account) = self.sender_actor.as_ref() { - let signed_rav = create_rav( - ALLOCATION_ID_0, - TAP_SIGNER.0.clone(), - 4, - *self.next_rav_value.borrow(), - ); + // Mock a successful RAV request response that follows TAP protocol behavior: + // 1. Aggregate unaggregated receipts into a Receipt Aggregate Voucher (RAV) + // 2. Clear unaggregated fees to zero (they're now represented in the RAV) + // 3. Create a RAV for the full aggregated amount + // + // This behavior aligns with the documented expectation: + // "set the unnagregated fees to zero and the rav to the amount" + // (see sender_account.rs test_deny_allow comment) + // + // Important: This correctly resolves the deny condition when unaggregated + // fees are cleared, which stops the retry mechanism as intended. + let current_value = *self.next_unaggregated_fees_value.borrow(); sender_account.cast(SenderAccountMessage::UpdateReceiptFees( AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)), ReceiptFees::RavRequestResponse( UnaggregatedReceipts { - value: *self.next_unaggregated_fees_value.borrow(), + value: 0, // Clear unaggregated fees - they're now in the RAV last_id: 0, counter: 0, }, - Ok(Some(signed_rav.into())), + Ok(Some(RavInformation { + allocation_id: ALLOCATION_ID_0, + value_aggregate: current_value, // RAV for the full amount + })), ), ))?; } diff --git a/crates/tap-agent/tests/sender_account_manager_test.rs b/crates/tap-agent/tests/sender_account_manager_test.rs index 841e31328..b75ad0afb 100644 --- a/crates/tap-agent/tests/sender_account_manager_test.rs +++ b/crates/tap-agent/tests/sender_account_manager_test.rs @@ -17,7 +17,6 @@ use indexer_tap_agent::{ }; use ractor::{ActorRef, ActorStatus}; use serde_json::json; -use sqlx::PgPool; use test_assets::{assert_while_retry, flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; use thegraph_core::{alloy::primitives::U256, AllocationId as AllocationIdCore}; use wiremock::{ @@ -29,8 +28,10 @@ const TRIGGER_VALUE: u128 = 100; // This test should ensure the full flow starting from // sender account manager layer to work, up to closing an allocation -#[test_log::test(sqlx::test(migrations = "../../migrations"))] -async fn sender_account_manager_layer_test(pgpool: PgPool) { +#[test_log::test(tokio::test)] +async fn sender_account_manager_layer_test() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let mock_network_subgraph_server: MockServer = MockServer::start().await; mock_network_subgraph_server .register( diff --git a/crates/tap-agent/tests/sender_account_test.rs b/crates/tap-agent/tests/sender_account_test.rs index 1b41a26b9..648922568 100644 --- a/crates/tap-agent/tests/sender_account_test.rs +++ b/crates/tap-agent/tests/sender_account_test.rs @@ -9,7 +9,6 @@ use indexer_tap_agent::{ }; use ractor::concurrency::Duration; use serde_json::json; -use sqlx::PgPool; use test_assets::{ALLOCATION_ID_0, TAP_SIGNER as SIGNER}; use thegraph_core::{alloy::hex::ToHexExt, AllocationId as AllocationIdCore}; use wiremock::{ @@ -21,8 +20,10 @@ const TRIGGER_VALUE: u128 = 500; // This test should ensure the full flow starting from // sender account layer to work, up to closing an allocation -#[sqlx::test(migrations = "../../migrations")] -async fn sender_account_layer_test(pgpool: PgPool) { +#[tokio::test] +async fn sender_account_layer_test() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let mock_server = MockServer::start().await; let mock_escrow_subgraph_server: MockServer = MockServer::start().await; mock_escrow_subgraph_server diff --git a/crates/tap-agent/tests/tap_agent_test.rs b/crates/tap-agent/tests/tap_agent_test.rs index 4a48f5cc1..30e3f163b 100644 --- a/crates/tap-agent/tests/tap_agent_test.rs +++ b/crates/tap-agent/tests/tap_agent_test.rs @@ -113,8 +113,10 @@ pub async fn start_agent( (receiver, Actor::spawn(None, actor, args).await.unwrap()) } -#[sqlx::test(migrations = "../../migrations")] -async fn test_start_tap_agent(pgpool: PgPool) { +#[tokio::test] +async fn test_start_tap_agent() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; let (mut msg_receiver, (_actor_ref, _handle)) = start_agent(pgpool.clone()).await; flush_messages(&mut msg_receiver).await; diff --git a/crates/test-assets/Cargo.toml b/crates/test-assets/Cargo.toml index f2e5f2e35..eb79b4a66 100644 --- a/crates/test-assets/Cargo.toml +++ b/crates/test-assets/Cargo.toml @@ -14,3 +14,5 @@ sqlx.workspace = true tokio.workspace = true rstest.workspace = true stdext.workspace = true +testcontainers-modules.workspace = true +tracing.workspace = true diff --git a/crates/test-assets/src/lib.rs b/crates/test-assets/src/lib.rs index b2325f5ae..bdbb1a8de 100644 --- a/crates/test-assets/src/lib.rs +++ b/crates/test-assets/src/lib.rs @@ -12,6 +12,7 @@ use std::{ use bip39::Mnemonic; use indexer_allocation::{Allocation, AllocationStatus, SubgraphDeployment}; +use sqlx::migrate::Migrator; use sqlx::{migrate, PgPool, Postgres}; use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; use tap_graph::{Receipt, SignedReceipt}; @@ -501,3 +502,178 @@ pub fn pgpool() -> Pin>> { .expect("failed to connect test pool") }) } + +// Testcontainers utilities for SQLX_OFFLINE compatibility + +use testcontainers_modules::{ + postgres, + testcontainers::{runners::AsyncRunner, ContainerAsync}, +}; + +/// Container handle returned by setup functions to keep container alive +pub struct TestDatabase { + pub pool: sqlx::PgPool, + pub url: String, + _container: ContainerAsync, +} + +/// Set up a test database using testcontainers +/// +/// This creates an isolated PostgreSQL container and database for testing. +/// The container will be kept alive as long as the returned TestDatabase +/// instance is not dropped. +pub async fn setup_shared_test_db() -> TestDatabase { + use std::sync::atomic::{AtomicU32, Ordering}; + static DB_COUNTER: AtomicU32 = AtomicU32::new(0); + + // Create unique database name for this test + let db_id = DB_COUNTER.fetch_add(1, Ordering::SeqCst); + let unique_db_name = format!("test_db_{db_id}"); + + let pg_container = postgres::Postgres::default() + .start() + .await + .expect("Failed to start PostgreSQL container"); + + let host_port = pg_container + .get_host_port_ipv4(5432) + .await + .expect("Failed to get container port"); + + // In CI environments, we might need to use the container's IP instead of localhost + let host = if std::env::var("CI").is_ok() { + pg_container + .get_host() + .await + .expect("Failed to get container host") + .to_string() + } else { + "localhost".to_string() + }; + + // Connect to postgres database first to create our test database + let admin_connection_string = + format!("postgres://postgres:postgres@{host}:{host_port}/postgres"); + + tracing::debug!( + "Attempting to connect to admin database: {}", + admin_connection_string + ); + let admin_pool = sqlx::PgPool::connect(&admin_connection_string) + .await + .expect("Failed to connect to admin database"); + + // Create unique database for this test + sqlx::query(&format!("CREATE DATABASE \"{unique_db_name}\"")) + .execute(&admin_pool) + .await + .expect("Failed to create test database"); + + // Connect to our test database + let connection_string = + format!("postgres://postgres:postgres@{host}:{host_port}/{unique_db_name}"); + let pool = sqlx::PgPool::connect(&connection_string) + .await + .expect("Failed to connect to test database"); + + // Run migrations to set up the database schema + // This matches the production architecture where indexer-agent runs migrations + sqlx::migrate!("../../migrations") + .run(&pool) + .await + .expect("Failed to run database migrations"); + + tracing::debug!( + "Isolated test PostgreSQL database created: {}", + connection_string + ); + + // Close admin pool + admin_pool.close().await; + + TestDatabase { + pool, + url: connection_string, + _container: pg_container, + } +} + +/// Set up a test database using testcontainers with a custom migrator +/// +/// This creates an isolated PostgreSQL container and database for testing, +/// using the provided migrator to run migrations. This is useful for testing +/// scenarios where only certain migrations should be applied. +pub async fn setup_test_db_with_migrator(migrator: Migrator) -> TestDatabase { + use std::sync::atomic::{AtomicU32, Ordering}; + static DB_COUNTER: AtomicU32 = AtomicU32::new(0); + + // Create unique database name for this test + let db_id = DB_COUNTER.fetch_add(1, Ordering::SeqCst); + let unique_db_name = format!("test_db_custom_{db_id}"); + + let pg_container = postgres::Postgres::default() + .start() + .await + .expect("Failed to start PostgreSQL container"); + + let host_port = pg_container + .get_host_port_ipv4(5432) + .await + .expect("Failed to get container port"); + + // In CI environments, we might need to use the container's IP instead of localhost + let host = if std::env::var("CI").is_ok() { + pg_container + .get_host() + .await + .expect("Failed to get container host") + .to_string() + } else { + "localhost".to_string() + }; + + // Connect to postgres database first to create our test database + let admin_connection_string = + format!("postgres://postgres:postgres@{host}:{host_port}/postgres"); + + tracing::debug!( + "Attempting to connect to admin database: {}", + admin_connection_string + ); + let admin_pool = sqlx::PgPool::connect(&admin_connection_string) + .await + .expect("Failed to connect to admin database"); + + // Create unique database for this test + sqlx::query(&format!("CREATE DATABASE \"{unique_db_name}\"")) + .execute(&admin_pool) + .await + .expect("Failed to create test database"); + + // Connect to our test database + let connection_string = + format!("postgres://postgres:postgres@{host}:{host_port}/{unique_db_name}"); + let pool = sqlx::PgPool::connect(&connection_string) + .await + .expect("Failed to connect to test database"); + + // Run migrations using the custom migrator + migrator + .run(&pool) + .await + .expect("Failed to run database migrations with custom migrator"); + + tracing::debug!( + "Isolated test PostgreSQL database created with custom migrator: {}", + connection_string + ); + + // Close admin pool + admin_pool.close().await; + + TestDatabase { + pool, + url: connection_string, + _container: pg_container, + } +}