Skip to content

Commit 86e0057

Browse files
committed
wip: horizon migration exploration
1 parent 604a765 commit 86e0057

File tree

7 files changed

+77
-38
lines changed

7 files changed

+77
-38
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ sqlx = { version = "0.8.2", features = [
8383
], default-features = false }
8484
stdext = "0.3.3"
8585
tap_aggregator = { version = "0.5.6", default-features = false }
86-
tap_core = { version = "4.1.3", default-features = false }
86+
tap_core = { version = "4.1.4", default-features = false }
8787
tap_graph = { version = "0.3.4", features = ["v2"] }
8888
tempfile = "3.8.0"
8989
test-log = { version = "0.2.12", default-features = false }

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@ use tap_aggregator::grpc::{
2525
v1::tap_aggregator_client::TapAggregatorClient as AggregatorV1,
2626
v2::tap_aggregator_client::TapAggregatorClient as AggregatorV2,
2727
};
28-
use thegraph_core::alloy::{
29-
hex::ToHexExt,
30-
primitives::{Address, U256},
31-
sol_types::Eip712Domain,
28+
use thegraph_core::{
29+
alloy::{
30+
hex::ToHexExt,
31+
primitives::{Address, U256},
32+
sol_types::Eip712Domain,
33+
},
34+
CollectionId,
3235
};
3336
use thegraph_core::{AllocationId as AllocationIdCore, CollectionId};
3437
use tokio::{sync::watch::Receiver, task::JoinHandle};

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,7 @@ impl State {
700700
let receipts_signer_collections_in_db = sqlx::query!(
701701
r#"
702702
WITH grouped AS (
703+
SELECT signer_address, collection_id
703704
SELECT signer_address, collection_id
704705
FROM tap_horizon_receipts
705706
GROUP BY signer_address, collection_id
@@ -746,7 +747,7 @@ impl State {
746747
r#"
747748
SELECT
748749
payer,
749-
ARRAY_AGG(DISTINCT allocation_id) FILTER (WHERE NOT last) AS allocation_ids
750+
ARRAY_AGG(DISTINCT collection_id) FILTER (WHERE NOT last) AS allocation_ids
750751
FROM tap_horizon_ravs
751752
GROUP BY payer
752753
"#

crates/tap-agent/src/tap/context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ impl NetworkVersion for Horizon {
155155
#[derive(Clone, bon::Builder)]
156156
pub struct TapAgentContext<T> {
157157
pgpool: PgPool,
158+
/// For Legacy network: represents an allocation ID
159+
/// For Horizon network: represents a collection ID (stored in collection_id database column)
158160
#[cfg_attr(test, builder(default = crate::test::ALLOCATION_ID_0))]
159161
allocation_id: Address,
160162
#[cfg_attr(test, builder(default = test_assets::TAP_SENDER.1))]

crates/tap-agent/src/tap/context/rav.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ impl RavRead<tap_graph::v2::ReceiptAggregateVoucher> for TapAgentContext<Horizon
167167
metadata
168168
FROM tap_horizon_ravs
169169
WHERE
170-
allocation_id = $1
170+
collection_id = $1
171171
AND payer = $2
172172
AND service_provider = $3
173173
"#,
@@ -200,7 +200,8 @@ impl RavRead<tap_graph::v2::ReceiptAggregateVoucher> for TapAgentContext<Horizon
200200
"Error decoding collection_id while retrieving RAV from database: {}",
201201
e
202202
),
203-
})?;
203+
}
204+
})?;
204205

205206
let payer = Address::from_str(&row.payer).map_err(|e| AdapterError::RavRead {
206207
error: format!(
@@ -382,7 +383,8 @@ mod test {
382383
{
383384
// Insert a rav
384385
let mut new_rav = T::create_rav(
385-
ALLOCATION_ID_0,
386+
Some(ALLOCATION_ID_0),
387+
None,
386388
SIGNER.0.clone(),
387389
TIMESTAMP_NS,
388390
VALUE_AGGREGATE,
@@ -398,7 +400,8 @@ mod test {
398400
// Update the RAV 3 times in quick succession
399401
for i in 0..3 {
400402
new_rav = T::create_rav(
401-
ALLOCATION_ID_0,
403+
Some(ALLOCATION_ID_0),
404+
None,
402405
SIGNER.0.clone(),
403406
TIMESTAMP_NS + i,
404407
VALUE_AGGREGATE - (i as u128),

crates/tap-agent/src/tap/context/receipt.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -460,8 +460,14 @@ mod test {
460460
T: CreateReceipt,
461461
TapAgentContext<T>: ReceiptRead<TapReceipt> + ReceiptDelete,
462462
{
463-
let received_receipt =
464-
T::create_received_receipt(ALLOCATION_ID_0, &SIGNER.0, u64::MAX, u64::MAX, u128::MAX);
463+
let received_receipt = T::create_received_receipt(
464+
Some(ALLOCATION_ID_0),
465+
None,
466+
&SIGNER.0,
467+
u64::MAX,
468+
u64::MAX,
469+
u128::MAX,
470+
);
465471

466472
// Storing the receipt
467473
store_receipt(&context.pgpool, received_receipt.signed_receipt())
@@ -499,7 +505,7 @@ mod test {
499505
.filter(|received_receipt| {
500506
range.contains(&received_receipt.signed_receipt().timestamp_ns())
501507
&& (received_receipt.signed_receipt().allocation_id()
502-
== storage_adapter.allocation_id)
508+
== Some(storage_adapter.allocation_id))
503509
&& escrow_accounts_snapshot
504510
.get_sender_for_signer(
505511
&received_receipt
@@ -574,7 +580,7 @@ mod test {
574580
.iter()
575581
.filter(|(_, received_receipt)| {
576582
if (received_receipt.signed_receipt().allocation_id()
577-
== storage_adapter.allocation_id)
583+
== Some(storage_adapter.allocation_id))
578584
&& escrow_accounts_snapshot
579585
.get_sender_for_signer(
580586
&received_receipt
@@ -717,7 +723,7 @@ mod test {
717723
.iter()
718724
.filter(|(_, received_receipt)| {
719725
if (received_receipt.signed_receipt().allocation_id()
720-
== storage_adapter.allocation_id)
726+
== Some(storage_adapter.allocation_id))
721727
&& escrow_accounts_snapshot
722728
.get_sender_for_signer(
723729
&received_receipt
@@ -832,7 +838,8 @@ mod test {
832838
// Creating 100 receipts with timestamps 42 to 141
833839
for i in 0..100 {
834840
let receipt = T::create_received_receipt(
835-
ALLOCATION_ID_0,
841+
Some(ALLOCATION_ID_0),
842+
None,
836843
&SIGNER.0,
837844
i + 684,
838845
i + 42,
@@ -858,7 +865,8 @@ mod test {
858865
// add a copy in the same timestamp
859866
for i in 0..100 {
860867
let receipt = T::create_received_receipt(
861-
ALLOCATION_ID_0,
868+
Some(ALLOCATION_ID_0),
869+
None,
862870
&SIGNER.0,
863871
i + 684,
864872
i + 43,
@@ -900,7 +908,8 @@ mod test {
900908
let mut received_receipt_vec = Vec::new();
901909
for i in 0..10 {
902910
received_receipt_vec.push(T::create_received_receipt(
903-
ALLOCATION_ID_0,
911+
Some(ALLOCATION_ID_0),
912+
None,
904913
&SIGNER.0,
905914
i + 684,
906915
i + 42,
@@ -909,14 +918,16 @@ mod test {
909918

910919
// Adding irrelevant receipts to make sure they are not retrieved
911920
received_receipt_vec.push(T::create_received_receipt(
912-
ALLOCATION_ID_IRRELEVANT,
921+
Some(ALLOCATION_ID_IRRELEVANT),
922+
None,
913923
&SIGNER.0,
914924
i + 684,
915925
i + 42,
916926
(i + 124).into(),
917927
));
918928
received_receipt_vec.push(T::create_received_receipt(
919-
ALLOCATION_ID_0,
929+
Some(ALLOCATION_ID_0),
930+
None,
920931
&SENDER_IRRELEVANT.0,
921932
i + 684,
922933
i + 42,
@@ -1025,7 +1036,8 @@ mod test {
10251036
let mut received_receipt_vec = Vec::new();
10261037
for i in 0..10 {
10271038
received_receipt_vec.push(T::create_received_receipt(
1028-
ALLOCATION_ID_0,
1039+
Some(ALLOCATION_ID_0),
1040+
None,
10291041
&SIGNER.0,
10301042
i + 684,
10311043
i + 42,
@@ -1034,14 +1046,16 @@ mod test {
10341046

10351047
// Adding irrelevant receipts to make sure they are not retrieved
10361048
received_receipt_vec.push(T::create_received_receipt(
1037-
ALLOCATION_ID_IRRELEVANT,
1049+
Some(ALLOCATION_ID_IRRELEVANT),
1050+
None,
10381051
&SIGNER.0,
10391052
i + 684,
10401053
i + 42,
10411054
(i + 124).into(),
10421055
));
10431056
received_receipt_vec.push(T::create_received_receipt(
1044-
ALLOCATION_ID_0,
1057+
Some(ALLOCATION_ID_0),
1058+
None,
10451059
&SENDER_IRRELEVANT.0,
10461060
i + 684,
10471061
i + 42,

crates/tap-agent/src/test.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain};
2323
use tap_graph::{Receipt, ReceiptAggregateVoucher, SignedRav, SignedReceipt};
2424
use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER};
2525
use thegraph_core::alloy::{
26-
primitives::{hex::ToHexExt, Address, Bytes, U256},
26+
primitives::{hex::ToHexExt, Address, Bytes, FixedBytes, U256},
2727
signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner},
2828
sol_types::Eip712Domain,
2929
};
@@ -268,7 +268,8 @@ pub trait CreateRav: NetworkVersion {
268268
/// function signature and don't require &self. The reason is that we can not match over T to get
269269
/// all variants because T is a trait and not an enum.
270270
fn create_rav(
271-
allocation_id: Address,
271+
allocation_id: Option<Address>,
272+
collection_id: Option<FixedBytes<32>>,
272273
signer_wallet: PrivateKeySigner,
273274
timestamp_ns: u64,
274275
value_aggregate: u128,
@@ -277,23 +278,35 @@ pub trait CreateRav: NetworkVersion {
277278

278279
impl CreateRav for Legacy {
279280
fn create_rav(
280-
allocation_id: Address,
281+
allocation_id: Option<Address>,
282+
_collection_id: Option<FixedBytes<32>>,
281283
signer_wallet: PrivateKeySigner,
282284
timestamp_ns: u64,
283285
value_aggregate: u128,
284286
) -> Eip712SignedMessage<Self::Rav> {
285-
create_rav(allocation_id, signer_wallet, timestamp_ns, value_aggregate)
287+
create_rav(
288+
allocation_id.unwrap(),
289+
signer_wallet,
290+
timestamp_ns,
291+
value_aggregate,
292+
)
286293
}
287294
}
288295

289296
impl CreateRav for Horizon {
290297
fn create_rav(
291-
allocation_id: Address,
298+
_allocation_id: Option<Address>,
299+
collection_id: Option<FixedBytes<32>>,
292300
signer_wallet: PrivateKeySigner,
293301
timestamp_ns: u64,
294302
value_aggregate: u128,
295303
) -> Eip712SignedMessage<Self::Rav> {
296-
create_rav_v2(allocation_id, signer_wallet, timestamp_ns, value_aggregate)
304+
create_rav_v2(
305+
collection_id.unwrap(),
306+
signer_wallet,
307+
timestamp_ns,
308+
value_aggregate,
309+
)
297310
}
298311
}
299312

@@ -318,15 +331,15 @@ pub fn create_rav(
318331

319332
/// Fixture to generate a RAV using the wallet from `keys()`
320333
pub fn create_rav_v2(
321-
allocation_id: Address,
334+
collection_id: FixedBytes<32>,
322335
signer_wallet: PrivateKeySigner,
323336
timestamp_ns: u64,
324337
value_aggregate: u128,
325338
) -> tap_graph::v2::SignedRav {
326339
Eip712SignedMessage::new(
327340
&TAP_EIP712_DOMAIN_SEPARATOR,
328341
tap_graph::v2::ReceiptAggregateVoucher {
329-
allocationId: allocation_id,
342+
collectionId: collection_id,
330343
timestampNs: timestamp_ns,
331344
valueAggregate: value_aggregate,
332345
payer: SENDER.1,
@@ -345,7 +358,8 @@ pub trait CreateReceipt {
345358
/// function signature and don't require &self. The reason is that we can not match over T to get
346359
/// all variants because T is a trait and not an enum.
347360
fn create_received_receipt(
348-
allocation_id: Address,
361+
allocation_id: Option<Address>,
362+
collection_id: Option<FixedBytes<32>>,
349363
signer_wallet: &PrivateKeySigner,
350364
nonce: u64,
351365
timestamp_ns: u64,
@@ -355,7 +369,8 @@ pub trait CreateReceipt {
355369

356370
impl CreateReceipt for Horizon {
357371
fn create_received_receipt(
358-
allocation_id: Address,
372+
_allocation_id: Option<Address>,
373+
collection_id: Option<FixedBytes<32>>,
359374
signer_wallet: &PrivateKeySigner,
360375
nonce: u64,
361376
timestamp_ns: u64,
@@ -364,7 +379,7 @@ impl CreateReceipt for Horizon {
364379
let receipt = Eip712SignedMessage::new(
365380
&TAP_EIP712_DOMAIN_SEPARATOR,
366381
tap_graph::v2::Receipt {
367-
allocation_id,
382+
collection_id: collection_id.unwrap(),
368383
payer: SENDER.1,
369384
service_provider: INDEXER.1,
370385
data_service: Address::ZERO,
@@ -381,7 +396,8 @@ impl CreateReceipt for Horizon {
381396

382397
impl CreateReceipt for Legacy {
383398
fn create_received_receipt(
384-
allocation_id: Address,
399+
allocation_id: Option<Address>,
400+
_collection_id: Option<FixedBytes<32>>,
385401
signer_wallet: &PrivateKeySigner,
386402
nonce: u64,
387403
timestamp_ns: u64,
@@ -390,7 +406,7 @@ impl CreateReceipt for Legacy {
390406
let receipt = Eip712SignedMessage::new(
391407
&TAP_EIP712_DOMAIN_SEPARATOR,
392408
Receipt {
393-
allocation_id,
409+
allocation_id: allocation_id.unwrap(),
394410
nonce,
395411
timestamp_ns,
396412
value,
@@ -480,7 +496,7 @@ pub async fn store_receipt_v2(
480496
INSERT INTO tap_horizon_receipts (
481497
signer_address,
482498
signature,
483-
allocation_id,
499+
collection_id,
484500
payer,
485501
data_service,
486502
service_provider,
@@ -492,7 +508,7 @@ pub async fn store_receipt_v2(
492508
"#,
493509
signer,
494510
encoded_signature,
495-
signed_receipt.message.allocation_id.encode_hex(),
511+
signed_receipt.message.collection_id.encode_hex(),
496512
signed_receipt.message.payer.encode_hex(),
497513
signed_receipt.message.data_service.encode_hex(),
498514
signed_receipt.message.service_provider.encode_hex(),

0 commit comments

Comments
 (0)