Skip to content

Commit e7713c4

Browse files
committed
feat(tap-aggregator): add v2 endpoint
Signed-off-by: Joseph Livesey <[email protected]>
1 parent 5d6f4da commit e7713c4

File tree

6 files changed

+255
-9
lines changed

6 files changed

+255
-9
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ tap_eip712_message = { version = "0.2.1", path = "tap_eip712_message" }
5050
tap_core = { version = "4.1.2", path = "tap_core" }
5151
tap_graph = { version = "0.3.2", path = "tap_graph", features = ["v2"] }
5252
tap_receipt = { version = "1.1.2", path = "tap_receipt" }
53-
thegraph-core = "0.15.0"
53+
thegraph-core = "0.15.1"
5454
thiserror = "2.0.12"
5555
tokio = { version = "1.44.2", features = ["macros", "signal"] }
5656
tonic = { version = "0.13.0", features = ["transport", "zstd"] }

tap_aggregator/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ repository.workspace = true
77
readme = "README.md"
88
description = "A JSON-RPC service for the Timeline Aggregation Protocol that lets clients request an aggregate receipt from a list of individual receipts."
99

10+
[features]
11+
default = ["v2"]
12+
v2 = ["tap_graph/v2"]
13+
1014
[[bin]]
1115
name = "tap_aggregator"
1216
path = "src/main.rs"

tap_aggregator/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ pub mod error_codes;
77
pub mod grpc;
88
pub mod jsonrpsee_helpers;
99
pub mod metrics;
10+
pub mod protocol_mode;
11+
pub mod receipt_classifier;
1012
pub mod server;
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2024 The Graph Foundation
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#[derive(Clone, Copy, Debug, PartialEq)]
16+
pub enum ProtocolMode {
17+
/// Pre-horizon: V1 receipts accepted, V1 aggregation used
18+
Legacy,
19+
/// Post-horizon: V2 for new receipts, V1 only for legacy receipt aggregation
20+
Horizon,
21+
}
22+
23+
#[derive(Clone, Copy, Debug, PartialEq)]
24+
pub enum ReceiptType {
25+
/// V1 receipt created before horizon activation (legacy, still needs aggregation)
26+
LegacyV1,
27+
/// V2 receipt (collection-based, created after horizon activation)
28+
V2,
29+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright 2024 The Graph Foundation
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use anyhow::Result;
16+
17+
use crate::protocol_mode::ProtocolMode;
18+
19+
/// Validate that a batch of v1 receipts is valid for legacy processing
20+
pub fn validate_v1_receipt_batch<T>(receipts: &[T]) -> Result<ProtocolMode> {
21+
if receipts.is_empty() {
22+
return Err(anyhow::anyhow!("Cannot aggregate empty receipt batch"));
23+
}
24+
// All v1 receipts use legacy mode
25+
Ok(ProtocolMode::Legacy)
26+
}
27+
28+
/// Validate that a batch of v2 receipts is valid for horizon processing
29+
#[cfg(feature = "v2")]
30+
pub fn validate_v2_receipt_batch<T>(receipts: &[T]) -> Result<ProtocolMode> {
31+
if receipts.is_empty() {
32+
return Err(anyhow::anyhow!("Cannot aggregate empty receipt batch"));
33+
}
34+
// All v2 receipts use horizon mode
35+
Ok(ProtocolMode::Horizon)
36+
}
37+
38+
#[cfg(test)]
39+
mod tests {
40+
use tap_graph::Receipt;
41+
42+
use super::*;
43+
44+
#[test]
45+
fn test_validate_v1_batch() {
46+
use thegraph_core::alloy::primitives::Address;
47+
let receipt = Receipt::new(Address::ZERO, 100).unwrap();
48+
let receipts = vec![receipt];
49+
assert_eq!(
50+
validate_v1_receipt_batch(&receipts).unwrap(),
51+
ProtocolMode::Legacy
52+
);
53+
}
54+
55+
#[test]
56+
fn test_validate_v1_empty_batch_fails() {
57+
let receipts: Vec<Receipt> = vec![];
58+
assert!(validate_v1_receipt_batch(&receipts).is_err());
59+
}
60+
61+
#[cfg(feature = "v2")]
62+
#[test]
63+
fn test_validate_v2_batch() {
64+
use tap_graph::v2;
65+
use thegraph_core::alloy::primitives::{Address, FixedBytes};
66+
let receipt = v2::Receipt::new(
67+
FixedBytes::ZERO,
68+
Address::ZERO,
69+
Address::ZERO,
70+
Address::ZERO,
71+
100,
72+
)
73+
.unwrap();
74+
let receipts = vec![receipt];
75+
assert_eq!(
76+
validate_v2_receipt_batch(&receipts).unwrap(),
77+
ProtocolMode::Horizon
78+
);
79+
}
80+
81+
#[cfg(feature = "v2")]
82+
#[test]
83+
fn test_validate_v2_empty_batch_fails() {
84+
use tap_graph::v2;
85+
let receipts: Vec<v2::Receipt> = vec![];
86+
assert!(validate_v2_receipt_batch(&receipts).is_err());
87+
}
88+
}

tap_aggregator/src/server.rs

Lines changed: 131 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,17 @@ pub trait Rpc {
9494
receipts: Vec<Eip712SignedMessage<Receipt>>,
9595
previous_rav: Option<Eip712SignedMessage<ReceiptAggregateVoucher>>,
9696
) -> JsonRpcResult<Eip712SignedMessage<ReceiptAggregateVoucher>>;
97+
98+
/// Aggregates the given v2 receipts into a v2 receipt aggregate voucher.
99+
/// Uses the Horizon protocol for collection-based aggregation.
100+
#[cfg(feature = "v2")]
101+
#[method(name = "aggregate_receipts_v2")]
102+
fn aggregate_receipts_v2(
103+
&self,
104+
api_version: String,
105+
receipts: Vec<Eip712SignedMessage<tap_graph::v2::Receipt>>,
106+
previous_rav: Option<Eip712SignedMessage<tap_graph::v2::ReceiptAggregateVoucher>>,
107+
) -> JsonRpcResult<Eip712SignedMessage<tap_graph::v2::ReceiptAggregateVoucher>>;
97108
}
98109

