diff --git a/tap_aggregator/Cargo.toml b/tap_aggregator/Cargo.toml index 3cb871e..d695648 100644 --- a/tap_aggregator/Cargo.toml +++ b/tap_aggregator/Cargo.toml @@ -36,7 +36,7 @@ tap_graph.workspace = true thegraph-core = { workspace = true, features = ["alloy-eip712"] } tokio.workspace = true tonic.workspace = true -tonic-prost = "0.14.1" +tonic-prost.workspace = true tower = { workspace = true, features = ["util", "steer", "limit"] } tracing-subscriber.workspace = true @@ -45,5 +45,6 @@ tonic-build.workspace = true tonic-prost-build.workspace = true [dev-dependencies] +jsonrpsee = { workspace = true, features = ["macros", "server", "client", "http-client"] } rand.workspace = true rstest.workspace = true diff --git a/tap_aggregator/src/main.rs b/tap_aggregator/src/main.rs index 4ab3a67..928769e 100644 --- a/tap_aggregator/src/main.rs +++ b/tap_aggregator/src/main.rs @@ -9,10 +9,7 @@ use anyhow::Result; use clap::Parser; use log::{debug, info}; use tap_aggregator::{metrics, server}; -use tap_core::tap_eip712_domain; -use thegraph_core::alloy::{ - dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner, -}; +use thegraph_core::alloy::{primitives::Address, signers::local::PrivateKeySigner}; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -66,9 +63,15 @@ struct Args { #[arg(long, env = "TAP_DOMAIN_CHAIN_ID")] domain_chain_id: Option, - /// Domain verifying contract to be used for the EIP-712 domain separator. - #[arg(long, env = "TAP_DOMAIN_VERIFYING_CONTRACT")] - domain_verifying_contract: Option
, + /// Domain verifying contract for V1 receipts (TAPVerifier). + /// Default: 0xC9a43158891282A2B1475592D5719c001986Aaec + #[arg(long, env = "TAP_DOMAIN_VERIFYING_CONTRACT_V1")] + domain_verifying_contract_v1: Option
, + + /// Domain verifying contract for V2 receipts (GraphTallyCollector). + /// Default: 0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07 + #[arg(long, env = "TAP_DOMAIN_VERIFYING_CONTRACT_V2")] + domain_verifying_contract_v2: Option
, /// Domain salt to be used for the EIP-712 domain separator. #[arg(long, env = "TAP_DOMAIN_SALT")] @@ -99,8 +102,8 @@ async fn main() -> Result<()> { info!("Wallet address: {:#40x}", wallet.address()); - // Create the EIP-712 domain separator. - let domain_separator = create_eip712_domain(&args)?; + // Create the domain configuration + let domain_config = create_domain_config(&args)?; // Create HashSet of *all* allowed signers let mut accepted_addresses: HashSet
= std::collections::HashSet::new(); @@ -126,7 +129,7 @@ async fn main() -> Result<()> { args.port, wallet, accepted_addresses, - domain_separator, + domain_config, args.max_request_body_size, args.max_response_body_size, args.max_connections, @@ -142,30 +145,31 @@ async fn main() -> Result<()> { Ok(()) } -fn create_eip712_domain(args: &Args) -> Result { - // Transform the args into the types expected by Eip712Domain::new(). - - // Transform optional strings into optional Cow. - // Transform optional strings into optional U256. - if args.domain_chain_id.is_some() { - debug!("Parsing domain chain ID..."); - } +fn create_domain_config(args: &Args) -> Result { let chain_id: Option = args .domain_chain_id .as_ref() .map(|s| s.parse()) .transpose()?; - if args.domain_salt.is_some() { - debug!("Parsing domain salt..."); + let chain_id = chain_id.unwrap_or(1); + + // Use custom addresses if provided, otherwise use defaults + if args.domain_verifying_contract_v1.is_some() || args.domain_verifying_contract_v2.is_some() { + let v1_contract = args.domain_verifying_contract_v1.unwrap_or_else(|| { + Address::from_str("0xC9a43158891282A2B1475592D5719c001986Aaec").unwrap() + }); + let v2_contract = args.domain_verifying_contract_v2.unwrap_or_else(|| { + Address::from_str("0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07").unwrap() + }); + + Ok(server::DomainConfig::custom( + chain_id, + v1_contract, + v2_contract, + )) + } else { + server::DomainConfig::new(chain_id) + .map_err(|e| anyhow::anyhow!("Failed to create domain config: {}", e)) } - - // Transform optional strings into optional Address. - let verifying_contract: Option
= args.domain_verifying_contract; - - // Create the EIP-712 domain separator. - Ok(tap_eip712_domain( - chain_id.unwrap_or(1), - verifying_contract.unwrap_or_default(), - )) } diff --git a/tap_aggregator/src/server.rs b/tap_aggregator/src/server.rs index b58d1e6..5c64671 100644 --- a/tap_aggregator/src/server.rs +++ b/tap_aggregator/src/server.rs @@ -107,11 +107,46 @@ pub trait Rpc { ) -> JsonRpcResult>; } +/// Configuration for domain separators based on protocol version +#[derive(Clone)] +pub struct DomainConfig { + pub v1: Eip712Domain, + pub v2: Eip712Domain, +} + +impl DomainConfig { + /// Create a new DomainConfig with default contract addresses + pub fn new(chain_id: u64) -> Result> { + use std::str::FromStr; + + use tap_core::tap_eip712_domain; + + // Default contract addresses + let tapverifier_address = Address::from_str("0xC9a43158891282A2B1475592D5719c001986Aaec")?; + let graphtally_address = Address::from_str("0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07")?; + + Ok(DomainConfig { + v1: tap_eip712_domain(chain_id, tapverifier_address), + v2: tap_eip712_domain(chain_id, graphtally_address), + }) + } + + /// Create a custom DomainConfig with specific contract addresses + pub fn custom(chain_id: u64, v1_contract: Address, v2_contract: Address) -> Self { + use tap_core::tap_eip712_domain; + + DomainConfig { + v1: tap_eip712_domain(chain_id, v1_contract), + v2: tap_eip712_domain(chain_id, v2_contract), + } + } +} + #[derive(Clone)] struct RpcImpl { wallet: PrivateKeySigner, accepted_addresses: HashSet
, - domain_separator: Eip712Domain, + domain_config: DomainConfig, kafka: Option>, } @@ -148,7 +183,7 @@ fn aggregate_receipts_( api_version: String, wallet: &PrivateKeySigner, accepted_addresses: &HashSet
, - domain_separator: &Eip712Domain, + domain_config: &DomainConfig, receipts: Vec>, previous_rav: Option>, ) -> JsonRpcResult> { @@ -184,7 +219,7 @@ fn aggregate_receipts_( // Execute v1 aggregation let res = aggregator::v1::check_and_aggregate_receipts( - domain_separator, + &domain_config.v1, &receipts, previous_rav, wallet, @@ -207,7 +242,7 @@ fn aggregate_receipts_v2_( api_version: String, wallet: &PrivateKeySigner, accepted_addresses: &HashSet
, - domain_separator: &Eip712Domain, + domain_config: &DomainConfig, receipts: Vec>, previous_rav: Option>, ) -> JsonRpcResult> { @@ -242,7 +277,7 @@ fn aggregate_receipts_v2_( // Execute v2 aggregation let res = aggregator::v2::check_and_aggregate_receipts( - domain_separator, + &domain_config.v2, &receipts, previous_rav, wallet, @@ -284,7 +319,7 @@ impl v1::tap_aggregator_server::TapAggregator for RpcImpl { let receipts_count: u64 = receipts.len() as u64; match aggregator::v1::check_and_aggregate_receipts( - &self.domain_separator, + &self.domain_config.v1, receipts.as_slice(), previous_rav, &self.wallet, @@ -340,7 +375,7 @@ impl v2::tap_aggregator_server::TapAggregator for RpcImpl { let receipts_count: u64 = receipts.len() as u64; match aggregator::v2::check_and_aggregate_receipts( - &self.domain_separator, + &self.domain_config.v2, receipts.as_slice(), previous_rav, &self.wallet, @@ -378,7 +413,9 @@ impl RpcServer for RpcImpl { } fn eip712_domain_info(&self) -> JsonRpcResult { - Ok(JsonRpcResponse::ok(self.domain_separator.clone())) + // For backward compatibility, return the V1 domain separator + // Clients can determine V2 domain separator from the contract addresses + Ok(JsonRpcResponse::ok(self.domain_config.v1.clone())) } fn aggregate_receipts( @@ -395,7 +432,7 @@ impl RpcServer for RpcImpl { api_version, &self.wallet, &self.accepted_addresses, - &self.domain_separator, + &self.domain_config, receipts, previous_rav, ) { @@ -435,7 +472,7 @@ impl RpcServer for RpcImpl { api_version, &self.wallet, &self.accepted_addresses, - &self.domain_separator, + &self.domain_config, receipts, previous_rav, ) { @@ -467,7 +504,7 @@ pub async fn run_server( port: u16, wallet: PrivateKeySigner, accepted_addresses: HashSet
, - domain_separator: Eip712Domain, + domain_config: DomainConfig, max_request_body_size: u32, max_response_body_size: u32, max_concurrent_connections: u32, @@ -477,7 +514,7 @@ pub async fn run_server( let rpc_impl = RpcImpl { wallet, accepted_addresses, - domain_separator, + domain_config, kafka, }; let (json_rpc_service, _) = create_json_rpc_service( @@ -633,6 +670,7 @@ mod tests { dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner, }; + use super::DomainConfig; use crate::server; #[derive(Clone)] @@ -680,7 +718,7 @@ mod tests { #[rstest] #[tokio::test] async fn protocol_version( - domain_separator: Eip712Domain, + _domain_separator: Eip712Domain, http_request_size_limit: u32, http_response_size_limit: u32, http_max_concurrent_connections: u32, @@ -693,7 +731,7 @@ mod tests { 0, keys_main.wallet, HashSet::from([keys_main.address]), - domain_separator, + DomainConfig::custom(1, Address::from([0x11u8; 20]), Address::from([0x11u8; 20])), http_request_size_limit, http_response_size_limit, http_max_concurrent_connections, @@ -745,7 +783,7 @@ mod tests { 0, keys_main.wallet.clone(), HashSet::from([keys_main.address, keys_0.address, keys_1.address]), - domain_separator.clone(), + DomainConfig::custom(1, Address::from([0x11u8; 20]), Address::from([0x11u8; 20])), http_request_size_limit, http_response_size_limit, http_max_concurrent_connections, @@ -828,7 +866,7 @@ mod tests { 0, keys_main.wallet.clone(), HashSet::from([keys_main.address, keys_0.address, keys_1.address]), - domain_separator.clone(), + DomainConfig::custom(1, Address::from([0x11u8; 20]), Address::from([0x11u8; 20])), http_request_size_limit, http_response_size_limit, http_max_concurrent_connections, @@ -906,7 +944,7 @@ mod tests { 0, keys_main.wallet.clone(), HashSet::from([keys_main.address]), - domain_separator.clone(), + DomainConfig::custom(1, Address::from([0x11u8; 20]), Address::from([0x11u8; 20])), http_request_size_limit, http_response_size_limit, http_max_concurrent_connections, @@ -998,7 +1036,7 @@ mod tests { 0, keys_main.wallet.clone(), HashSet::from([keys_main.address]), - domain_separator.clone(), + DomainConfig::custom(1, Address::from([0x11u8; 20]), Address::from([0x11u8; 20])), http_request_size_limit, http_response_size_limit, http_max_concurrent_connections, diff --git a/tap_aggregator/tests/aggregate_test.rs b/tap_aggregator/tests/aggregate_test.rs index eced28c..66fb6f4 100644 --- a/tap_aggregator/tests/aggregate_test.rs +++ b/tap_aggregator/tests/aggregate_test.rs @@ -26,11 +26,13 @@ async fn aggregation_test() { let accepted_addresses = HashSet::from([wallet.address()]); + let domain_config = server::DomainConfig::custom(1, Address::ZERO, Address::ZERO); + let (join_handle, local_addr) = server::run_server( 0, wallet.clone(), accepted_addresses, - domain_separator.clone(), + domain_config, max_request_body_size, max_response_body_size, max_concurrent_connections, diff --git a/tap_aggregator/tests/aggregate_v1_and_v2.rs b/tap_aggregator/tests/aggregate_v1_and_v2.rs index 3ea4d9b..ce16bea 100644 --- a/tap_aggregator/tests/aggregate_v1_and_v2.rs +++ b/tap_aggregator/tests/aggregate_v1_and_v2.rs @@ -30,11 +30,13 @@ async fn aggregation_test() { let accepted_addresses = HashSet::from([wallet.address()]); + let domain_config = server::DomainConfig::custom(1, Address::ZERO, Address::ZERO); + let (_, local_addr) = server::run_server( 0, wallet.clone(), accepted_addresses, - domain_separator.clone(), + domain_config, max_request_body_size, max_response_body_size, max_concurrent_connections, diff --git a/tap_integration_tests/tests/showcase.rs b/tap_integration_tests/tests/showcase.rs index 3763914..150be85 100644 --- a/tap_integration_tests/tests/showcase.rs +++ b/tap_integration_tests/tests/showcase.rs @@ -838,11 +838,17 @@ async fn start_sender_aggregator( let accepted_addresses = HashSet::from([keys.address()]); + let domain_config = agg_server::DomainConfig::custom( + 1, + domain_separator.verifying_contract.unwrap_or_default(), + domain_separator.verifying_contract.unwrap_or_default(), + ); + let (server_handle, socket_addr) = agg_server::run_server( http_port, keys, accepted_addresses, - domain_separator, + domain_config, http_request_size_limit, http_response_size_limit, http_max_concurrent_connections,