diff --git a/Cargo.toml b/Cargo.toml index eebb6d4..b079cf3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ tap_eip712_message = { version = "0.2.1", path = "tap_eip712_message" } tap_core = { version = "4.1.2", path = "tap_core" } tap_graph = { version = "0.3.2", path = "tap_graph", features = ["v2"] } tap_receipt = { version = "1.1.2", path = "tap_receipt" } -thegraph-core = "0.15.0" +thegraph-core = "0.15.1" thiserror = "2.0.12" tokio = { version = "1.44.2", features = ["macros", "signal"] } tonic = { version = "0.13.0", features = ["transport", "zstd"] } diff --git a/tap_aggregator/Cargo.toml b/tap_aggregator/Cargo.toml index 66305bb..0482a61 100644 --- a/tap_aggregator/Cargo.toml +++ b/tap_aggregator/Cargo.toml @@ -7,6 +7,10 @@ repository.workspace = true readme = "README.md" description = "A JSON-RPC service for the Timeline Aggregation Protocol that lets clients request an aggregate receipt from a list of individual receipts." +[features] +default = ["v2"] +v2 = ["tap_graph/v2"] + [[bin]] name = "tap_aggregator" path = "src/main.rs" diff --git a/tap_aggregator/README.md b/tap_aggregator/README.md index efaaa70..b9ec1ac 100644 --- a/tap_aggregator/README.md +++ b/tap_aggregator/README.md @@ -2,6 +2,8 @@ A stateless JSON-RPC service that lets clients request an aggregate receipt from a list of individual receipts. +Supports both V1 (allocation-based) and V2 (collection-based) aggregation protocols for seamless migration to the Horizon protocol. + TAP Aggregator is run by [gateway](https://github.com/edgeandnode/gateway/blob/main/README.md) operators. @@ -189,6 +191,11 @@ In addition to the official spec, we define a few special errors: ### Methods +This aggregator supports both V1 (legacy) and V2 (Horizon) protocols: + +- **V1 Endpoints**: `aggregate_receipts` - allocation-based aggregation +- **V2 Endpoints**: `aggregate_receipts_v2` - collection-based aggregation with enhanced fields + #### `api_versions()` [source](server::RpcServer::api_versions) @@ -318,3 +325,170 @@ Example: } } ``` + +#### `aggregate_receipts_v2(api_version, receipts, previous_rav)` + +Aggregates the given V2 receipts into a V2 receipt aggregate voucher using the Horizon protocol. +This method supports collection-based aggregation with enhanced fields for payer, data service, and service provider tracking. + +**V2 Receipt Structure:** + +- `collection_id`: 32-byte identifier for the collection (replaces `allocation_id`) +- `payer`: Address of the payer +- `data_service`: Address of the data service +- `service_provider`: Address of the service provider +- `timestamp_ns`: Timestamp in nanoseconds +- `nonce`: Unique nonce +- `value`: Receipt value + +**V2 RAV Structure:** + +- `collectionId`: Collection identifier (replaces `allocation_id`) +- `payer`: Payer address +- `dataService`: Data service address +- `serviceProvider`: Service provider address +- `timestampNs`: Latest timestamp +- `valueAggregate`: Total aggregated value +- `metadata`: Additional metadata (bytes) + +Example: + +*Request*: + +```json +{ + "jsonrpc": "2.0", + "id": 0, + "method": "aggregate_receipts_v2", + "params": [ + "1.0.0", + [ + { + "message": { + "collection_id": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddead", + "payer": "0xabababababababababababababababababababab", + "data_service": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead", + "service_provider": "0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef", + "timestamp_ns": 1685670449225087255, + "nonce": 11835827017881841442, + "value": 34 + }, + "signature": { + "r": "0xa9fa1acf3cc3be503612f75602e68cc22286592db1f4f944c78397cbe529353b", + "s": "0x566cfeb7e80a393021a443d5846c0734d25bcf54ed90d97effe93b1c8aef0911", + "v": 27 + } + }, + { + "message": { + "collection_id": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddead", + "payer": "0xabababababababababababababababababababab", + "data_service": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead", + "service_provider": "0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef", + "timestamp_ns": 1685670449225830106, + "nonce": 17711980309995246801, + "value": 23 + }, + "signature": { + "r": "0x51ca5a2b839558654326d3a3f544a97d94effb9a7dd9cac7492007bc974e91f0", + "s": "0x3d9d398ea6b0dd9fac97726f51c0840b8b314821fb4534cb40383850c431fd9e", + "v": 28 + } + } + ], + { + "message": { + "collectionId": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddead", + "payer": "0xabababababababababababababababababababab", + "dataService": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead", + "serviceProvider": "0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef", + "timestampNs": 1685670449224324338, + "valueAggregate": 101, + "metadata": "0x" + }, + "signature": { + "r": "0x601a1f399cf6223d1414a89b7bbc90ee13eeeec006bd59e0c96042266c6ad7dc", + "s": "0x3172e795bd190865afac82e3a8be5f4ccd4b65958529986c779833625875f0b2", + "v": 28 + } + } + ] +} +``` + +*Response*: + +```json +{ + "id": 0, + "jsonrpc": "2.0", + "result": { + "data": { + "message": { + "collectionId": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddead", + "payer": "0xabababababababababababababababababababab", + "dataService": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead", + "serviceProvider": "0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef", + "timestampNs": 1685670449225830106, + "valueAggregate": 158, + "metadata": "0x" + }, + "signature": { + "r": "0x60eb38374119bbabf1ac6960f532124ba2a9c5990d9fb50875b512e611847eb5", + "s": "0x1b9a330cc9e2ecbda340a4757afaee8f55b6dbf278428f8cf49dd5ad8438f83d", + "v": 27 + } + } + } +} +``` + +## Feature Flags + +### V2 Protocol Support + +The aggregator supports both V1 and V2 protocols: + +- **V2 is enabled by default** via the `v2` feature flag in `tap_aggregator/Cargo.toml` +- To disable V2: `cargo build --no-default-features` +- Both protocols can run simultaneously for gradual migration +- V1 endpoints remain unchanged and fully functional + +### Feature Flag Usage + +```bash +# Build with V2 support (default) +cargo build --release + +# Build without V2 support (V1 only) +cargo build --release --no-default-features + +# Explicitly enable V2 feature +cargo build --release --features v2 +``` + +## Migration Guide + +### V1 to V2 Migration + +The V2 protocol introduces collection-based aggregation with enhanced tracking. Here's how to migrate: + +#### Key Changes + +1. **Receipt Structure**: `allocation_id` → `collection_id` + additional fields +2. **RAV Structure**: `allocation_id` → `collectionId` + additional fields +3. **New Fields**: `payer`, `data_service`/`dataService`, `service_provider`/`serviceProvider` +4. **Metadata**: V2 RAVs include optional `metadata` field + +#### Migration Steps + +1. **Phase 1**: Deploy aggregator with V2 support (both endpoints available) +2. **Phase 2**: Update clients to use V2 receipt structure +3. **Phase 3**: Switch clients to `aggregate_receipts_v2` endpoint +4. **Phase 4**: V1 endpoints remain for backward compatibility + +#### Backward Compatibility + +- **V1 endpoints**: Unchanged and fully functional +- **gRPC support**: Both V1 and V2 with automatic conversion +- **No breaking changes**: Existing V1 clients continue to work diff --git a/tap_aggregator/src/lib.rs b/tap_aggregator/src/lib.rs index 6746f3a..47b2292 100644 --- a/tap_aggregator/src/lib.rs +++ b/tap_aggregator/src/lib.rs @@ -7,4 +7,6 @@ pub mod error_codes; pub mod grpc; pub mod jsonrpsee_helpers; pub mod metrics; +pub mod protocol_mode; +pub mod receipt_classifier; pub mod server; diff --git a/tap_aggregator/src/protocol_mode.rs b/tap_aggregator/src/protocol_mode.rs new file mode 100644 index 0000000..e88acda --- /dev/null +++ b/tap_aggregator/src/protocol_mode.rs @@ -0,0 +1,29 @@ +// Copyright 2024 The Graph Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ProtocolMode { + /// Pre-horizon: V1 receipts accepted, V1 aggregation used + Legacy, + /// Post-horizon: V2 for new receipts, V1 only for legacy receipt aggregation + Horizon, +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ReceiptType { + /// V1 receipt created before horizon activation (legacy, still needs aggregation) + LegacyV1, + /// V2 receipt (collection-based, created after horizon activation) + V2, +} diff --git a/tap_aggregator/src/receipt_classifier.rs b/tap_aggregator/src/receipt_classifier.rs new file mode 100644 index 0000000..b7091bc --- /dev/null +++ b/tap_aggregator/src/receipt_classifier.rs @@ -0,0 +1,88 @@ +// Copyright 2024 The Graph Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; + +use crate::protocol_mode::ProtocolMode; + +/// Validate that a batch of v1 receipts is valid for legacy processing +pub fn validate_v1_receipt_batch(receipts: &[T]) -> Result { + if receipts.is_empty() { + return Err(anyhow::anyhow!("Cannot aggregate empty receipt batch")); + } + // All v1 receipts use legacy mode + Ok(ProtocolMode::Legacy) +} + +/// Validate that a batch of v2 receipts is valid for horizon processing +#[cfg(feature = "v2")] +pub fn validate_v2_receipt_batch(receipts: &[T]) -> Result { + if receipts.is_empty() { + return Err(anyhow::anyhow!("Cannot aggregate empty receipt batch")); + } + // All v2 receipts use horizon mode + Ok(ProtocolMode::Horizon) +} + +#[cfg(test)] +mod tests { + use tap_graph::Receipt; + + use super::*; + + #[test] + fn test_validate_v1_batch() { + use thegraph_core::alloy::primitives::Address; + let receipt = Receipt::new(Address::ZERO, 100).unwrap(); + let receipts = vec![receipt]; + assert_eq!( + validate_v1_receipt_batch(&receipts).unwrap(), + ProtocolMode::Legacy + ); + } + + #[test] + fn test_validate_v1_empty_batch_fails() { + let receipts: Vec = vec![]; + assert!(validate_v1_receipt_batch(&receipts).is_err()); + } + + #[cfg(feature = "v2")] + #[test] + fn test_validate_v2_batch() { + use tap_graph::v2; + use thegraph_core::alloy::primitives::{Address, FixedBytes}; + let receipt = v2::Receipt::new( + FixedBytes::ZERO, + Address::ZERO, + Address::ZERO, + Address::ZERO, + 100, + ) + .unwrap(); + let receipts = vec![receipt]; + assert_eq!( + validate_v2_receipt_batch(&receipts).unwrap(), + ProtocolMode::Horizon + ); + } + + #[cfg(feature = "v2")] + #[test] + fn test_validate_v2_empty_batch_fails() { + use tap_graph::v2; + let receipts: Vec = vec![]; + assert!(validate_v2_receipt_batch(&receipts).is_err()); + } +} diff --git a/tap_aggregator/src/server.rs b/tap_aggregator/src/server.rs index d7ed979..b58d1e6 100644 --- a/tap_aggregator/src/server.rs +++ b/tap_aggregator/src/server.rs @@ -94,6 +94,17 @@ pub trait Rpc { receipts: Vec>, previous_rav: Option>, ) -> JsonRpcResult>; + + /// Aggregates the given v2 receipts into a v2 receipt aggregate voucher. + /// Uses the Horizon protocol for collection-based aggregation. + #[cfg(feature = "v2")] + #[method(name = "aggregate_receipts_v2")] + fn aggregate_receipts_v2( + &self, + api_version: String, + receipts: Vec>, + previous_rav: Option>, + ) -> JsonRpcResult>; } #[derive(Clone)] @@ -141,6 +152,8 @@ fn aggregate_receipts_( receipts: Vec>, previous_rav: Option>, ) -> JsonRpcResult> { + use crate::receipt_classifier::validate_v1_receipt_batch; + // Return an error if the API version is not supported. let api_version = match parse_api_version(api_version.as_str()) { Ok(v) => v, @@ -157,16 +170,85 @@ fn aggregate_receipts_( DEPRECATION_WARNING_COUNT.inc(); } - let res = match api_version { - TapRpcApiVersion::V0_0 => aggregator::v1::check_and_aggregate_receipts( - domain_separator, - &receipts, - previous_rav, - wallet, - accepted_addresses, - ), + // This endpoint handles v1 receipts for legacy aggregation + // V2 receipts are handled through the aggregate_receipts_v2 endpoint + if let Err(e) = validate_v1_receipt_batch(&receipts) { + return Err(jsonrpsee::types::ErrorObject::owned( + JsonRpcErrorCode::Aggregation as i32, + e.to_string(), + None::<()>, + )); + } + + log::debug!("Processing V1 receipts"); + + // Execute v1 aggregation + let res = aggregator::v1::check_and_aggregate_receipts( + domain_separator, + &receipts, + previous_rav, + wallet, + accepted_addresses, + ); + + // Handle aggregation error + match res { + Ok(res) => Ok(JsonRpcResponse::warn(res, warnings)), + Err(e) => Err(jsonrpsee::types::ErrorObject::owned( + JsonRpcErrorCode::Aggregation as i32, + e.to_string(), + None::<()>, + )), + } +} + +#[cfg(feature = "v2")] +fn aggregate_receipts_v2_( + api_version: String, + wallet: &PrivateKeySigner, + accepted_addresses: &HashSet
, + domain_separator: &Eip712Domain, + receipts: Vec>, + previous_rav: Option>, +) -> JsonRpcResult> { + use crate::receipt_classifier::validate_v2_receipt_batch; + + // Return an error if the API version is not supported. + let api_version = match parse_api_version(api_version.as_str()) { + Ok(v) => v, + Err(e) => { + VERSION_ERROR_COUNT.inc(); + return Err(e); + } }; + // Add a warning if the API version is to be deprecated. + let mut warnings: Vec = Vec::new(); + if let Some(w) = check_api_version_deprecation(&api_version) { + warnings.push(w); + DEPRECATION_WARNING_COUNT.inc(); + } + + // Validate v2 receipt batch for horizon processing + if let Err(e) = validate_v2_receipt_batch(&receipts) { + return Err(jsonrpsee::types::ErrorObject::owned( + JsonRpcErrorCode::Aggregation as i32, + e.to_string(), + None::<()>, + )); + } + + log::debug!("Processing V2 receipts with Horizon protocol"); + + // Execute v2 aggregation + let res = aggregator::v2::check_and_aggregate_receipts( + domain_separator, + &receipts, + previous_rav, + wallet, + accepted_addresses, + ); + // Handle aggregation error match res { Ok(res) => Ok(JsonRpcResponse::warn(res, warnings)), @@ -337,6 +419,47 @@ impl RpcServer for RpcImpl { } } } + + #[cfg(feature = "v2")] + fn aggregate_receipts_v2( + &self, + api_version: String, + receipts: Vec>, + previous_rav: Option>, + ) -> JsonRpcResult> { + // Values for Prometheus metrics + let receipts_grt: u128 = receipts.iter().map(|r| r.message.value).sum(); + let receipts_count: u64 = receipts.len() as u64; + + match aggregate_receipts_v2_( + api_version, + &self.wallet, + &self.accepted_addresses, + &self.domain_separator, + receipts, + previous_rav, + ) { + Ok(res) => { + TOTAL_GRT_AGGREGATED.inc_by(receipts_grt as f64); + TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count); + AGGREGATION_SUCCESS_COUNTER.inc(); + if let Some(kafka) = &self.kafka { + // V2 RAVs use collectionId instead of allocationId + produce_kafka_records( + kafka, + &self.wallet.address(), + &res.data.message.collectionId, + res.data.message.valueAggregate, + ); + } + Ok(res) + } + Err(e) => { + AGGREGATION_FAILURE_COUNTER.inc(); + Err(e) + } + } + } } #[allow(clippy::too_many_arguments)]