Skip to content

Commit 5fb1fdf

Browse files
committed
refactor: split into files
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent d6cadae commit 5fb1fdf

File tree

12 files changed

+892
-754
lines changed

12 files changed

+892
-754
lines changed

tap-agent/src/agent.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use sender_accounts_manager::SenderAccountsManager;
2121
pub mod sender_account;
2222
pub mod sender_accounts_manager;
2323
pub mod sender_allocation;
24-
pub mod sender_fee_tracker;
2524
pub mod unaggregated_receipts;
2625

2726
pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {

tap-agent/src/agent/sender_account.rs

Lines changed: 67 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@ use tap_core::rav::SignedRAV;
2727
use tracing::{error, Level};
2828

2929
use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
30-
use super::sender_fee_tracker::{BufferedReceiptFee, DurationInfo};
3130
use crate::agent::sender_allocation::SenderAllocationMessage;
32-
use crate::agent::sender_fee_tracker::SenderFeeTracker;
3331
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
32+
use crate::tracker::{SenderFeeTracker, SimpleFeeTracker};
3433
use crate::{
3534
config::{self},
3635
tap::escrow_adapter::EscrowAdapter,
@@ -98,7 +97,7 @@ pub enum SenderAccountMessage {
9897
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
9998
UpdateRav(SignedRAV),
10099
#[cfg(test)]
101-
GetSenderFeeTracker(ractor::RpcReplyPort<SenderFeeTracker<BufferedReceiptFee, DurationInfo>>),
100+
GetSenderFeeTracker(ractor::RpcReplyPort<SenderFeeTracker>),
102101
#[cfg(test)]
103102
GetDeny(ractor::RpcReplyPort<bool>),
104103
#[cfg(test)]
@@ -132,9 +131,9 @@ pub struct SenderAccountArgs {
132131
}
133132
pub struct State {
134133
prefix: Option<String>,
135-
sender_fee_tracker: SenderFeeTracker<BufferedReceiptFee, DurationInfo>,
136-
rav_tracker: SenderFeeTracker<u128>,
137-
invalid_receipts_tracker: SenderFeeTracker<u128>,
134+
sender_fee_tracker: SenderFeeTracker,
135+
rav_tracker: SimpleFeeTracker,
136+
invalid_receipts_tracker: SimpleFeeTracker,
138137
allocation_ids: HashSet<Address>,
139138
_indexer_allocations_handle: JoinHandle<()>,
140139
_escrow_account_monitor: PipeHandle,
@@ -239,6 +238,53 @@ impl State {
239238
Ok(())
240239
}
241240

241+
fn finalize_rav_request(
242+
&mut self,
243+
allocation_id: Address,
244+
rav_result: anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>,
245+
) {
246+
self.sender_fee_tracker.finish_rav_request(allocation_id);
247+
match rav_result {
248+
Ok((fees, rav)) => {
249+
self.sender_fee_tracker.ok_rav_request(allocation_id);
250+
251+
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
252+
self.update_rav(allocation_id, rav_value);
253+
254+
// update sender fee tracker
255+
self.update_sender_fee(allocation_id, fees);
256+
}
257+
Err(err) => {
258+
// TODO we should update the total value too
259+
self.sender_fee_tracker.failed_rav_backoff(allocation_id);
260+
error!(
261+
"Error while requesting RAV for sender {} and allocation {}: {}",
262+
self.sender, allocation_id, err
263+
);
264+
}
265+
};
266+
}
267+
268+
fn update_rav(&mut self, allocation_id: Address, rav_value: u128) {
269+
self.rav_tracker.update(allocation_id, rav_value);
270+
PENDING_RAV
271+
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
272+
.set(rav_value as f64);
273+
}
274+
275+
fn update_sender_fee(
276+
&mut self,
277+
allocation_id: Address,
278+
unaggregated_fees: UnaggregatedReceipts,
279+
) {
280+
self.sender_fee_tracker
281+
.update(allocation_id, unaggregated_fees);
282+
283+
UNAGGREGATED_FEES
284+
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
285+
.set(unaggregated_fees.value as f64);
286+
}
287+
242288
fn deny_condition_reached(&self) -> bool {
243289
let pending_ravs = self.rav_tracker.get_total_fee();
244290
let unaggregated_fees = self.sender_fee_tracker.get_total_fee();
@@ -472,9 +518,8 @@ impl Actor for SenderAccount {
472518
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
473519
config.tap.rav_request_timestamp_buffer_ms,
474520
)),
475-
// sender_fee_tracker: SenderFeeTracker::new(),
476-
rav_tracker: SenderFeeTracker::default(),
477-
invalid_receipts_tracker: SenderFeeTracker::default(),
521+
rav_tracker: SimpleFeeTracker::default(),
522+
invalid_receipts_tracker: SimpleFeeTracker::default(),
478523
allocation_ids: allocation_ids.clone(),
479524
_indexer_allocations_handle,
480525
_escrow_account_monitor,
@@ -522,16 +567,7 @@ impl Actor for SenderAccount {
522567

523568
match message {
524569
SenderAccountMessage::UpdateRav(rav) => {
525-
state
526-
.rav_tracker
527-
.update(rav.message.allocationId, rav.message.valueAggregate, 0);
528-
529-
PENDING_RAV
530-
.with_label_values(&[
531-
&state.sender.to_string(),
532-
&rav.message.allocationId.to_string(),
533-
])
534-
.set(rav.message.valueAggregate as f64);
570+
state.update_rav(rav.message.allocationId, rav.message.valueAggregate);
535571

536572
let should_deny = !state.denied && state.deny_condition_reached();
537573
if should_deny {
@@ -545,7 +581,7 @@ impl Actor for SenderAccount {
545581

546582
state
547583
.invalid_receipts_tracker
548-
.update(allocation_id, unaggregated_fees.value, 0);
584+
.update(allocation_id, unaggregated_fees.value);
549585

550586
// invalid receipts can't go down
551587
let should_deny = !state.denied && state.deny_condition_reached();
@@ -573,8 +609,9 @@ impl Actor for SenderAccount {
573609
);
574610
SenderAccount::deny_sender(&state.pgpool, state.sender).await;
575611
}
576-
state.sender_fee_tracker.add(allocation_id, value);
577612

613+
// add new value
614+
state.sender_fee_tracker.add(allocation_id, value);
578615
UNAGGREGATED_FEES
579616
.with_label_values(&[
580617
&state.sender.to_string(),
@@ -583,58 +620,10 @@ impl Actor for SenderAccount {
583620
.add(value as f64);
584621
}
585622
ReceiptFees::RavRequestResponse(rav_result) => {
586-
state.sender_fee_tracker.finish_rav_request(allocation_id);
587-
match rav_result {
588-
Ok((fees, rav)) => {
589-
state.sender_fee_tracker.ok_rav_request(allocation_id);
590-
591-
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
592-
// update rav tracker
593-
state.rav_tracker.update(allocation_id, rav_value, 0);
594-
PENDING_RAV
595-
.with_label_values(&[
596-
&state.sender.to_string(),
597-
&allocation_id.to_string(),
598-
])
599-
.set(rav_value as f64);
600-
601-
// update sender fee tracker
602-
state.sender_fee_tracker.update(
603-
allocation_id,
604-
fees.value,
605-
fees.counter,
606-
);
607-
UNAGGREGATED_FEES
608-
.with_label_values(&[
609-
&state.sender.to_string(),
610-
&allocation_id.to_string(),
611-
])
612-
.set(fees.value as f64);
613-
}
614-
Err(err) => {
615-
state.sender_fee_tracker.failed_rav_backoff(allocation_id);
616-
error!(
617-
"Error while requesting RAV for sender {} and allocation {}: {}",
618-
state.sender,
619-
allocation_id,
620-
err
621-
);
622-
}
623-
};
623+
state.finalize_rav_request(allocation_id, rav_result);
624624
}
625625
ReceiptFees::UpdateValue(unaggregated_fees) => {
626-
state.sender_fee_tracker.update(
627-
allocation_id,
628-
unaggregated_fees.value,
629-
unaggregated_fees.counter,
630-
);
631-
632-
UNAGGREGATED_FEES
633-
.with_label_values(&[
634-
&state.sender.to_string(),
635-
&allocation_id.to_string(),
636-
])
637-
.set(unaggregated_fees.value as f64);
626+
state.update_sender_fee(allocation_id, unaggregated_fees);
638627
}
639628
ReceiptFees::Retry => {}
640629
}
@@ -648,12 +637,11 @@ impl Actor for SenderAccount {
648637
}
649638
let total_counter_for_allocation = state
650639
.sender_fee_tracker
651-
.get_total_counter_outside_buffer_for_allocation(&allocation_id);
640+
.get_count_outside_buffer_for_allocation(&allocation_id);
641+
let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id);
652642
let counter_greater_receipt_limit = total_counter_for_allocation
653643
>= state.config.tap.rav_request_receipt_limit
654-
&& !state
655-
.sender_fee_tracker
656-
.check_allocation_has_rav_request_running(allocation_id);
644+
&& can_trigger_rav;
657645
let total_fee_outside_buffer =
658646
state.sender_fee_tracker.get_total_fee_outside_buffer();
659647
let total_fee_greater_trigger_value =
@@ -777,7 +765,7 @@ impl Actor for SenderAccount {
777765
for allocation_id in tracked_allocation_ids.difference(&active_allocation_ids) {
778766
// if it's being tracked and we didn't receive any update from the non_final_last_ravs
779767
// remove from the tracker
780-
state.rav_tracker.update(*allocation_id, 0, 0);
768+
state.rav_tracker.remove(*allocation_id);
781769

782770
let _ = PENDING_RAV.remove_label_values(&[
783771
&state.sender.to_string(),
@@ -786,10 +774,7 @@ impl Actor for SenderAccount {
786774
}
787775

788776
for (allocation_id, value) in non_final_last_ravs {
789-
state.rav_tracker.update(allocation_id, value, 0);
790-
PENDING_RAV
791-
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
792-
.set(value as f64);
777+
state.update_rav(allocation_id, value);
793778
}
794779
// now that balance and rav tracker is updated, check
795780
match (state.denied, state.deny_condition_reached()) {
@@ -853,15 +838,8 @@ impl Actor for SenderAccount {
853838
return Ok(());
854839
};
855840

856-
// clean up hashset
857-
state
858-
.sender_fee_tracker
859-
.unblock_allocation_id(allocation_id);
860-
// update the receipt fees by reseting to 0
861-
myself.cast(SenderAccountMessage::UpdateReceiptFees(
862-
allocation_id,
863-
ReceiptFees::UpdateValue(UnaggregatedReceipts::default()),
864-
))?;
841+
// remove from sender_fee_tracker
842+
state.sender_fee_tracker.remove(allocation_id);
865843

866844
// rav tracker is not updated because it's still not redeemed
867845
}

tap-agent/src/agent/sender_allocation.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl Actor for SenderAllocation {
149149
if state.invalid_receipts_fees.value > 0 {
150150
sender_account_ref.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
151151
allocation_id,
152-
state.invalid_receipts_fees.clone(),
152+
state.invalid_receipts_fees,
153153
))?;
154154
}
155155

@@ -158,7 +158,7 @@ impl Actor for SenderAllocation {
158158

159159
sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
160160
allocation_id,
161-
ReceiptFees::UpdateValue(state.unaggregated_fees.clone()),
161+
ReceiptFees::UpdateValue(state.unaggregated_fees),
162162
))?;
163163

164164
// update rav tracker for sender account
@@ -196,7 +196,12 @@ impl Actor for SenderAllocation {
196196
}
197197

198198
while let Err(err) = state.mark_rav_last().await {
199-
error!(error = %err, %state.allocation_id, %state.sender, "Error while marking allocation last. Retrying in 30 seconds...");
199+
error!(
200+
error = %err,
201+
%state.allocation_id,
202+
%state.sender,
203+
"Error while marking allocation last. Retrying in 30 seconds..."
204+
);
200205
tokio::time::sleep(Duration::from_secs(30)).await;
201206
}
202207

@@ -263,7 +268,7 @@ impl Actor for SenderAllocation {
263268
state
264269
.request_rav()
265270
.await
266-
.map(|_| (state.unaggregated_fees.clone(), state.latest_rav.clone()))
271+
.map(|_| (state.unaggregated_fees, state.latest_rav.clone()))
267272
} else {
268273
Err(anyhow!("Unaggregated fee equals zero"))
269274
};
@@ -278,7 +283,7 @@ impl Actor for SenderAllocation {
278283
#[cfg(test)]
279284
SenderAllocationMessage::GetUnaggregatedReceipts(reply) => {
280285
if !reply.is_closed() {
281-
let _ = reply.send(unaggregated_fees.clone());
286+
let _ = reply.send(*unaggregated_fees);
282287
}
283288
}
284289
}
@@ -769,7 +774,7 @@ impl SenderAllocationState {
769774
self.sender_account_ref
770775
.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
771776
self.allocation_id,
772-
self.invalid_receipts_fees.clone(),
777+
self.invalid_receipts_fees,
773778
))?;
774779

775780
Ok(())

0 commit comments

Comments
 (0)