Skip to content

Commit 4e62e7f

Browse files
committed
refactor: split Legacy and Horizon allocation id types
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 4bf596e commit 4e62e7f

10 files changed

+205
-162
lines changed

.sqlx/query-3498f286395489d0bd96e89485aa91792724b625cc7b41081d8d0d523ed3e977.json

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-8fe28629e2453852a41abd452ed519167d3f358d25aa02f306779270a084f8c3.json

Lines changed: 0 additions & 26 deletions
This file was deleted.

.sqlx/query-dec31dcc0a429e4cf55be46bcd7129c11c694ed1cb3ca50fa66b021b4d078e9d.json

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-fef5849dc64c15d1188d6398c93a59bfaafd9a38cf342739cdabf8b7bba073d3.json

Lines changed: 0 additions & 26 deletions
This file was deleted.

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 59 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ use tokio::{sync::watch::Receiver, task::JoinHandle};
3434
use tonic::transport::{Channel, Endpoint};
3535
use tracing::Level;
3636

37-
use super::sender_allocation::{
38-
AllocationConfig, SenderAllocation, SenderAllocationArgs, SenderAllocationMessage,
37+
use super::{
38+
sender_accounts_manager::AllocationId,
39+
sender_allocation::{
40+
AllocationConfig, SenderAllocation, SenderAllocationArgs, SenderAllocationMessage,
41+
},
3942
};
4043
use crate::{
4144
adaptative_concurrency::AdaptiveLimiter,
@@ -141,8 +144,8 @@ pub enum ReceiptFees {
141144
#[derive(Debug)]
142145
pub enum SenderAccountMessage {
143146
UpdateBalanceAndLastRavs(Balance, RavMap),
144-
UpdateAllocationIds(HashSet<Address>),
145-
NewAllocationId(Address),
147+
UpdateAllocationIds(HashSet<AllocationId>),
148+
NewAllocationId(AllocationId),
146149
UpdateReceiptFees(Address, ReceiptFees),
147150
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
148151
UpdateRav(RavInformation),
@@ -171,12 +174,12 @@ pub struct SenderAccountArgs {
171174
pub pgpool: PgPool,
172175
pub sender_id: Address,
173176
pub escrow_accounts: Receiver<EscrowAccounts>,
174-
pub indexer_allocations: Receiver<HashSet<Address>>,
177+
pub indexer_allocations: Receiver<HashSet<AllocationId>>,
175178
pub escrow_subgraph: &'static SubgraphClient,
176179
pub network_subgraph: &'static SubgraphClient,
177180
pub domain_separator: Eip712Domain,
178181
pub sender_aggregator_endpoint: Url,
179-
pub allocation_ids: HashSet<Address>,
182+
pub allocation_ids: HashSet<AllocationId>,
180183
pub prefix: Option<String>,
181184

182185
pub retry_interval: Duration,
@@ -186,7 +189,7 @@ pub struct State {
186189
sender_fee_tracker: SenderFeeTracker,
187190
rav_tracker: SimpleFeeTracker,
188191
invalid_receipts_tracker: SimpleFeeTracker,
189-
allocation_ids: HashSet<Address>,
192+
allocation_ids: HashSet<AllocationId>,
190193
_indexer_allocations_handle: JoinHandle<()>,
191194
_escrow_account_monitor: JoinHandle<()>,
192195
scheduled_rav_request: Option<JoinHandle<Result<(), MessagingErr<SenderAccountMessage>>>>,
@@ -247,29 +250,23 @@ impl SenderAccountConfig {
247250
}
248251
}
249252

250-
pub enum AllocationType {
251-
Legacy,
252-
Horizon,
253-
}
254-
255253
impl State {
256254
async fn create_sender_allocation(
257255
&self,
258256
sender_account_ref: ActorRef<SenderAccountMessage>,
259-
allocation_id: Address,
260-
allocation_type: AllocationType,
257+
allocation_id: AllocationId,
261258
) -> anyhow::Result<()> {
262259
tracing::trace!(
263260
%self.sender,
264261
%allocation_id,
265262
"SenderAccount is creating allocation."
266263
);
267264

268-
match allocation_type {
269-
AllocationType::Legacy => {
265+
match allocation_id {
266+
AllocationId::Legacy(id) => {
270267
let args = SenderAllocationArgs::builder()
271268
.pgpool(self.pgpool.clone())
272-
.allocation_id(allocation_id)
269+
.allocation_id(id)
273270
.sender(self.sender)
274271
.escrow_accounts(self.escrow_accounts.clone())
275272
.escrow_subgraph(self.escrow_subgraph)
@@ -279,17 +276,17 @@ impl State {
279276
.config(AllocationConfig::from_sender_config(self.config))
280277
.build();
281278
SenderAllocation::<Legacy>::spawn_linked(
282-
Some(self.format_sender_allocation(&allocation_id)),
279+
Some(self.format_sender_allocation(&id)),
283280
SenderAllocation::default(),
284281
args,
285282
sender_account_ref.get_cell(),
286283
)
287284
.await?;
288285
}
289-
AllocationType::Horizon => {
286+
AllocationId::Horizon(id) => {
290287
let args = SenderAllocationArgs::builder()
291288
.pgpool(self.pgpool.clone())
292-
.allocation_id(allocation_id)
289+
.allocation_id(id)
293290
.sender(self.sender)
294291
.escrow_accounts(self.escrow_accounts.clone())
295292
.escrow_subgraph(self.escrow_subgraph)
@@ -300,7 +297,7 @@ impl State {
300297
.build();
301298

302299
SenderAllocation::<Horizon>::spawn_linked(
303-
Some(self.format_sender_allocation(&allocation_id)),
300+
Some(self.format_sender_allocation(&id)),
304301
SenderAllocation::default(),
305302
args,
306303
sender_account_ref.get_cell(),
@@ -478,11 +475,13 @@ impl State {
478475
/// if they are really closed
479476
async fn check_closed_allocations(
480477
&self,
481-
allocation_ids: HashSet<&Address>,
482-
) -> anyhow::Result<HashSet<Address>> {
478+
allocation_ids: HashSet<&AllocationId>,
479+
) -> anyhow::Result<HashSet<AllocationId>> {
483480
if allocation_ids.is_empty() {
484481
return Ok(HashSet::new());
485482
}
483+
// We don't need to check what type of allocation it is since
484+
// legacy allocation ids can't be reused for horizon
486485
let allocation_ids: Vec<String> = allocation_ids
487486
.into_iter()
488487
.map(|addr| addr.to_string().to_lowercase())
@@ -522,8 +521,8 @@ impl State {
522521
}
523522
Ok(responses
524523
.into_iter()
525-
.map(|allocation| Address::from_str(&allocation.id))
526-
.collect::<Result<HashSet<Address>, _>>()?)
524+
.map(|allocation| Address::from_str(&allocation.id).map(AllocationId::Legacy))
525+
.collect::<Result<HashSet<_>, _>>()?)
527526
}
528527
}
529528

@@ -726,13 +725,7 @@ impl Actor for SenderAccount {
726725

727726
stream::iter(allocation_ids)
728727
// Create a sender allocation for each allocation
729-
.map(|allocation_id| {
730-
state.create_sender_allocation(
731-
myself.clone(),
732-
allocation_id,
733-
AllocationType::Legacy,
734-
)
735-
})
728+
.map(|allocation_id| state.create_sender_allocation(myself.clone(), allocation_id))
736729
.buffer_unordered(10) // Limit concurrency to 10 allocations at a time
737730
.collect::<Vec<anyhow::Result<()>>>()
738731
.await
@@ -908,11 +901,7 @@ impl Actor for SenderAccount {
908901
let mut new_allocation_ids = state.allocation_ids.clone();
909902
for allocation_id in allocation_ids.difference(&state.allocation_ids) {
910903
if let Err(error) = state
911-
.create_sender_allocation(
912-
myself.clone(),
913-
*allocation_id,
914-
AllocationType::Legacy,
915-
)
904+
.create_sender_allocation(myself.clone(), *allocation_id)
916905
.await
917906
{
918907
tracing::error!(
@@ -940,12 +929,14 @@ impl Actor for SenderAccount {
940929
for allocation_id in possibly_closed_allocations {
941930
if really_closed.contains(allocation_id) {
942931
if let Some(sender_handle) = ActorRef::<SenderAllocationMessage>::where_is(
943-
state.format_sender_allocation(allocation_id),
932+
state.format_sender_allocation(&allocation_id.address()),
944933
) {
945934
tracing::trace!(%allocation_id, "SenderAccount shutting down SenderAllocation");
946935
// we can not send a rav request to this allocation
947936
// because it's gonna trigger the last rav
948-
state.sender_fee_tracker.block_allocation_id(*allocation_id);
937+
state
938+
.sender_fee_tracker
939+
.block_allocation_id(allocation_id.address());
949940
sender_handle.stop(None);
950941
new_allocation_ids.remove(allocation_id);
951942
}
@@ -963,7 +954,7 @@ impl Actor for SenderAccount {
963954
}
964955
SenderAccountMessage::NewAllocationId(allocation_id) => {
965956
if let Err(error) = state
966-
.create_sender_allocation(myself.clone(), allocation_id, AllocationType::Legacy)
957+
.create_sender_allocation(myself.clone(), allocation_id)
967958
.await
968959
{
969960
tracing::error!(
@@ -985,6 +976,9 @@ impl Actor for SenderAccount {
985976

986977
let active_allocation_ids = state
987978
.allocation_ids
979+
.iter()
980+
.map(|id| id.address())
981+
.collect::<HashSet<_>>()
988982
.union(&non_final_last_ravs_set)
989983
.cloned()
990984
.collect::<HashSet<_>>();
@@ -1104,9 +1098,17 @@ impl Actor for SenderAccount {
11041098
tracing::error!(%allocation_id, "Could not convert allocation_id to Address");
11051099
return Ok(());
11061100
};
1101+
let Some(allocation_id) = state
1102+
.allocation_ids
1103+
.iter()
1104+
.find(|id| id.address() == allocation_id)
1105+
else {
1106+
tracing::error!(%allocation_id, "Could not get allocation id type from state");
1107+
return Ok(());
1108+
};
11071109

11081110
if let Err(error) = state
1109-
.create_sender_allocation(myself.clone(), allocation_id, AllocationType::Legacy)
1111+
.create_sender_allocation(myself.clone(), *allocation_id)
11101112
.await
11111113
{
11121114
tracing::error!(
@@ -1162,7 +1164,8 @@ pub mod tests {
11621164
use super::SenderAccountMessage;
11631165
use crate::{
11641166
agent::{
1165-
sender_account::ReceiptFees, sender_allocation::SenderAllocationMessage,
1167+
sender_account::ReceiptFees, sender_accounts_manager::AllocationId,
1168+
sender_allocation::SenderAllocationMessage,
11661169
unaggregated_receipts::UnaggregatedReceipts,
11671170
},
11681171
assert_not_triggered, assert_triggered,
@@ -1276,7 +1279,9 @@ pub mod tests {
12761279
// we expect it to create a sender allocation
12771280
sender_account
12781281
.cast(SenderAccountMessage::UpdateAllocationIds(
1279-
vec![ALLOCATION_ID_0].into_iter().collect(),
1282+
vec![AllocationId::Legacy(ALLOCATION_ID_0)]
1283+
.into_iter()
1284+
.collect(),
12801285
))
12811286
.unwrap();
12821287
notify.notified().await;
@@ -1361,7 +1366,9 @@ pub mod tests {
13611366

13621367
// we expect it to create a sender allocation
13631368
sender_account
1364-
.cast(SenderAccountMessage::NewAllocationId(ALLOCATION_ID_0))
1369+
.cast(SenderAccountMessage::NewAllocationId(AllocationId::Legacy(
1370+
ALLOCATION_ID_0,
1371+
)))
13651372
.unwrap();
13661373

13671374
flush_messages(&notify).await;
@@ -1374,7 +1381,9 @@ pub mod tests {
13741381
// nothing should change because we already created
13751382
sender_account
13761383
.cast(SenderAccountMessage::UpdateAllocationIds(
1377-
vec![ALLOCATION_ID_0].into_iter().collect(),
1384+
vec![AllocationId::Legacy(ALLOCATION_ID_0)]
1385+
.into_iter()
1386+
.collect(),
13781387
))
13791388
.unwrap();
13801389

@@ -1556,7 +1565,11 @@ pub mod tests {
15561565
) {
15571566
let (sender_account, _, prefix, _) = create_sender_account()
15581567
.pgpool(pgpool)
1559-
.initial_allocation(vec![ALLOCATION_ID_0].into_iter().collect())
1568+
.initial_allocation(
1569+
vec![AllocationId::Legacy(ALLOCATION_ID_0)]
1570+
.into_iter()
1571+
.collect(),
1572+
)
15601573
.escrow_subgraph_endpoint(&mock_escrow_subgraph.uri())
15611574
.call()
15621575
.await;

0 commit comments

Comments
 (0)