diff --git a/contrib/indexer-service/start.sh b/contrib/indexer-service/start.sh index e6f80718e..a39a912dd 100755 --- a/contrib/indexer-service/start.sh +++ b/contrib/indexer-service/start.sh @@ -69,7 +69,7 @@ receipts_verifier_address = "${GRAPH_TALLY_VERIFIER}" [service] free_query_auth_token = "freestuff" -host_and_port = "0.0.0.0:7600" +host_and_port = "0.0.0.0:7601" url_prefix = "/" serve_network_subgraph = false serve_escrow_subgraph = false diff --git a/crates/service/src/middleware/auth/tap.rs b/crates/service/src/middleware/auth/tap.rs index ff3bd605e..68895e035 100644 --- a/crates/service/src/middleware/auth/tap.rs +++ b/crates/service/src/middleware/auth/tap.rs @@ -55,21 +55,42 @@ where async move { let execute = || async { - let receipt = receipt.ok_or(IndexerServiceError::ReceiptNotFound)?; + let receipt = receipt.ok_or_else(|| { + tracing::debug!( + "TAP receipt validation failed: receipt not found in request extensions" + ); + IndexerServiceError::ReceiptNotFound + })?; + + let version = match &receipt { + TapReceipt::V1(_) => "V1", + TapReceipt::V2(_) => "V2", + }; + tracing::debug!(receipt_version = version, "Starting TAP receipt validation"); + // Verify the receipt and store it in the database tap_manager .verify_and_store_receipt(&ctx.unwrap_or_default(), receipt) .await - .inspect_err(|_| { - if let Some(labels) = labels { + .inspect_err(|err| { + tracing::debug!(error = %err, receipt_version = version, "TAP receipt validation failed"); + if let Some(labels) = &labels { failed_receipt_metric .with_label_values(&labels.get_labels()) .inc() } })?; + + tracing::debug!( + receipt_version = version, + "TAP receipt validation successful" + ); Ok::<_, IndexerServiceError>(request) }; - execute().await.map_err(|error| error.into_response()) + execute().await.map_err(|error| { + tracing::debug!(error = %error, "TAP authorization failed, returning HTTP error response"); + error.into_response() + }) } } } diff --git a/crates/service/src/middleware/tap_receipt.rs b/crates/service/src/middleware/tap_receipt.rs index e5188ab08..79c435e82 100644 --- a/crates/service/src/middleware/tap_receipt.rs +++ b/crates/service/src/middleware/tap_receipt.rs @@ -14,10 +14,21 @@ use crate::service::TapHeader; /// /// This is useful to not deserialize multiple times the same receipt pub async fn receipt_middleware(mut request: Request, next: Next) -> Response { - if let Ok(TypedHeader(TapHeader(receipt))) = - request.extract_parts::>().await - { - request.extensions_mut().insert(receipt); + match request.extract_parts::>().await { + Ok(TypedHeader(TapHeader(receipt))) => { + let version = match &receipt { + crate::tap::TapReceipt::V1(_) => "V1", + crate::tap::TapReceipt::V2(_) => "V2", + }; + tracing::debug!( + receipt_version = version, + "TAP receipt extracted successfully" + ); + request.extensions_mut().insert(receipt); + } + Err(e) => { + tracing::debug!(error = %e, "No TAP receipt found in request headers"); + } } next.run(request).await } diff --git a/crates/service/src/service/tap_receipt_header.rs b/crates/service/src/service/tap_receipt_header.rs index a5be7c9dd..14aa2c25e 100644 --- a/crates/service/src/service/tap_receipt_header.rs +++ b/crates/service/src/service/tap_receipt_header.rs @@ -31,19 +31,40 @@ impl Header for TapHeader { { let mut execute = || -> anyhow::Result { let raw_receipt = values.next().ok_or(headers::Error::invalid())?; + tracing::debug!( + raw_receipt_length = raw_receipt.len(), + "Processing TAP receipt header" + ); // we first try to decode a v2 receipt since it's cheaper and fail earlier than using // serde match BASE64_STANDARD.decode(raw_receipt) { Ok(raw_receipt) => { - tracing::debug!("Decoded v2"); - let receipt = grpc::v2::SignedReceipt::decode(raw_receipt.as_ref())?; - Ok(TapHeader(TapReceipt::V2(receipt.try_into()?))) + tracing::debug!( + decoded_length = raw_receipt.len(), + "Successfully base64 decoded v2 receipt" + ); + let receipt = + grpc::v2::SignedReceipt::decode(raw_receipt.as_ref()).map_err(|e| { + tracing::debug!(error = %e, "Failed to protobuf decode v2 receipt"); + e + })?; + tracing::debug!("Successfully protobuf decoded v2 receipt"); + let converted_receipt = receipt.try_into().map_err(|e: anyhow::Error| { + tracing::debug!(error = %e, "Failed to convert v2 receipt"); + e + })?; + tracing::debug!("Successfully converted v2 receipt to TapReceipt::V2"); + Ok(TapHeader(TapReceipt::V2(converted_receipt))) } - Err(_) => { - tracing::debug!("Could not decode v2, trying v1"); + Err(e) => { + tracing::debug!(error = %e, "Could not base64 decode v2 receipt, trying v1"); let parsed_receipt: SignedReceipt = - serde_json::from_slice(raw_receipt.as_ref())?; + serde_json::from_slice(raw_receipt.as_ref()).map_err(|e| { + tracing::debug!(error = %e, "Failed to JSON decode v1 receipt"); + e + })?; + tracing::debug!("Successfully decoded v1 receipt"); Ok(TapHeader(TapReceipt::V1(parsed_receipt))) } } diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index 444eb1bc4..1aa4ff2fb 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -37,7 +37,12 @@ where time_interval.tick().await; let result = function().await; match result { - Ok(value) => tx.send(value).expect("Failed to update channel"), + Ok(value) => { + if tx.send(value).is_err() { + tracing::debug!("Watcher channel closed, stopping watcher task"); + break; + } + } Err(err) => { // TODO mark it as delayed tracing::warn!(error = %err, "There was an error while updating watcher"); @@ -79,7 +84,10 @@ where let current_val_1 = receiver_1.borrow().clone(); let current_val_2 = receiver_2.borrow().clone(); let mapped_value = map_function((current_val_1, current_val_2)); - tx.send(mapped_value).expect("Failed to update channel"); + if tx.send(mapped_value).is_err() { + tracing::debug!("Watcher channel closed, stopping combined watcher task"); + break; + } } }); rx @@ -138,7 +146,10 @@ where let current_val = receiver.borrow().clone(); let mapped_value = map_function(current_val); - tx.send(mapped_value).expect("Failed to update channel"); + if tx.send(mapped_value).is_err() { + tracing::debug!("Watcher channel closed, stopping mapped watcher task"); + break; + } } }); rx diff --git a/integration-tests/src/constants.rs b/integration-tests/src/constants.rs index 753937cdb..d1d429cc1 100644 --- a/integration-tests/src/constants.rs +++ b/integration-tests/src/constants.rs @@ -17,6 +17,9 @@ pub const TAP_AGENT_METRICS_URL: &str = "http://localhost:7300/metrics"; // and the signing key account0_secret // they must match otherwise receipts would be rejected pub const TAP_VERIFIER_CONTRACT: &str = "0xC9a43158891282A2B1475592D5719c001986Aaec"; + +// V2 GraphTallyCollector contract address (for Horizon receipts) +pub const GRAPH_TALLY_COLLECTOR_CONTRACT: &str = "0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07"; pub const ACCOUNT0_SECRET: &str = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; pub const CHAIN_ID: u64 = 1337; diff --git a/integration-tests/src/main.rs b/integration-tests/src/main.rs index 348ce6665..2fc491c27 100644 --- a/integration-tests/src/main.rs +++ b/integration-tests/src/main.rs @@ -5,6 +5,7 @@ mod constants; mod load_test; mod metrics; mod rav_tests; +mod signature_test; mod utils; use anyhow::Result; @@ -39,6 +40,9 @@ enum Commands { #[clap(long, short, value_parser)] num_receipts: usize, }, + + #[clap(name = "debug")] + Debug, } #[tokio::main] @@ -65,6 +69,10 @@ async fn main() -> Result<()> { let concurrency = num_cpus::get(); receipt_handler_load_test_v2(num_receipts, concurrency).await?; } + // cargo run -- debug + Commands::Debug => { + signature_test::test_v2_signature_recovery().await?; + } } Ok(()) diff --git a/integration-tests/src/rav_tests.rs b/integration-tests/src/rav_tests.rs index 179518d92..9dacfe15c 100644 --- a/integration-tests/src/rav_tests.rs +++ b/integration-tests/src/rav_tests.rs @@ -21,12 +21,12 @@ use crate::{ }; const WAIT_TIME_BATCHES: u64 = 40; -const NUM_RECEIPTS: u32 = 3; +const NUM_RECEIPTS: u32 = 30; // Increased to 30 receipts per batch // Send receipts in batches with a delay in between // to ensure some receipts get outside the timestamp buffer -const BATCHES: u32 = 2; -const MAX_TRIGGERS: usize = 100; +const BATCHES: u32 = 15; // Increased to 15 batches for total 450 receipts in Stage 1 +const MAX_TRIGGERS: usize = 200; // Increased trigger attempts to 200 // Function to test the tap RAV generation pub async fn test_tap_rav_v1() -> Result<()> { @@ -241,17 +241,23 @@ pub async fn test_tap_rav_v2() -> Result<()> { "\n=== V2 Initial metrics: RAVs created: {initial_ravs_created}, Unaggregated fees: {initial_unaggregated} ===" ); + // Calculate expected thresholds + let trigger_threshold = 2_000_000_000_000_000u128; // 0.002 GRT trigger value + let receipts_needed = trigger_threshold / (MAX_RECEIPT_VALUE / 10); // Using trigger receipt value + println!("📊 RAV trigger threshold: {trigger_threshold} wei (0.002 GRT)",); + let receipt_value = MAX_RECEIPT_VALUE / 10; + println!( + "📊 Receipts needed for trigger: ~{receipts_needed} receipts at {receipt_value} wei each", + ); + println!("\n=== V2 STAGE 1: Sending large receipt batches with small pauses ==="); // Send multiple V2 receipts in two batches with a gap between them let mut total_successful = 0; for batch in 0..BATCHES { - println!( - "Sending V2 batch {} of 2 with {} receipts each...", - batch + 1, - NUM_RECEIPTS - ); + let batch = batch + 1; + println!("Sending V2 batch {batch} of {BATCHES} with {NUM_RECEIPTS} receipts each...",); for i in 0..NUM_RECEIPTS { // Create V2 receipt @@ -267,16 +273,17 @@ pub async fn test_tap_rav_v2() -> Result<()> { let receipt_encoded = encode_v2_receipt(&receipt)?; - let response = create_request( - &http_client, - &format!("{INDEXER_URL}/subgraphs/id/{SUBGRAPH_ID}"), - &receipt_encoded, - &json!({ + let response = http_client + .post(format!("{GATEWAY_URL}/api/subgraphs/id/{SUBGRAPH_ID}")) + .header("Content-Type", "application/json") + .header("Authorization", format!("Bearer {GATEWAY_API_KEY}")) + .header("Tap-Receipt", receipt_encoded) + .json(&json!({ "query": "{ _meta { block { number } } }" - }), - ) - .send() - .await?; + })) + .timeout(Duration::from_secs(10)) + .send() + .await?; if response.status().is_success() { total_successful += 1; @@ -298,15 +305,22 @@ pub async fn test_tap_rav_v2() -> Result<()> { // Check metrics after batch let batch_metrics = metrics_checker.get_current_metrics().await?; + let current_unaggregated = + batch_metrics.unaggregated_fees_by_allocation(&allocation_id.to_string()); + let trigger_threshold = 2_000_000_000_000_000u128; + let progress_pct = + (current_unaggregated as f64 / trigger_threshold as f64 * 100.0).min(100.0); + println!( - "After V2 batch {}: RAVs created: {}, Unaggregated fees: {}", + "After V2 batch {}: RAVs created: {}, Unaggregated fees: {} ({:.1}% of trigger threshold)", batch + 1, batch_metrics.ravs_created_by_allocation(&allocation_id.to_string()), - batch_metrics.unaggregated_fees_by_allocation(&allocation_id.to_string()) + current_unaggregated, + progress_pct ); // Wait between batches - long enough for first batch to exit buffer - if batch < 1 { + if batch < BATCHES - 1 { println!("Waiting for buffer period + 5s..."); tokio::time::sleep(Duration::from_secs(WAIT_TIME_BATCHES)).await; } @@ -331,16 +345,17 @@ pub async fn test_tap_rav_v2() -> Result<()> { let receipt_encoded = encode_v2_receipt(&receipt)?; - let response = create_request( - &http_client, - &format!("{INDEXER_URL}/subgraphs/id/{SUBGRAPH_ID}"), - &receipt_encoded, - &json!({ + let response = http_client + .post(format!("{GATEWAY_URL}/api/subgraphs/id/{SUBGRAPH_ID}")) + .header("Content-Type", "application/json") + .header("Authorization", format!("Bearer {GATEWAY_API_KEY}")) + .header("Tap-Receipt", receipt_encoded) + .json(&json!({ "query": "{ _meta { block { number } } }" - }), - ) - .send() - .await?; + })) + .timeout(Duration::from_secs(10)) + .send() + .await?; if response.status().is_success() { total_successful += 1; @@ -361,11 +376,17 @@ pub async fn test_tap_rav_v2() -> Result<()> { let current_unaggregated = current_metrics.unaggregated_fees_by_allocation(&allocation_id.to_string()); + // Calculate progress toward trigger threshold + let trigger_threshold = 2_000_000_000_000_000u128; + let progress_pct = + (current_unaggregated as f64 / trigger_threshold as f64 * 100.0).min(100.0); + println!( - "After V2 trigger {}: RAVs created: {}, Unaggregated fees: {}", + "After V2 trigger {}: RAVs created: {}, Unaggregated fees: {} ({:.1}% of trigger threshold)", i + 1, current_ravs_created, - current_unaggregated + current_unaggregated, + progress_pct ); // If we've succeeded, exit early diff --git a/integration-tests/src/signature_test.rs b/integration-tests/src/signature_test.rs new file mode 100644 index 000000000..4564b2a79 --- /dev/null +++ b/integration-tests/src/signature_test.rs @@ -0,0 +1,70 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +//! Test to verify V2 signature creation and recovery works correctly + +use anyhow::Result; +use std::str::FromStr; +use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; +use tap_graph::v2::Receipt as V2Receipt; +use thegraph_core::{ + alloy::{primitives::Address, signers::local::PrivateKeySigner}, + CollectionId, +}; + +use crate::constants::{ + ACCOUNT0_SECRET, CHAIN_ID, GRAPH_TALLY_COLLECTOR_CONTRACT, TEST_DATA_SERVICE, +}; + +pub async fn test_v2_signature_recovery() -> Result<()> { + println!("=== V2 Signature Recovery Test ==="); + + let wallet: PrivateKeySigner = ACCOUNT0_SECRET.parse()?; + let wallet_address = wallet.address(); + println!("Wallet address: {wallet_address:?}"); + + // Create EIP-712 domain - V2 uses GraphTallyCollector + let domain = tap_eip712_domain(CHAIN_ID, Address::from_str(GRAPH_TALLY_COLLECTOR_CONTRACT)?); + println!("Using domain: chain_id={CHAIN_ID}, verifier={GRAPH_TALLY_COLLECTOR_CONTRACT}"); + + // Create a V2 receipt + let allocation_id = Address::from_str("0xc172ed1c6470dfa3b12a789317dda50cdd8b85df")?; + let collection_id = CollectionId::from(allocation_id); + let payer = wallet_address; + let service_provider = allocation_id; + let data_service = Address::from_str(TEST_DATA_SERVICE)?; + + println!("V2 Receipt parameters:"); + println!(" Collection ID: {collection_id:?}"); + println!(" Payer: {payer:?}"); + println!(" Service provider: {service_provider:?}"); + println!(" Data service: {data_service:?}"); + + let receipt = V2Receipt::new( + *collection_id, + payer, + data_service, + service_provider, + 100_000_000_000_000_000u128, // 0.1 GRT + )?; + + // Sign the receipt + let signed_receipt = Eip712SignedMessage::new(&domain, receipt, &wallet)?; + println!("Receipt signed successfully"); + + // Recover the signer + let recovered_signer = signed_receipt.recover_signer(&domain)?; + println!("Recovered signer: {recovered_signer:?}"); + println!("Expected signer: {wallet_address:?}"); + + // Check if they match + if recovered_signer == wallet_address { + println!("✅ SUCCESS: Signature recovery matches wallet address"); + Ok(()) + } else { + println!("❌ FAILURE: Signature recovery mismatch!"); + println!(" Expected: {wallet_address:?}"); + println!(" Got: {recovered_signer:?}"); + Err(anyhow::anyhow!("Signature recovery failed for V2 receipt")) + } +} diff --git a/integration-tests/src/utils.rs b/integration-tests/src/utils.rs index 4936bfec1..ae30b5ab0 100644 --- a/integration-tests/src/utils.rs +++ b/integration-tests/src/utils.rs @@ -19,7 +19,7 @@ use tap_graph::Receipt; use thegraph_core::alloy::{primitives::Address, signers::local::PrivateKeySigner}; use thegraph_core::CollectionId; -use crate::constants::TEST_DATA_SERVICE; +use crate::constants::{GRAPH_TALLY_COLLECTOR_CONTRACT, TEST_DATA_SERVICE}; pub fn create_tap_receipt( value: u128, @@ -58,8 +58,8 @@ pub fn create_tap_receipt( pub fn create_tap_receipt_v2( value: u128, - allocation_id: &Address, // Used to derive collection_id in V2 - verifier_contract: &str, + allocation_id: &Address, // Used to derive collection_id in V2 + _verifier_contract: &str, // V2 uses GraphTallyCollector, not TAPVerifier chain_id: u64, wallet: &PrivateKeySigner, payer: &Address, @@ -77,12 +77,21 @@ pub fn create_tap_receipt_v2( // For the migration period, we derive collection_id from allocation_id let collection_id = CollectionId::from(*allocation_id); - // Create domain separator + // Create domain separator - V2 uses GraphTallyCollector let eip712_domain_separator = - tap_eip712_domain(chain_id, Address::from_str(verifier_contract)?); + tap_eip712_domain(chain_id, Address::from_str(GRAPH_TALLY_COLLECTOR_CONTRACT)?); + let wallet_address = wallet.address(); // Create and sign V2 receipt println!("Creating and signing V2 receipt..."); + println!("V2 Receipt details:"); + println!(" Payer (from wallet): {payer:?}"); + println!(" Service provider: {service_provider:?}"); + println!(" Data service: {TEST_DATA_SERVICE}"); + println!(" Collection ID: {collection_id:?}"); + println!(" Wallet address: {wallet_address:?}"); + println!(" Using GraphTallyCollector: {GRAPH_TALLY_COLLECTOR_CONTRACT}"); + let receipt = Eip712SignedMessage::new( &eip712_domain_separator, tap_graph::v2::Receipt { diff --git a/setup-test-network.sh b/setup-test-network.sh index 17b0d926f..0862bf029 100755 --- a/setup-test-network.sh +++ b/setup-test-network.sh @@ -260,11 +260,26 @@ source local-network/.env docker build -t local-gateway:latest ./local-network/gateway echo "Running gateway container..." -# Updated to use the horizon file structure +# Verify required files exist before starting gateway +if [ ! -f "local-network/horizon.json" ]; then + echo "ERROR: local-network/horizon.json not found!" + exit 1 +fi +if [ ! -f "local-network/tap-contracts.json" ]; then + echo "ERROR: local-network/tap-contracts.json not found!" + exit 1 +fi +if [ ! -f "local-network/subgraph-service.json" ]; then + echo "ERROR: local-network/subgraph-service.json not found!" + exit 1 +fi + +# Updated to use the horizon file structure and include tap-contracts.json docker run -d --name gateway \ --network local-network_default \ -p 7700:7700 \ -v "$(pwd)/local-network/horizon.json":/opt/horizon.json:ro \ + -v "$(pwd)/local-network/tap-contracts.json":/opt/tap-contracts.json:ro \ -v "$(pwd)/local-network/subgraph-service.json":/opt/subgraph-service.json:ro \ -v "$(pwd)/local-network/.env":/opt/.env:ro \ -e RUST_LOG=info,graph_gateway=trace \