diff --git a/Cargo.lock b/Cargo.lock index fe2715b7e..c327db410 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4083,7 +4083,7 @@ dependencies = [ [[package]] name = "indexer-tap-agent" -version = "1.9.4" +version = "1.9.5" dependencies = [ "anyhow", "async-trait", @@ -4206,7 +4206,9 @@ version = "0.1.0" dependencies = [ "anyhow", "bip39", + "clap", "indexer-receipt", + "num_cpus", "rand 0.9.1", "reqwest 0.12.15", "serde", diff --git a/contrib/docker-compose.prof.yml b/contrib/docker-compose.prof.yml index 5ff8b8f2a..58b998495 100644 --- a/contrib/docker-compose.prof.yml +++ b/contrib/docker-compose.prof.yml @@ -9,10 +9,10 @@ services: - ./local-network/.env:/opt/.env:ro - ./profiling:/opt/profiling:rw - ./indexer-service/config.toml:/opt/config/config.toml - - ./indexer-service/start-perf.sh:/usr/local/bin/start-perf.sh + - ./indexer-service/start.sh:/usr/local/bin/start.sh - ../migrations:/opt/migrations:ro - ../target/release/indexer-service-rs:/usr/local/bin/indexer-service-rs - entrypoint: ["/usr/local/bin/start-perf.sh"] + entrypoint: ["/usr/local/bin/start.sh"] environment: - RUST_BACKTRACE=1 - RUST_LOG=debug diff --git a/contrib/indexer-service/Dockerfile b/contrib/indexer-service/Dockerfile index b5b7ce3a9..651ea8af9 100644 --- a/contrib/indexer-service/Dockerfile +++ b/contrib/indexer-service/Dockerfile @@ -23,9 +23,7 @@ COPY --from=build /root/target/release/indexer-service-rs /usr/local/bin/indexer # Copy our start script into the image COPY contrib/indexer-service/start.sh /usr/local/bin/start.sh COPY contrib/indexer-service/config.toml /opt/config/config.toml -COPY contrib/indexer-service/start-perf.sh /usr/local/bin/start-perf.sh RUN chmod +x /usr/local/bin/start.sh -RUN chmod +x /usr/local/bin/start-perf.sh -ENTRYPOINT [ "/usr/local/bin/start-perf.sh" ] +ENTRYPOINT [ "/usr/local/bin/start.sh" ] diff --git a/contrib/indexer-service/start-perf.sh b/contrib/indexer-service/start-perf.sh deleted file mode 100755 index ed94d03da..000000000 --- a/contrib/indexer-service/start-perf.sh +++ /dev/null @@ -1,126 +0,0 @@ -#!/bin/bash -set -eu -# Source environment variables if available -if [ -f "/opt/.env" ]; then - source /opt/.env -fi - -cat /opt/.env - -# Extract TAPVerifier address from contracts.json -VERIFIER_ADDRESS=$(jq -r '."1337".TAPVerifier.address' /opt/contracts.json) - -# Override with test values taken from test-assets/src/lib.rs -ALLOCATION_ID="0xfa44c72b753a66591f241c7dc04e8178c30e13af" # ALLOCATION_ID_0 - -# Get network subgraph deployment ID -NETWORK_DEPLOYMENT=$(curl -s "http://graph-node:8000/subgraphs/name/graph-network" \ - -H 'content-type: application/json' \ - -d '{"query": "{ _meta { deployment } }"}' | jq -r '.data._meta.deployment' 2>/dev/null) -stdbuf -oL echo "Graph-network subgraph deployment ID: $NETWORK_DEPLOYMENT" - -# Get escrow subgraph deployment ID -ESCROW_DEPLOYMENT=$(curl -s "http://graph-node:8000/subgraphs/name/semiotic/tap" \ - -H 'content-type: application/json' \ - -d '{"query": "{ _meta { deployment } }"}' | jq -r '.data._meta.deployment' 2>/dev/null) - -stdbuf -oL echo "Escrow subgraph deployment ID: $ESCROW_DEPLOYMENT" -stdbuf -oL echo "Using test Network subgraph deployment ID: $NETWORK_DEPLOYMENT" -stdbuf -oL echo "Using test Verifier address: $VERIFIER_ADDRESS" -stdbuf -oL echo "Using test Indexer address: $RECEIVER_ADDRESS" -stdbuf -oL echo "Using TAPVerifier address from contracts.json: $VERIFIER_ADDRESS" -stdbuf -oL echo "Using test Account0 address: $ACCOUNT0_ADDRESS" - -# Create/copy config file -cp /opt/config/config.toml /opt/config.toml - -# Replace the placeholders with actual values -sed -i "s/NETWORK_DEPLOYMENT_PLACEHOLDER/$NETWORK_DEPLOYMENT/g" /opt/config.toml -sed -i "s/ESCROW_DEPLOYMENT_PLACEHOLDER/$ESCROW_DEPLOYMENT/g" /opt/config.toml -sed -i "s/VERIFIER_ADDRESS_PLACEHOLDER/$VERIFIER_ADDRESS/g" /opt/config.toml -sed -i "s/INDEXER_ADDRESS_PLACEHOLDER/$RECEIVER_ADDRESS/g" /opt/config.toml -sed -i "s/INDEXER_MNEMONIC_PLACEHOLDER/$INDEXER_MNEMONIC/g" /opt/config.toml -sed -i "s/ACCOUNT0_ADDRESS_PLACEHOLDER/$ACCOUNT0_ADDRESS/g" /opt/config.toml -sed -i "s/POSTGRES_PORT_PLACEHOLDER/$POSTGRES/g" /opt/config.toml - -stdbuf -oL echo "Starting indexer-service with config:" -cat /opt/config.toml - -# Run basic connectivity tests -stdbuf -oL echo "Testing graph-node endpoints..." -curl -s "http://graph-node:8000" >/dev/null && stdbuf -oL echo "Query endpoint OK" || stdbuf -oL echo "Query endpoint FAILED" -curl -s "http://graph-node:8030/graphql" >/dev/null && stdbuf -oL echo "Status endpoint OK" || stdbuf -oL echo "Status endpoint FAILED" - -# Set profiling tool based on environment variable -# Default is no profiling -PROFILER="${PROFILER:-none}" -stdbuf -oL echo "🔍 DEBUG: Profiling with: $PROFILER" - -# Set environment variables for the service -export RUST_BACKTRACE=full -export RUST_LOG="${RUST_LOG:-trace}" - -# Create output directory if it doesn't exist -mkdir -p /opt/profiling/indexer-service -chmod 777 /opt/profiling -chmod 777 /opt/profiling/indexer-service - -stdbuf -oL echo "📁 DEBUG: Profiling output directory: $(ls -la /opt/profiling)" - -case "$PROFILER" in -flamegraph) - stdbuf -oL echo "🔥 Starting with profiler..." - - # Start the service in the background with output redirection - stdbuf -oL echo "🚀 Starting service..." - exec /usr/local/bin/indexer-service-rs --config /opt/config.toml - ;; -strace) - stdbuf -oL echo "🔍 Starting with strace..." - # -f: follow child processes - # -tt: print timestamps with microsecond precision - # -T: show time spent in each syscall - # -e trace=all: trace all system calls - # -s 256: show up to 256 characters per string - # -o: output file - exec strace -f -tt -T -e trace=all -s 256 -o /opt/profiling/indexer-service/strace.log /usr/local/bin/indexer-service-rs --config /opt/config.toml - ;; -valgrind) - stdbuf -oL echo "🔍 Starting with Valgrind profiling..." - - # Start with Massif memory profiler - stdbuf -oL echo "🔄 Starting Valgrind Massif memory profiling..." - exec valgrind --tool=massif \ - --massif-out-file=/opt/profiling/indexer-service/massif.out \ - --time-unit=B \ - --detailed-freq=10 \ - --max-snapshots=100 \ - --threshold=0.5 \ - /usr/local/bin/indexer-service-rs --config /opt/config.toml - ;; -# Use callgrind_annotate indexer-service.callgrind.out -# for humand friendly report of callgrind output -# Ideally you should set: -# [profile.release.package."*"] -# debug = true -# force-frame-pointers = true -# in the Cargo.toml -callgrind) - stdbuf -oL echo "🔍 Starting with Callgrind CPU profiling..." - exec valgrind --tool=callgrind \ - --callgrind-out-file=/opt/profiling/indexer-service/callgrind.out \ - --cache-sim=yes \ - --branch-sim=yes \ - --collect-jumps=yes \ - --collect-systime=yes \ - --collect-bus=yes \ - --dump-instr=yes \ - --dump-line=yes \ - --compress-strings=no \ - /usr/local/bin/indexer-service-rs --config /opt/config.toml - ;; -none) - stdbuf -oL echo "🔍 Starting without profiling..." - exec /usr/local/bin/indexer-service-rs --config /opt/config.toml - ;; -esac diff --git a/contrib/indexer-service/start.sh b/contrib/indexer-service/start.sh index 5080bec22..2f82d4ee6 100755 --- a/contrib/indexer-service/start.sh +++ b/contrib/indexer-service/start.sh @@ -13,12 +13,6 @@ VERIFIER_ADDRESS=$(jq -r '."1337".TAPVerifier.address' /opt/contracts.json) # Override with test values taken from test-assets/src/lib.rs ALLOCATION_ID="0xfa44c72b753a66591f241c7dc04e8178c30e13af" # ALLOCATION_ID_0 -# Wait for postgres to be ready -until pg_isready -h postgres -U postgres -d indexer_components_1; do - stdbuf -oL echo "Waiting for postgres..." - sleep 2 -done - # Get network subgraph deployment ID NETWORK_DEPLOYMENT=$(curl -s "http://graph-node:8000/subgraphs/name/graph-network" \ -H 'content-type: application/json' \ @@ -57,8 +51,76 @@ stdbuf -oL echo "Testing graph-node endpoints..." curl -s "http://graph-node:8000" >/dev/null && stdbuf -oL echo "Query endpoint OK" || stdbuf -oL echo "Query endpoint FAILED" curl -s "http://graph-node:8030/graphql" >/dev/null && stdbuf -oL echo "Status endpoint OK" || stdbuf -oL echo "Status endpoint FAILED" -# Run service with verbose logging +# Set profiling tool based on environment variable +# Default is no profiling +PROFILER="${PROFILER:-none}" +stdbuf -oL echo "🔍 DEBUG: Profiling with: $PROFILER" + +# Set environment variables for the service export RUST_BACKTRACE=full -# export RUST_LOG=debug -export RUST_LOG=trace -exec /usr/local/bin/indexer-service-rs --config /opt/config.toml +export RUST_LOG="${RUST_LOG:-trace}" + +# Create output directory if it doesn't exist +mkdir -p /opt/profiling/indexer-service +chmod 777 /opt/profiling +chmod 777 /opt/profiling/indexer-service + +stdbuf -oL echo "📁 DEBUG: Profiling output directory: $(ls -la /opt/profiling)" + +case "$PROFILER" in +flamegraph) + stdbuf -oL echo "🔥 Starting with profiler..." + + # Start the service in the background with output redirection + stdbuf -oL echo "🚀 Starting service..." + exec /usr/local/bin/indexer-service-rs --config /opt/config.toml + ;; +strace) + stdbuf -oL echo "🔍 Starting with strace..." + # -f: follow child processes + # -tt: print timestamps with microsecond precision + # -T: show time spent in each syscall + # -e trace=all: trace all system calls + # -s 256: show up to 256 characters per string + # -o: output file + exec strace -f -tt -T -e trace=all -s 256 -o /opt/profiling/indexer-service/strace.log /usr/local/bin/indexer-service-rs --config /opt/config.toml + ;; +valgrind) + stdbuf -oL echo "🔍 Starting with Valgrind profiling..." + + # Start with Massif memory profiler + stdbuf -oL echo "🔄 Starting Valgrind Massif memory profiling..." + exec valgrind --tool=massif \ + --massif-out-file=/opt/profiling/indexer-service/massif.out \ + --time-unit=B \ + --detailed-freq=10 \ + --max-snapshots=100 \ + --threshold=0.5 \ + /usr/local/bin/indexer-service-rs --config /opt/config.toml + ;; +# Use callgrind_annotate indexer-service.callgrind.out +# for human-friendly report of callgrind output +# Ideally you should set: +# [profile.release.package."*"] +# debug = true +# force-frame-pointers = true +# in the Cargo.toml +callgrind) + stdbuf -oL echo "🔍 Starting with Callgrind CPU profiling..." + exec valgrind --tool=callgrind \ + --callgrind-out-file=/opt/profiling/indexer-service/callgrind.out \ + --cache-sim=yes \ + --branch-sim=yes \ + --collect-jumps=yes \ + --collect-systime=yes \ + --collect-bus=yes \ + --dump-instr=yes \ + --dump-line=yes \ + --compress-strings=no \ + /usr/local/bin/indexer-service-rs --config /opt/config.toml + ;; +none) + stdbuf -oL echo "🔍 Starting without profiling..." + exec /usr/local/bin/indexer-service-rs --config /opt/config.toml + ;; +esac diff --git a/integration-tests/Cargo.toml b/integration-tests/Cargo.toml index 99abf2328..9fa9d908e 100644 --- a/integration-tests/Cargo.toml +++ b/integration-tests/Cargo.toml @@ -23,3 +23,5 @@ bip39 = { workspace = true } rand.workspace = true indexer-receipt = { path = "../crates/indexer-receipt" } +num_cpus = "1.16.0" +clap = { version = "4.0", features = ["derive"] } diff --git a/integration-tests/src/constants.rs b/integration-tests/src/constants.rs new file mode 100644 index 000000000..df532f407 --- /dev/null +++ b/integration-tests/src/constants.rs @@ -0,0 +1,31 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 +// +// +//! Constants used in the integration tests for the TAP RAV generation +//! their value is taken from local-network .env variable + +pub const INDEXER_URL: &str = "http://localhost:7601"; + +pub const GATEWAY_API_KEY: &str = "deadbeefdeadbeefdeadbeefdeadbeef"; +pub const GATEWAY_URL: &str = "http://localhost:7700"; +pub const TAP_AGENT_METRICS_URL: &str = "http://localhost:7300/metrics"; + +// The deployed gateway and indexer +// use this verifier contract +// which must be part of the eip712 domain +// and the signing key account0_secret +// they must match otherwise receipts would be rejected +pub const TAP_VERIFIER_CONTRACT: &str = "0x8198f5d8F8CfFE8f9C413d98a0A55aEB8ab9FbB7"; +pub const ACCOUNT0_SECRET: &str = + "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; +pub const CHAIN_ID: u64 = 1337; + +pub const SUBGRAPH_ID: &str = "QmV4R5g7Go94bVFmKTVFG7vaMTb1ztUUWb45mNrsc7Yyqs"; + +pub const GRAPH_URL: &str = "http://localhost:8000/subgraphs/name/graph-network"; + +pub const GRT_DECIMALS: u8 = 18; +pub const GRT_BASE: u128 = 10u128.pow(GRT_DECIMALS as u32); + +pub const MAX_RECEIPT_VALUE: u128 = GRT_BASE / 10_000; diff --git a/integration-tests/src/load_test.rs b/integration-tests/src/load_test.rs new file mode 100644 index 000000000..87eb85bea --- /dev/null +++ b/integration-tests/src/load_test.rs @@ -0,0 +1,135 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use reqwest::Client; +use serde_json::json; +use std::str::FromStr; +use std::sync::Arc; +use thegraph_core::alloy::primitives::Address; +use thegraph_core::alloy::signers::local::PrivateKeySigner; +use tokio::sync::Semaphore; +use tokio::task; +use tokio::time::Instant; + +use crate::constants::{ + ACCOUNT0_SECRET, CHAIN_ID, GRAPH_URL, INDEXER_URL, MAX_RECEIPT_VALUE, SUBGRAPH_ID, + TAP_VERIFIER_CONTRACT, +}; +use crate::utils::{create_request, create_tap_receipt, find_allocation}; + +// Function to test indexer service component +// which is in charge of validating receipt signature, +// amount, timestamp and so on, and store them into the database. +// it is the entry point for the TAP receipts +// processing into RAVs(the slower part) +pub async fn receipt_handler_load_test(num_receipts: usize, concurrency: usize) -> Result<()> { + let wallet: PrivateKeySigner = ACCOUNT0_SECRET.parse().unwrap(); + + // Setup HTTP client + let http_client = Arc::new(Client::new()); + + // Query the network subgraph to find active allocations + let allocation_id = find_allocation(http_client.clone(), GRAPH_URL).await?; + let allocation_id = Address::from_str(&allocation_id)?; + + let start = Instant::now(); + let semaphore = Arc::new(Semaphore::new(concurrency)); + let mut handles = vec![]; + + for _ in 0..num_receipts { + let signer = wallet.clone(); + let client = http_client.clone(); + + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let handle = task::spawn(async move { + let res = create_and_send_receipts(allocation_id, signer, client).await; + drop(permit); + res + }); + handles.push(handle); + } + + let mut successful_sends = 0; + let mut failed_sends = 0; + + for (index, handle) in handles.into_iter().enumerate() { + match handle.await { + Ok(send_result) => { + // Check if the send was Ok + if let Err(e) = send_result { + failed_sends += 1; + eprintln!("Receipt {} failed to send: {:?}", index, e); // Log the specific error + } else { + successful_sends += 1; + } + } + Err(join_error) => { + // The task panicked or was cancelled + failed_sends += 1; + eprintln!( + "Receipt {} task execution failed (e.g., panic): {:?}", + index, join_error + ); + } + } + } + + let duration = start.elapsed(); + println!( + "Completed processing {} requests in {:?}", + num_receipts, duration + ); + if num_receipts > 0 { + println!( + "Average time per request: {:?}", + duration / num_receipts as u32 + ); + } + println!("Successfully sent receipts: {}", successful_sends); + println!("Failed receipts: {}", failed_sends); + + if failed_sends > 0 { + return Err(anyhow::anyhow!( + "Load test completed with {} failures.", + failed_sends + )); + } + + Ok(()) +} + +async fn create_and_send_receipts( + id: Address, + signer: PrivateKeySigner, + http_client: Arc, +) -> Result<()> { + let receipt = create_tap_receipt( + MAX_RECEIPT_VALUE, + &id, + TAP_VERIFIER_CONTRACT, + CHAIN_ID, + &signer, + )?; + + let receipt_json = serde_json::to_string(&receipt).unwrap(); + let response = create_request( + &http_client, + format!("{}/subgraphs/id/{}", INDEXER_URL, SUBGRAPH_ID).as_str(), + &receipt_json, + &json!({ + "query": "{ _meta { block { number } } }" + }), + ) + .send() + .await?; + + if !response.status().is_success() { + return Err(anyhow::anyhow!( + "Failed to send receipt: {}", + response.text().await.unwrap_or_default() + )); + } + + Ok(()) +} diff --git a/integration-tests/src/main.rs b/integration-tests/src/main.rs index 491e981e7..a96573884 100644 --- a/integration-tests/src/main.rs +++ b/integration-tests/src/main.rs @@ -1,18 +1,55 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use anyhow::Result; -use rav_tests::{test_invalid_chain_id, test_tap_rav_v1}; - +mod constants; +mod load_test; mod metrics; mod rav_tests; mod utils; +use anyhow::Result; +use clap::Parser; +use load_test::receipt_handler_load_test; +pub(crate) use rav_tests::{test_invalid_chain_id, test_tap_rav_v1}; + use metrics::MetricsChecker; +/// Main CLI parser structure +#[derive(Parser, Debug)] +#[clap(author, version, about = "Integration Test Suite Runner", long_about = None)] +struct Cli { + #[clap(subcommand)] + command: Commands, +} + +#[derive(clap::Subcommand, Debug)] +enum Commands { + Rav1, + + #[clap(name = "load")] + LoadService { + // for example: --num-receipts 10000 or -n 10000 + #[clap(long, short, value_parser)] + num_receipts: usize, + }, +} + #[tokio::main] async fn main() -> Result<()> { - // Run the TAP receipt test - test_invalid_chain_id().await?; - test_tap_rav_v1().await + let cli = Cli::parse(); + + match cli.command { + // cargo run -- rav1 + Commands::Rav1 => { + test_invalid_chain_id().await?; + test_tap_rav_v1().await?; + } + // cargo run -- load --num-receipts 1000 + Commands::LoadService { num_receipts } => { + let concurrency = num_cpus::get(); + receipt_handler_load_test(num_receipts, concurrency).await?; + } + } + + Ok(()) } diff --git a/integration-tests/src/rav_tests.rs b/integration-tests/src/rav_tests.rs index fc697af56..ae917785d 100644 --- a/integration-tests/src/rav_tests.rs +++ b/integration-tests/src/rav_tests.rs @@ -10,29 +10,14 @@ use std::time::Duration; use thegraph_core::alloy::primitives::Address; use thegraph_core::alloy::signers::local::PrivateKeySigner; +use crate::constants::{ + ACCOUNT0_SECRET, CHAIN_ID, GATEWAY_API_KEY, GATEWAY_URL, GRAPH_URL, INDEXER_URL, + MAX_RECEIPT_VALUE, SUBGRAPH_ID, TAP_AGENT_METRICS_URL, TAP_VERIFIER_CONTRACT, +}; use crate::utils::{create_request, create_tap_receipt, find_allocation}; use crate::MetricsChecker; -const INDEXER_URL: &str = "http://localhost:7601"; -// Taken from .env -// this is the key gateway uses -const ACCOUNT0_SECRET: &str = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; - -// The deployed gateway and indexer -// use this verifier contract -// which must be part of the eip712 domain -const TAP_VERIFIER_CONTRACT: &str = "0x8198f5d8F8CfFE8f9C413d98a0A55aEB8ab9FbB7"; -const CHAIN_ID: u64 = 1337; - -const GATEWAY_URL: &str = "http://localhost:7700"; -const SUBGRAPH_ID: &str = "BFr2mx7FgkJ36Y6pE5BiXs1KmNUmVDCnL82KUSdcLW1g"; -const GATEWAY_API_KEY: &str = "deadbeefdeadbeefdeadbeefdeadbeef"; -const TAP_AGENT_METRICS_URL: &str = "http://localhost:7300/metrics"; - -const GRAPH_URL: &str = "http://localhost:8000/subgraphs/name/graph-network"; - const WAIT_TIME_BATCHES: u64 = 40; - const NUM_RECEIPTS: u32 = 3; // Send receipts in batches with a delay in between @@ -40,11 +25,6 @@ const NUM_RECEIPTS: u32 = 3; const BATCHES: u32 = 2; const MAX_TRIGGERS: usize = 100; -const GRT_DECIMALS: u8 = 18; -const GRT_BASE: u128 = 10u128.pow(GRT_DECIMALS as u32); - -const MAX_RECEIPT_VALUE: u128 = GRT_BASE / 10_000; - // Function to test the tap RAV generation pub async fn test_tap_rav_v1() -> Result<()> { // Setup HTTP client