Skip to content

Commit 35046f6

Browse files
committed
refactor: add generic for legacy or horizon
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent d4b9778 commit 35046f6

File tree

8 files changed

+153
-50
lines changed

8 files changed

+153
-50
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/tap-agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ tracing-subscriber.workspace = true
3939
tonic.workspace = true
4040
bigdecimal = { workspace = true, features = ["serde"] }
4141
graphql_client.workspace = true
42-
4342
ruint = { version = "1.12.3", features = [
4443
"num-traits",
4544
], default-features = false }
@@ -52,6 +51,7 @@ tap_aggregator.workspace = true
5251
futures = { version = "0.3.30", default-features = false }
5352
bon = "3.3"
5453
test-assets = { path = "../test-assets", optional = true }
54+
sealed = "0.6.0"
5555

5656
[dev-dependencies]
5757
# Release-please breaks with cyclical dependencies if dev-dependencies

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::{
3939
adaptative_concurrency::AdaptiveLimiter,
4040
agent::unaggregated_receipts::UnaggregatedReceipts,
4141
backoff::BackoffInfo,
42+
tap::context::Legacy,
4243
tracker::{SenderFeeTracker, SimpleFeeTracker},
4344
};
4445

@@ -210,11 +211,17 @@ impl SenderAccountConfig {
210211
}
211212
}
212213

