Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/indexer-service/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ receipts_verifier_address = "${GRAPH_TALLY_VERIFIER}"

[service]
free_query_auth_token = "freestuff"
host_and_port = "0.0.0.0:7600"
host_and_port = "0.0.0.0:7601"
url_prefix = "/"
serve_network_subgraph = false
serve_escrow_subgraph = false
Expand Down
29 changes: 25 additions & 4 deletions crates/service/src/middleware/auth/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,42 @@ where

async move {
let execute = || async {
let receipt = receipt.ok_or(IndexerServiceError::ReceiptNotFound)?;
let receipt = receipt.ok_or_else(|| {
tracing::debug!(
"TAP receipt validation failed: receipt not found in request extensions"
);
IndexerServiceError::ReceiptNotFound
})?;

let version = match &receipt {
TapReceipt::V1(_) => "V1",
TapReceipt::V2(_) => "V2",
};
tracing::debug!(receipt_version = version, "Starting TAP receipt validation");

// Verify the receipt and store it in the database
tap_manager
.verify_and_store_receipt(&ctx.unwrap_or_default(), receipt)
.await
.inspect_err(|_| {
if let Some(labels) = labels {
.inspect_err(|err| {
tracing::debug!(error = %err, receipt_version = version, "TAP receipt validation failed");
if let Some(labels) = &labels {
failed_receipt_metric
.with_label_values(&labels.get_labels())
.inc()
}
})?;

tracing::debug!(
receipt_version = version,
"TAP receipt validation successful"
);
Ok::<_, IndexerServiceError>(request)
};
execute().await.map_err(|error| error.into_response())
execute().await.map_err(|error| {
tracing::debug!(error = %error, "TAP authorization failed, returning HTTP error response");
error.into_response()
})
}
}
}
Expand Down
19 changes: 15 additions & 4 deletions crates/service/src/middleware/tap_receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,21 @@ use crate::service::TapHeader;
///
/// This is useful to not deserialize multiple times the same receipt
pub async fn receipt_middleware(mut request: Request, next: Next) -> Response {
if let Ok(TypedHeader(TapHeader(receipt))) =
request.extract_parts::<TypedHeader<TapHeader>>().await
{
request.extensions_mut().insert(receipt);
match request.extract_parts::<TypedHeader<TapHeader>>().await {
Ok(TypedHeader(TapHeader(receipt))) => {
let version = match &receipt {
crate::tap::TapReceipt::V1(_) => "V1",
crate::tap::TapReceipt::V2(_) => "V2",
};
tracing::debug!(
receipt_version = version,
"TAP receipt extracted successfully"
);
request.extensions_mut().insert(receipt);
}
Err(e) => {
tracing::debug!(error = %e, "No TAP receipt found in request headers");
}
}
next.run(request).await
}
Expand Down
33 changes: 27 additions & 6 deletions crates/service/src/service/tap_receipt_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,40 @@ impl Header for TapHeader {
{
let mut execute = || -> anyhow::Result<TapHeader> {
let raw_receipt = values.next().ok_or(headers::Error::invalid())?;
tracing::debug!(
raw_receipt_length = raw_receipt.len(),
"Processing TAP receipt header"
);

// we first try to decode a v2 receipt since it's cheaper and fail earlier than using
// serde
match BASE64_STANDARD.decode(raw_receipt) {
Ok(raw_receipt) => {
tracing::debug!("Decoded v2");
let receipt = grpc::v2::SignedReceipt::decode(raw_receipt.as_ref())?;
Ok(TapHeader(TapReceipt::V2(receipt.try_into()?)))
tracing::debug!(
decoded_length = raw_receipt.len(),
"Successfully base64 decoded v2 receipt"
);
let receipt =
grpc::v2::SignedReceipt::decode(raw_receipt.as_ref()).map_err(|e| {
tracing::debug!(error = %e, "Failed to protobuf decode v2 receipt");
e
})?;
tracing::debug!("Successfully protobuf decoded v2 receipt");
let converted_receipt = receipt.try_into().map_err(|e: anyhow::Error| {
tracing::debug!(error = %e, "Failed to convert v2 receipt");
e
})?;
tracing::debug!("Successfully converted v2 receipt to TapReceipt::V2");
Ok(TapHeader(TapReceipt::V2(converted_receipt)))
}
Err(_) => {
tracing::debug!("Could not decode v2, trying v1");
Err(e) => {
tracing::debug!(error = %e, "Could not base64 decode v2 receipt, trying v1");
let parsed_receipt: SignedReceipt =
serde_json::from_slice(raw_receipt.as_ref())?;
serde_json::from_slice(raw_receipt.as_ref()).map_err(|e| {
tracing::debug!(error = %e, "Failed to JSON decode v1 receipt");
e
})?;
tracing::debug!("Successfully decoded v1 receipt");
Ok(TapHeader(TapReceipt::V1(parsed_receipt)))
}
}
Expand Down
17 changes: 14 additions & 3 deletions crates/watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ where
time_interval.tick().await;
let result = function().await;
match result {
Ok(value) => tx.send(value).expect("Failed to update channel"),
Ok(value) => {
if tx.send(value).is_err() {
tracing::debug!("Watcher channel closed, stopping watcher task");
break;
}
}
Err(err) => {
// TODO mark it as delayed
tracing::warn!(error = %err, "There was an error while updating watcher");
Expand Down Expand Up @@ -79,7 +84,10 @@ where
let current_val_1 = receiver_1.borrow().clone();
let current_val_2 = receiver_2.borrow().clone();
let mapped_value = map_function((current_val_1, current_val_2));
tx.send(mapped_value).expect("Failed to update channel");
if tx.send(mapped_value).is_err() {
tracing::debug!("Watcher channel closed, stopping combined watcher task");
break;
}
}
});
rx
Expand Down Expand Up @@ -138,7 +146,10 @@ where

let current_val = receiver.borrow().clone();
let mapped_value = map_function(current_val);
tx.send(mapped_value).expect("Failed to update channel");
if tx.send(mapped_value).is_err() {
tracing::debug!("Watcher channel closed, stopping mapped watcher task");
break;
}
}
});
rx
Expand Down
3 changes: 3 additions & 0 deletions integration-tests/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub const TAP_AGENT_METRICS_URL: &str = "http://localhost:7300/metrics";
// and the signing key account0_secret
// they must match otherwise receipts would be rejected
pub const TAP_VERIFIER_CONTRACT: &str = "0xC9a43158891282A2B1475592D5719c001986Aaec";

// V2 GraphTallyCollector contract address (for Horizon receipts)
pub const GRAPH_TALLY_COLLECTOR_CONTRACT: &str = "0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07";
pub const ACCOUNT0_SECRET: &str =
"ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80";
pub const CHAIN_ID: u64 = 1337;
Expand Down
8 changes: 8 additions & 0 deletions integration-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod constants;
mod load_test;
mod metrics;
mod rav_tests;
mod signature_test;
mod utils;

use anyhow::Result;
Expand Down Expand Up @@ -39,6 +40,9 @@ enum Commands {
#[clap(long, short, value_parser)]
num_receipts: usize,
},

#[clap(name = "debug")]
Debug,
}

#[tokio::main]
Expand All @@ -65,6 +69,10 @@ async fn main() -> Result<()> {
let concurrency = num_cpus::get();
receipt_handler_load_test_v2(num_receipts, concurrency).await?;
}
// cargo run -- debug
Commands::Debug => {
signature_test::test_v2_signature_recovery().await?;
}
}

