Skip to content

Commit 21e61d7

Browse files
committed
feat(tap-agent): Add horizon config flag safety checks
Add conditional initialization of pglistener_v2 and escrow account listener based on horizon_enabled flag, this as an extray safety measure. Improve error messages specificity for v2 components.
1 parent 8593439 commit 21e61d7

File tree

1 file changed

+71
-36
lines changed

1 file changed

+71
-36
lines changed

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 71 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -191,13 +191,20 @@ impl Actor for SenderAccountsManager {
191191
allocation_id
192192
.keys()
193193
.cloned()
194-
// TODO map based on the allocation type returned by the subgraph
194+
// TODO: map based on the allocation type returned by the subgraph
195195
.map(AllocationId::Legacy)
196196
.collect::<HashSet<_>>()
197197
});
198198
// we need two connections because each one will listen to different notify events
199199
let pglistener_v1 = PgListener::connect_with(&pgpool.clone()).await.unwrap();
200-
let pglistener_v2 = PgListener::connect_with(&pgpool.clone()).await.unwrap();
200+
201+
// Extra safety, we don't want to have a listener if horizon is not enabled
202+
let pglistener_v2 = if config.horizon_enabled {
203+
Some(PgListener::connect_with(&pgpool.clone()).await.unwrap())
204+
} else {
205+
None
206+
};
207+
201208
let myself_clone = myself.clone();
202209
let accounts_clone = escrow_accounts_v1.clone();
203210
watch_pipe(accounts_clone, move |escrow_accounts| {
@@ -212,19 +219,23 @@ impl Actor for SenderAccountsManager {
212219
async {}
213220
});
214221

215-
let myself_clone = myself.clone();
216-
let _escrow_accounts_v2 = escrow_accounts_v2.clone();
217-
watch_pipe(_escrow_accounts_v2, move |escrow_accounts| {
218-
let senders = escrow_accounts.get_senders();
219-
myself_clone
220-
.cast(SenderAccountsManagerMessage::UpdateSenderAccountsV2(
221-
senders,
222-
))
223-
.unwrap_or_else(|e| {
224-
tracing::error!("Error while updating sender_accounts v2: {:?}", e);
225-
});
226-
async {}
227-
});
222+
// Extra safety, we don't want to have a
223+
// escrow account listener if horizon is not enabled
224+
if config.horizon_enabled {
225+
let myself_clone = myself.clone();
226+
let _escrow_accounts_v2 = escrow_accounts_v2.clone();
227+
watch_pipe(_escrow_accounts_v2, move |escrow_accounts| {
228+
let senders = escrow_accounts.get_senders();
229+
myself_clone
230+
.cast(SenderAccountsManagerMessage::UpdateSenderAccountsV2(
231+
senders,
232+
))
233+
.unwrap_or_else(|e| {
234+
tracing::error!("Error while updating sender_accounts v2: {:?}", e);
235+
});
236+
async {}
237+
});
238+
}
228239

229240
let mut state = State {
230241
config,
@@ -305,17 +316,18 @@ impl Actor for SenderAccountsManager {
305316
// after starting all senders
306317
state.new_receipts_watcher_handle_v2 = None;
307318

308-
if state.config.horizon_enabled {
319+
// Extra safety, we don't want to have a listener if horizon is not enabled
320+
if let Some(listener_v2) = pglistener_v2 {
309321
state.new_receipts_watcher_handle_v2 = Some(tokio::spawn(
310322
new_receipts_watcher()
311323
.actor_cell(myself.get_cell())
312-
.pglistener(pglistener_v2)
324+
.pglistener(listener_v2)
313325
.escrow_accounts_rx(escrow_accounts_v2)
314326
.sender_type(SenderType::Horizon)
315327
.maybe_prefix(prefix)
316328
.call(),
317329
));
318-
}
330+
};
319331

320332
tracing::info!("SenderAccountManager created!");
321333
Ok(state)
@@ -446,18 +458,41 @@ impl Actor for SenderAccountsManager {
446458
}
447459
};
448460

449-
let mut sender_allocation = select! {
450-
sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation,
451-
_ = tokio::time::sleep(state.config.tap_sender_timeout) => {
452-
tracing::error!("Timeout while getting pending sender allocation ids");
453-
return Ok(());
461+
// Get the sender's allocations taking into account
462+
// the sender type
463+
let allocations = match sender_type {
464+
SenderType::Legacy => {
465+
let mut sender_allocation = select! {
466+
sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation,
467+
_ = tokio::time::sleep(state.config.tap_sender_timeout) => {
468+
tracing::error!("Timeout while getting pending sender allocation ids");
469+
return Ok(());
470+
}
471+
};
472+
sender_allocation
473+
.remove(&sender_id)
474+
.unwrap_or(HashSet::new())
475+
}
476+
SenderType::Horizon => {
477+
if !state.config.horizon_enabled {
478+
tracing::info!(%sender_id, "Horizon sender failed but horizon is disabled, not restarting");
479+
480+
return Ok(());
481+
}
482+
483+
let mut sender_allocation = select! {
484+
sender_allocation = state.get_pending_sender_allocation_id_v2() => sender_allocation,
485+
_ = tokio::time::sleep(state.config.tap_sender_timeout) => {
486+
tracing::error!("Timeout while getting pending V2 sender allocation ids");
487+
return Ok(());
488+
}
489+
};
490+
sender_allocation
491+
.remove(&sender_id)
492+
.unwrap_or(HashSet::new())
454493
}
455494
};
456495

457-
let allocations = sender_allocation
458-
.remove(&sender_id)
459-
.unwrap_or(HashSet::new());
460-
461496
state
462497
.create_or_deny_sender(myself.get_cell(), sender_id, allocations, sender_type)
463498
.await;
@@ -575,12 +610,12 @@ impl State {
575610
)
576611
.fetch_all(&self.pgpool)
577612
.await
578-
.expect("should be able to fetch pending receipts from the database");
613+
.expect("should be able to fetch pending receipts V1 from the database");
579614

580615
for row in receipts_signer_allocations_in_db {
581616
let allocation_ids = row
582617
.allocation_ids
583-
.expect("all receipts should have an allocation_id")
618+
.expect("all receipts V1 should have an allocation_id")
584619
.iter()
585620
.map(|allocation_id| {
586621
AllocationId::Legacy(
@@ -615,12 +650,12 @@ impl State {
615650
)
616651
.fetch_all(&self.pgpool)
617652
.await
618-
.expect("should be able to fetch unfinalized RAVs from the database");
653+
.expect("should be able to fetch unfinalized RAVs V1 from the database");
619654

620655
for row in nonfinal_ravs_sender_allocations_in_db {
621656
let allocation_ids = row
622657
.allocation_ids
623-
.expect("all RAVs should have an allocation_id")
658+
.expect("all RAVs V1 should have an allocation_id")
624659
.iter()
625660
.map(|allocation_id| {
626661
AllocationId::Legacy(
@@ -668,12 +703,12 @@ impl State {
668703
)
669704
.fetch_all(&self.pgpool)
670705
.await
671-
.expect("should be able to fetch pending receipts from the database");
706+
.expect("should be able to fetch pending V2 receipts from the database");
672707

673708
for row in receipts_signer_allocations_in_db {
674709
let allocation_ids = row
675710
.allocation_ids
676-
.expect("all receipts should have an allocation_id")
711+
.expect("all receipts V2 should have an allocation_id")
677712
.iter()
678713
.map(|allocation_id| {
679714
AllocationId::Legacy(
@@ -708,12 +743,12 @@ impl State {
708743
)
709744
.fetch_all(&self.pgpool)
710745
.await
711-
.expect("should be able to fetch unfinalized RAVs from the database");
746+
.expect("should be able to fetch unfinalized V2 RAVs from the database");
712747

713748
for row in nonfinal_ravs_sender_allocations_in_db {
714749
let allocation_ids = row
715750
.allocation_ids
716-
.expect("all RAVs should have an allocation_id")
751+
.expect("all RAVs V2 should have an allocation_id")
717752
.iter()
718753
.map(|allocation_id| {
719754
AllocationId::Legacy(
@@ -1081,7 +1116,7 @@ mod tests {
10811116
}
10821117

10831118
#[sqlx::test(migrations = "../../migrations")]
1084-
async fn test_update_sender_allocation(pgpool: PgPool) {
1119+
async fn test_update_sender_account(pgpool: PgPool) {
10851120
let (prefix, mut notify, (actor, join_handle)) =
10861121
create_sender_accounts_manager().pgpool(pgpool).call().await;
10871122

0 commit comments

Comments
 (0)