Skip to content

Commit 9528ef2

Browse files
authored
test: use gateway in local network testing (#814)
* test(contrib): fix indexer-service setup port mismatch * fix(watcher): handle watcher channels closing * chore(service): add debug logging to receipt handling * test: mount tap-contracts to gateway in local-network setup
1 parent c029807 commit 9528ef2

File tree

11 files changed

+245
-55
lines changed

11 files changed

+245
-55
lines changed

contrib/indexer-service/start.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ receipts_verifier_address = "${GRAPH_TALLY_VERIFIER}"
6969
7070
[service]
7171
free_query_auth_token = "freestuff"
72-
host_and_port = "0.0.0.0:7600"
72+
host_and_port = "0.0.0.0:7601"
7373
url_prefix = "/"
7474
serve_network_subgraph = false
7575
serve_escrow_subgraph = false

crates/service/src/middleware/auth/tap.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,21 +55,42 @@ where
5555

5656
async move {
5757
let execute = || async {
58-
let receipt = receipt.ok_or(IndexerServiceError::ReceiptNotFound)?;
58+
let receipt = receipt.ok_or_else(|| {
59+
tracing::debug!(
60+
"TAP receipt validation failed: receipt not found in request extensions"
61+
);
62+
IndexerServiceError::ReceiptNotFound
63+
})?;
64+
65+
let version = match &receipt {
66+
TapReceipt::V1(_) => "V1",
67+
TapReceipt::V2(_) => "V2",
68+
};
69+
tracing::debug!(receipt_version = version, "Starting TAP receipt validation");
70+
5971
// Verify the receipt and store it in the database
6072
tap_manager
6173
.verify_and_store_receipt(&ctx.unwrap_or_default(), receipt)
6274
.await
63-
.inspect_err(|_| {
64-
if let Some(labels) = labels {
75+
.inspect_err(|err| {
76+
tracing::debug!(error = %err, receipt_version = version, "TAP receipt validation failed");
77+
if let Some(labels) = &labels {
6578
failed_receipt_metric
6679
.with_label_values(&labels.get_labels())
6780
.inc()
6881
}
6982
})?;
83+
84+
tracing::debug!(
85+
receipt_version = version,
86+
"TAP receipt validation successful"
87+
);
7088
Ok::<_, IndexerServiceError>(request)
7189
};
72-
execute().await.map_err(|error| error.into_response())
90+
execute().await.map_err(|error| {
91+
tracing::debug!(error = %error, "TAP authorization failed, returning HTTP error response");
92+
error.into_response()
93+
})
7394
}
7495
}
7596
}

crates/service/src/middleware/tap_receipt.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,21 @@ use crate::service::TapHeader;
1414
///
1515
/// This is useful to not deserialize multiple times the same receipt
1616
pub async fn receipt_middleware(mut request: Request, next: Next) -> Response {
17-
if let Ok(TypedHeader(TapHeader(receipt))) =
18-
request.extract_parts::<TypedHeader<TapHeader>>().await
19-
{
20-
request.extensions_mut().insert(receipt);
17+
match request.extract_parts::<TypedHeader<TapHeader>>().await {
18+
Ok(TypedHeader(TapHeader(receipt))) => {
19+
let version = match &receipt {
20+
crate::tap::TapReceipt::V1(_) => "V1",
21+
crate::tap::TapReceipt::V2(_) => "V2",
22+
};
23+
tracing::debug!(
24+
receipt_version = version,
25+
"TAP receipt extracted successfully"
26+
);
27+
request.extensions_mut().insert(receipt);
28+
}
29+
Err(e) => {
30+
tracing::debug!(error = %e, "No TAP receipt found in request headers");
31+
}
2132
}
2233
next.run(request).await
2334
}

crates/service/src/service/tap_receipt_header.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,40 @@ impl Header for TapHeader {
3131
{
3232
let mut execute = || -> anyhow::Result<TapHeader> {
3333
let raw_receipt = values.next().ok_or(headers::Error::invalid())?;
34+
tracing::debug!(
35+
raw_receipt_length = raw_receipt.len(),
36+
"Processing TAP receipt header"
37+
);
3438

3539
// we first try to decode a v2 receipt since it's cheaper and fail earlier than using
3640
// serde
3741
match BASE64_STANDARD.decode(raw_receipt) {
3842
Ok(raw_receipt) => {
39-
tracing::debug!("Decoded v2");
40-
let receipt = grpc::v2::SignedReceipt::decode(raw_receipt.as_ref())?;
41-
Ok(TapHeader(TapReceipt::V2(receipt.try_into()?)))
43+
tracing::debug!(
44+
decoded_length = raw_receipt.len(),
45+
"Successfully base64 decoded v2 receipt"
46+
);
47+
let receipt =
48+
grpc::v2::SignedReceipt::decode(raw_receipt.as_ref()).map_err(|e| {
49+
tracing::debug!(error = %e, "Failed to protobuf decode v2 receipt");
50+
e
51+
})?;
52+
tracing::debug!("Successfully protobuf decoded v2 receipt");
53+
let converted_receipt = receipt.try_into().map_err(|e: anyhow::Error| {
54+
tracing::debug!(error = %e, "Failed to convert v2 receipt");
55+
e
56+
})?;
57+
tracing::debug!("Successfully converted v2 receipt to TapReceipt::V2");
58+
Ok(TapHeader(TapReceipt::V2(converted_receipt)))
4259
}
43-
Err(_) => {
44-
tracing::debug!("Could not decode v2, trying v1");
60+
Err(e) => {
61+
tracing::debug!(error = %e, "Could not base64 decode v2 receipt, trying v1");
4562
let parsed_receipt: SignedReceipt =
46-
serde_json::from_slice(raw_receipt.as_ref())?;
63+
serde_json::from_slice(raw_receipt.as_ref()).map_err(|e| {
64+
tracing::debug!(error = %e, "Failed to JSON decode v1 receipt");
65+
e
66+
})?;
67+
tracing::debug!("Successfully decoded v1 receipt");
4768
Ok(TapHeader(TapReceipt::V1(parsed_receipt)))
4869
}
4970
}

crates/watcher/src/lib.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ where
3737
time_interval.tick().await;
3838
let result = function().await;
3939
match result {
40-
Ok(value) => tx.send(value).expect("Failed to update channel"),
40+
Ok(value) => {
41+
if tx.send(value).is_err() {
42+
tracing::debug!("Watcher channel closed, stopping watcher task");
43+
break;
44+
}
45+
}
4146
Err(err) => {
4247
// TODO mark it as delayed
4348
tracing::warn!(error = %err, "There was an error while updating watcher");
@@ -79,7 +84,10 @@ where
7984
let current_val_1 = receiver_1.borrow().clone();
8085
let current_val_2 = receiver_2.borrow().clone();
8186
let mapped_value = map_function((current_val_1, current_val_2));
82-
tx.send(mapped_value).expect("Failed to update channel");
87+
if tx.send(mapped_value).is_err() {
88+
tracing::debug!("Watcher channel closed, stopping combined watcher task");
89+
break;
90+
}
8391
}
8492
});
8593
rx
@@ -138,7 +146,10 @@ where
138146

139147
let current_val = receiver.borrow().clone();
140148
let mapped_value = map_function(current_val);
141-
tx.send(mapped_value).expect("Failed to update channel");
149+
if tx.send(mapped_value).is_err() {
150+
tracing::debug!("Watcher channel closed, stopping mapped watcher task");
151+
break;
152+
}
142153
}
143154
});
144155
rx

integration-tests/src/constants.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ pub const TAP_AGENT_METRICS_URL: &str = "http://localhost:7300/metrics";
1717
// and the signing key account0_secret
1818
// they must match otherwise receipts would be rejected
1919
pub const TAP_VERIFIER_CONTRACT: &str = "0xC9a43158891282A2B1475592D5719c001986Aaec";
20+
21+
// V2 GraphTallyCollector contract address (for Horizon receipts)
22+
pub const GRAPH_TALLY_COLLECTOR_CONTRACT: &str = "0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07";
2023
pub const ACCOUNT0_SECRET: &str =
2124
"ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80";
2225
pub const CHAIN_ID: u64 = 1337;

integration-tests/src/main.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod constants;
55
mod load_test;
66
mod metrics;
77
mod rav_tests;
8+
mod signature_test;
89
mod utils;
910

1011
use anyhow::Result;
@@ -39,6 +40,9 @@ enum Commands {
3940
#[clap(long, short, value_parser)]
4041
num_receipts: usize,
4142
},
43+
44+
#[clap(name = "debug")]
45+
Debug,
4246
}
4347

4448
#[tokio::main]
@@ -65,6 +69,10 @@ async fn main() -> Result<()> {
6569
let concurrency = num_cpus::get();
6670
receipt_handler_load_test_v2(num_receipts, concurrency).await?;
6771
}
72+
// cargo run -- debug
73+
Commands::Debug => {
74+
signature_test::test_v2_signature_recovery().await?;
75+
}
6876
}
6977

7078
Ok(())

integration-tests/src/rav_tests.rs

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ use crate::{
2121
};
2222

2323
const WAIT_TIME_BATCHES: u64 = 40;
24-
const NUM_RECEIPTS: u32 = 3;
24+
const NUM_RECEIPTS: u32 = 30; // Increased to 30 receipts per batch
2525

2626
// Send receipts in batches with a delay in between
2727
// to ensure some receipts get outside the timestamp buffer
28-
const BATCHES: u32 = 2;
29-
const MAX_TRIGGERS: usize = 100;
28+
const BATCHES: u32 = 15; // Increased to 15 batches for total 450 receipts in Stage 1
29+
const MAX_TRIGGERS: usize = 200; // Increased trigger attempts to 200
3030

3131
// Function to test the tap RAV generation
3232
pub async fn test_tap_rav_v1() -> Result<()> {
@@ -241,17 +241,23 @@ pub async fn test_tap_rav_v2() -> Result<()> {
241241
"\n=== V2 Initial metrics: RAVs created: {initial_ravs_created}, Unaggregated fees: {initial_unaggregated} ==="
242242
);
243243

244+
// Calculate expected thresholds
245+
let trigger_threshold = 2_000_000_000_000_000u128; // 0.002 GRT trigger value
246+
let receipts_needed = trigger_threshold / (MAX_RECEIPT_VALUE / 10); // Using trigger receipt value
247+
println!("📊 RAV trigger threshold: {trigger_threshold} wei (0.002 GRT)",);
248+
let receipt_value = MAX_RECEIPT_VALUE / 10;
249+
println!(
250+
"📊 Receipts needed for trigger: ~{receipts_needed} receipts at {receipt_value} wei each",
251+
);
252+
244253
println!("\n=== V2 STAGE 1: Sending large receipt batches with small pauses ===");
245254

246255
// Send multiple V2 receipts in two batches with a gap between them
247256
let mut total_successful = 0;
248257

249258
for batch in 0..BATCHES {
250-
println!(
251-
"Sending V2 batch {} of 2 with {} receipts each...",
252-
batch + 1,
253-
NUM_RECEIPTS
254-
);
259+
let batch = batch + 1;
260+
println!("Sending V2 batch {batch} of {BATCHES} with {NUM_RECEIPTS} receipts each...",);
255261

256262
for i in 0..NUM_RECEIPTS {
257263
// Create V2 receipt
@@ -267,16 +273,17 @@ pub async fn test_tap_rav_v2() -> Result<()> {
267273

268274
let receipt_encoded = encode_v2_receipt(&receipt)?;
269275

270-
let response = create_request(
271-
&http_client,
272-
&format!("{INDEXER_URL}/subgraphs/id/{SUBGRAPH_ID}"),
273-
&receipt_encoded,
274-
&json!({
276+
let response = http_client
277+
.post(format!("{GATEWAY_URL}/api/subgraphs/id/{SUBGRAPH_ID}"))
278+
.header("Content-Type", "application/json")
279+
.header("Authorization", format!("Bearer {GATEWAY_API_KEY}"))
280+
.header("Tap-Receipt", receipt_encoded)
281+
.json(&json!({
275282
"query": "{ _meta { block { number } } }"
276-
}),
277-
)
278-
.send()
279-
.await?;
283+
}))
284+
.timeout(Duration::from_secs(10))
285+
.send()
286+
.await?;
280287

281288
if response.status().is_success() {
282289
total_successful += 1;
@@ -298,15 +305,22 @@ pub async fn test_tap_rav_v2() -> Result<()> {
298305

299306
// Check metrics after batch
300307
let batch_metrics = metrics_checker.get_current_metrics().await?;
308+
let current_unaggregated =
309+
batch_metrics.unaggregated_fees_by_allocation(&allocation_id.to_string());
310+
let trigger_threshold = 2_000_000_000_000_000u128;
311+
let progress_pct =
312+
(current_unaggregated as f64 / trigger_threshold as f64 * 100.0).min(100.0);
313+
301314
println!(
302-
"After V2 batch {}: RAVs created: {}, Unaggregated fees: {}",
315+
"After V2 batch {}: RAVs created: {}, Unaggregated fees: {} ({:.1}% of trigger threshold)",
303316
batch + 1,
304317
batch_metrics.ravs_created_by_allocation(&allocation_id.to_string()),
305-
batch_metrics.unaggregated_fees_by_allocation(&allocation_id.to_string())
318+
current_unaggregated,
319+
progress_pct
306320
);
307321

308322
// Wait between batches - long enough for first batch to exit buffer
309-
if batch < 1 {
323+
if batch < BATCHES - 1 {
310324
println!("Waiting for buffer period + 5s...");
311325
tokio::time::sleep(Duration::from_secs(WAIT_TIME_BATCHES)).await;
312326
}
@@ -331,16 +345,17 @@ pub async fn test_tap_rav_v2() -> Result<()> {
331345

332346
let receipt_encoded = encode_v2_receipt(&receipt)?;
333347

334-
let response = create_request(
335-
&http_client,
336-
&format!("{INDEXER_URL}/subgraphs/id/{SUBGRAPH_ID}"),
337-
&receipt_encoded,
338-
&json!({
348+
let response = http_client
349+
.post(format!("{GATEWAY_URL}/api/subgraphs/id/{SUBGRAPH_ID}"))
350+
.header("Content-Type", "application/json")
351+
.header("Authorization", format!("Bearer {GATEWAY_API_KEY}"))
352+
.header("Tap-Receipt", receipt_encoded)
353+
.json(&json!({
339354
"query": "{ _meta { block { number } } }"
340-
}),
341-
)
342-
.send()
343-
.await?;
355+
}))
356+
.timeout(Duration::from_secs(10))
357+
.send()
358+
.await?;
344359

345360
if response.status().is_success() {
346361
total_successful += 1;
@@ -361,11 +376,17 @@ pub async fn test_tap_rav_v2() -> Result<()> {
361376
let current_unaggregated =
362377
current_metrics.unaggregated_fees_by_allocation(&allocation_id.to_string());
363378

379+
// Calculate progress toward trigger threshold
380+
let trigger_threshold = 2_000_000_000_000_000u128;
381+
let progress_pct =
382+
(current_unaggregated as f64 / trigger_threshold as f64 * 100.0).min(100.0);
383+
364384
println!(
365-
"After V2 trigger {}: RAVs created: {}, Unaggregated fees: {}",
385+
"After V2 trigger {}: RAVs created: {}, Unaggregated fees: {} ({:.1}% of trigger threshold)",
366386
i + 1,
367387
current_ravs_created,
368-
current_unaggregated
388+
current_unaggregated,
389+
progress_pct
369390
);
370391

371392
// If we've succeeded, exit early

0 commit comments

Comments
 (0)