Ok(())
Expand Down
83 changes: 52 additions & 31 deletions integration-tests/src/rav_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use crate::{
};

const WAIT_TIME_BATCHES: u64 = 40;
const NUM_RECEIPTS: u32 = 3;
const NUM_RECEIPTS: u32 = 30; // Increased to 30 receipts per batch

// Send receipts in batches with a delay in between
// to ensure some receipts get outside the timestamp buffer
const BATCHES: u32 = 2;
const MAX_TRIGGERS: usize = 100;
const BATCHES: u32 = 15; // Increased to 15 batches for total 450 receipts in Stage 1
const MAX_TRIGGERS: usize = 200; // Increased trigger attempts to 200

// Function to test the tap RAV generation
pub async fn test_tap_rav_v1() -> Result<()> {
Expand Down Expand Up @@ -241,17 +241,23 @@ pub async fn test_tap_rav_v2() -> Result<()> {
"\n=== V2 Initial metrics: RAVs created: {initial_ravs_created}, Unaggregated fees: {initial_unaggregated} ==="
);

// Calculate expected thresholds
let trigger_threshold = 2_000_000_000_000_000u128; // 0.002 GRT trigger value
let receipts_needed = trigger_threshold / (MAX_RECEIPT_VALUE / 10); // Using trigger receipt value
println!("📊 RAV trigger threshold: {trigger_threshold} wei (0.002 GRT)",);
let receipt_value = MAX_RECEIPT_VALUE / 10;
println!(
"📊 Receipts needed for trigger: ~{receipts_needed} receipts at {receipt_value} wei each",
);

println!("\n=== V2 STAGE 1: Sending large receipt batches with small pauses ===");

// Send multiple V2 receipts in two batches with a gap between them
let mut total_successful = 0;

for batch in 0..BATCHES {
println!(
"Sending V2 batch {} of 2 with {} receipts each...",
batch + 1,
NUM_RECEIPTS
);
let batch = batch + 1;
println!("Sending V2 batch {batch} of {BATCHES} with {NUM_RECEIPTS} receipts each...",);

for i in 0..NUM_RECEIPTS {
// Create V2 receipt
Expand All @@ -267,16 +273,17 @@ pub async fn test_tap_rav_v2() -> Result<()> {

let receipt_encoded = encode_v2_receipt(&receipt)?;

let response = create_request(
&http_client,
&format!("{INDEXER_URL}/subgraphs/id/{SUBGRAPH_ID}"),
&receipt_encoded,
&json!({
let response = http_client
.post(format!("{GATEWAY_URL}/api/subgraphs/id/{SUBGRAPH_ID}"))
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {GATEWAY_API_KEY}"))
.header("Tap-Receipt", receipt_encoded)
.json(&json!({
"query": "{ _meta { block { number } } }"
}),
)
.send()
.await?;
}))
.timeout(Duration::from_secs(10))
.send()
.await?;

if response.status().is_success() {
total_successful += 1;
Expand All @@ -298,15 +305,22 @@ pub async fn test_tap_rav_v2() -> Result<()> {

// Check metrics after batch
let batch_metrics = metrics_checker.get_current_metrics().await?;
let current_unaggregated =
batch_metrics.unaggregated_fees_by_allocation(&allocation_id.to_string());
let trigger_threshold = 2_000_000_000_000_000u128;
let progress_pct =
(current_unaggregated as f64 / trigger_threshold as f64 * 100.0).min(100.0);

println!(
"After V2 batch {}: RAVs created: {}, Unaggregated fees: {}",
"After V2 batch {}: RAVs created: {}, Unaggregated fees: {} ({:.1}% of trigger threshold)",
batch + 1,
batch_metrics.ravs_created_by_allocation(&allocation_id.to_string()),
batch_metrics.unaggregated_fees_by_allocation(&allocation_id.to_string())
current_unaggregated,
progress_pct
);

// Wait between batches - long enough for first batch to exit buffer
if batch < 1 {
if batch < BATCHES - 1 {
println!("Waiting for buffer period + 5s...");
tokio::time::sleep(Duration::from_secs(WAIT_TIME_BATCHES)).await;
}
Expand All @@ -331,16 +345,17 @@ pub async fn test_tap_rav_v2() -> Result<()> {

let receipt_encoded = encode_v2_receipt(&receipt)?;

let response = create_request(
&http_client,
&format!("{INDEXER_URL}/subgraphs/id/{SUBGRAPH_ID}"),
&receipt_encoded,
&json!({
let response = http_client
.post(format!("{GATEWAY_URL}/api/subgraphs/id/{SUBGRAPH_ID}"))
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {GATEWAY_API_KEY}"))
.header("Tap-Receipt", receipt_encoded)
.json(&json!({
"query": "{ _meta { block { number } } }"
}),
)
.send()
.await?;
}))
.timeout(Duration::from_secs(10))
.send()
.await?;

if response.status().is_success() {
total_successful += 1;
Expand All @@ -361,11 +376,17 @@ pub async fn test_tap_rav_v2() -> Result<()> {
let current_unaggregated =
current_metrics.unaggregated_fees_by_allocation(&allocation_id.to_string());

// Calculate progress toward trigger threshold
let trigger_threshold = 2_000_000_000_000_000u128;
let progress_pct =
(current_unaggregated as f64 / trigger_threshold as f64 * 100.0).min(100.0);

println!(
"After V2 trigger {}: RAVs created: {}, Unaggregated fees: {}",
"After V2 trigger {}: RAVs created: {}, Unaggregated fees: {} ({:.1}% of trigger threshold)",
i + 1,
current_ravs_created,
current_unaggregated
current_unaggregated,
progress_pct
);

// If we've succeeded, exit early
Expand Down
Loading
Loading