Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tap_aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ tap_graph.workspace = true
thegraph-core = { workspace = true, features = ["alloy-eip712"] }
tokio.workspace = true
tonic.workspace = true
tonic-prost = "0.14.1"
tonic-prost.workspace = true
tower = { workspace = true, features = ["util", "steer", "limit"] }
tracing-subscriber.workspace = true

Expand All @@ -45,5 +45,6 @@ tonic-build.workspace = true
tonic-prost-build.workspace = true

[dev-dependencies]
jsonrpsee = { workspace = true, features = ["macros", "server", "client", "http-client"] }
rand.workspace = true
rstest.workspace = true
62 changes: 33 additions & 29 deletions tap_aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use anyhow::Result;
use clap::Parser;
use log::{debug, info};
use tap_aggregator::{metrics, server};
use tap_core::tap_eip712_domain;
use thegraph_core::alloy::{
dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner,
};
use thegraph_core::alloy::{primitives::Address, signers::local::PrivateKeySigner};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -66,9 +63,15 @@ struct Args {
#[arg(long, env = "TAP_DOMAIN_CHAIN_ID")]
domain_chain_id: Option<String>,

/// Domain verifying contract to be used for the EIP-712 domain separator.
#[arg(long, env = "TAP_DOMAIN_VERIFYING_CONTRACT")]
domain_verifying_contract: Option<Address>,
/// Domain verifying contract for V1 receipts (TAPVerifier).
/// Default: 0xC9a43158891282A2B1475592D5719c001986Aaec
#[arg(long, env = "TAP_DOMAIN_VERIFYING_CONTRACT_V1")]
domain_verifying_contract_v1: Option<Address>,

/// Domain verifying contract for V2 receipts (GraphTallyCollector).
/// Default: 0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07
#[arg(long, env = "TAP_DOMAIN_VERIFYING_CONTRACT_V2")]
domain_verifying_contract_v2: Option<Address>,

/// Domain salt to be used for the EIP-712 domain separator.
#[arg(long, env = "TAP_DOMAIN_SALT")]
Expand Down Expand Up @@ -99,8 +102,8 @@ async fn main() -> Result<()> {

info!("Wallet address: {:#40x}", wallet.address());

// Create the EIP-712 domain separator.
let domain_separator = create_eip712_domain(&args)?;
// Create the domain configuration
let domain_config = create_domain_config(&args)?;

// Create HashSet of *all* allowed signers
let mut accepted_addresses: HashSet<Address> = std::collections::HashSet::new();
Expand All @@ -126,7 +129,7 @@ async fn main() -> Result<()> {
args.port,
wallet,
accepted_addresses,
domain_separator,
domain_config,
args.max_request_body_size,
args.max_response_body_size,
args.max_connections,
Expand All @@ -142,30 +145,31 @@ async fn main() -> Result<()> {
Ok(())
}

fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
// Transform the args into the types expected by Eip712Domain::new().

// Transform optional strings into optional Cow<str>.
// Transform optional strings into optional U256.
if args.domain_chain_id.is_some() {
debug!("Parsing domain chain ID...");
}
fn create_domain_config(args: &Args) -> Result<server::DomainConfig> {
let chain_id: Option<u64> = args
.domain_chain_id
.as_ref()
.map(|s| s.parse())
.transpose()?;

if args.domain_salt.is_some() {
debug!("Parsing domain salt...");
let chain_id = chain_id.unwrap_or(1);

// Use custom addresses if provided, otherwise use defaults
if args.domain_verifying_contract_v1.is_some() || args.domain_verifying_contract_v2.is_some() {
let v1_contract = args.domain_verifying_contract_v1.unwrap_or_else(|| {
Address::from_str("0xC9a43158891282A2B1475592D5719c001986Aaec").unwrap()
});
let v2_contract = args.domain_verifying_contract_v2.unwrap_or_else(|| {
Address::from_str("0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07").unwrap()
});

Ok(server::DomainConfig::custom(
chain_id,
v1_contract,
v2_contract,
))
} else {
server::DomainConfig::new(chain_id)
.map_err(|e| anyhow::anyhow!("Failed to create domain config: {}", e))
}

// Transform optional strings into optional Address.
let verifying_contract: Option<Address> = args.domain_verifying_contract;

// Create the EIP-712 domain separator.
Ok(tap_eip712_domain(
chain_id.unwrap_or(1),
verifying_contract.unwrap_or_default(),
))
}
74 changes: 56 additions & 18 deletions tap_aggregator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,46 @@ pub trait Rpc {
) -> JsonRpcResult<Eip712SignedMessage<tap_graph::v2::ReceiptAggregateVoucher>>;
}

/// Configuration for domain separators based on protocol version
#[derive(Clone)]
pub struct DomainConfig {
pub v1: Eip712Domain,
pub v2: Eip712Domain,
}

impl DomainConfig {
/// Create a new DomainConfig with default contract addresses
pub fn new(chain_id: u64) -> Result<Self, Box<dyn std::error::Error>> {
use std::str::FromStr;

use tap_core::tap_eip712_domain;

// Default contract addresses
let tapverifier_address = Address::from_str("0xC9a43158891282A2B1475592D5719c001986Aaec")?;
let graphtally_address = Address::from_str("0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07")?;

Ok(DomainConfig {
v1: tap_eip712_domain(chain_id, tapverifier_address),
v2: tap_eip712_domain(chain_id, graphtally_address),
})
}

/// Create a custom DomainConfig with specific contract addresses
pub fn custom(chain_id: u64, v1_contract: Address, v2_contract: Address) -> Self {
use tap_core::tap_eip712_domain;

DomainConfig {
v1: tap_eip712_domain(chain_id, v1_contract),
v2: tap_eip712_domain(chain_id, v2_contract),
}
}
}

#[derive(Clone)]
struct RpcImpl {
wallet: PrivateKeySigner,
accepted_addresses: HashSet<Address>,
domain_separator: Eip712Domain,
domain_config: DomainConfig,
kafka: Option<rdkafka::producer::ThreadedProducer<rdkafka::producer::DefaultProducerContext>>,
}

Expand Down Expand Up @@ -148,7 +183,7 @@ fn aggregate_receipts_(
api_version: String,
wallet: &PrivateKeySigner,
accepted_addresses: &HashSet<Address>,
domain_separator: &Eip712Domain,
domain_config: &DomainConfig,
receipts: Vec<Eip712SignedMessage<Receipt>>,
previous_rav: Option<Eip712SignedMessage<ReceiptAggregateVoucher>>,
) -> JsonRpcResult<Eip712SignedMessage<ReceiptAggregateVoucher>> {
Expand Down Expand Up @@ -184,7 +219,7 @@ fn aggregate_receipts_(

// Execute v1 aggregation
let res = aggregator::v1::check_and_aggregate_receipts(
domain_separator,
&domain_config.v1,
&receipts,
previous_rav,
wallet,
Expand All @@ -207,7 +242,7 @@ fn aggregate_receipts_v2_(
api_version: String,
wallet: &PrivateKeySigner,
accepted_addresses: &HashSet<Address>,
domain_separator: &Eip712Domain,
domain_config: &DomainConfig,
receipts: Vec<Eip712SignedMessage<tap_graph::v2::Receipt>>,
previous_rav: Option<Eip712SignedMessage<tap_graph::v2::ReceiptAggregateVoucher>>,
) -> JsonRpcResult<Eip712SignedMessage<tap_graph::v2::ReceiptAggregateVoucher>> {
Expand Down Expand Up @@ -242,7 +277,7 @@ fn aggregate_receipts_v2_(

// Execute v2 aggregation
let res = aggregator::v2::check_and_aggregate_receipts(
domain_separator,
&domain_config.v2,
&receipts,
previous_rav,
wallet,
Expand Down Expand Up @@ -284,7 +319,7 @@ impl v1::tap_aggregator_server::TapAggregator for RpcImpl {
let receipts_count: u64 = receipts.len() as u64;

match aggregator::v1::check_and_aggregate_receipts(
&self.domain_separator,
&self.domain_config.v1,
receipts.as_slice(),
previous_rav,
&self.wallet,
Expand Down Expand Up @@ -340,7 +375,7 @@ impl v2::tap_aggregator_server::TapAggregator for RpcImpl {
let receipts_count: u64 = receipts.len() as u64;

match aggregator::v2::check_and_aggregate_receipts(
&self.domain_separator,
&self.domain_config.v2,
receipts.as_slice(),
previous_rav,
&self.wallet,
Expand Down Expand Up @@ -378,7 +413,9 @@ impl RpcServer for RpcImpl {
}

fn eip712_domain_info(&self) -> JsonRpcResult<Eip712Domain> {
Ok(JsonRpcResponse::ok(self.domain_separator.clone()))
// For backward compatibility, return the V1 domain separator
// Clients can determine V2 domain separator from the contract addresses
Ok(JsonRpcResponse::ok(self.domain_config.v1.clone()))
}

fn aggregate_receipts(
Expand All @@ -395,7 +432,7 @@ impl RpcServer for RpcImpl {
api_version,
&self.wallet,
&self.accepted_addresses,
&self.domain_separator,
&self.domain_config,
receipts,
previous_rav,
) {
Expand Down Expand Up @@ -435,7 +472,7 @@ impl RpcServer for RpcImpl {
api_version,
&self.wallet,
&self.accepted_addresses,
&self.domain_separator,
&self.domain_config,
receipts,
previous_rav,
) {
Expand Down Expand Up @@ -467,7 +504,7 @@ pub async fn run_server(
port: u16,
wallet: PrivateKeySigner,
accepted_addresses: HashSet<Address>,
domain_separator: Eip712Domain,
domain_config: DomainConfig,
max_request_body_size: u32,
max_response_body_size: u32,
max_concurrent_connections: u32,
Expand All @@ -477,7 +514,7 @@ pub async fn run_server(
let rpc_impl = RpcImpl {
wallet,
accepted_addresses,
domain_separator,
domain_config,
kafka,
};
let (json_rpc_service, _) = create_json_rpc_service(
Expand Down Expand Up @@ -633,6 +670,7 @@ mod tests {
dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner,
};

use super::DomainConfig;
use crate::server;

#[derive(Clone)]
Expand Down Expand Up @@ -680,7 +718,7 @@ mod tests {
#[rstest]
#[tokio::test]
async fn protocol_version(
domain_separator: Eip712Domain,
_domain_separator: Eip712Domain,
http_request_size_limit: u32,
http_response_size_limit: u32,
http_max_concurrent_connections: u32,
Expand All @@ -693,7 +731,7 @@ mod tests {
0,
keys_main.wallet,
HashSet::from([keys_main.address]),
domain_separator,
DomainConfig::custom(1, Address::from([0x11u8; 20]), Address::from([0x11u8; 20])),
http_request_size_limit,
http_response_size_limit,
http_max_concurrent_connections,
Expand Down Expand Up @@ -745,7 +783,7 @@ mod tests {
0,
keys_main.wallet.clone(),
HashSet::from([keys_main.address, keys_0.address, keys_1.address]),
domain_separator.clone(),
DomainConfig::custom(1, Address::from([0x11u8; 20]), Address::from([0x11u8; 20])),
http_request_size_limit,
http_response_size_limit,
http_max_concurrent_connections,
Expand Down Expand Up @@ -828,7 +866,7 @@ mod tests {
0,
keys_main.wallet.clone(),
HashSet::from([keys_main.address, keys_0.address, keys_1.address]),
domain_separator.clone(),
DomainConfig::custom(1, Address::from([0x11u8; 20]), Address::from([0x11u8; 20])),
http_request_size_limit,
http_response_size_limit,
http_max_concurrent_connections,
Expand Down Expand Up @@ -906,7 +944,7 @@ mod tests {
0,
keys_main.wallet.clone(),
HashSet::from([keys_main.address]),
domain_separator.clone(),
DomainConfig::custom(1, Address::from([0x11u8; 20]), Address::from([0x11u8; 20])),
http_request_size_limit,
http_response_size_limit,
http_max_concurrent_connections,
Expand Down Expand Up @@ -998,7 +1036,7 @@ mod tests {
0,
keys_main.wallet.clone(),
HashSet::from([keys_main.address]),
domain_separator.clone(),
DomainConfig::custom(1, Address::from([0x11u8; 20]), Address::from([0x11u8; 20])),
http_request_size_limit,
http_response_size_limit,
http_max_concurrent_connections,
Expand Down
4 changes: 3 additions & 1 deletion tap_aggregator/tests/aggregate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ async fn aggregation_test() {

let accepted_addresses = HashSet::from([wallet.address()]);

let domain_config = server::DomainConfig::custom(1, Address::ZERO, Address::ZERO);

let (join_handle, local_addr) = server::run_server(
0,
wallet.clone(),
accepted_addresses,
domain_separator.clone(),
domain_config,
max_request_body_size,
max_response_body_size,
max_concurrent_connections,
Expand Down
4 changes: 3 additions & 1 deletion tap_aggregator/tests/aggregate_v1_and_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ async fn aggregation_test() {

let accepted_addresses = HashSet::from([wallet.address()]);

let domain_config = server::DomainConfig::custom(1, Address::ZERO, Address::ZERO);

let (_, local_addr) = server::run_server(
0,
wallet.clone(),
accepted_addresses,
domain_separator.clone(),
domain_config,
max_request_body_size,
max_response_body_size,
max_concurrent_connections,
Expand Down
8 changes: 7 additions & 1 deletion tap_integration_tests/tests/showcase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,11 +838,17 @@ async fn start_sender_aggregator(

let accepted_addresses = HashSet::from([keys.address()]);

let domain_config = agg_server::DomainConfig::custom(
1,
domain_separator.verifying_contract.unwrap_or_default(),
domain_separator.verifying_contract.unwrap_or_default(),
);

let (server_handle, socket_addr) = agg_server::run_server(
http_port,
keys,
accepted_addresses,
domain_separator,
domain_config,
http_request_size_limit,
http_response_size_limit,
http_max_concurrent_connections,
Expand Down
Loading