diff --git a/contrib/base/Dockerfile b/contrib/base/Dockerfile index 88d9f0051..0991e1e6e 100644 --- a/contrib/base/Dockerfile +++ b/contrib/base/Dockerfile @@ -2,7 +2,7 @@ FROM debian:bookworm-slim RUN apt-get update && apt-get install -y --no-install-recommends \ openssl ca-certificates protobuf-compiler postgresql-client curl \ - jq git linux-perf \ + jq git linux-perf libsasl2-dev\ strace valgrind procps \ bpftrace linux-headers-generic \ && rm -rf /var/lib/apt/lists/* diff --git a/contrib/indexer-service/Dockerfile b/contrib/indexer-service/Dockerfile index 4e3eccd3b..b5b7ce3a9 100644 --- a/contrib/indexer-service/Dockerfile +++ b/contrib/indexer-service/Dockerfile @@ -7,7 +7,7 @@ COPY ../../ . # ENV SQLX_OFFLINE=true RUN apt-get update && apt-get install -y --no-install-recommends \ - protobuf-compiler && rm -rf /var/lib/apt/lists/* + protobuf-compiler libsasl2-dev && rm -rf /var/lib/apt/lists/* RUN cargo build --release --bin indexer-service-rs ######################################################################################## diff --git a/contrib/tap-agent/Dockerfile b/contrib/tap-agent/Dockerfile index a7693d8ca..35276edba 100644 --- a/contrib/tap-agent/Dockerfile +++ b/contrib/tap-agent/Dockerfile @@ -7,7 +7,7 @@ COPY . . # the prepared files in the `.sqlx` directory. ENV SQLX_OFFLINE=true RUN apt-get update && apt-get install -y --no-install-recommends \ - protobuf-compiler && rm -rf /var/lib/apt/lists/* + protobuf-compiler libsasl2-dev && rm -rf /var/lib/apt/lists/* RUN cargo build --release --bin indexer-tap-agent ######################################################################################## diff --git a/crates/service/src/tap.rs b/crates/service/src/tap.rs index 393c79c93..0c8cf03d5 100644 --- a/crates/service/src/tap.rs +++ b/crates/service/src/tap.rs @@ -34,7 +34,10 @@ const GRACE_PERIOD: u64 = 60; #[derive(Clone)] pub struct IndexerTapContext { domain_separator: Arc, - receipt_producer: Sender, + receipt_producer: Sender<( + DatabaseReceipt, + tokio::sync::oneshot::Sender>, + )>, cancelation_token: CancellationToken, } diff --git a/crates/service/src/tap/receipt_store.rs b/crates/service/src/tap/receipt_store.rs index b879dfb4e..af30d6e20 100644 --- a/crates/service/src/tap/receipt_store.rs +++ b/crates/service/src/tap/receipt_store.rs @@ -7,7 +7,7 @@ use itertools::{Either, Itertools}; use sqlx::{types::BigDecimal, PgPool}; use tap_core::{manager::adapters::ReceiptStore, receipt::WithValueAndTimestamp}; use thegraph_core::alloy::{hex::ToHexExt, sol_types::Eip712Domain}; -use tokio::{sync::mpsc::Receiver, task::JoinHandle}; +use tokio::{sync::mpsc::Receiver, sync::oneshot::Sender as OneShotSender, task::JoinHandle}; use tokio_util::sync::CancellationToken; use super::{AdapterError, CheckingReceipt, IndexerTapContext, TapReceipt}; @@ -40,19 +40,28 @@ pub enum ProcessedReceipt { impl InnerContext { async fn process_db_receipts( &self, - buffer: Vec, + buffer: Vec<(DatabaseReceipt, OneShotSender>)>, ) -> Result { - let (v1_receipts, v2_receipts): (Vec<_>, Vec<_>) = - buffer.into_iter().partition_map(|r| match r { - DatabaseReceipt::V1(db_receipt_v1) => Either::Left(db_receipt_v1), - DatabaseReceipt::V2(db_receipt_v2) => Either::Right(db_receipt_v2), - }); + let (v1_data, v2_data): (Vec<_>, Vec<_>) = + buffer + .into_iter() + .partition_map(|(receipt, sender)| match receipt { + DatabaseReceipt::V1(receipt) => Either::Left((receipt, sender)), + DatabaseReceipt::V2(receipt) => Either::Right((receipt, sender)), + }); + + let (v1_receipts, v1_senders): (Vec<_>, Vec<_>) = v1_data.into_iter().unzip(); + let (v2_receipts, v2_senders): (Vec<_>, Vec<_>) = v2_data.into_iter().unzip(); let (insert_v1, insert_v2) = tokio::join!( self.store_receipts_v1(v1_receipts), self.store_receipts_v2(v2_receipts), ); + // send back the result of storing receipts to callers + Self::notify_senders(v1_senders, &insert_v1, "V1"); + Self::notify_senders(v2_senders, &insert_v2, "V2"); + match (insert_v1, insert_v2) { (Err(e1), Err(e2)) => Err(ProcessReceiptError::Both(e1.into(), e2.into())), @@ -66,6 +75,29 @@ impl InnerContext { } } + fn notify_senders( + senders: Vec>>, + result: &Result, AdapterError>, + version: &str, + ) { + match result { + Ok(_) => { + for sender in senders { + let _ = sender.send(Ok(())); + } + } + Err(e) => { + // Create error message once + let err_msg = format!("Failed to store {} receipts: {}", version, e); + tracing::error!("{}", err_msg); + for sender in senders { + // Convert to AdapterError for each sender + let _ = sender.send(Err(anyhow!(err_msg.clone()).into())); + } + } + } + } + async fn store_receipts_v1( &self, receipts: Vec, @@ -197,7 +229,7 @@ impl InnerContext { impl IndexerTapContext { pub fn spawn_store_receipt_task( inner_context: InnerContext, - mut receiver: Receiver, + mut receiver: Receiver<(DatabaseReceipt, OneShotSender>)>, cancelation_token: CancellationToken, ) -> JoinHandle<()> { const BUFFER_SIZE: usize = 100; @@ -224,13 +256,19 @@ impl ReceiptStore for IndexerTapContext { async fn store_receipt(&self, receipt: CheckingReceipt) -> Result { let db_receipt = DatabaseReceipt::from_receipt(receipt, &self.domain_separator)?; - self.receipt_producer.send(db_receipt).await.map_err(|e| { - tracing::error!("Failed to queue receipt for storage: {}", e); - anyhow!(e) - })?; + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + self.receipt_producer + .send((db_receipt, result_tx)) + .await + .map_err(|e| { + tracing::error!("Failed to queue receipt for storage: {}", e); + anyhow!(e) + })?; + + let res = result_rx.await.map_err(|e| anyhow!(e))?; // We don't need receipt_ids - Ok(0) + res.map(|_| 0) } } @@ -374,6 +412,23 @@ mod tests { DatabaseReceipt::V2(DbReceiptV2::from_receipt(&v2, &TAP_EIP712_DOMAIN).unwrap()) } + pub type VecReceiptTx = Vec<( + DatabaseReceipt, + tokio::sync::oneshot::Sender>, + )>; + pub type VecRx = Vec>>; + + pub fn attach_oneshot_channels(receipts: Vec) -> (VecReceiptTx, VecRx) { + let mut txs = Vec::with_capacity(receipts.len()); + let mut rxs = Vec::with_capacity(receipts.len()); + for r in receipts.into_iter() { + let (tx, rx) = tokio::sync::oneshot::channel(); + txs.push((r, tx)); + rxs.push(rx); + } + (txs, rxs) + } + mod when_all_migrations_are_run { use super::*; @@ -391,6 +446,7 @@ mod tests { receipts: Vec, ) { let context = InnerContext { pgpool }; + let (receipts, _rxs) = attach_oneshot_channels(receipts); let res = context.process_db_receipts(receipts).await.unwrap(); @@ -415,7 +471,9 @@ mod tests { let context = InnerContext { pgpool }; let v1 = create_v1().await; + let receipts = vec![v1]; + let (receipts, _rxs) = attach_oneshot_channels(receipts); let res = context.process_db_receipts(receipts).await.unwrap(); @@ -434,6 +492,7 @@ mod tests { ) { let context = InnerContext { pgpool }; + let (receipts, _rxs) = attach_oneshot_channels(receipts); let error = context.process_db_receipts(receipts).await.unwrap_err(); let ProcessReceiptError::V2(error) = error else { diff --git a/integration-tests/src/main.rs b/integration-tests/src/main.rs index a1c445ecc..491e981e7 100644 --- a/integration-tests/src/main.rs +++ b/integration-tests/src/main.rs @@ -2,16 +2,17 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use rav_tests::test_tap_rav_v1; +use rav_tests::{test_invalid_chain_id, test_tap_rav_v1}; mod metrics; mod rav_tests; -mod receipt; +mod utils; + use metrics::MetricsChecker; -use receipt::create_tap_receipt; #[tokio::main] async fn main() -> Result<()> { // Run the TAP receipt test + test_invalid_chain_id().await?; test_tap_rav_v1().await } diff --git a/integration-tests/src/rav_tests.rs b/integration-tests/src/rav_tests.rs index a97b31f0a..fc697af56 100644 --- a/integration-tests/src/rav_tests.rs +++ b/integration-tests/src/rav_tests.rs @@ -7,44 +7,32 @@ use serde_json::json; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use thegraph_core::alloy::signers::local::coins_bip39::English; -use thegraph_core::alloy::{ - primitives::Address, - signers::local::{MnemonicBuilder, PrivateKeySigner}, -}; +use thegraph_core::alloy::primitives::Address; +use thegraph_core::alloy::signers::local::PrivateKeySigner; -use crate::create_tap_receipt; +use crate::utils::{create_request, create_tap_receipt, find_allocation}; use crate::MetricsChecker; -// TODO: Would be nice to read this values from: -// contrib/tap-agent/config.toml -// and contrib/local-network/.env +const INDEXER_URL: &str = "http://localhost:7601"; +// Taken from .env +// this is the key gateway uses +const ACCOUNT0_SECRET: &str = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; + +// The deployed gateway and indexer +// use this verifier contract +// which must be part of the eip712 domain +const TAP_VERIFIER_CONTRACT: &str = "0x8198f5d8F8CfFE8f9C413d98a0A55aEB8ab9FbB7"; +const CHAIN_ID: u64 = 1337; + const GATEWAY_URL: &str = "http://localhost:7700"; const SUBGRAPH_ID: &str = "BFr2mx7FgkJ36Y6pE5BiXs1KmNUmVDCnL82KUSdcLW1g"; -const TAP_ESCROW_CONTRACT: &str = "0x0355B7B8cb128fA5692729Ab3AAa199C1753f726"; const GATEWAY_API_KEY: &str = "deadbeefdeadbeefdeadbeefdeadbeef"; -// const RECEIVER_ADDRESS: &str = "0xf4EF6650E48d099a4972ea5B414daB86e1998Bd3"; const TAP_AGENT_METRICS_URL: &str = "http://localhost:7300/metrics"; -const MNEMONIC: &str = "test test test test test test test test test test test junk"; const GRAPH_URL: &str = "http://localhost:8000/subgraphs/name/graph-network"; -const GRT_DECIMALS: u8 = 18; -const GRT_BASE: u128 = 10u128.pow(GRT_DECIMALS as u32); - -// With trigger_value_divisor = 500_000 and max_amount_willing_to_lose_grt = 1000 -// trigger_value = 0.002 GRT -// We need to send at least 20 receipts to reach the trigger threshold -// Sending slightly more than required to ensure triggering -const MAX_RECEIPT_VALUE: u128 = GRT_BASE / 1_000; -// This value should match the timestamp_buffer_secs -// in the tap-agent setting + 10 seconds const WAIT_TIME_BATCHES: u64 = 40; -// Calculate required receipts to trigger RAV -// With MAX_RECEIPT_VALUE = GRT_BASE / 1_000 (0.001 GRT) -// And trigger_value = 0.002 GRT -// We need at least 3 receipts to trigger a RAV (0.003 GRT > 0.002 GRT) const NUM_RECEIPTS: u32 = 3; // Send receipts in batches with a delay in between @@ -52,51 +40,20 @@ const NUM_RECEIPTS: u32 = 3; const BATCHES: u32 = 2; const MAX_TRIGGERS: usize = 100; +const GRT_DECIMALS: u8 = 18; +const GRT_BASE: u128 = 10u128.pow(GRT_DECIMALS as u32); + +const MAX_RECEIPT_VALUE: u128 = GRT_BASE / 10_000; + // Function to test the tap RAV generation pub async fn test_tap_rav_v1() -> Result<()> { - // Setup wallet using your MnemonicBuilder - let index: u32 = 0; - let wallet: PrivateKeySigner = MnemonicBuilder::::default() - .phrase(MNEMONIC) - .index(index) - .unwrap() - .build() - .unwrap(); - // Setup HTTP client let http_client = Arc::new(Client::new()); // Query the network subgraph to find active allocations - println!("Querying for active allocations..."); - let response = http_client - .post(GRAPH_URL) - .json(&json!({ - "query": "{ allocations(where: { status: Active }) { id indexer { id } subgraphDeployment { id } } }" - })) - .send() - .await?; - - if !response.status().is_success() { - return Err(anyhow::anyhow!( - "Network subgraph request failed with status: {}", - response.status() - )); - } - - // Try to find a valid allocation - let response_text = response.text().await?; - - let json_value = serde_json::from_str::(&response_text)?; - let allocation_id = json_value - .get("data") - .and_then(|d| d.get("allocations")) - .and_then(|a| a.as_array()) - .filter(|arr| !arr.is_empty()) - .and_then(|arr| arr[0].get("id")) - .and_then(|id| id.as_str()) - .ok_or_else(|| anyhow::anyhow!("No valid allocation ID found"))?; + let allocation_id = find_allocation(http_client.clone(), GRAPH_URL).await?; - let allocation_id = Address::from_str(allocation_id)?; + let allocation_id = Address::from_str(&allocation_id)?; // Create a metrics checker let metrics_checker = @@ -127,20 +84,10 @@ pub async fn test_tap_rav_v1() -> Result<()> { ); for i in 0..NUM_RECEIPTS { - let receipt = create_tap_receipt( - MAX_RECEIPT_VALUE, - &allocation_id, - TAP_ESCROW_CONTRACT, - &wallet, - )?; - - let receipt_json = serde_json::to_string(&receipt).unwrap(); - let response = http_client .post(format!("{}/api/subgraphs/id/{}", GATEWAY_URL, SUBGRAPH_ID)) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", GATEWAY_API_KEY)) - .header("Tap-Receipt", receipt_json) .json(&json!({ "query": "{ _meta { block { number } } }" })) @@ -150,15 +97,15 @@ pub async fn test_tap_rav_v1() -> Result<()> { if response.status().is_success() { total_successful += 1; - println!("Receipt {} of batch {} sent successfully", i + 1, batch + 1); + println!("Query {} of batch {} sent successfully", i + 1, batch + 1); } else { return Err(anyhow::anyhow!( - "Failed to send receipt: {}", + "Failed to send query: {}", response.status() )); } - // Small pause between receipts within batch + // Small pause between queries within batch tokio::time::sleep(Duration::from_millis(100)).await; } @@ -180,24 +127,14 @@ pub async fn test_tap_rav_v1() -> Result<()> { println!("\n=== STAGE 2: Sending continuous trigger receipts ==="); - // Now send a series of regular receipts with short intervals until RAV is detected + // Now send a series of regular queries with short intervals until RAV is detected for i in 0..MAX_TRIGGERS { - println!("Sending trigger receipt {}/{}...", i + 1, MAX_TRIGGERS); - - let trigger_receipt = create_tap_receipt( - MAX_RECEIPT_VALUE, - &allocation_id, - TAP_ESCROW_CONTRACT, - &wallet, - )?; - - let receipt_json = serde_json::to_string(&trigger_receipt).unwrap(); + println!("Sending trigger query {}/{}...", i + 1, MAX_TRIGGERS); let response = http_client .post(format!("{}/api/subgraphs/id/{}", GATEWAY_URL, SUBGRAPH_ID)) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", GATEWAY_API_KEY)) - .header("Tap-Receipt", receipt_json) .json(&json!({ "query": "{ _meta { block { number } } }" })) @@ -210,7 +147,7 @@ pub async fn test_tap_rav_v1() -> Result<()> { println!("Trigger receipt {} sent successfully", i + 1); } else { return Err(anyhow::anyhow!( - "Failed to send trigger receipt: {}", + "Failed to send trigger query: {}", response.status() )); } @@ -250,13 +187,47 @@ pub async fn test_tap_rav_v1() -> Result<()> { } println!("\n=== Summary ==="); - println!("Total receipts sent successfully: {}", total_successful); - println!( - "Total value sent: {} GRT", - (MAX_RECEIPT_VALUE as f64 * total_successful as f64) / GRT_BASE as f64 - ); + println!("Total queries sent successfully: {}", total_successful); // If we got here, test failed println!("❌ TEST FAILED: No RAV generation detected"); Err(anyhow::anyhow!("Failed to detect RAV generation")) } + +pub async fn test_invalid_chain_id() -> Result<()> { + let wallet: PrivateKeySigner = ACCOUNT0_SECRET.parse().unwrap(); + + let http_client = Arc::new(Client::new()); + + let allocation_id = find_allocation(http_client.clone(), GRAPH_URL).await?; + + let allocation_id = Address::from_str(&allocation_id)?; + println!("Found allocation ID: {}", allocation_id); + + let receipt = create_tap_receipt( + MAX_RECEIPT_VALUE, + &allocation_id, + TAP_VERIFIER_CONTRACT, + CHAIN_ID + 18, + &wallet, + )?; + + let receipt_json = serde_json::to_string(&receipt).unwrap(); + let response = create_request( + &http_client, + format!("{}/subgraphs/id/{}", INDEXER_URL, SUBGRAPH_ID).as_str(), + &receipt_json, + &json!({ + "query": "{ _meta { block { number } } }" + }), + ) + .send() + .await?; + + assert!( + response.status().is_client_error(), + "Failed to send receipt" + ); + + Ok(()) +} diff --git a/integration-tests/src/receipt.rs b/integration-tests/src/receipt.rs deleted file mode 100644 index 61491609a..000000000 --- a/integration-tests/src/receipt.rs +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use rand::{rng, Rng}; -use std::str::FromStr; -use std::time::SystemTime; - -use anyhow::Result; -use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; -use tap_graph::Receipt; -use thegraph_core::alloy::{primitives::Address, signers::local::PrivateKeySigner}; - -pub fn create_tap_receipt( - value: u128, - allocation_id: &Address, - escrow_contract: &str, - wallet: &PrivateKeySigner, -) -> Result> { - let nonce = rng().random::(); - - // Get timestamp in nanoseconds - let timestamp = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH)? - .as_nanos(); - let timestamp_ns = timestamp as u64; - - // Create domain separator - let eip712_domain_separator = tap_eip712_domain( - 1337, // chain_id - Address::from_str(escrow_contract)?, // verifying_contract - ); - - // Create and sign receipt - println!("Creating and signing receipt..."); - let receipt = Eip712SignedMessage::new( - &eip712_domain_separator, - Receipt { - allocation_id: *allocation_id, - nonce, - timestamp_ns, - value, - }, - wallet, - )?; - - Ok(receipt) -} diff --git a/integration-tests/src/utils.rs b/integration-tests/src/utils.rs new file mode 100644 index 000000000..447518f18 --- /dev/null +++ b/integration-tests/src/utils.rs @@ -0,0 +1,95 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use rand::{rng, Rng}; +use serde_json::json; +use std::time::{Duration, SystemTime}; +use std::{str::FromStr, sync::Arc}; + +use anyhow::Result; +use reqwest::Client; +use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; +use tap_graph::Receipt; +use thegraph_core::alloy::{primitives::Address, signers::local::PrivateKeySigner}; + +pub fn create_tap_receipt( + value: u128, + allocation_id: &Address, + verifier_contract: &str, + chain_id: u64, + wallet: &PrivateKeySigner, +) -> Result> { + let nonce = rng().random::(); + + // Get timestamp in nanoseconds + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_nanos(); + let timestamp_ns = timestamp as u64; + + // Create domain separator + let eip712_domain_separator = + tap_eip712_domain(chain_id, Address::from_str(verifier_contract)?); + + // Create and sign receipt + println!("Creating and signing receipt..."); + let receipt = Eip712SignedMessage::new( + &eip712_domain_separator, + Receipt { + allocation_id: *allocation_id, + nonce, + timestamp_ns, + value, + }, + wallet, + )?; + + Ok(receipt) +} + +// Function to create a configured request +pub fn create_request( + client: &reqwest::Client, + url: &str, + receipt_json: &str, + query: &serde_json::Value, +) -> reqwest::RequestBuilder { + client + .post(url) + .header("Content-Type", "application/json") + .header("Tap-Receipt", receipt_json) + .json(query) + .timeout(Duration::from_secs(10)) +} + +pub async fn find_allocation(http_client: Arc, url: &str) -> Result { + println!("Querying for active allocations..."); + let response = http_client + .post(url) + .json(&json!({ + "query": "{ allocations(where: { status: Active }) { id indexer { id } subgraphDeployment { id } } }" + })) + .send() + .await?; + + if !response.status().is_success() { + return Err(anyhow::anyhow!( + "Network subgraph request failed with status: {}", + response.status() + )); + } + + // Try to find a valid allocation + let response_text = response.text().await?; + + let json_value = serde_json::from_str::(&response_text)?; + json_value + .get("data") + .and_then(|d| d.get("allocations")) + .and_then(|a| a.as_array()) + .filter(|arr| !arr.is_empty()) + .and_then(|arr| arr[0].get("id")) + .and_then(|id| id.as_str()) + .map(|id| id.to_string()) + .ok_or_else(|| anyhow::anyhow!("No valid allocation ID found")) +} diff --git a/setup-test-network.sh b/setup-test-network.sh index 6ec78e561..46f4c46c0 100755 --- a/setup-test-network.sh +++ b/setup-test-network.sh @@ -56,7 +56,7 @@ if [ ! -d "local-network" ]; then git clone https://github.com/edgeandnode/local-network.git cd local-network # Checkout to a specific commit that is known to work - git checkout 006e2511d4b8262ff14ff6cd5e1b75f0663dee98 + git checkout ad98716661b033dd5e4cb9f09cebb80dba954c2d cd .. fi @@ -118,7 +118,7 @@ sleep 10 # tap-escrow-manager requires subgraph-deploy echo "Starting tap-escrow-manager..." docker compose up -d tap-escrow-manager -sleep 10 +timeout 90 bash -c 'until docker ps --filter "name=^tap-escrow-manager$" --format "{{.Names}}" | grep -q "^tap-escrow-manager$"; do echo "Waiting for tap-escrow-manager container to appear..."; sleep 5; done' # Start redpanda if it's not already started (required for gateway) if ! docker ps | grep -q redpanda; then @@ -191,7 +191,7 @@ docker run -d --name gateway \ echo "Waiting for gateway to be available..." # Ensure gateway is ready before testing -timeout 300 bash -c 'until curl -f http://localhost:7700/ > /dev/null 2>&1; do echo "Waiting for gateway service..."; sleep 5; done' +timeout 100 bash -c 'until curl -f http://localhost:7700/ > /dev/null 2>&1; do echo "Waiting for gateway service..."; sleep 5; done' # After all services are running, measure the disk space used END_SPACE=$(df -h --output=used /var/lib/docker | tail -1)