Skip to content

Commit d3eafbe

Browse files
gusinacioaasseman
andauthored
Gusinacio/dont crash sender account (#190)
* feat(tap-agent): ignore blocked allocation ids Signed-off-by: Gustavo Inacio <[email protected]> * fix(tap-agent): retry last rav forever Signed-off-by: Gustavo Inacio <[email protected]> * fix(tap-agent): never fail sender account, add block allocation when closing it Signed-off-by: Gustavo Inacio <[email protected]> * docs(tap-agent): update comments Co-authored-by: Alexis Asseman <[email protected]> --------- Signed-off-by: Gustavo Inacio <[email protected]> Co-authored-by: Alexis Asseman <[email protected]>
1 parent 11bc9d1 commit d3eafbe

File tree

3 files changed

+57
-27
lines changed

3 files changed

+57
-27
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,10 @@ impl State {
136136

137137
async fn rav_requester_single(&mut self) -> Result<()> {
138138
let Some(allocation_id) = self.sender_fee_tracker.get_heaviest_allocation_id() else {
139-
anyhow::bail!("Error while getting the heaviest allocation because none has unaggregated fees tracked");
139+
anyhow::bail!(
140+
"Error while getting the heaviest allocation because \
141+
no unblocked allocation has enough unaggregated fees tracked"
142+
);
140143
};
141144
let sender_allocation_id = self.format_sender_allocation(&allocation_id);
142145
let allocation = ActorRef::<SenderAllocationMessage>::where_is(sender_allocation_id);
@@ -147,7 +150,9 @@ impl State {
147150
);
148151
};
149152
// we call and wait for the response so we don't process anymore update
150-
let (fees, rav) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest)?;
153+
let Ok((fees, rav)) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else {
154+
anyhow::bail!("Error while sending and waiting message for actor {allocation_id}");
155+
};
151156

152157
// update rav tracker
153158
self.rav_tracker.update(
@@ -470,14 +475,13 @@ impl Actor for SenderAccount {
470475
trigger_value = state.config.tap.rav_request_trigger_value,
471476
"Total fee greater than the trigger value. Triggering RAV request"
472477
);
473-
// There are only 3 scenarios where this can fail:
474-
// - No allocation ids
475-
// - The SenderAllocation could not be found
476-
// - The SenderAllocation could not process the message
477-
// In any case, we want to respawn this whole actor
478-
// and respawn all its children actors. Thus, we can safely
479-
// panic the actor by using ?
480-
state.rav_requester_single().await?;
478+
// In case we fail, we want our actor to keep running
479+
if let Err(err) = state.rav_requester_single().await {
480+
tracing::error!(
481+
error = %err,
482+
"There was an error while requesting a RAV."
483+
);
484+
}
481485
}
482486

483487
// Maybe allow the sender right after the potential RAV request. This way, the
@@ -502,6 +506,9 @@ impl Actor for SenderAccount {
502506
state.format_sender_allocation(allocation_id),
503507
) {
504508
tracing::trace!(%allocation_id, "SenderAccount shutting down SenderAllocation");
509+
// we can not send a rav request to this allocation
510+
// because it's gonna trigger the last rav
511+
state.sender_fee_tracker.block_allocation_id(*allocation_id);
505512
sender_handle.stop(None);
506513
}
507514
}
@@ -594,6 +601,10 @@ impl Actor for SenderAccount {
594601

595602
let tracker = &mut state.sender_fee_tracker;
596603
tracker.update(allocation_id, 0);
604+
// clean up hashset
605+
state
606+
.sender_fee_tracker
607+
.unblock_allocation_id(allocation_id);
597608
// rav tracker is not updated because it's still not redeemed
598609
}
599610
SupervisionEvent::ActorPanicked(cell, error) => {

tap-agent/src/agent/sender_allocation.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -194,27 +194,18 @@ impl Actor for SenderAllocation {
194194
);
195195
// Request a RAV and mark the allocation as final.
196196
while state.unaggregated_fees.value > 0 {
197-
if state.request_rav().await.is_err() {
198-
break;
197+
if let Err(err) = state.request_rav().await {
198+
error!(error = %err, "There was an error while requesting rav. Retrying in 30 seconds...");
199+
tokio::time::sleep(Duration::from_secs(30)).await;
199200
}
200201
}
201-
if state.unaggregated_fees.value > 0 {
202-
Err(anyhow!(
203-
"There are still pending unaggregated_fees for sender {} and allocation {}.\
204-
Not marking as last.",
205-
state.sender,
206-
state.allocation_id
207-
))?;
208-
}
209202

210-
state.mark_rav_last().await.inspect_err(|e| {
211-
error!(
212-
"Error while marking allocation {} as last for sender {}: {}",
213-
state.allocation_id, state.sender, e
214-
);
215-
})?;
203+
while let Err(err) = state.mark_rav_last().await {
204+
error!(error = %err, %state.allocation_id, %state.sender, "Error while marking allocation last. Retrying in 30 seconds...");
205+
tokio::time::sleep(Duration::from_secs(30)).await;
206+
}
216207

217-
//Since this is only triggered after allocation is closed will be counted here
208+
// Since this is only triggered after allocation is closed will be counted here
218209
CLOSED_SENDER_ALLOCATIONS.inc();
219210

220211
Ok(())
@@ -250,6 +241,7 @@ impl Actor for SenderAllocation {
250241
);
251242
u128::MAX
252243
});
244+
// it's fine to crash the actor, could not send a message to its parent
253245
state
254246
.sender_account_ref
255247
.cast(SenderAccountMessage::UpdateReceiptFees(

tap-agent/src/agent/sender_fee_tracker.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ use tracing::error;
99
pub struct SenderFeeTracker {
1010
id_to_fee: HashMap<Address, u128>,
1111
total_fee: u128,
12+
// there are some allocations that we don't want it to be
13+
// heaviest allocation, because they are already marked for finalization,
14+
// and thus requesting RAVs on their own in their `post_stop` routine.
15+
blocked_addresses: HashSet<Address>,
1216
}
1317

1418
impl SenderFeeTracker {
@@ -32,10 +36,19 @@ impl SenderFeeTracker {
3236
}
3337
}
3438

39+
pub fn block_allocation_id(&mut self, address: Address) {
40+
self.blocked_addresses.insert(address);
41+
}
42+
43+
pub fn unblock_allocation_id(&mut self, address: Address) {
44+
self.blocked_addresses.remove(&address);
45+
}
46+
3547
pub fn get_heaviest_allocation_id(&self) -> Option<Address> {
3648
// just loop over and get the biggest fee
3749
self.id_to_fee
3850
.iter()
51+
.filter(|(addr, _)| !self.blocked_addresses.contains(*addr))
3952
.fold(None, |acc: Option<(&Address, u128)>, (addr, fee)| {
4053
if let Some((_, max_fee)) = acc {
4154
if *fee > max_fee {
@@ -82,10 +95,24 @@ mod tests {
8295
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
8396
assert_eq!(tracker.get_total_fee(), 10);
8497

98+
tracker.block_allocation_id(allocation_id_0);
99+
assert_eq!(tracker.get_heaviest_allocation_id(), None);
100+
assert_eq!(tracker.get_total_fee(), 10);
101+
102+
tracker.unblock_allocation_id(allocation_id_0);
103+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
104+
85105
tracker.update(allocation_id_2, 20);
86106
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2));
87107
assert_eq!(tracker.get_total_fee(), 30);
88108

109+
tracker.block_allocation_id(allocation_id_2);
110+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_0));
111+
assert_eq!(tracker.get_total_fee(), 30);
112+
113+
tracker.unblock_allocation_id(allocation_id_2);
114+
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2));
115+
89116
tracker.update(allocation_id_1, 30);
90117
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
91118
assert_eq!(tracker.get_total_fee(), 60);

0 commit comments

Comments
 (0)