Skip to content

Commit 21609bf

Browse files
committed
wip: enable horizon
Signed-off-by: Joseph Livesey <[email protected]>
1 parent 715930b commit 21609bf

File tree

22 files changed

+659
-323
lines changed

22 files changed

+659
-323
lines changed

tap_aggregator/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,8 @@ tonic-build.workspace = true
4141
[dev-dependencies]
4242
rand.workspace = true
4343
rstest.workspace = true
44+
jsonrpsee = { workspace = true, features = ["http-client"] }
45+
46+
[features]
47+
default = ["v2"]
48+
v2 = []

tap_aggregator/src/server.rs

Lines changed: 137 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ use lazy_static::lazy_static;
1414
use log::{error, info};
1515
use prometheus::{register_counter, register_int_counter, Counter, IntCounter};
1616
use tap_core::signed_message::Eip712SignedMessage;
17+
#[cfg(feature = "v2")]
18+
use tap_graph::v2::{Receipt, ReceiptAggregateVoucher};
19+
#[cfg(not(feature = "v2"))]
1720
use tap_graph::{Receipt, ReceiptAggregateVoucher, SignedReceipt};
1821
use thegraph_core::alloy::{
1922
dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner,
@@ -159,13 +162,28 @@ fn aggregate_receipts_(
159162
}
160163

161164
let res = match api_version {
162-
TapRpcApiVersion::V0_0 => aggregator::v1::check_and_aggregate_receipts(
163-
domain_separator,
164-
&receipts,
165-
previous_rav,
166-
wallet,
167-
accepted_addresses,
168-
),
165+
TapRpcApiVersion::V0_0 => {
166+
#[cfg(feature = "v2")]
167+
{
168+
aggregator::v2::check_and_aggregate_receipts(
169+
domain_separator,
170+
&receipts,
171+
previous_rav,
172+
wallet,
173+
accepted_addresses,
174+
)
175+
}
176+
#[cfg(not(feature = "v2"))]
177+
{
178+
aggregator::v1::check_and_aggregate_receipts(
179+
domain_separator,
180+
&receipts,
181+
previous_rav,
182+
wallet,
183+
accepted_addresses,
184+
)
185+
}
186+
}
169187
};
170188

171189
// Handle aggregation error
@@ -186,7 +204,9 @@ impl v1::tap_aggregator_server::TapAggregator for RpcImpl {
186204
request: Request<v1::RavRequest>,
187205
) -> Result<Response<v1::RavResponse>, Status> {
188206
let rav_request = request.into_inner();
189-
let receipts: Vec<SignedReceipt> = rav_request
207+
208+
// Always use v1 aggregation for v1 gRPC calls to maintain compatibility
209+
let receipts: Vec<tap_graph::SignedReceipt> = rav_request
190210
.receipts
191211
.into_iter()
192212
.map(TryFrom::try_from)
@@ -506,6 +526,9 @@ mod tests {
506526
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params};
507527
use rstest::*;
508528
use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
529+
#[cfg(feature = "v2")]
530+
use tap_graph::v2::{Receipt, ReceiptAggregateVoucher};
531+
#[cfg(not(feature = "v2"))]
509532
use tap_graph::{Receipt, ReceiptAggregateVoucher};
510533
use thegraph_core::alloy::{
511534
dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner,
@@ -535,6 +558,21 @@ mod tests {
535558
]
536559
}
537560

561+
#[fixture]
562+
fn payer() -> Address {
563+
Address::from_str("0xabababababababababababababababababababab").unwrap()
564+
}
565+
566+
#[fixture]
567+
fn data_service() -> Address {
568+
Address::from_str("0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead").unwrap()
569+
}
570+
571+
#[fixture]
572+
fn service_provider() -> Address {
573+
Address::from_str("0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef").unwrap()
574+
}
575+
538576
#[fixture]
539577
fn domain_separator() -> Eip712Domain {
540578
tap_eip712_domain(1, Address::from([0x11u8; 20]))
@@ -602,6 +640,9 @@ mod tests {
602640
http_response_size_limit: u32,
603641
http_max_concurrent_connections: u32,
604642
allocation_ids: Vec<Address>,
643+
payer: Address,
644+
data_service: Address,
645+
service_provider: Address,
605646
#[case] values: Vec<u128>,
606647
#[values("0.0")] api_version: &str,
607648
#[values(0, 1, 2)] random_seed: u64,
@@ -643,6 +684,16 @@ mod tests {
643684
receipts.push(
644685
Eip712SignedMessage::new(
645686
&domain_separator,
687+
#[cfg(feature = "v2")]
688+
Receipt::new(
689+
allocation_ids[0],
690+
payer,
691+
data_service,
692+
service_provider,
693+
value,
694+
)
695+
.unwrap(),
696+
#[cfg(not(feature = "v2"))]
646697
Receipt::new(allocation_ids[0], value).unwrap(),
647698
&all_wallets.choose(&mut rng).unwrap().wallet,
648699
)
@@ -662,9 +713,25 @@ mod tests {
662713

663714
let remote_rav = res.data;
664715

665-
let local_rav =
666-
ReceiptAggregateVoucher::aggregate_receipts(allocation_ids[0], &receipts, None)
667-
.unwrap();
716+
let local_rav = {
717+
#[cfg(feature = "v2")]
718+
{
719+
ReceiptAggregateVoucher::aggregate_receipts(
720+
allocation_ids[0],
721+
payer,
722+
data_service,
723+
service_provider,
724+
&receipts,
725+
None,
726+
)
727+
.unwrap()
728+
}
729+
#[cfg(not(feature = "v2"))]
730+
{
731+
ReceiptAggregateVoucher::aggregate_receipts(allocation_ids[0], &receipts, None)
732+
.unwrap()
733+
}
734+
};
668735

669736
assert!(remote_rav.message.allocationId == local_rav.allocationId);
670737
assert!(remote_rav.message.timestampNs == local_rav.timestampNs);
@@ -685,6 +752,9 @@ mod tests {
685752
http_response_size_limit: u32,
686753
http_max_concurrent_connections: u32,
687754
allocation_ids: Vec<Address>,
755+
payer: Address,
756+
data_service: Address,
757+
service_provider: Address,
688758
#[case] values: Vec<u128>,
689759
#[values("0.0")] api_version: &str,
690760
#[values(0, 1, 2, 3, 4)] random_seed: u64,
@@ -726,6 +796,16 @@ mod tests {
726796
receipts.push(
727797
Eip712SignedMessage::new(
728798
&domain_separator,
799+
#[cfg(feature = "v2")]
800+
Receipt::new(
801+
allocation_ids[0],
802+
payer,
803+
data_service,
804+
service_provider,
805+
value,
806+
)
807+
.unwrap(),
808+
#[cfg(not(feature = "v2"))]
729809
Receipt::new(allocation_ids[0], value).unwrap(),
730810
&all_wallets.choose(&mut rng).unwrap().wallet,
731811
)
@@ -734,12 +814,29 @@ mod tests {
734814
}
735815

736816
// Create previous RAV from first half of receipts locally
737-
let prev_rav = ReceiptAggregateVoucher::aggregate_receipts(
738-
allocation_ids[0],
739-
&receipts[0..receipts.len() / 2],
740-
None,
741-
)
742-
.unwrap();
817+
let prev_rav = {
818+
#[cfg(feature = "v2")]
819+
{
820+
ReceiptAggregateVoucher::aggregate_receipts(
821+
allocation_ids[0],
822+
payer,
823+
data_service,
824+
service_provider,
825+
&receipts[0..receipts.len() / 2],
826+
None,
827+
)
828+
.unwrap()
829+
}
830+
#[cfg(not(feature = "v2"))]
831+
{
832+
ReceiptAggregateVoucher::aggregate_receipts(
833+
allocation_ids[0],
834+
&receipts[0..receipts.len() / 2],
835+
None,
836+
)
837+
.unwrap()
838+
}
839+
};
743840
let signed_prev_rav = Eip712SignedMessage::new(
744841
&domain_separator,
745842
prev_rav,
@@ -775,6 +872,9 @@ mod tests {
775872
http_response_size_limit: u32,
776873
http_max_concurrent_connections: u32,
777874
allocation_ids: Vec<Address>,
875+
payer: Address,
876+
data_service: Address,
877+
service_provider: Address,
778878
) {
779879
// The keys that will be used to sign the new RAVs
780880
let keys_main = keys();
@@ -801,6 +901,9 @@ mod tests {
801901
// Create receipts
802902
let receipts = vec![Eip712SignedMessage::new(
803903
&domain_separator,
904+
#[cfg(feature = "v2")]
905+
Receipt::new(allocation_ids[0], payer, data_service, service_provider, 42).unwrap(),
906+
#[cfg(not(feature = "v2"))]
804907
Receipt::new(allocation_ids[0], 42).unwrap(),
805908
&keys_main.wallet,
806909
)
@@ -857,6 +960,9 @@ mod tests {
857960
http_response_size_limit: u32,
858961
http_max_concurrent_connections: u32,
859962
allocation_ids: Vec<Address>,
963+
payer: Address,
964+
data_service: Address,
965+
service_provider: Address,
860966
#[values("0.0")] api_version: &str,
861967
) {
862968
// The keys that will be used to sign the new RAVs
@@ -869,6 +975,10 @@ mod tests {
869975
// Number of receipts that is just above the number that would fit within the
870976
// request size limit. This value is hard-coded here because it supports the
871977
// maximum number of receipts per aggregate value we wrote in the spec / docs.
978+
// Reduced for v2 receipts which are larger due to additional fields
979+
#[cfg(feature = "v2")]
980+
let number_of_receipts_to_exceed_limit = 200;
981+
#[cfg(not(feature = "v2"))]
872982
let number_of_receipts_to_exceed_limit = 300;
873983

874984
// Start the JSON-RPC server.
@@ -896,6 +1006,16 @@ mod tests {
8961006
receipts.push(
8971007
Eip712SignedMessage::new(
8981008
&domain_separator,
1009+
#[cfg(feature = "v2")]
1010+
Receipt::new(
1011+
allocation_ids[0],
1012+
payer,
1013+
data_service,
1014+
service_provider,
1015+
u128::MAX / 1000,
1016+
)
1017+
.unwrap(),
1018+
#[cfg(not(feature = "v2"))]
8991019
Receipt::new(allocation_ids[0], u128::MAX / 1000).unwrap(),
9001020
&keys_main.wallet,
9011021
)

tap_aggregator/tests/aggregate_test.rs

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ use tap_aggregator::{
1010
server,
1111
};
1212
use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
13+
#[cfg(feature = "v2")]
14+
use tap_graph::v2::{Receipt, ReceiptAggregateVoucher};
15+
#[cfg(not(feature = "v2"))]
1316
use tap_graph::{Receipt, ReceiptAggregateVoucher};
1417
use thegraph_core::alloy::{primitives::Address, signers::local::PrivateKeySigner};
1518
use tonic::codec::CompressionEncoding;
@@ -47,24 +50,45 @@ async fn aggregation_test() {
4750
.send_compressed(CompressionEncoding::Zstd);
4851

4952
let allocation_id = Address::from_str("0xabababababababababababababababababababab").unwrap();
53+
let payer = Address::from_str("0xabababababababababababababababababababab").unwrap();
54+
let data_service = Address::from_str("0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead").unwrap();
55+
let service_provider = Address::from_str("0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef").unwrap();
5056

51-
// Create receipts
52-
let mut receipts = Vec::new();
53-
for value in 50..60 {
54-
receipts.push(
55-
Eip712SignedMessage::new(
56-
&domain_separator,
57-
Receipt::new(allocation_id, value).unwrap(),
58-
&wallet,
59-
)
60-
.unwrap(),
61-
);
57+
// Use a fixed timestamp to ensure both v1 and v2 receipts have the same timestamps
58+
let fixed_timestamp = 1700000000000000000u64; // Fixed timestamp in nanoseconds
59+
60+
// Create v1 receipts for gRPC v1 compatibility
61+
let mut v1_receipts = Vec::new();
62+
for (i, value) in (50..60).enumerate() {
63+
let mut receipt = tap_graph::Receipt::new(allocation_id, value).unwrap();
64+
receipt.timestamp_ns = fixed_timestamp + i as u64; // Ensure increasing timestamps
65+
v1_receipts.push(Eip712SignedMessage::new(&domain_separator, receipt, &wallet).unwrap());
6266
}
6367

64-
let rav_request = RavRequest::new(receipts.clone(), None);
68+
let rav_request = RavRequest::new(v1_receipts, None);
6569
let res = client.aggregate_receipts(rav_request).await.unwrap();
6670
let signed_rav: tap_graph::SignedRav = res.into_inner().signed_rav().unwrap();
6771

72+
// Create v2 receipts for JSON-RPC API with the same timestamps
73+
let mut v2_receipts = Vec::new();
74+
for (i, value) in (50..60).enumerate() {
75+
#[cfg(feature = "v2")]
76+
{
77+
let mut receipt =
78+
Receipt::new(allocation_id, payer, data_service, service_provider, value).unwrap();
79+
receipt.timestamp_ns = fixed_timestamp + i as u64; // Same timestamps as v1
80+
v2_receipts
81+
.push(Eip712SignedMessage::new(&domain_separator, receipt, &wallet).unwrap());
82+
}
83+
#[cfg(not(feature = "v2"))]
84+
{
85+
let mut receipt = Receipt::new(allocation_id, value).unwrap();
86+
receipt.timestamp_ns = fixed_timestamp + i as u64;
87+
v2_receipts
88+
.push(Eip712SignedMessage::new(&domain_separator, receipt, &wallet).unwrap());
89+
}
90+
}
91+
6892
let sender_aggregator = HttpClientBuilder::default().build(&endpoint).unwrap();
6993

7094
let previous_rav: Option<tap_graph::SignedRav> = None;
@@ -74,13 +98,22 @@ async fn aggregation_test() {
7498
"aggregate_receipts",
7599
rpc_params!(
76100
"0.0", // TODO: Set the version in a smarter place.
77-
receipts,
101+
v2_receipts,
78102
previous_rav
79103
),
80104
)
81105
.await
82106
.unwrap();
83107
let response = response.data;
84-
assert_eq!(signed_rav, response);
108+
// Compare the core fields since the types might differ between v1 and v2
109+
assert_eq!(
110+
signed_rav.message.allocationId,
111+
response.message.allocationId
112+
);
113+
assert_eq!(signed_rav.message.timestampNs, response.message.timestampNs);
114+
assert_eq!(
115+
signed_rav.message.valueAggregate,
116+
response.message.valueAggregate
117+
);
85118
join_handle.abort();
86119
}

tap_aggregator/tests/aggregate_v1_and_v2.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ async fn aggregation_test() {
6868
let rav_request = ReqV1::new(receipts.clone(), None);
6969
let res = client.aggregate_receipts(rav_request).await;
7070

71+
if res.is_err() {
72+
println!("V1 gRPC Error: {:?}", res.as_ref().err());
73+
}
7174
assert!(res.is_ok());
7275

7376
let mut client = ClientV2::connect(endpoint.clone())

tap_core/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@ rstest.workspace = true
2525
serde_json.workspace = true
2626

2727
[features]
28-
default = ["in_memory"]
28+
default = ["in_memory", "v2"]
2929
in_memory = ["dep:tap_graph"]
30+
v2 = []

0 commit comments

Comments
 (0)