Skip to content

Commit d43e0c9

Browse files
committed
fix: fix in after collection id update on v2
Signed-off-by: Joseph Livesey <[email protected]>
1 parent b467d94 commit d43e0c9

File tree

18 files changed

+276
-187
lines changed

18 files changed

+276
-187
lines changed

tap_aggregator/proto/v2_u256.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ package tap_aggregator.v2_u256;
77
import "uint256.proto";
88

99
message Receipt {
10-
bytes allocation_id = 1;
10+
bytes collection_id = 1;
1111
uint64 timestamp_ns = 2;
1212
uint64 nonce = 3;
1313
grpc.uint256.Uint256 value = 4;
@@ -22,7 +22,7 @@ message SignedReceipt {
2222
}
2323

2424
message ReceiptAggregateVoucher {
25-
bytes allocation_id = 1;
25+
bytes collection_id = 1;
2626
uint64 timestamp_ns = 2;
2727
grpc.uint256.Uint256 value_aggregate = 3;
2828
bytes payer = 4;

tap_aggregator/src/aggregator/v2.rs

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ mod tests {
192192
use tap_graph::v2::{Receipt, ReceiptAggregateVoucher};
193193
use thegraph_core::alloy::{
194194
dyn_abi::Eip712Domain,
195-
primitives::{address, fixed_bytes, Address, Bytes, FixedBytes},
195+
primitives::{address, fixed_bytes, Address, Bytes, FixedBytes, U256},
196196
signers::local::PrivateKeySigner,
197197
};
198198

@@ -246,7 +246,14 @@ mod tests {
246246
let mut receipts = Vec::new();
247247
let receipt = Eip712SignedMessage::new(
248248
&domain_separator,
249-
Receipt::new(collection_id, payer, data_service, service_provider, 42).unwrap(),
249+
Receipt::new(
250+
collection_id,
251+
payer,
252+
data_service,
253+
service_provider,
254+
U256::from(42),
255+
)
256+
.unwrap(),
250257
&keys.0,
251258
)
252259
.unwrap();
@@ -271,13 +278,27 @@ mod tests {
271278
let receipts = vec![
272279
Eip712SignedMessage::new(
273280
&domain_separator,
274-
Receipt::new(collection_id, payer, data_service, service_provider, 42).unwrap(),
281+
Receipt::new(
282+
collection_id,
283+
payer,
284+
data_service,
285+
service_provider,
286+
U256::from(42),
287+
)
288+
.unwrap(),
275289
&keys.0,
276290
)
277291
.unwrap(),
278292
Eip712SignedMessage::new(
279293
&domain_separator,
280-
Receipt::new(collection_id, payer, data_service, service_provider, 42).unwrap(),
294+
Receipt::new(
295+
collection_id,
296+
payer,
297+
data_service,
298+
service_provider,
299+
U256::from(42),
300+
)
301+
.unwrap(),
281302
&keys.0,
282303
)
283304
.unwrap(),
@@ -390,13 +411,27 @@ mod tests {
390411
let receipts = vec![
391412
Eip712SignedMessage::new(
392413
&domain_separator,
393-
Receipt::new(collection_id, payer, data_service, service_provider, 42).unwrap(),
414+
Receipt::new(
415+
collection_id,
416+
payer,
417+
data_service,
418+
service_provider,
419+
U256::from(42),
420+
)
421+
.unwrap(),
394422
&keys.0,
395423
)
396424
.unwrap(),
397425
Eip712SignedMessage::new(
398426
&domain_separator,
399-
Receipt::new(collection_id, payer, data_service, service_provider, 43).unwrap(),
427+
Receipt::new(
428+
collection_id,
429+
payer,
430+
data_service,
431+
service_provider,
432+
U256::from(43),
433+
)
434+
.unwrap(),
400435
&keys.0,
401436
)
402437
.unwrap(),
@@ -407,7 +442,7 @@ mod tests {
407442
payer,
408443
data_service,
409444
service_provider,
410-
44,
445+
U256::from(44),
411446
)
412447
.unwrap(),
413448
&keys.0,
@@ -440,19 +475,40 @@ mod tests {
440475
let receipts = vec![
441476
Eip712SignedMessage::new(
442477
&domain_separator,
443-
Receipt::new(collection_id, payer, data_service, service_provider, 42).unwrap(),
478+
Receipt::new(
479+
collection_id,
480+
payer,
481+
data_service,
482+
service_provider,
483+
U256::from(42),
484+
)
485+
.unwrap(),
444486
&keys.0,
445487
)
446488
.unwrap(),
447489
Eip712SignedMessage::new(
448490
&domain_separator,
449-
Receipt::new(collection_id, payer, data_service, service_provider, 43).unwrap(),
491+
Receipt::new(
492+
collection_id,
493+
payer,
494+
data_service,
495+
service_provider,
496+
U256::from(43),
497+
)
498+
.unwrap(),
450499
&keys.0,
451500
)
452501
.unwrap(),
453502
Eip712SignedMessage::new(
454503
&domain_separator,
455-
Receipt::new(collection_id, payer, data_service, service_provider, 44).unwrap(),
504+
Receipt::new(
505+
collection_id,
506+
payer,
507+
data_service,
508+
service_provider,
509+
U256::from(44),
510+
)
511+
.unwrap(),
456512
&keys.0,
457513
)
458514
.unwrap(),

tap_aggregator/src/grpc.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ pub mod v2_u256 {
450450
type Error = anyhow::Error;
451451
fn try_from(receipt: self::Receipt) -> Result<Self, Self::Error> {
452452
Ok(Self {
453-
allocation_id: receipt.allocation_id.as_slice().try_into()?,
453+
collection_id: receipt.collection_id.as_slice().try_into()?,
454454
timestamp_ns: receipt.timestamp_ns,
455455
value: receipt.value.ok_or(anyhow!("Missing value"))?.into(),
456456
nonce: receipt.nonce,
@@ -477,7 +477,7 @@ pub mod v2_u256 {
477477
impl From<tap_graph::v2::Receipt> for self::Receipt {
478478
fn from(value: tap_graph::v2::Receipt) -> Self {
479479
Self {
480-
allocation_id: value.allocation_id.as_slice().to_vec(),
480+
collection_id: value.collection_id.as_slice().to_vec(),
481481
timestamp_ns: value.timestamp_ns,
482482
nonce: value.nonce,
483483
value: Some(U256::from(value.value).into()),
@@ -523,7 +523,7 @@ pub mod v2_u256 {
523523
type Error = anyhow::Error;
524524
fn try_from(voucher: self::ReceiptAggregateVoucher) -> Result<Self, Self::Error> {
525525
Ok(Self {
526-
allocationId: voucher.allocation_id.as_slice().try_into()?,
526+
collectionId: voucher.collection_id.as_slice().try_into()?,
527527
timestampNs: voucher.timestamp_ns,
528528
valueAggregate: voucher
529529
.value_aggregate
@@ -540,7 +540,7 @@ pub mod v2_u256 {
540540
impl From<tap_graph::v2::ReceiptAggregateVoucher> for self::ReceiptAggregateVoucher {
541541
fn from(voucher: tap_graph::v2::ReceiptAggregateVoucher) -> Self {
542542
Self {
543-
allocation_id: voucher.allocationId.to_vec(),
543+
collection_id: voucher.collectionId.to_vec(),
544544
timestamp_ns: voucher.timestampNs,
545545
value_aggregate: Some(U256::from(voucher.valueAggregate).into()),
546546
payer: voucher.payer.to_vec(),

tap_aggregator/src/server.rs

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ impl RpcServer for RpcImpl {
347347
produce_kafka_records(
348348
kafka,
349349
&self.wallet.address(),
350-
&res.data.message.allocationId,
350+
&res.data.message.collectionId,
351351
res.data.message.valueAggregate,
352352
);
353353
}
@@ -504,7 +504,7 @@ fn produce_kafka_records<K: Debug>(
504504
kafka: &rdkafka::producer::ThreadedProducer<rdkafka::producer::DefaultProducerContext>,
505505
sender: &Address,
506506
key_fragment: &K,
507-
aggregated_value: u128,
507+
aggregated_value: U256,
508508
) {
509509
let topic = "gateway_ravs";
510510
let key = format!("{sender:?}:{key_fragment:?}");
@@ -554,12 +554,25 @@ mod tests {
554554
}
555555

556556
#[fixture]
557-
fn allocation_ids() -> Vec<Address> {
557+
fn collection_ids() -> Vec<thegraph_core::alloy::primitives::FixedBytes<32>> {
558+
use thegraph_core::alloy::primitives::FixedBytes;
558559
vec![
559-
address!("0xabababababababababababababababababababab"),
560-
address!("0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead"),
561-
address!("0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef"),
562-
address!("0x1234567890abcdef1234567890abcdef12345678"),
560+
FixedBytes::from([0xab; 32]),
561+
FixedBytes::from([
562+
0xde, 0xad, 0xde, 0xad, 0xde, 0xad, 0xde, 0xad, 0xde, 0xad, 0xde, 0xad, 0xde, 0xad,
563+
0xde, 0xad, 0xde, 0xad, 0xde, 0xad, 0xde, 0xad, 0xde, 0xad, 0xde, 0xad, 0xde, 0xad,
564+
0xde, 0xad, 0xde, 0xad,
565+
]),
566+
FixedBytes::from([
567+
0xbe, 0xef, 0xbe, 0xef, 0xbe, 0xef, 0xbe, 0xef, 0xbe, 0xef, 0xbe, 0xef, 0xbe, 0xef,
568+
0xbe, 0xef, 0xbe, 0xef, 0xbe, 0xef, 0xbe, 0xef, 0xbe, 0xef, 0xbe, 0xef, 0xbe, 0xef,
569+
0xbe, 0xef, 0xbe, 0xef,
570+
]),
571+
FixedBytes::from([
572+
0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab,
573+
0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78,
574+
0x90, 0xab, 0xcd, 0xef,
575+
]),
563576
]
564577
}
565578

@@ -644,7 +657,7 @@ mod tests {
644657
http_request_size_limit: u32,
645658
http_response_size_limit: u32,
646659
http_max_concurrent_connections: u32,
647-
allocation_ids: Vec<Address>,
660+
collection_ids: Vec<thegraph_core::alloy::primitives::FixedBytes<32>>,
648661
payer: Address,
649662
data_service: Address,
650663
service_provider: Address,
@@ -691,7 +704,7 @@ mod tests {
691704
&domain_separator,
692705
#[cfg(feature = "v2")]
693706
Receipt::new(
694-
allocation_ids[0],
707+
collection_ids[0],
695708
payer,
696709
data_service,
697710
service_provider,
@@ -722,7 +735,7 @@ mod tests {
722735
#[cfg(feature = "v2")]
723736
{
724737
ReceiptAggregateVoucher::aggregate_receipts(
725-
allocation_ids[0],
738+
collection_ids[0],
726739
payer,
727740
data_service,
728741
service_provider,
@@ -733,12 +746,12 @@ mod tests {
733746
}
734747
#[cfg(not(feature = "v2"))]
735748
{
736-
ReceiptAggregateVoucher::aggregate_receipts(allocation_ids[0], &receipts, None)
749+
ReceiptAggregateVoucher::aggregate_receipts(collection_ids[0], &receipts, None)
737750
.unwrap()
738751
}
739752
};
740753

741-
assert!(remote_rav.message.allocationId == local_rav.allocationId);
754+
assert!(remote_rav.message.collectionId == local_rav.collectionId);
742755
assert!(remote_rav.message.timestampNs == local_rav.timestampNs);
743756
assert!(remote_rav.message.valueAggregate == local_rav.valueAggregate);
744757

@@ -756,7 +769,7 @@ mod tests {
756769
http_request_size_limit: u32,
757770
http_response_size_limit: u32,
758771
http_max_concurrent_connections: u32,
759-
allocation_ids: Vec<Address>,
772+
collection_ids: Vec<thegraph_core::alloy::primitives::FixedBytes<32>>,
760773
payer: Address,
761774
data_service: Address,
762775
service_provider: Address,
@@ -803,15 +816,15 @@ mod tests {
803816
&domain_separator,
804817
#[cfg(feature = "v2")]
805818
Receipt::new(
806-
allocation_ids[0],
819+
collection_ids[0],
807820
payer,
808821
data_service,
809822
service_provider,
810823
super::U256::from(value),
811824
)
812825
.unwrap(),
813826
#[cfg(not(feature = "v2"))]
814-
Receipt::new(allocation_ids[0], value).unwrap(),
827+
Receipt::new(collection_ids[0], value).unwrap(),
815828
&all_wallets.choose(&mut rng).unwrap().wallet,
816829
)
817830
.unwrap(),
@@ -823,7 +836,7 @@ mod tests {
823836
#[cfg(feature = "v2")]
824837
{
825838
ReceiptAggregateVoucher::aggregate_receipts(
826-
allocation_ids[0],
839+
collection_ids[0],
827840
payer,
828841
data_service,
829842
service_provider,
@@ -835,7 +848,7 @@ mod tests {
835848
#[cfg(not(feature = "v2"))]
836849
{
837850
ReceiptAggregateVoucher::aggregate_receipts(
838-
allocation_ids[0],
851+
collection_ids[0],
839852
&receipts[0..receipts.len() / 2],
840853
None,
841854
)
@@ -876,7 +889,7 @@ mod tests {
876889
http_request_size_limit: u32,
877890
http_response_size_limit: u32,
878891
http_max_concurrent_connections: u32,
879-
allocation_ids: Vec<Address>,
892+
collection_ids: Vec<thegraph_core::alloy::primitives::FixedBytes<32>>,
880893
payer: Address,
881894
data_service: Address,
882895
service_provider: Address,
@@ -908,15 +921,15 @@ mod tests {
908921
&domain_separator,
909922
#[cfg(feature = "v2")]
910923
Receipt::new(
911-
allocation_ids[0],
924+
collection_ids[0],
912925
payer,
913926
data_service,
914927
service_provider,
915928
super::U256::from(42),
916929
)
917930
.unwrap(),
918931
#[cfg(not(feature = "v2"))]
919-
Receipt::new(allocation_ids[0], 42).unwrap(),
932+
Receipt::new(collection_ids[0], 42).unwrap(),
920933
&keys_main.wallet,
921934
)
922935
.unwrap()];
@@ -971,7 +984,7 @@ mod tests {
971984
domain_separator: Eip712Domain,
972985
http_response_size_limit: u32,
973986
http_max_concurrent_connections: u32,
974-
allocation_ids: Vec<Address>,
987+
collection_ids: Vec<thegraph_core::alloy::primitives::FixedBytes<32>>,
975988
payer: Address,
976989
data_service: Address,
977990
service_provider: Address,
@@ -1020,7 +1033,7 @@ mod tests {
10201033
&domain_separator,
10211034
#[cfg(feature = "v2")]
10221035
Receipt::new(
1023-
allocation_ids[0],
1036+
collection_ids[0],
10241037
payer,
10251038
data_service,
10261039
service_provider,

0 commit comments

Comments
 (0)