Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ tap_eip712_message = { version = "0.2.1", path = "tap_eip712_message" }
tap_core = { version = "4.1.2", path = "tap_core" }
tap_graph = { version = "0.3.2", path = "tap_graph", features = ["v2"] }
tap_receipt = { version = "1.1.2", path = "tap_receipt" }
thegraph-core = "0.15.0"
thegraph-core = "0.15.1"
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["macros", "signal"] }
tonic = { version = "0.13.0", features = ["transport", "zstd"] }
Expand Down
4 changes: 4 additions & 0 deletions tap_aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ repository.workspace = true
readme = "README.md"
description = "A JSON-RPC service for the Timeline Aggregation Protocol that lets clients request an aggregate receipt from a list of individual receipts."

[features]
default = ["v2"]
v2 = ["tap_graph/v2"]

[[bin]]
name = "tap_aggregator"
path = "src/main.rs"
Expand Down
174 changes: 174 additions & 0 deletions tap_aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

A stateless JSON-RPC service that lets clients request an aggregate receipt from a list of individual receipts.

Supports both V1 (allocation-based) and V2 (collection-based) aggregation protocols for seamless migration to the Horizon protocol.

TAP Aggregator is run by [gateway](https://github.com/edgeandnode/gateway/blob/main/README.md)
operators.

Expand Down Expand Up @@ -189,6 +191,11 @@ In addition to the official spec, we define a few special errors:

### Methods

This aggregator supports both V1 (legacy) and V2 (Horizon) protocols:

- **V1 Endpoints**: `aggregate_receipts` - allocation-based aggregation
- **V2 Endpoints**: `aggregate_receipts_v2` - collection-based aggregation with enhanced fields

#### `api_versions()`

[source](server::RpcServer::api_versions)
Expand Down Expand Up @@ -318,3 +325,170 @@ Example:
}
}
```

#### `aggregate_receipts_v2(api_version, receipts, previous_rav)`

Aggregates the given V2 receipts into a V2 receipt aggregate voucher using the Horizon protocol.
This method supports collection-based aggregation with enhanced fields for payer, data service, and service provider tracking.

**V2 Receipt Structure:**

- `collection_id`: 32-byte identifier for the collection (replaces `allocation_id`)
- `payer`: Address of the payer
- `data_service`: Address of the data service
- `service_provider`: Address of the service provider
- `timestamp_ns`: Timestamp in nanoseconds
- `nonce`: Unique nonce
- `value`: Receipt value

**V2 RAV Structure:**

- `collectionId`: Collection identifier (replaces `allocation_id`)
- `payer`: Payer address
- `dataService`: Data service address
- `serviceProvider`: Service provider address
- `timestampNs`: Latest timestamp
- `valueAggregate`: Total aggregated value
- `metadata`: Additional metadata (bytes)

Example:

*Request*:

```json
{
"jsonrpc": "2.0",
"id": 0,
"method": "aggregate_receipts_v2",
"params": [
"1.0.0",
[
{
"message": {
"collection_id": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddead",
"payer": "0xabababababababababababababababababababab",
"data_service": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead",
"service_provider": "0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef",
"timestamp_ns": 1685670449225087255,
"nonce": 11835827017881841442,
"value": 34
},
"signature": {
"r": "0xa9fa1acf3cc3be503612f75602e68cc22286592db1f4f944c78397cbe529353b",
"s": "0x566cfeb7e80a393021a443d5846c0734d25bcf54ed90d97effe93b1c8aef0911",
"v": 27
}
},
{
"message": {
"collection_id": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddead",
"payer": "0xabababababababababababababababababababab",
"data_service": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead",
"service_provider": "0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef",
"timestamp_ns": 1685670449225830106,
"nonce": 17711980309995246801,
"value": 23
},
"signature": {
"r": "0x51ca5a2b839558654326d3a3f544a97d94effb9a7dd9cac7492007bc974e91f0",
"s": "0x3d9d398ea6b0dd9fac97726f51c0840b8b314821fb4534cb40383850c431fd9e",
"v": 28
}
}
],
{
"message": {
"collectionId": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddead",
"payer": "0xabababababababababababababababababababab",
"dataService": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead",
"serviceProvider": "0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef",
"timestampNs": 1685670449224324338,
"valueAggregate": 101,
"metadata": "0x"
},
"signature": {
"r": "0x601a1f399cf6223d1414a89b7bbc90ee13eeeec006bd59e0c96042266c6ad7dc",
"s": "0x3172e795bd190865afac82e3a8be5f4ccd4b65958529986c779833625875f0b2",
"v": 28
}
}
]
}
```

*Response*:

```json
{
"id": 0,
"jsonrpc": "2.0",
"result": {
"data": {
"message": {
"collectionId": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddeaddead",
"payer": "0xabababababababababababababababababababab",
"dataService": "0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead",
"serviceProvider": "0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef",
"timestampNs": 1685670449225830106,
"valueAggregate": 158,
"metadata": "0x"
},
"signature": {
"r": "0x60eb38374119bbabf1ac6960f532124ba2a9c5990d9fb50875b512e611847eb5",
"s": "0x1b9a330cc9e2ecbda340a4757afaee8f55b6dbf278428f8cf49dd5ad8438f83d",
"v": 27
}
}
}
}
```

## Feature Flags

### V2 Protocol Support

The aggregator supports both V1 and V2 protocols:

- **V2 is enabled by default** via the `v2` feature flag in `tap_aggregator/Cargo.toml`
- To disable V2: `cargo build --no-default-features`
- Both protocols can run simultaneously for gradual migration
- V1 endpoints remain unchanged and fully functional

### Feature Flag Usage

```bash
# Build with V2 support (default)
cargo build --release

