Skip to content

Commit 37c6c30

Browse files
committed
refactor: split Legacy and Horizon allocation id types
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 4bf596e commit 37c6c30

10 files changed

+256
-205
lines changed

.sqlx/query-3498f286395489d0bd96e89485aa91792724b625cc7b41081d8d0d523ed3e977.json

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-8fe28629e2453852a41abd452ed519167d3f358d25aa02f306779270a084f8c3.json

Lines changed: 0 additions & 26 deletions
This file was deleted.

.sqlx/query-dec31dcc0a429e4cf55be46bcd7129c11c694ed1cb3ca50fa66b021b4d078e9d.json

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-fef5849dc64c15d1188d6398c93a59bfaafd9a38cf342739cdabf8b7bba073d3.json

Lines changed: 0 additions & 26 deletions
This file was deleted.

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 59 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ use tokio::{sync::watch::Receiver, task::JoinHandle};
3434
use tonic::transport::{Channel, Endpoint};
3535
use tracing::Level;
3636

37-
use super::sender_allocation::{
38-
AllocationConfig, SenderAllocation, SenderAllocationArgs, SenderAllocationMessage,
37+
use super::{
38+
sender_accounts_manager::AllocationId,
39+
sender_allocation::{
40+
AllocationConfig, SenderAllocation, SenderAllocationArgs, SenderAllocationMessage,
41+
},
3942
};
4043
use crate::{
4144
adaptative_concurrency::AdaptiveLimiter,
@@ -141,8 +144,8 @@ pub enum ReceiptFees {
141144
#[derive(Debug)]
142145
pub enum SenderAccountMessage {
143146
UpdateBalanceAndLastRavs(Balance, RavMap),
144-
UpdateAllocationIds(HashSet<Address>),
145-
NewAllocationId(Address),
147+
UpdateAllocationIds(HashSet<AllocationId>),
148+
NewAllocationId(AllocationId),
146149
UpdateReceiptFees(Address, ReceiptFees),
147150
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
148151
UpdateRav(RavInformation),
@@ -171,12 +174,12 @@ pub struct SenderAccountArgs {
171174
pub pgpool: PgPool,
172175
pub sender_id: Address,
173176
pub escrow_accounts: Receiver<EscrowAccounts>,
174-
pub indexer_allocations: Receiver<HashSet<Address>>,
177+
pub indexer_allocations: Receiver<HashSet<AllocationId>>,
175178
pub escrow_subgraph: &'static SubgraphClient,
176179
pub network_subgraph: &'static SubgraphClient,
177180
pub domain_separator: Eip712Domain,
178181
pub sender_aggregator_endpoint: Url,
179-
pub allocation_ids: HashSet<Address>,
182+
pub allocation_ids: HashSet<AllocationId>,
180183
pub prefix: Option<String>,
181184

182185
pub retry_interval: Duration,
@@ -186,7 +189,7 @@ pub struct State {
186189
sender_fee_tracker: SenderFeeTracker,
187190
rav_tracker: SimpleFeeTracker,
188191
invalid_receipts_tracker: SimpleFeeTracker,
189-
allocation_ids: HashSet<Address>,
192+
allocation_ids: HashSet<AllocationId>,
190193
_indexer_allocations_handle: JoinHandle<()>,
191194
_escrow_account_monitor: JoinHandle<()>,
192195
scheduled_rav_request: Option<JoinHandle<Result<(), MessagingErr<SenderAccountMessage>>>>,
@@ -256,20 +259,19 @@ impl State {
256259
async fn create_sender_allocation(
257260
&self,
258261
sender_account_ref: ActorRef<SenderAccountMessage>,
259-
allocation_id: Address,
260-
allocation_type: AllocationType,
262+
allocation_id: AllocationId,
261263
) -> anyhow::Result<()> {
262264
tracing::trace!(
263265
%self.sender,
264266
%allocation_id,
265267
"SenderAccount is creating allocation."
266268
);
267269

268-
match allocation_type {
269-
AllocationType::Legacy => {
270+
match allocation_id {
271+
AllocationId::Legacy(id) => {
270272
let args = SenderAllocationArgs::builder()
271273
.pgpool(self.pgpool.clone())
272-
.allocation_id(allocation_id)
274+
.allocation_id(id)
273275
.sender(self.sender)
274276
.escrow_accounts(self.escrow_accounts.clone())
275277
.escrow_subgraph(self.escrow_subgraph)
@@ -279,17 +281,17 @@ impl State {
279281
.config(AllocationConfig::from_sender_config(self.config))
280282
.build();
281283
SenderAllocation::<Legacy>::spawn_linked(
282-
Some(self.format_sender_allocation(&allocation_id)),
284+
Some(self.format_sender_allocation(&id)),
283285
SenderAllocation::default(),
284286
args,
285287
sender_account_ref.get_cell(),
286288
)
287289
.await?;
288290
}
289-
AllocationType::Horizon => {
291+
AllocationId::Horizon(id) => {
290292
let args = SenderAllocationArgs::builder()
291293
.pgpool(self.pgpool.clone())
292-
.allocation_id(allocation_id)
294+
.allocation_id(id)
293295
.sender(self.sender)
294296
.escrow_accounts(self.escrow_accounts.clone())
295297
.escrow_subgraph(self.escrow_subgraph)
@@ -300,7 +302,7 @@ impl State {
300302
.build();
301303

302304
SenderAllocation::<Horizon>::spawn_linked(
303-
Some(self.format_sender_allocation(&allocation_id)),
305+
Some(self.format_sender_allocation(&id)),
304306
SenderAllocation::default(),
305307
args,
306308
sender_account_ref.get_cell(),
@@ -478,11 +480,13 @@ impl State {
478480
/// if they are really closed
479481
async fn check_closed_allocations(
480482
&self,
481-
allocation_ids: HashSet<&Address>,
482-
) -> anyhow::Result<HashSet<Address>> {
483+
allocation_ids: HashSet<&AllocationId>,
484+
) -> anyhow::Result<HashSet<AllocationId>> {
483485
if allocation_ids.is_empty() {
484486
return Ok(HashSet::new());
485487
}
488+
// We don't need to check what type of allocation it is since
489+
// legacy allocation ids can't be reused for horizon
486490
let allocation_ids: Vec<String> = allocation_ids
487491
.into_iter()
488492
.map(|addr| addr.to_string().to_lowercase())
@@ -522,8 +526,8 @@ impl State {
522526
}
523527
Ok(responses
524528
.into_iter()
525-
.map(|allocation| Address::from_str(&allocation.id))
526-
.collect::<Result<HashSet<Address>, _>>()?)
529+
.map(|allocation| Address::from_str(&allocation.id).map(AllocationId::Legacy))
530+
.collect::<Result<HashSet<_>, _>>()?)
527531
}
528532
}
529533

@@ -726,13 +730,7 @@ impl Actor for SenderAccount {
726730

727731
stream::iter(allocation_ids)
728732
// Create a sender allocation for each allocation
729-
.map(|allocation_id| {
730-
state.create_sender_allocation(
731-
myself.clone(),
732-
allocation_id,
733-
AllocationType::Legacy,
734-
)
735-
})
733+
.map(|allocation_id| state.create_sender_allocation(myself.clone(), allocation_id))
736734
.buffer_unordered(10) // Limit concurrency to 10 allocations at a time
737735
.collect::<Vec<anyhow::Result<()>>>()
738736
.await
@@ -908,11 +906,7 @@ impl Actor for SenderAccount {
908906
let mut new_allocation_ids = state.allocation_ids.clone();
909907
for allocation_id in allocation_ids.difference(&state.allocation_ids) {
910908
if let Err(error) = state
911-
.create_sender_allocation(
912-
myself.clone(),
913-
*allocation_id,
914-
AllocationType::Legacy,
915-
)
909+
.create_sender_allocation(myself.clone(), *allocation_id)
916910
.await
917911
{
918912
tracing::error!(
@@ -940,12 +934,14 @@ impl Actor for SenderAccount {
940934
for allocation_id in possibly_closed_allocations {
941935
if really_closed.contains(allocation_id) {
942936
if let Some(sender_handle) = ActorRef::<SenderAllocationMessage>::where_is(
943-
state.format_sender_allocation(allocation_id),
937+
state.format_sender_allocation(&allocation_id.address()),
944938
) {
945939
tracing::trace!(%allocation_id, "SenderAccount shutting down SenderAllocation");
946940
// we can not send a rav request to this allocation
947941
// because it's gonna trigger the last rav
948-
state.sender_fee_tracker.block_allocation_id(*allocation_id);
942+
state
943+
.sender_fee_tracker
944+
.block_allocation_id(allocation_id.address());
949945
sender_handle.stop(None);
950946
new_allocation_ids.remove(allocation_id);
951947
}
@@ -963,7 +959,7 @@ impl Actor for SenderAccount {
963959
}
964960
SenderAccountMessage::NewAllocationId(allocation_id) => {
965961
if let Err(error) = state
966-
.create_sender_allocation(myself.clone(), allocation_id, AllocationType::Legacy)
962+
.create_sender_allocation(myself.clone(), allocation_id)
967963
.await
968964
{
969965
tracing::error!(
@@ -985,6 +981,9 @@ impl Actor for SenderAccount {
985981

986982
let active_allocation_ids = state
987983
.allocation_ids
984+
.iter()
985+
.map(|id| id.address())
986+
.collect::<HashSet<_>>()
988987
.union(&non_final_last_ravs_set)
989988
.cloned()
990989
.collect::<HashSet<_>>();
@@ -1104,9 +1103,17 @@ impl Actor for SenderAccount {
11041103
tracing::error!(%allocation_id, "Could not convert allocation_id to Address");
11051104
return Ok(());
11061105
};
1106+
let Some(allocation_id) = state
1107+
.allocation_ids
1108+
.iter()
1109+
.find(|id| id.address() == allocation_id)
1110+
else {
1111+
tracing::error!(%allocation_id, "Could not get allocation id type from state");
1112+
return Ok(());
1113+
};
11071114

11081115
if let Err(error) = state
1109-
.create_sender_allocation(myself.clone(), allocation_id, AllocationType::Legacy)
1116+
.create_sender_allocation(myself.clone(), *allocation_id)
11101117
.await
11111118
{
11121119
tracing::error!(
@@ -1162,7 +1169,8 @@ pub mod tests {
11621169
use super::SenderAccountMessage;
11631170
use crate::{
11641171
agent::{
1165-
sender_account::ReceiptFees, sender_allocation::SenderAllocationMessage,
1172+
sender_account::ReceiptFees, sender_accounts_manager::AllocationId,
1173+
sender_allocation::SenderAllocationMessage,
11661174
unaggregated_receipts::UnaggregatedReceipts,
11671175
},
11681176
assert_not_triggered, assert_triggered,
@@ -1276,7 +1284,9 @@ pub mod tests {
12761284
// we expect it to create a sender allocation
12771285
sender_account
12781286
.cast(SenderAccountMessage::UpdateAllocationIds(
1279-
vec![ALLOCATION_ID_0].into_iter().collect(),
1287+
vec![AllocationId::Legacy(ALLOCATION_ID_0)]
1288+
.into_iter()
1289+
.collect(),
12801290
))
12811291
.unwrap();
12821292
notify.notified().await;
@@ -1361,7 +1371,9 @@ pub mod tests {
13611371

13621372
// we expect it to create a sender allocation
13631373
sender_account
1364-
.cast(SenderAccountMessage::NewAllocationId(ALLOCATION_ID_0))
1374+
.cast(SenderAccountMessage::NewAllocationId(AllocationId::Legacy(
1375+
ALLOCATION_ID_0,
1376+
)))
13651377
.unwrap();
13661378

13671379
flush_messages(&notify).await;
@@ -1374,7 +1386,9 @@ pub mod tests {
13741386
// nothing should change because we already created
13751387
sender_account
13761388
.cast(SenderAccountMessage::UpdateAllocationIds(
1377-
vec![ALLOCATION_ID_0].into_iter().collect(),
1389+
vec![AllocationId::Legacy(ALLOCATION_ID_0)]
1390+
.into_iter()
1391+
.collect(),
13781392
))
13791393
.unwrap();
13801394

@@ -1556,7 +1570,11 @@ pub mod tests {
15561570
) {
15571571
let (sender_account, _, prefix, _) = create_sender_account()
15581572
.pgpool(pgpool)
1559-
.initial_allocation(vec![ALLOCATION_ID_0].into_iter().collect())
1573+
.initial_allocation(
1574+
vec![AllocationId::Legacy(ALLOCATION_ID_0)]
1575+
.into_iter()
1576+
.collect(),
1577+
)
15601578
.escrow_subgraph_endpoint(&mock_escrow_subgraph.uri())
15611579
.call()
15621580
.await;

0 commit comments

Comments
 (0)