99110
#[derive(Clone)]
@@ -141,6 +152,8 @@ fn aggregate_receipts_(
141152
receipts: Vec<Eip712SignedMessage<Receipt>>,
142153
previous_rav: Option<Eip712SignedMessage<ReceiptAggregateVoucher>>,
143154
) -> JsonRpcResult<Eip712SignedMessage<ReceiptAggregateVoucher>> {
155+
use crate::receipt_classifier::validate_v1_receipt_batch;
156+
144157
// Return an error if the API version is not supported.
145158
let api_version = match parse_api_version(api_version.as_str()) {
146159
Ok(v) => v,
@@ -157,16 +170,85 @@ fn aggregate_receipts_(
157170
DEPRECATION_WARNING_COUNT.inc();
158171
}
159172

160-
let res = match api_version {
161-
TapRpcApiVersion::V0_0 => aggregator::v1::check_and_aggregate_receipts(
162-
domain_separator,
163-
&receipts,
164-
previous_rav,
165-
wallet,
166-
accepted_addresses,
167-
),
173+
// This endpoint handles v1 receipts for legacy aggregation
174+
// V2 receipts are handled through the aggregate_receipts_v2 endpoint
175+
if let Err(e) = validate_v1_receipt_batch(&receipts) {
176+
return Err(jsonrpsee::types::ErrorObject::owned(
177+
JsonRpcErrorCode::Aggregation as i32,
178+
e.to_string(),
179+
None::<()>,
180+
));
181+
}
182+
183+
log::debug!("Processing V1 receipts");
184+
185+
// Execute v1 aggregation
186+
let res = aggregator::v1::check_and_aggregate_receipts(
187+
domain_separator,
188+
&receipts,
189+
previous_rav,
190+
wallet,
191+
accepted_addresses,
192+
);
193+
194+
// Handle aggregation error
195+
match res {
196+
Ok(res) => Ok(JsonRpcResponse::warn(res, warnings)),
197+
Err(e) => Err(jsonrpsee::types::ErrorObject::owned(
198+
JsonRpcErrorCode::Aggregation as i32,
199+
e.to_string(),
200+
None::<()>,
201+
)),
202+
}
203+
}
204+
205+
#[cfg(feature = "v2")]
206+
fn aggregate_receipts_v2_(
207+
api_version: String,
208+
wallet: &PrivateKeySigner,
209+
accepted_addresses: &HashSet<Address>,
210+
domain_separator: &Eip712Domain,
211+
receipts: Vec<Eip712SignedMessage<tap_graph::v2::Receipt>>,
212+
previous_rav: Option<Eip712SignedMessage<tap_graph::v2::ReceiptAggregateVoucher>>,
213+
) -> JsonRpcResult<Eip712SignedMessage<tap_graph::v2::ReceiptAggregateVoucher>> {
214+
use crate::receipt_classifier::validate_v2_receipt_batch;
215+
216+
// Return an error if the API version is not supported.
217+
let api_version = match parse_api_version(api_version.as_str()) {
218+
Ok(v) => v,
219+
Err(e) => {
220+
VERSION_ERROR_COUNT.inc();
221+
return Err(e);
222+
}
168223
};
169224

225+
// Add a warning if the API version is to be deprecated.
226+
let mut warnings: Vec<JsonRpcWarning> = Vec::new();
227+
if let Some(w) = check_api_version_deprecation(&api_version) {
228+
warnings.push(w);
229+
DEPRECATION_WARNING_COUNT.inc();
230+
}
231+
232+
// Validate v2 receipt batch for horizon processing
233+
if let Err(e) = validate_v2_receipt_batch(&receipts) {
234+
return Err(jsonrpsee::types::ErrorObject::owned(
235+
JsonRpcErrorCode::Aggregation as i32,
236+
e.to_string(),
237+
None::<()>,
238+
));
239+
}
240+
241+
log::debug!("Processing V2 receipts with Horizon protocol");
242+
243+
// Execute v2 aggregation
244+
let res = aggregator::v2::check_and_aggregate_receipts(
245+
domain_separator,
246+
&receipts,
247+
previous_rav,
248+
wallet,
249+
accepted_addresses,
250+
);
251+
170252
// Handle aggregation error
171253
match res {
172254
Ok(res) => Ok(JsonRpcResponse::warn(res, warnings)),
@@ -337,6 +419,47 @@ impl RpcServer for RpcImpl {
337419
}
338420
}
339421
}
422+
423+
#[cfg(feature = "v2")]
424+
fn aggregate_receipts_v2(
425+
&self,
426+
api_version: String,
427+
receipts: Vec<Eip712SignedMessage<tap_graph::v2::Receipt>>,
428+
previous_rav: Option<Eip712SignedMessage<tap_graph::v2::ReceiptAggregateVoucher>>,
429+
) -> JsonRpcResult<Eip712SignedMessage<tap_graph::v2::ReceiptAggregateVoucher>> {
430+
// Values for Prometheus metrics
431+
let receipts_grt: u128 = receipts.iter().map(|r| r.message.value).sum();
432+
let receipts_count: u64 = receipts.len() as u64;
433+
434+
match aggregate_receipts_v2_(
435+
api_version,
436+
&self.wallet,
437+
&self.accepted_addresses,
438+
&self.domain_separator,
439+
receipts,
440+
previous_rav,
441+
) {
442+
Ok(res) => {
443+
TOTAL_GRT_AGGREGATED.inc_by(receipts_grt as f64);
444+
TOTAL_AGGREGATED_RECEIPTS.inc_by(receipts_count);
445+
AGGREGATION_SUCCESS_COUNTER.inc();
446+
if let Some(kafka) = &self.kafka {
447+
// V2 RAVs use collectionId instead of allocationId
448+
produce_kafka_records(
449+
kafka,
450+
&self.wallet.address(),
451+
&res.data.message.collectionId,
452+
res.data.message.valueAggregate,
453+
);
454+
}
455+
Ok(res)
456+
}
457+
Err(e) => {
458+
AGGREGATION_FAILURE_COUNTER.inc();
459+
Err(e)
460+
}
461+
}
462+
}
340463
}
341464

342465
#[allow(clippy::too_many_arguments)]

0 commit comments

Comments
 (0)