# Build without V2 support (V1 only)
cargo build --release --no-default-features

# Explicitly enable V2 feature
cargo build --release --features v2
```

## Migration Guide

### V1 to V2 Migration

The V2 protocol introduces collection-based aggregation with enhanced tracking. Here's how to migrate:

#### Key Changes

1. **Receipt Structure**: `allocation_id` → `collection_id` + additional fields
2. **RAV Structure**: `allocation_id` → `collectionId` + additional fields
3. **New Fields**: `payer`, `data_service`/`dataService`, `service_provider`/`serviceProvider`
4. **Metadata**: V2 RAVs include optional `metadata` field

#### Migration Steps

1. **Phase 1**: Deploy aggregator with V2 support (both endpoints available)
2. **Phase 2**: Update clients to use V2 receipt structure
3. **Phase 3**: Switch clients to `aggregate_receipts_v2` endpoint
4. **Phase 4**: V1 endpoints remain for backward compatibility

#### Backward Compatibility

- **V1 endpoints**: Unchanged and fully functional
- **gRPC support**: Both V1 and V2 with automatic conversion
- **No breaking changes**: Existing V1 clients continue to work
2 changes: 2 additions & 0 deletions tap_aggregator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ pub mod error_codes;
pub mod grpc;
pub mod jsonrpsee_helpers;
pub mod metrics;
pub mod protocol_mode;
pub mod receipt_classifier;
pub mod server;
29 changes: 29 additions & 0 deletions tap_aggregator/src/protocol_mode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 The Graph Foundation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ProtocolMode {
/// Pre-horizon: V1 receipts accepted, V1 aggregation used
Legacy,
/// Post-horizon: V2 for new receipts, V1 only for legacy receipt aggregation
Horizon,
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ReceiptType {
/// V1 receipt created before horizon activation (legacy, still needs aggregation)
LegacyV1,
/// V2 receipt (collection-based, created after horizon activation)
V2,
}
88 changes: 88 additions & 0 deletions tap_aggregator/src/receipt_classifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 The Graph Foundation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Result;

use crate::protocol_mode::ProtocolMode;

/// Validate that a batch of v1 receipts is valid for legacy processing
pub fn validate_v1_receipt_batch<T>(receipts: &[T]) -> Result<ProtocolMode> {
if receipts.is_empty() {
return Err(anyhow::anyhow!("Cannot aggregate empty receipt batch"));
}
// All v1 receipts use legacy mode
Ok(ProtocolMode::Legacy)
}

/// Validate that a batch of v2 receipts is valid for horizon processing
#[cfg(feature = "v2")]
pub fn validate_v2_receipt_batch<T>(receipts: &[T]) -> Result<ProtocolMode> {
if receipts.is_empty() {
return Err(anyhow::anyhow!("Cannot aggregate empty receipt batch"));
}
// All v2 receipts use horizon mode
Ok(ProtocolMode::Horizon)
}

#[cfg(test)]
mod tests {
use tap_graph::Receipt;

use super::*;

#[test]
fn test_validate_v1_batch() {
use thegraph_core::alloy::primitives::Address;
let receipt = Receipt::new(Address::ZERO, 100).unwrap();
let receipts = vec![receipt];
assert_eq!(
validate_v1_receipt_batch(&receipts).unwrap(),
ProtocolMode::Legacy
);
}

#[test]
fn test_validate_v1_empty_batch_fails() {
let receipts: Vec<Receipt> = vec![];
assert!(validate_v1_receipt_batch(&receipts).is_err());
}

#[cfg(feature = "v2")]
#[test]
fn test_validate_v2_batch() {
use tap_graph::v2;
use thegraph_core::alloy::primitives::{Address, FixedBytes};
let receipt = v2::Receipt::new(
FixedBytes::ZERO,
Address::ZERO,
Address::ZERO,
Address::ZERO,
100,
)
.unwrap();
let receipts = vec![receipt];
assert_eq!(
validate_v2_receipt_batch(&receipts).unwrap(),
ProtocolMode::Horizon
);
}

#[cfg(feature = "v2")]
#[test]
fn test_validate_v2_empty_batch_fails() {
use tap_graph::v2;
let receipts: Vec<v2::Receipt> = vec![];
assert!(validate_v2_receipt_batch(&receipts).is_err());
}
}
Loading