Skip to content

Commit e9799f5

Browse files
committed
wip: horizon migration exploration
1 parent ceefaeb commit e9799f5

File tree

14 files changed

+645
-559
lines changed

14 files changed

+645
-559
lines changed

Cargo.lock

Lines changed: 475 additions & 456 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,13 @@ sqlx = { version = "0.8.2", features = [
8282
"uuid",
8383
], default-features = false }
8484
stdext = "0.3.3"
85-
tap_aggregator = { version = "0.5.5", default-features = false }
86-
tap_core = { version = "4.1.3", default-features = false }
87-
tap_graph = { version = "0.3.3", features = ["v2"] }
85+
tap_aggregator = { version = "0.5.6", default-features = false }
86+
tap_core = { version = "4.1.4", default-features = false }
87+
tap_graph = { version = "0.3.4", features = ["v2"] }
8888
tempfile = "3.8.0"
8989
test-log = { version = "0.2.12", default-features = false }
9090
test-with = "0.14.6"
91-
thegraph-core = { version = "0.15.0", features = [
91+
thegraph-core = { version = "0.15.0", git = "https://github.com/edgeandnode/toolshed", branch = "tmigone/collection-id", features = [
9292
"attestation",
9393
"alloy-eip712",
9494
"alloy-sol-types",

crates/indexer-receipt/src/lib.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use tap_core::{
99
},
1010
signed_message::SignatureBytes,
1111
};
12-
use thegraph_core::alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::Signature};
12+
use thegraph_core::{
13+
alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::Signature},
14+
CollectionId,
15+
};
1316

1417
#[derive(Debug, Clone, PartialEq, Eq)]
1518
pub enum TapReceipt {
@@ -70,13 +73,13 @@ impl Aggregate<TapReceipt> for tap_graph::v2::ReceiptAggregateVoucher {
7073
})
7174
.collect::<Result<_, _>>()
7275
.map_err(AggregationError::Other)?;
73-
let allocation_id = receipts[0].message.allocation_id;
76+
let collection_id = receipts[0].message.collection_id;
7477
let payer = receipts[0].message.payer;
7578
let data_service = receipts[0].message.data_service;
7679
let service_provider = receipts[0].message.service_provider;
7780

7881
tap_graph::v2::ReceiptAggregateVoucher::aggregate_receipts(
79-
allocation_id,
82+
collection_id,
8083
payer,
8184
data_service,
8285
service_provider,
@@ -115,10 +118,17 @@ impl TapReceipt {
115118
}
116119
}
117120

118-
pub fn allocation_id(&self) -> Address {
121+
pub fn allocation_id(&self) -> Option<Address> {
122+
match self {
123+
TapReceipt::V1(receipt) => Some(receipt.message.allocation_id),
124+
_ => None,
125+
}
126+
}
127+
128+
pub fn collection_id(&self) -> Option<CollectionId> {
119129
match self {
120-
TapReceipt::V1(receipt) => receipt.message.allocation_id,
121-
TapReceipt::V2(receipt) => receipt.message.allocation_id,
130+
TapReceipt::V1(_) => None,
131+
TapReceipt::V2(receipt) => Some(receipt.message.collection_id.into()),
122132
}
123133
}
124134

crates/service/src/tap/checks/allocation_eligible.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,29 @@ impl Check<TapReceipt> for AllocationEligible {
2929
_: &tap_core::receipt::Context,
3030
receipt: &CheckingReceipt,
3131
) -> CheckResult {
32-
let allocation_id = receipt.signed_receipt().allocation_id();
33-
if !self
34-
.indexer_allocations
35-
.borrow()
36-
.contains_key(&allocation_id)
37-
{
38-
return Err(CheckError::Failed(anyhow!(
39-
"Receipt allocation ID `{}` is not eligible for this indexer",
40-
allocation_id
41-
)));
32+
if let Some(allocation_id) = receipt.signed_receipt().allocation_id() {
33+
if !self
34+
.indexer_allocations
35+
.borrow()
36+
.contains_key(&allocation_id)
37+
{
38+
return Err(CheckError::Failed(anyhow!(
39+
"Receipt allocation ID `{}` is not eligible for this indexer",
40+
allocation_id
41+
)));
42+
}
43+
} else if let Some(collection_id) = receipt.signed_receipt().collection_id() {
44+
let collection_id_address = collection_id.as_address();
45+
if !self
46+
.indexer_allocations
47+
.borrow()
48+
.contains_key(&collection_id_address)
49+
{
50+
return Err(CheckError::Failed(anyhow!(
51+
"Receipt collection ID `{}` is not eligible for this indexer",
52+
collection_id
53+
)));
54+
}
4255
}
4356
Ok(())
4457
}

crates/service/src/tap/receipt_store.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ impl InnerContext {
164164
let receipts_len = receipts.len();
165165
let mut signers = Vec::with_capacity(receipts_len);
166166
let mut signatures = Vec::with_capacity(receipts_len);
167-
let mut allocation_ids = Vec::with_capacity(receipts_len);
167+
let mut collection_ids = Vec::with_capacity(receipts_len);
168168
let mut payers = Vec::with_capacity(receipts_len);
169169
let mut data_services = Vec::with_capacity(receipts_len);
170170
let mut service_providers = Vec::with_capacity(receipts_len);
@@ -175,7 +175,7 @@ impl InnerContext {
175175
for receipt in receipts {
176176
signers.push(receipt.signer_address);
177177
signatures.push(receipt.signature);
178-
allocation_ids.push(receipt.allocation_id);
178+
collection_ids.push(receipt.collection_id);
179179
payers.push(receipt.payer);
180180
data_services.push(receipt.data_service);
181181
service_providers.push(receipt.service_provider);
@@ -187,7 +187,7 @@ impl InnerContext {
187187
r#"INSERT INTO tap_horizon_receipts (
188188
signer_address,
189189
signature,
190-
allocation_id,
190+
collection_id,
191191
payer,
192192
data_service,
193193
service_provider,
@@ -207,7 +207,7 @@ impl InnerContext {
207207
)"#,
208208
&signers,
209209
&signatures,
210-
&allocation_ids,
210+
&collection_ids,
211211
&payers,
212212
&data_services,
213213
&service_providers,
@@ -328,7 +328,7 @@ impl DbReceiptV1 {
328328
pub struct DbReceiptV2 {
329329
signer_address: String,
330330
signature: Vec<u8>,
331-
allocation_id: String,
331+
collection_id: String,
332332
payer: String,
333333
data_service: String,
334334
service_provider: String,
@@ -342,7 +342,7 @@ impl DbReceiptV2 {
342342
receipt: &tap_graph::v2::SignedReceipt,
343343
separator: &Eip712Domain,
344344
) -> anyhow::Result<Self> {
345-
let allocation_id = receipt.message.allocation_id.encode_hex();
345+
let collection_id = receipt.message.collection_id.encode_hex();
346346
let payer = receipt.message.payer.encode_hex();
347347
let data_service = receipt.message.data_service.encode_hex();
348348
let service_provider = receipt.message.service_provider.encode_hex();
@@ -360,7 +360,7 @@ impl DbReceiptV2 {
360360
let nonce = BigDecimal::from(receipt.message.nonce);
361361
let value = BigDecimal::from(BigInt::from(receipt.value()));
362362
Ok(Self {
363-
allocation_id,
363+
collection_id,
364364
payer,
365365
data_service,
366366
service_provider,

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@ use tap_aggregator::grpc::{
2525
v1::tap_aggregator_client::TapAggregatorClient as AggregatorV1,
2626
v2::tap_aggregator_client::TapAggregatorClient as AggregatorV2,
2727
};
28-
use thegraph_core::alloy::{
29-
hex::ToHexExt,
30-
primitives::{Address, U256},
31-
sol_types::Eip712Domain,
28+
use thegraph_core::{
29+
alloy::{
30+
hex::ToHexExt,
31+
primitives::{Address, U256},
32+
sol_types::Eip712Domain,
33+
},
34+
CollectionId,
3235
};
3336
use tokio::{sync::watch::Receiver, task::JoinHandle};
3437
use tonic::transport::{Channel, Endpoint};
@@ -144,7 +147,7 @@ impl From<tap_graph::SignedRav> for RavInformation {
144147
impl From<&tap_graph::v2::SignedRav> for RavInformation {
145148
fn from(value: &tap_graph::v2::SignedRav) -> Self {
146149
RavInformation {
147-
allocation_id: value.message.allocationId,
150+
allocation_id: CollectionId::from(value.message.collectionId).as_address(),
148151
value_aggregate: value.message.valueAggregate,
149152
}
150153
}
@@ -815,7 +818,7 @@ impl Actor for SenderAccount {
815818
if config.horizon_enabled {
816819
sqlx::query!(
817820
r#"
818-
SELECT allocation_id, value_aggregate
821+
SELECT collection_id, value_aggregate
819822
FROM tap_horizon_ravs
820823
WHERE payer = $1 AND last AND NOT final;
821824
"#,
@@ -825,7 +828,7 @@ impl Actor for SenderAccount {
825828
.await
826829
.expect("Should not fail to fetch from \"horizon\" scalar_tap_ravs")
827830
.into_iter()
828-
.map(|record| (record.allocation_id, record.value_aggregate))
831+
.map(|record| (record.collection_id, record.value_aggregate))
829832
.collect()
830833
} else {
831834
vec![]

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -700,13 +700,13 @@ impl State {
700700
let receipts_signer_allocations_in_db = sqlx::query!(
701701
r#"
702702
WITH grouped AS (
703-
SELECT signer_address, allocation_id
703+
SELECT signer_address, collection_id
704704
FROM tap_horizon_receipts
705-
GROUP BY signer_address, allocation_id
705+
GROUP BY signer_address, collection_id
706706
)
707707
SELECT
708708
signer_address,
709-
ARRAY_AGG(allocation_id) AS allocation_ids
709+
ARRAY_AGG(collection_id) AS allocation_ids
710710
FROM grouped
711711
GROUP BY signer_address
712712
"#
@@ -746,7 +746,7 @@ impl State {
746746
r#"
747747
SELECT
748748
payer,
749-
ARRAY_AGG(DISTINCT allocation_id) FILTER (WHERE NOT last) AS allocation_ids
749+
ARRAY_AGG(DISTINCT collection_id) FILTER (WHERE NOT last) AS allocation_ids
750750
FROM tap_horizon_ravs
751751
GROUP BY payer
752752
"#

crates/tap-agent/src/agent/sender_allocation.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ where
818818
let reciepts_len = receipts.len();
819819
let mut reciepts_signers = Vec::with_capacity(reciepts_len);
820820
let mut encoded_signatures = Vec::with_capacity(reciepts_len);
821-
let mut allocation_ids = Vec::with_capacity(reciepts_len);
821+
let mut collection_ids = Vec::with_capacity(reciepts_len);
822822
let mut payers = Vec::with_capacity(reciepts_len);
823823
let mut data_services = Vec::with_capacity(reciepts_len);
824824
let mut service_providers = Vec::with_capacity(reciepts_len);
@@ -828,7 +828,7 @@ where
828828
let mut error_logs = Vec::with_capacity(reciepts_len);
829829

830830
for (receipt, receipt_error) in receipts {
831-
let allocation_id = receipt.message.allocation_id;
831+
let collection_id = receipt.message.collection_id;
832832
let payer = receipt.message.payer;
833833
let data_service = receipt.message.data_service;
834834
let service_provider = receipt.message.service_provider;
@@ -841,13 +841,13 @@ where
841841
})?;
842842
tracing::debug!(
843843
"Receipt for allocation {} and signer {} failed reason: {}",
844-
allocation_id.encode_hex(),
844+
collection_id.encode_hex(),
845845
receipt_signer.encode_hex(),
846846
receipt_error
847847
);
848848
reciepts_signers.push(receipt_signer.encode_hex());
849849
encoded_signatures.push(encoded_signature);
850-
allocation_ids.push(allocation_id.encode_hex());
850+
collection_ids.push(collection_id.encode_hex());
851851
payers.push(payer.encode_hex());
852852
data_services.push(data_service.encode_hex());
853853
service_providers.push(service_provider.encode_hex());
@@ -860,7 +860,7 @@ where
860860
r#"INSERT INTO tap_horizon_receipts_invalid (
861861
signer_address,
862862
signature,
863-
allocation_id,
863+
collection_id,
864864
payer,
865865
data_service,
866866
service_provider,
@@ -871,7 +871,7 @@ where
871871
) SELECT * FROM UNNEST(
872872
$1::CHAR(40)[],
873873
$2::BYTEA[],
874-
$3::CHAR(40)[],
874+
$3::CHAR(64)[],
875875
$4::CHAR(40)[],
876876
$5::CHAR(40)[],
877877
$6::CHAR(40)[],
@@ -882,7 +882,7 @@ where
882882
)"#,
883883
&reciepts_signers,
884884
&encoded_signatures,
885-
&allocation_ids,
885+
&collection_ids,
886886
&payers,
887887
&data_services,
888888
&service_providers,
@@ -1159,7 +1159,7 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
11591159
FROM
11601160
tap_horizon_receipts_invalid
11611161
WHERE
1162-
allocation_id = $1
1162+
collection_id = $1
11631163
AND signer_address IN (SELECT unnest($2::text[]))
11641164
"#,
11651165
self.allocation_id.encode_hex(),
@@ -1205,7 +1205,7 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
12051205
FROM
12061206
tap_horizon_receipts
12071207
WHERE
1208-
allocation_id = $1
1208+
collection_id = $1
12091209
AND service_provider = $2
12101210
AND id <= $3
12111211
AND signer_address IN (SELECT unnest($4::text[]))
@@ -1258,7 +1258,7 @@ impl DatabaseInteractions for SenderAllocationState<Horizon> {
12581258
UPDATE tap_horizon_ravs
12591259
SET last = true
12601260
WHERE
1261-
allocation_id = $1
1261+
collection_id = $1
12621262
AND payer = $2
12631263
AND service_provider = $3
12641264
"#,

crates/tap-agent/src/tap/context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ impl NetworkVersion for Horizon {
155155
#[derive(Clone, bon::Builder)]
156156
pub struct TapAgentContext<T> {
157157
pgpool: PgPool,
158+
/// For Legacy network: represents an allocation ID
159+
/// For Horizon network: represents a collection ID (stored in collection_id database column)
158160
#[cfg_attr(test, builder(default = crate::test::ALLOCATION_ID_0))]
159161
allocation_id: Address,
160162
#[cfg_attr(test, builder(default = test_assets::TAP_SENDER.1))]

crates/tap-agent/src/tap/context/checks/allocation_id.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl Check<TapReceipt> for AllocationId {
5454
_: &tap_core::receipt::Context,
5555
receipt: &CheckingReceipt,
5656
) -> CheckResult {
57-
let allocation_id = receipt.signed_receipt().allocation_id();
57+
let allocation_id = receipt.signed_receipt().allocation_id().unwrap();
5858
// TODO: Remove the if block below? Each TAP Monitor is specific to an allocation
5959
// ID. So the receipts that are received here should already have been filtered by
6060
// allocation ID.

0 commit comments

Comments
 (0)