214+
pub enum AllocationType {
215+
Legacy,
216+
Horizon,
217+
}
218+
213219
impl State {
214220
async fn create_sender_allocation(
215221
&self,
216222
sender_account_ref: ActorRef<SenderAccountMessage>,
217223
allocation_id: Address,
224+
allocation_type: AllocationType,
218225
) -> anyhow::Result<()> {
219226
tracing::trace!(
220227
%self.sender,
@@ -233,13 +240,18 @@ impl State {
233240
.config(AllocationConfig::from_sender_config(self.config))
234241
.build();
235242

236-
SenderAllocation::spawn_linked(
237-
Some(self.format_sender_allocation(&allocation_id)),
238-
SenderAllocation,
239-
args,
240-
sender_account_ref.get_cell(),
241-
)
242-
.await?;
243+
match allocation_type {
244+
AllocationType::Legacy => {
245+
SenderAllocation::spawn_linked(
246+
Some(self.format_sender_allocation(&allocation_id)),
247+
SenderAllocation::<Legacy>::default(),
248+
args,
249+
sender_account_ref.get_cell(),
250+
)
251+
.await?;
252+
}
253+
AllocationType::Horizon => unimplemented!(),
254+
}
243255
Ok(())
244256
}
245257
fn format_sender_allocation(&self, allocation_id: &Address) -> String {
@@ -646,7 +658,13 @@ impl Actor for SenderAccount {
646658

647659
stream::iter(allocation_ids)
648660
// Create a sender allocation for each allocation
649-
.map(|allocation_id| state.create_sender_allocation(myself.clone(), allocation_id))
661+
.map(|allocation_id| {
662+
state.create_sender_allocation(
663+
myself.clone(),
664+
allocation_id,
665+
AllocationType::Legacy,
666+
)
667+
})
650668
.buffer_unordered(10) // Limit concurrency to 10 allocations at a time
651669
.collect::<Vec<anyhow::Result<()>>>()
652670
.await
@@ -819,7 +837,11 @@ impl Actor for SenderAccount {
819837
let mut new_allocation_ids = state.allocation_ids.clone();
820838
for allocation_id in allocation_ids.difference(&state.allocation_ids) {
821839
if let Err(error) = state
822-
.create_sender_allocation(myself.clone(), *allocation_id)
840+
.create_sender_allocation(
841+
myself.clone(),
842+
*allocation_id,
843+
AllocationType::Legacy,
844+
)
823845
.await
824846
{
825847
tracing::error!(
@@ -870,7 +892,7 @@ impl Actor for SenderAccount {
870892
}
871893
SenderAccountMessage::NewAllocationId(allocation_id) => {
872894
if let Err(error) = state
873-
.create_sender_allocation(myself.clone(), allocation_id)
895+
.create_sender_allocation(myself.clone(), allocation_id, AllocationType::Legacy)
874896
.await
875897
{
876898
tracing::error!(
@@ -1013,7 +1035,7 @@ impl Actor for SenderAccount {
10131035
};
10141036

10151037
if let Err(error) = state
1016-
.create_sender_allocation(myself.clone(), allocation_id)
1038+
.create_sender_allocation(myself.clone(), allocation_id, AllocationType::Legacy)
10171039
.await
10181040
{
10191041
tracing::error!(

crates/tap-agent/src/agent/sender_allocation.rs

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use std::{
5+
marker::PhantomData,
56
sync::Arc,
67
time::{Duration, Instant},
78
};
@@ -16,7 +17,7 @@ use tap_aggregator::grpc::{
1617
tap_aggregator_client::TapAggregatorClient, RavRequest as AggregatorRequest,
1718
};
1819
use tap_core::{
19-
manager::adapters::RavRead,
20+
manager::adapters::{RavRead, RavStore, ReceiptDelete, ReceiptRead},
2021
rav_request::RavRequest,
2122
receipt::{
2223
checks::{Check, CheckList},
@@ -43,7 +44,7 @@ use crate::{
4344
tap::{
4445
context::{
4546
checks::{AllocationId, Signature},
46-
TapAgentContext,
47+
ReceiptType, TapAgentContext,
4748
},
4849
signers_trimmed, TapReceipt,
4950
},
@@ -100,29 +101,28 @@ pub enum RavError {
100101
Other(#[from] anyhow::Error),
101102
}
102103

103-
type TapManager = tap_core::manager::Manager<TapAgentContext, TapReceipt>;
104-
105-
pub enum AllocationType {
106-
Legacy,
107-
Horizon,
108-
}
104+
type TapManager<T> = tap_core::manager::Manager<TapAgentContext<T>, TapReceipt>;
109105

110106
/// Manages unaggregated fees and the TAP lifecyle for a specific (allocation, sender) pair.
111-
pub struct SenderAllocation;
107+
pub struct SenderAllocation<T>(PhantomData<T>);
108+
impl<T: ReceiptType> Default for SenderAllocation<T> {
109+
fn default() -> Self {
110+
Self(PhantomData)
111+
}
112+
}
112113

113-
pub struct SenderAllocationState {
114+
pub struct SenderAllocationState<T> {
114115
unaggregated_fees: UnaggregatedReceipts,
115116
invalid_receipts_fees: UnaggregatedReceipts,
116117
latest_rav: Option<SignedRav>,
117118
pgpool: PgPool,
118-
tap_manager: TapManager,
119+
tap_manager: TapManager<T>,
119120
allocation_id: Address,
120121
sender: Address,
121122
escrow_accounts: Receiver<EscrowAccounts>,
122123
domain_separator: Eip712Domain,
123124
sender_account_ref: ActorRef<SenderAccountMessage>,
124125
sender_aggregator: TapAggregatorClient<Channel>,
125-
allocation_type: AllocationType,
126126
//config
127127
timestamp_buffer_ns: u64,
128128
rav_request_receipt_limit: u64,
@@ -157,8 +157,6 @@ pub struct SenderAllocationArgs {
157157
pub domain_separator: Eip712Domain,
158158
pub sender_account_ref: ActorRef<SenderAccountMessage>,
159159
pub sender_aggregator: TapAggregatorClient<Channel>,
160-
#[builder(default = AllocationType::Legacy)]
161-
pub allocation_type: AllocationType,
162160

163161
//config
164162
pub config: AllocationConfig,
@@ -173,9 +171,16 @@ pub enum SenderAllocationMessage {
173171
}
174172

175173
#[async_trait::async_trait]
176-
impl Actor for SenderAllocation {
174+
impl<T> Actor for SenderAllocation<T>
175+
where
176+
T: ReceiptType + Send + Sync + 'static,
177+
TapAgentContext<T>: RavRead<ReceiptAggregateVoucher>
178+
+ RavStore<ReceiptAggregateVoucher>
179+
+ ReceiptDelete
180+
+ ReceiptRead<TapReceipt>,
181+
{
177182
type Msg = SenderAllocationMessage;
178-
type State = SenderAllocationState;
183+
type State = SenderAllocationState<T>;
179184
type Arguments = SenderAllocationArgs;
180185

181186
async fn pre_start(
@@ -350,10 +355,16 @@ impl Actor for SenderAllocation {
350355
}
351356
}
352357

353-
impl SenderAllocationState {
358+
impl<T> SenderAllocationState<T>
359+
where
360+
T: ReceiptType + Send + Sync,
361+
TapAgentContext<T>: RavRead<ReceiptAggregateVoucher>
362+
+ RavStore<ReceiptAggregateVoucher>
363+
+ ReceiptDelete
364+
+ ReceiptRead<TapReceipt>,
365+
{
354366
async fn new(
355367
SenderAllocationArgs {
356-
allocation_type,
357368
pgpool,
358369
allocation_id,
359370
sender,
@@ -397,7 +408,6 @@ impl SenderAllocationState {
397408
Ok(Self {
398409
pgpool,
399410
tap_manager,
400-
allocation_type,
401411
allocation_id,
402412
sender,
403413
escrow_accounts,
@@ -1028,7 +1038,7 @@ pub mod tests {
10281038
.rav_request_receipt_limit(rav_request_receipt_limit)
10291039
.call()
10301040
.await;
1031-
let actor = TestableActor::new(SenderAllocation);
1041+
let actor = TestableActor::new(SenderAllocation::default());
10321042
let notify = actor.notify.clone();
10331043

10341044
let (allocation_ref, _join_handle) = Actor::spawn(None, actor, args).await.unwrap();

crates/tap-agent/src/tap/context.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use std::marker::PhantomData;
5+
46
use indexer_monitor::EscrowAccounts;
57
use sqlx::PgPool;
68
use thegraph_core::alloy::primitives::Address;
@@ -14,15 +16,28 @@ mod receipt;
1416

1517
pub use error::AdapterError;
1618

19+
#[sealed::sealed]
20+
pub trait ReceiptType {}
21+
22+
pub enum Legacy {}
23+
pub enum Horizon {}
24+
25+
#[sealed::sealed]
26+
impl ReceiptType for Legacy {}
27+
28+
#[sealed::sealed]
29+
impl ReceiptType for Horizon {}
30+
1731
#[derive(Clone)]
18-
pub struct TapAgentContext {
32+
pub struct TapAgentContext<T> {
1933
pgpool: PgPool,
2034
allocation_id: Address,
2135
sender: Address,
2236
escrow_accounts: Receiver<EscrowAccounts>,
37+
_phantom: PhantomData<T>,
2338
}
2439

25-
impl TapAgentContext {
40+
impl<T: ReceiptType> TapAgentContext<T> {
2641
pub fn new(
2742
pgpool: PgPool,
2843
allocation_id: Address,
@@ -34,6 +49,7 @@ impl TapAgentContext {
3449
allocation_id,
3550
sender,
3651
escrow_accounts,
52+
_phantom: PhantomData,
3753
}
3854
}
3955
}

crates/tap-agent/src/tap/context/escrow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use async_trait::async_trait;
55
use tap_core::manager::adapters::SignatureChecker;
66
use thegraph_core::alloy::primitives::Address;
77

8-
use super::{error::AdapterError, TapAgentContext};
8+
use super::{error::AdapterError, ReceiptType, TapAgentContext};
99

1010
// Conversion from eventuals::error::Closed to AdapterError::EscrowEventualError
1111
impl From<eventuals::error::Closed> for AdapterError {
@@ -22,7 +22,7 @@ impl From<eventuals::error::Closed> for AdapterError {
2222
// In any case, we don't want to fail a receipt because of this.
2323
// The receipt is fine, just the escrow account that is not.
2424
#[async_trait]
25-
impl SignatureChecker for TapAgentContext {
25+
impl<T: ReceiptType + Send + Sync> SignatureChecker for TapAgentContext<T> {
2626
type AdapterError = AdapterError;
2727

2828
async fn verify_signer(&self, signer: Address) -> Result<bool, Self::AdapterError> {

crates/tap-agent/src/tap/context/rav.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ use tap_graph::{ReceiptAggregateVoucher, SignedRav};
1414
use thegraph_core::alloy::signers::Signature;
1515
use thegraph_core::alloy::{hex::ToHexExt, primitives::Address};
1616

17-
use super::{error::AdapterError, TapAgentContext};
17+
use super::{error::AdapterError, Horizon, Legacy, TapAgentContext};
1818

1919
#[async_trait::async_trait]
20-
impl RavRead<ReceiptAggregateVoucher> for TapAgentContext {
20+
impl RavRead<ReceiptAggregateVoucher> for TapAgentContext<Legacy> {
2121
type AdapterError = AdapterError;
2222

2323
async fn last_rav(&self) -> Result<Option<SignedRav>, Self::AdapterError> {
@@ -87,7 +87,7 @@ impl RavRead<ReceiptAggregateVoucher> for TapAgentContext {
8787
}
8888

8989
#[async_trait::async_trait]
90-
impl RavStore<ReceiptAggregateVoucher> for TapAgentContext {
90+
impl RavStore<ReceiptAggregateVoucher> for TapAgentContext<Legacy> {
9191
type AdapterError = AdapterError;
9292

9393
async fn update_last_rav(&self, rav: SignedRav) -> Result<(), Self::AdapterError> {
@@ -129,6 +129,27 @@ impl RavStore<ReceiptAggregateVoucher> for TapAgentContext {
129129
}
130130
}
131131

132+
#[async_trait::async_trait]
133+
impl RavRead<tap_graph::v2::ReceiptAggregateVoucher> for TapAgentContext<Horizon> {
134+
type AdapterError = AdapterError;
135+
136+
async fn last_rav(&self) -> Result<Option<tap_graph::v2::SignedRav>, Self::AdapterError> {
137+
unimplemented!()
138+
}
139+
}
140+
141+
#[async_trait::async_trait]
142+
impl RavStore<tap_graph::v2::ReceiptAggregateVoucher> for TapAgentContext<Horizon> {
143+
type AdapterError = AdapterError;
144+
145+
async fn update_last_rav(
146+
&self,
147+
_rav: tap_graph::v2::SignedRav,
148+
) -> Result<(), Self::AdapterError> {
149+
unimplemented!()
150+
}
151+
}
152+
132153
#[cfg(test)]
133154
mod test {
134155
use indexer_monitor::EscrowAccounts;

0 commit comments

Comments
 (0)