Skip to content

Commit cb95e9a

Browse files
committed
refactor: use typed builder for state and args
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 7be8db8 commit cb95e9a

File tree

8 files changed

+187
-175
lines changed

8 files changed

+187
-175
lines changed

crates/tap-agent/src/agent.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,17 +76,16 @@ pub async fn start_agent(
7676

7777
let config = SenderAccountConfig::from_config(&config);
7878

79-
let args = SenderAccountsManagerArgs {
80-
config,
81-
domain_separator,
82-
pgpool,
83-
indexer_allocations,
84-
escrow_accounts,
85-
escrow_subgraph,
86-
network_subgraph,
87-
sender_aggregator_endpoints: sender_aggregator_endpoints.clone(),
88-
prefix: None,
89-
};
79+
let args = SenderAccountsManagerArgs::builder()
80+
.config(config)
81+
.domain_separator(domain_separator)
82+
.pgpool(pgpool)
83+
.indexer_allocations(indexer_allocations)
84+
.escrow_accounts(escrow_accounts)
85+
.network_subgraph(network_subgraph)
86+
.escrow_subgraph(escrow_subgraph)
87+
.sender_aggregator_endpoints(sender_aggregator_endpoints.clone())
88+
.build();
9089

9190
SenderAccountsManager::spawn(None, SenderAccountsManager, args)
9291
.await

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::{
2424
use tap_core::rav::SignedRAV;
2525
use tokio::sync::watch::Receiver;
2626
use tracing::{error, warn, Level};
27+
use typed_builder::TypedBuilder;
2728

2829
use crate::{
2930
adaptative_concurrency::AdaptiveLimiter,
@@ -35,8 +36,7 @@ use crate::{
3536
sender_allocation::SenderAllocationMessage,
3637
unaggregated_receipts::UnaggregatedReceipts,
3738
},
38-
backoff::BackoffInfo,
39-
tracker::{SenderFeeTracker, SimpleFeeTracker},
39+
tracker::SenderFeeTracker,
4040
};
4141

4242
mod state;
@@ -72,21 +72,22 @@ pub enum SenderAccountMessage {
7272
IsSchedulerEnabled(ractor::RpcReplyPort<bool>),
7373
}
7474

75+
#[derive(TypedBuilder)]
7576
pub struct SenderAccountArgs {
76-
pub config: &'static SenderAccountConfig,
77-
78-
pub pgpool: PgPool,
79-
pub sender_id: Address,
80-
pub escrow_accounts: Receiver<EscrowAccounts>,
81-
pub indexer_allocations: Receiver<HashSet<Address>>,
82-
pub escrow_subgraph: &'static SubgraphClient,
83-
pub network_subgraph: &'static SubgraphClient,
84-
pub domain_separator: Eip712Domain,
85-
pub sender_aggregator_endpoint: Url,
86-
pub allocation_ids: HashSet<Address>,
87-
pub prefix: Option<String>,
88-
89-
pub retry_interval: Duration,
77+
config: &'static SenderAccountConfig,
78+
79+
pgpool: PgPool,
80+
sender_id: Address,
81+
escrow_accounts: Receiver<EscrowAccounts>,
82+
indexer_allocations: Receiver<HashSet<Address>>,
83+
escrow_subgraph: &'static SubgraphClient,
84+
network_subgraph: &'static SubgraphClient,
85+
domain_separator: Eip712Domain,
86+
sender_aggregator_endpoint: Url,
87+
allocation_ids: HashSet<Address>,
88+
prefix: Option<String>,
89+
90+
retry_interval: Duration,
9091
}
9192

9293
pub struct SenderAccountConfig {
@@ -290,29 +291,25 @@ impl Actor for SenderAccount {
290291
.request_timeout(config.rav_request_timeout)
291292
.build(&sender_aggregator_endpoint)?;
292293

293-
let state = State {
294-
prefix,
295-
sender_fee_tracker: SenderFeeTracker::new(config.rav_request_buffer),
296-
rav_tracker: SimpleFeeTracker::default(),
297-
invalid_receipts_tracker: SimpleFeeTracker::default(),
298-
allocation_ids: allocation_ids.clone(),
299-
_indexer_allocations_handle,
300-
_escrow_account_monitor,
301-
scheduled_rav_request: None,
302-
sender: sender_id,
303-
denied,
304-
sender_balance,
305-
retry_interval,
306-
adaptive_limiter: AdaptiveLimiter::new(INITIAL_RAV_REQUEST_CONCURRENT, 1..50),
307-
escrow_accounts,
308-
escrow_subgraph,
309-
network_subgraph,
310-
domain_separator,
311-
pgpool,
312-
sender_aggregator,
313-
backoff_info: BackoffInfo::default(),
314-
config,
315-
};
294+
let state = State::builder()
295+
.prefix(prefix)
296+
.sender_fee_tracker(SenderFeeTracker::new(config.rav_request_buffer))
297+
.allocation_ids(allocation_ids.clone())
298+
._indexer_allocations_handle(_indexer_allocations_handle)
299+
._escrow_account_monitor(_escrow_account_monitor)
300+
.sender(sender_id)
301+
.denied(denied)
302+
.sender_balance(sender_balance)
303+
.retry_interval(retry_interval)
304+
.adaptive_limiter(AdaptiveLimiter::new(INITIAL_RAV_REQUEST_CONCURRENT, 1..50))
305+
.escrow_accounts(escrow_accounts)
306+
.escrow_subgraph(escrow_subgraph)
307+
.network_subgraph(network_subgraph)
308+
.domain_separator(domain_separator)
309+
.pgpool(pgpool)
310+
.sender_aggregator(sender_aggregator)
311+
.config(config)
312+
.build();
316313

317314
stream::iter(allocation_ids)
318315
// Create a sender allocation for each allocation

crates/tap-agent/src/agent/sender_account/state.rs

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::{collections::HashSet, str::FromStr, time::Duration};
1616
use tap_core::rav::SignedRAV;
1717
use tokio::{sync::watch::Receiver, task::JoinHandle};
1818
use tracing::error;
19+
use typed_builder::TypedBuilder;
1920

2021
use crate::{
2122
adaptative_concurrency::AdaptiveLimiter,
@@ -34,14 +35,18 @@ use super::{
3435
SenderAccountConfig, SenderAccountMessage, PENDING_RAV, SENDER_FEE_TRACKER, UNAGGREGATED_FEES,
3536
};
3637

38+
#[derive(TypedBuilder)]
3739
pub struct State {
38-
pub(super) prefix: Option<String>,
40+
prefix: Option<String>,
3941
pub(super) sender_fee_tracker: SenderFeeTracker,
42+
#[builder(default)]
4043
pub(super) rav_tracker: SimpleFeeTracker,
44+
#[builder(default)]
4145
pub(super) invalid_receipts_tracker: SimpleFeeTracker,
4246
pub(super) allocation_ids: HashSet<Address>,
43-
pub(super) _indexer_allocations_handle: JoinHandle<()>,
44-
pub(super) _escrow_account_monitor: JoinHandle<()>,
47+
_indexer_allocations_handle: JoinHandle<()>,
48+
_escrow_account_monitor: JoinHandle<()>,
49+
#[builder(default)]
4550
pub(super) scheduled_rav_request:
4651
Option<JoinHandle<Result<(), MessagingErr<SenderAccountMessage>>>>,
4752

@@ -56,16 +61,17 @@ pub struct State {
5661
pub(super) adaptive_limiter: AdaptiveLimiter,
5762

5863
// Receivers
59-
pub(super) escrow_accounts: Receiver<EscrowAccounts>,
64+
escrow_accounts: Receiver<EscrowAccounts>,
6065

61-
pub(super) escrow_subgraph: &'static SubgraphClient,
62-
pub(super) network_subgraph: &'static SubgraphClient,
66+
escrow_subgraph: &'static SubgraphClient,
67+
network_subgraph: &'static SubgraphClient,
6368

64-
pub(super) domain_separator: Eip712Domain,
69+
domain_separator: Eip712Domain,
6570
pub(super) pgpool: PgPool,
66-
pub(super) sender_aggregator: jsonrpsee::http_client::HttpClient,
71+
sender_aggregator: jsonrpsee::http_client::HttpClient,
6772

6873
// Backoff info
74+
#[builder(default)]
6975
pub(super) backoff_info: BackoffInfo,
7076

7177
// Config
@@ -83,17 +89,17 @@ impl State {
8389
%allocation_id,
8490
"SenderAccount is creating allocation."
8591
);
86-
let args = SenderAllocationArgs {
87-
pgpool: self.pgpool.clone(),
88-
allocation_id,
89-
sender: self.sender,
90-
escrow_accounts: self.escrow_accounts.clone(),
91-
escrow_subgraph: self.escrow_subgraph,
92-
domain_separator: self.domain_separator.clone(),
93-
sender_account_ref: sender_account_ref.clone(),
94-
sender_aggregator: self.sender_aggregator.clone(),
95-
config: AllocationConfig::from_sender_config(self.config),
96-
};
92+
let args = SenderAllocationArgs::builder()
93+
.pgpool(self.pgpool.clone())
94+
.allocation_id(allocation_id)
95+
.sender(self.sender)
96+
.escrow_accounts(self.escrow_accounts.clone())
97+
.escrow_subgraph(self.escrow_subgraph)
98+
.domain_separator(self.domain_separator.clone())
99+
.sender_account_ref(sender_account_ref.clone())
100+
.sender_aggregator(self.sender_aggregator.clone())
101+
.config(AllocationConfig::from_sender_config(self.config))
102+
.build();
97103

98104
SenderAllocation::spawn_linked(
99105
Some(self.format_sender_allocation(&allocation_id)),

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use tokio::{
2020
sync::watch::{self, Receiver},
2121
};
2222
use tracing::error;
23+
use typed_builder::TypedBuilder;
2324

2425
mod receipt_watcher;
2526
mod state;
@@ -42,19 +43,22 @@ pub enum SenderAccountsManagerMessage {
4243
UpdateSenderAccounts(HashSet<Address>),
4344
}
4445

46+
#[derive(TypedBuilder)]
4547
pub struct SenderAccountsManagerArgs {
46-
pub config: &'static SenderAccountConfig,
47-
pub domain_separator: Eip712Domain,
48-
49-
pub pgpool: PgPool,
50-
pub indexer_allocations: Receiver<HashMap<Address, Allocation>>,
51-
pub escrow_accounts: Receiver<EscrowAccounts>,
52-
pub escrow_subgraph: &'static SubgraphClient,
53-
pub network_subgraph: &'static SubgraphClient,
54-
pub sender_aggregator_endpoints: HashMap<Address, Url>,
55-
56-
pub prefix: Option<String>,
48+
config: &'static SenderAccountConfig,
49+
domain_separator: Eip712Domain,
50+
51+
pgpool: PgPool,
52+
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
53+
escrow_accounts: Receiver<EscrowAccounts>,
54+
escrow_subgraph: &'static SubgraphClient,
55+
network_subgraph: &'static SubgraphClient,
56+
sender_aggregator_endpoints: HashMap<Address, Url>,
57+
58+
#[builder(default)]
59+
prefix: Option<String>,
5760
}
61+
5862
#[async_trait::async_trait]
5963
impl Actor for SenderAccountsManager {
6064
type Msg = SenderAccountsManagerMessage;
@@ -105,20 +109,18 @@ impl Actor for SenderAccountsManager {
105109
async {}
106110
});
107111

108-
let mut state = State {
109-
config,
110-
domain_separator,
111-
sender_ids: HashSet::new(),
112-
new_receipts_watcher_handle: None,
113-
_eligible_allocations_senders_handle,
114-
pgpool,
115-
indexer_allocations: allocations_rx,
116-
escrow_accounts: escrow_accounts.clone(),
117-
escrow_subgraph,
118-
network_subgraph,
119-
sender_aggregator_endpoints,
120-
prefix: prefix.clone(),
121-
};
112+
let mut state = State::builder()
113+
.config(config)
114+
.domain_separator(domain_separator)
115+
.sender_aggregator_endpoints(sender_aggregator_endpoints)
116+
.indexer_allocations(allocations_rx)
117+
.pgpool(pgpool)
118+
._eligible_allocations_senders_handle(_eligible_allocations_senders_handle)
119+
.escrow_subgraph(escrow_subgraph)
120+
.network_subgraph(network_subgraph)
121+
.escrow_accounts(escrow_accounts.clone())
122+
.prefix(prefix.clone())
123+
.build();
122124
let sender_allocation = select! {
123125
sender_allocation = state.get_pending_sender_allocation_id() => sender_allocation,
124126
_ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {

crates/tap-agent/src/agent/sender_accounts_manager/state.rs

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,27 @@ use reqwest::Url;
1616
use sqlx::PgPool;
1717
use tokio::sync::watch::Receiver;
1818
use tracing::{error, warn};
19+
use typed_builder::TypedBuilder;
1920

21+
#[derive(TypedBuilder)]
2022
pub struct State {
21-
pub(super) sender_ids: HashSet<Address>,
22-
pub(super) new_receipts_watcher_handle: Option<tokio::task::JoinHandle<()>>,
23-
pub(super) _eligible_allocations_senders_handle: JoinHandle<()>,
24-
25-
pub(super) config: &'static SenderAccountConfig,
26-
pub(super) domain_separator: Eip712Domain,
27-
pub(super) pgpool: PgPool,
28-
pub(super) indexer_allocations: Receiver<HashSet<Address>>,
29-
pub(super) escrow_accounts: Receiver<EscrowAccounts>,
30-
pub(super) escrow_subgraph: &'static SubgraphClient,
31-
pub(super) network_subgraph: &'static SubgraphClient,
32-
pub(super) sender_aggregator_endpoints: HashMap<Address, Url>,
33-
pub(super) prefix: Option<String>,
23+
#[builder(default)]
24+
pub sender_ids: HashSet<Address>,
25+
26+
#[builder(default)]
27+
pub new_receipts_watcher_handle: Option<tokio::task::JoinHandle<()>>,
28+
_eligible_allocations_senders_handle: JoinHandle<()>,
29+
30+
config: &'static SenderAccountConfig,
31+
domain_separator: Eip712Domain,
32+
pgpool: PgPool,
33+
indexer_allocations: Receiver<HashSet<Address>>,
34+
escrow_accounts: Receiver<EscrowAccounts>,
35+
escrow_subgraph: &'static SubgraphClient,
36+
network_subgraph: &'static SubgraphClient,
37+
sender_aggregator_endpoints: HashMap<Address, Url>,
38+
#[builder(default)]
39+
prefix: Option<String>,
3440
}
3541

3642
impl State {
@@ -201,26 +207,27 @@ impl State {
201207
sender_id: &Address,
202208
allocation_ids: HashSet<Address>,
203209
) -> Result<SenderAccountArgs> {
204-
Ok(SenderAccountArgs {
205-
config: self.config,
206-
pgpool: self.pgpool.clone(),
207-
sender_id: *sender_id,
208-
escrow_accounts: self.escrow_accounts.clone(),
209-
indexer_allocations: self.indexer_allocations.clone(),
210-
escrow_subgraph: self.escrow_subgraph,
211-
network_subgraph: self.network_subgraph,
212-
domain_separator: self.domain_separator.clone(),
213-
sender_aggregator_endpoint: self
214-
.sender_aggregator_endpoints
215-
.get(sender_id)
216-
.ok_or(anyhow!(
217-
"No sender_aggregator_endpoints found for sender {}",
218-
sender_id
219-
))?
220-
.clone(),
221-
allocation_ids,
222-
prefix: self.prefix.clone(),
223-
retry_interval: Duration::from_secs(30),
224-
})
210+
Ok(SenderAccountArgs::builder()
211+
.config(self.config)
212+
.pgpool(self.pgpool.clone())
213+
.sender_id(*sender_id)
214+
.escrow_accounts(self.escrow_accounts.clone())
215+
.indexer_allocations(self.indexer_allocations.clone())
216+
.escrow_subgraph(self.escrow_subgraph)
217+
.network_subgraph(self.network_subgraph)
218+
.domain_separator(self.domain_separator.clone())
219+
.sender_aggregator_endpoint(
220+
self.sender_aggregator_endpoints
221+
.get(sender_id)
222+
.ok_or(anyhow!(
223+
"No sender_aggregator_endpoints found for sender {}",
224+
sender_id
225+
))?
226+
.clone(),
227+
)
228+
.allocation_ids(allocation_ids)
229+
.prefix(self.prefix.clone())
230+
.retry_interval(Duration::from_secs(30))
231+
.build())
225232
}
226233
}

0 commit comments

Comments
 (0)