Skip to content

Commit 51588fb

Browse files
committed
wip: enable horizon
Signed-off-by: Joseph Livesey <[email protected]>
1 parent 715930b commit 51588fb

File tree

22 files changed

+661
-328
lines changed

22 files changed

+661
-328
lines changed

tap_aggregator/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,8 @@ tonic-build.workspace = true
4141
[dev-dependencies]
4242
rand.workspace = true
4343
rstest.workspace = true
44+
jsonrpsee = { workspace = true, features = ["http-client"] }
45+
46+
[features]
47+
default = ["v2"]
48+
v2 = []

tap_aggregator/src/server.rs

Lines changed: 136 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ use lazy_static::lazy_static;
1414
use log::{error, info};
1515
use prometheus::{register_counter, register_int_counter, Counter, IntCounter};
1616
use tap_core::signed_message::Eip712SignedMessage;
17+
#[cfg(feature = "v2")]
18+
use tap_graph::v2::{Receipt, ReceiptAggregateVoucher};
19+
#[cfg(not(feature = "v2"))]
1720
use tap_graph::{Receipt, ReceiptAggregateVoucher, SignedReceipt};
1821
use thegraph_core::alloy::{
1922
dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner,
@@ -159,13 +162,28 @@ fn aggregate_receipts_(
159162
}
160163

161164
let res = match api_version {
162-
TapRpcApiVersion::V0_0 => aggregator::v1::check_and_aggregate_receipts(
163-
domain_separator,
164-
&receipts,
165-
previous_rav,
166-
wallet,
167-
accepted_addresses,
168-
),
165+
TapRpcApiVersion::V0_0 => {
166+
#[cfg(feature = "v2")]
167+
{
168+
aggregator::v2::check_and_aggregate_receipts(
169+
domain_separator,
170+
&receipts,
171+
previous_rav,
172+
wallet,
173+
accepted_addresses,
174+
)
175+
}
176+
#[cfg(not(feature = "v2"))]
177+
{
178+
aggregator::v1::check_and_aggregate_receipts(
179+
domain_separator,
180+
&receipts,
181+
previous_rav,
182+
wallet,
183+
accepted_addresses,
184+
)
185+
}
186+
}
169187
};
170188

171189
// Handle aggregation error
@@ -186,7 +204,8 @@ impl v1::tap_aggregator_server::TapAggregator for RpcImpl {
186204
request: Request<v1::RavRequest>,
187205
) -> Result<Response<v1::RavResponse>, Status> {
188206
let rav_request = request.into_inner();
189-
let receipts: Vec<SignedReceipt> = rav_request
207+
208+
let receipts: Vec<tap_graph::SignedReceipt> = rav_request
190209
.receipts
191210
.into_iter()
192211
.map(TryFrom::try_from)
@@ -506,6 +525,9 @@ mod tests {
506525
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params};
507526
use rstest::*;
508527
use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
528+
#[cfg(feature = "v2")]
529+
use tap_graph::v2::{Receipt, ReceiptAggregateVoucher};
530+
#[cfg(not(feature = "v2"))]
509531
use tap_graph::{Receipt, ReceiptAggregateVoucher};
510532
use thegraph_core::alloy::{
511533
dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner,
@@ -535,6 +557,21 @@ mod tests {
535557
]
536558
}
537559

560+
#[fixture]
561+
fn payer() -> Address {
562+
Address::from_str("0xabababababababababababababababababababab").unwrap()
563+
}
564+
565+
#[fixture]
566+
fn data_service() -> Address {
567+
Address::from_str("0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead").unwrap()
568+
}
569+
570+
#[fixture]
571+
fn service_provider() -> Address {
572+
Address::from_str("0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef").unwrap()
573+
}
574+
538575
#[fixture]
539576
fn domain_separator() -> Eip712Domain {
540577
tap_eip712_domain(1, Address::from([0x11u8; 20]))
@@ -602,6 +639,9 @@ mod tests {
602639
http_response_size_limit: u32,
603640
http_max_concurrent_connections: u32,
604641
allocation_ids: Vec<Address>,
642+
payer: Address,
643+
data_service: Address,
644+
service_provider: Address,
605645
#[case] values: Vec<u128>,
606646
#[values("0.0")] api_version: &str,
607647
#[values(0, 1, 2)] random_seed: u64,
@@ -643,6 +683,16 @@ mod tests {
643683
receipts.push(
644684
Eip712SignedMessage::new(
645685
&domain_separator,
686+
#[cfg(feature = "v2")]
687+
Receipt::new(
688+
allocation_ids[0],
689+
payer,
690+
data_service,
691+
service_provider,
692+
value,
693+
)
694+
.unwrap(),
695+
#[cfg(not(feature = "v2"))]
646696
Receipt::new(allocation_ids[0], value).unwrap(),
647697
&all_wallets.choose(&mut rng).unwrap().wallet,
648698
)
@@ -662,9 +712,25 @@ mod tests {
662712

663713
let remote_rav = res.data;
664714

665-
let local_rav =
666-
ReceiptAggregateVoucher::aggregate_receipts(allocation_ids[0], &receipts, None)
667-
.unwrap();
715+
let local_rav = {
716+
#[cfg(feature = "v2")]
717+
{
718+
ReceiptAggregateVoucher::aggregate_receipts(
719+
allocation_ids[0],
720+
payer,
721+
data_service,
722+
service_provider,
723+
&receipts,
724+
None,
725+
)
726+
.unwrap()
727+
}
728+
#[cfg(not(feature = "v2"))]
729+
{
730+
ReceiptAggregateVoucher::aggregate_receipts(allocation_ids[0], &receipts, None)
731+
.unwrap()
732+
}
733+
};
668734

669735
assert!(remote_rav.message.allocationId == local_rav.allocationId);
670736
assert!(remote_rav.message.timestampNs == local_rav.timestampNs);
@@ -685,6 +751,9 @@ mod tests {
685751
http_response_size_limit: u32,
686752
http_max_concurrent_connections: u32,
687753
allocation_ids: Vec<Address>,
754+
payer: Address,
755+
data_service: Address,
756+
service_provider: Address,
688757
#[case] values: Vec<u128>,
689758
#[values("0.0")] api_version: &str,
690759
#[values(0, 1, 2, 3, 4)] random_seed: u64,
@@ -726,6 +795,16 @@ mod tests {
726795
receipts.push(
727796
Eip712SignedMessage::new(
728797
&domain_separator,
798+
#[cfg(feature = "v2")]
799+
Receipt::new(
800+
allocation_ids[0],
801+
payer,
802+
data_service,
803+
service_provider,
804+
value,
805+
)
806+
.unwrap(),
807+
#[cfg(not(feature = "v2"))]
729808
Receipt::new(allocation_ids[0], value).unwrap(),
730809
&all_wallets.choose(&mut rng).unwrap().wallet,
731810
)
@@ -734,12 +813,29 @@ mod tests {
734813
}
735814

736815
// Create previous RAV from first half of receipts locally
737-
let prev_rav = ReceiptAggregateVoucher::aggregate_receipts(
738-
allocation_ids[0],
739-
&receipts[0..receipts.len() / 2],
740-
None,
741-
)
742-
.unwrap();
816+
let prev_rav = {
817+
#[cfg(feature = "v2")]
818+
{
819+
ReceiptAggregateVoucher::aggregate_receipts(
820+
allocation_ids[0],
821+
payer,
822+
data_service,
823+
service_provider,
824+
&receipts[0..receipts.len() / 2],
825+
None,
826+
)
827+
.unwrap()
828+
}
829+
#[cfg(not(feature = "v2"))]
830+
{
831+
ReceiptAggregateVoucher::aggregate_receipts(
832+
allocation_ids[0],
833+
&receipts[0..receipts.len() / 2],
834+
None,
835+
)
836+
.unwrap()
837+
}
838+
};
743839
let signed_prev_rav = Eip712SignedMessage::new(
744840
&domain_separator,
745841
prev_rav,
@@ -775,6 +871,9 @@ mod tests {
775871
http_response_size_limit: u32,
776872
http_max_concurrent_connections: u32,
777873
allocation_ids: Vec<Address>,
874+
payer: Address,
875+
data_service: Address,
876+
service_provider: Address,
778877
) {
779878
// The keys that will be used to sign the new RAVs
780879
let keys_main = keys();
@@ -801,6 +900,9 @@ mod tests {
801900
// Create receipts
802901
let receipts = vec![Eip712SignedMessage::new(
803902
&domain_separator,
903+
#[cfg(feature = "v2")]
904+
Receipt::new(allocation_ids[0], payer, data_service, service_provider, 42).unwrap(),
905+
#[cfg(not(feature = "v2"))]
804906
Receipt::new(allocation_ids[0], 42).unwrap(),
805907
&keys_main.wallet,
806908
)
@@ -857,6 +959,9 @@ mod tests {
857959
http_response_size_limit: u32,
858960
http_max_concurrent_connections: u32,
859961
allocation_ids: Vec<Address>,
962+
payer: Address,
963+
data_service: Address,
964+
service_provider: Address,
860965
#[values("0.0")] api_version: &str,
861966
) {
862967
// The keys that will be used to sign the new RAVs
@@ -869,6 +974,10 @@ mod tests {
869974
// Number of receipts that is just above the number that would fit within the
870975
// request size limit. This value is hard-coded here because it supports the
871976
// maximum number of receipts per aggregate value we wrote in the spec / docs.
977+
// Reduced for v2 receipts which are larger due to additional fields
978+
#[cfg(feature = "v2")]
979+
let number_of_receipts_to_exceed_limit = 200;
980+
#[cfg(not(feature = "v2"))]
872981
let number_of_receipts_to_exceed_limit = 300;
873982

874983
// Start the JSON-RPC server.
@@ -896,6 +1005,16 @@ mod tests {
8961005
receipts.push(
8971006
Eip712SignedMessage::new(
8981007
&domain_separator,
1008+
#[cfg(feature = "v2")]
1009+
Receipt::new(
1010+
allocation_ids[0],
1011+
payer,
1012+
data_service,
1013+
service_provider,
1014+
u128::MAX / 1000,
1015+
)
1016+
.unwrap(),
1017+
#[cfg(not(feature = "v2"))]
8991018
Receipt::new(allocation_ids[0], u128::MAX / 1000).unwrap(),
9001019
&keys_main.wallet,
9011020
)

tap_aggregator/tests/aggregate_test.rs

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ use tap_aggregator::{
1010
server,
1111
};
1212
use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
13+
#[cfg(feature = "v2")]
14+
use tap_graph::v2::{Receipt, ReceiptAggregateVoucher};
15+
#[cfg(not(feature = "v2"))]
1316
use tap_graph::{Receipt, ReceiptAggregateVoucher};
1417
use thegraph_core::alloy::{primitives::Address, signers::local::PrivateKeySigner};
1518
use tonic::codec::CompressionEncoding;
@@ -47,24 +50,45 @@ async fn aggregation_test() {
4750
.send_compressed(CompressionEncoding::Zstd);
4851

4952
let allocation_id = Address::from_str("0xabababababababababababababababababababab").unwrap();
53+
let payer = Address::from_str("0xabababababababababababababababababababab").unwrap();
54+
let data_service = Address::from_str("0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead").unwrap();
55+
let service_provider = Address::from_str("0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef").unwrap();
5056

51-
// Create receipts
52-
let mut receipts = Vec::new();
53-
for value in 50..60 {
54-
receipts.push(
55-
Eip712SignedMessage::new(
56-
&domain_separator,
57-
Receipt::new(allocation_id, value).unwrap(),
58-
&wallet,
59-
)
60-
.unwrap(),
61-
);
57+
// Use a fixed timestamp to ensure both v1 and v2 receipts have the same timestamps
58+
let fixed_timestamp = 1700000000000000000u64; // Fixed timestamp in nanoseconds
59+
60+
// Create v1 receipts for gRPC v1 compatibility
61+
let mut v1_receipts = Vec::new();
62+
for (i, value) in (50..60).enumerate() {
63+
let mut receipt = tap_graph::Receipt::new(allocation_id, value).unwrap();
64+
receipt.timestamp_ns = fixed_timestamp + i as u64; // Ensure increasing timestamps
65+
v1_receipts.push(Eip712SignedMessage::new(&domain_separator, receipt, &wallet).unwrap());
6266
}
6367

64-
let rav_request = RavRequest::new(receipts.clone(), None);
68+
let rav_request = RavRequest::new(v1_receipts, None);
6569
let res = client.aggregate_receipts(rav_request).await.unwrap();
6670
let signed_rav: tap_graph::SignedRav = res.into_inner().signed_rav().unwrap();
6771

72+
// Create v2 receipts for JSON-RPC API with the same timestamps
73+
let mut v2_receipts = Vec::new();
74+
for (i, value) in (50..60).enumerate() {
75+
#[cfg(feature = "v2")]
76+
{
77+
let mut receipt =
78+
Receipt::new(allocation_id, payer, data_service, service_provider, value).unwrap();
79+
receipt.timestamp_ns = fixed_timestamp + i as u64; // Same timestamps as v1
80+
v2_receipts
81+
.push(Eip712SignedMessage::new(&domain_separator, receipt, &wallet).unwrap());
82+
}
83+
#[cfg(not(feature = "v2"))]
84+
{
85+
let mut receipt = Receipt::new(allocation_id, value).unwrap();
86+
receipt.timestamp_ns = fixed_timestamp + i as u64;
87+
v2_receipts
88+
.push(Eip712SignedMessage::new(&domain_separator, receipt, &wallet).unwrap());
89+
}
90+
}
91+
6892
let sender_aggregator = HttpClientBuilder::default().build(&endpoint).unwrap();
6993

7094
let previous_rav: Option<tap_graph::SignedRav> = None;
@@ -74,13 +98,22 @@ async fn aggregation_test() {
7498
"aggregate_receipts",
7599
rpc_params!(
76100
"0.0", // TODO: Set the version in a smarter place.
77-
receipts,
101+
v2_receipts,
78102
previous_rav
79103
),
80104
)
81105
.await
82106
.unwrap();
83107
let response = response.data;
84-
assert_eq!(signed_rav, response);
108+
// Compare the core fields since the types might differ between v1 and v2
109+
assert_eq!(
110+
signed_rav.message.allocationId,
111+
response.message.allocationId
112+
);
113+
assert_eq!(signed_rav.message.timestampNs, response.message.timestampNs);
114+
assert_eq!(
115+
signed_rav.message.valueAggregate,
116+
response.message.valueAggregate
117+
);
85118
join_handle.abort();
86119
}

tap_aggregator/tests/aggregate_v1_and_v2.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ async fn aggregation_test() {
6868
let rav_request = ReqV1::new(receipts.clone(), None);
6969
let res = client.aggregate_receipts(rav_request).await;
7070

71+
if res.is_err() {
72+
println!("V1 gRPC Error: {:?}", res.as_ref().err());
73+
}
7174
assert!(res.is_ok());
7275

7376
let mut client = ClientV2::connect(endpoint.clone())

tap_core/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@ rstest.workspace = true
2525
serde_json.workspace = true
2626

2727
[features]
28-
default = ["in_memory"]
28+
default = ["in_memory", "v2"]
2929
in_memory = ["dep:tap_graph"]
30+
v2 = []

0 commit comments

Comments
 (0)