Skip to content

Commit c83dff0

Browse files
committed
refactor: split in files
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent e71f891 commit c83dff0

File tree

9 files changed

+509
-909
lines changed

9 files changed

+509
-909
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 64 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use tracing::{error, Level};
2828
use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
2929
use crate::agent::sender_allocation::SenderAllocationMessage;
3030
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
31-
use crate::tracker::{DurationInfo, GlobalFeeTracker, SenderFeeStats, SimpleFeeTracker};
31+
use crate::tracker::{SenderFeeTracker, SimpleFeeTracker};
3232
use crate::{
3333
config::{self},
3434
tap::escrow_adapter::EscrowAdapter,
@@ -87,8 +87,6 @@ pub enum ReceiptFees {
8787
Retry,
8888
}
8989

90-
type SenderFeeTracker = SimpleFeeTracker<SenderFeeStats, DurationInfo, GlobalFeeTracker>;
91-
9290
#[derive(Debug)]
9391
pub enum SenderAccountMessage {
9492
UpdateBalanceAndLastRavs(Balance, RavMap),
@@ -133,8 +131,8 @@ pub struct SenderAccountArgs {
133131
pub struct State {
134132
prefix: Option<String>,
135133
sender_fee_tracker: SenderFeeTracker,
136-
rav_tracker: SimpleFeeTracker<u128>,
137-
invalid_receipts_tracker: SimpleFeeTracker<u128>,
134+
rav_tracker: SimpleFeeTracker,
135+
invalid_receipts_tracker: SimpleFeeTracker,
138136
allocation_ids: HashSet<Address>,
139137
_indexer_allocations_handle: PipeHandle,
140138
_escrow_account_monitor: PipeHandle,
@@ -239,6 +237,53 @@ impl State {
239237
Ok(())
240238
}
241239

240+
fn finalize_rav_request(
241+
&mut self,
242+
allocation_id: Address,
243+
rav_result: anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>,
244+
) {
245+
self.sender_fee_tracker.finish_rav_request(allocation_id);
246+
match rav_result {
247+
Ok((fees, rav)) => {
248+
self.sender_fee_tracker.ok_rav_request(allocation_id);
249+
250+
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
251+
self.update_rav(allocation_id, rav_value);
252+
253+
// update sender fee tracker
254+
self.update_sender_fee(allocation_id, fees);
255+
}
256+
Err(err) => {
257+
// TODO we should update the total value too
258+
self.sender_fee_tracker.failed_rav_backoff(allocation_id);
259+
error!(
260+
"Error while requesting RAV for sender {} and allocation {}: {}",
261+
self.sender, allocation_id, err
262+
);
263+
}
264+
};
265+
}
266+
267+
fn update_rav(&mut self, allocation_id: Address, rav_value: u128) {
268+
self.rav_tracker.update(allocation_id, rav_value);
269+
PENDING_RAV
270+
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
271+
.set(rav_value as f64);
272+
}
273+
274+
fn update_sender_fee(
275+
&mut self,
276+
allocation_id: Address,
277+
unaggregated_fees: UnaggregatedReceipts,
278+
) {
279+
self.sender_fee_tracker
280+
.update(allocation_id, unaggregated_fees);
281+
282+
UNAGGREGATED_FEES
283+
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
284+
.set(unaggregated_fees.value as f64);
285+
}
286+
242287
fn deny_condition_reached(&self) -> bool {
243288
let pending_ravs = self.rav_tracker.get_total_fee();
244289
let unaggregated_fees = self.sender_fee_tracker.get_total_fee();
@@ -468,10 +513,9 @@ impl Actor for SenderAccount {
468513
.build(&sender_aggregator_endpoint)?;
469514

470515
let state = State {
471-
sender_fee_tracker: SimpleFeeTracker::new(Duration::from_millis(
516+
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
472517
config.tap.rav_request_timestamp_buffer_ms,
473518
)),
474-
// sender_fee_tracker: SenderFeeTracker::new(),
475519
rav_tracker: SimpleFeeTracker::default(),
476520
invalid_receipts_tracker: SimpleFeeTracker::default(),
477521
allocation_ids: allocation_ids.clone(),
@@ -521,16 +565,7 @@ impl Actor for SenderAccount {
521565

522566
match message {
523567
SenderAccountMessage::UpdateRav(rav) => {
524-
state
525-
.rav_tracker
526-
.update(rav.message.allocationId, rav.message.valueAggregate, 0);
527-
528-
PENDING_RAV
529-
.with_label_values(&[
530-
&state.sender.to_string(),
531-
&rav.message.allocationId.to_string(),
532-
])
533-
.set(rav.message.valueAggregate as f64);
568+
state.update_rav(rav.message.allocationId, rav.message.valueAggregate);
534569

535570
let should_deny = !state.denied && state.deny_condition_reached();
536571
if should_deny {
@@ -544,7 +579,7 @@ impl Actor for SenderAccount {
544579

545580
state
546581
.invalid_receipts_tracker
547-
.update(allocation_id, unaggregated_fees.value, 0);
582+
.update(allocation_id, unaggregated_fees.value);
548583

549584
// invalid receipts can't go down
550585
let should_deny = !state.denied && state.deny_condition_reached();
@@ -572,8 +607,9 @@ impl Actor for SenderAccount {
572607
);
573608
SenderAccount::deny_sender(&state.pgpool, state.sender).await;
574609
}
575-
state.sender_fee_tracker.add(allocation_id, value);
576610

611+
// add new value
612+
state.sender_fee_tracker.add(allocation_id, value);
577613
UNAGGREGATED_FEES
578614
.with_label_values(&[
579615
&state.sender.to_string(),
@@ -582,58 +618,10 @@ impl Actor for SenderAccount {
582618
.add(value as f64);
583619
}
584620
ReceiptFees::RavRequestResponse(rav_result) => {
585-
state.sender_fee_tracker.finish_rav_request(allocation_id);
586-
match rav_result {
587-
Ok((fees, rav)) => {
588-
state.sender_fee_tracker.ok_rav_request(allocation_id);
589-
590-
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
591-
// update rav tracker
592-
state.rav_tracker.update(allocation_id, rav_value, 0);
593-
PENDING_RAV
594-
.with_label_values(&[
595-
&state.sender.to_string(),
596-
&allocation_id.to_string(),
597-
])
598-
.set(rav_value as f64);
599-
600-
// update sender fee tracker
601-
state.sender_fee_tracker.update(
602-
allocation_id,
603-
fees.value,
604-
fees.counter,
605-
);
606-
UNAGGREGATED_FEES
607-
.with_label_values(&[
608-
&state.sender.to_string(),
609-
&allocation_id.to_string(),
610-
])
611-
.set(fees.value as f64);
612-
}
613-
Err(err) => {
614-
state.sender_fee_tracker.failed_rav_backoff(allocation_id);
615-
error!(
616-
"Error while requesting RAV for sender {} and allocation {}: {}",
617-
state.sender,
618-
allocation_id,
619-
err
620-
);
621-
}
622-
};
621+
state.finalize_rav_request(allocation_id, rav_result);
623622
}
624623
ReceiptFees::UpdateValue(unaggregated_fees) => {
625-
state.sender_fee_tracker.update(
626-
allocation_id,
627-
unaggregated_fees.value,
628-
unaggregated_fees.counter,
629-
);
630-
631-
UNAGGREGATED_FEES
632-
.with_label_values(&[
633-
&state.sender.to_string(),
634-
&allocation_id.to_string(),
635-
])
636-
.set(unaggregated_fees.value as f64);
624+
state.update_sender_fee(allocation_id, unaggregated_fees);
637625
}
638626
ReceiptFees::Retry => {}
639627
}
@@ -647,12 +635,11 @@ impl Actor for SenderAccount {
647635
}
648636
let total_counter_for_allocation = state
649637
.sender_fee_tracker
650-
.get_total_counter_outside_buffer_for_allocation(&allocation_id);
638+
.get_count_outside_buffer_for_allocation(&allocation_id);
639+
let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id);
651640
let counter_greater_receipt_limit = total_counter_for_allocation
652641
>= state.config.tap.rav_request_receipt_limit
653-
&& !state
654-
.sender_fee_tracker
655-
.check_allocation_has_rav_request_running(allocation_id);
642+
&& can_trigger_rav;
656643
let total_fee_outside_buffer =
657644
state.sender_fee_tracker.get_total_fee_outside_buffer();
658645
let total_fee_greater_trigger_value =
@@ -776,7 +763,7 @@ impl Actor for SenderAccount {
776763
for allocation_id in tracked_allocation_ids.difference(&active_allocation_ids) {
777764
// if it's being tracked and we didn't receive any update from the non_final_last_ravs
778765
// remove from the tracker
779-
state.rav_tracker.update(*allocation_id, 0, 0);
766+
state.rav_tracker.remove(*allocation_id);
780767

781768
let _ = PENDING_RAV.remove_label_values(&[
782769
&state.sender.to_string(),
@@ -785,10 +772,7 @@ impl Actor for SenderAccount {
785772
}
786773

787774
for (allocation_id, value) in non_final_last_ravs {
788-
state.rav_tracker.update(allocation_id, value, 0);
789-
PENDING_RAV
790-
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
791-
.set(value as f64);
775+
state.update_rav(allocation_id, value);
792776
}
793777
// now that balance and rav tracker is updated, check
794778
match (state.denied, state.deny_condition_reached()) {
@@ -852,15 +836,8 @@ impl Actor for SenderAccount {
852836
return Ok(());
853837
};
854838

855-
// clean up hashset
856-
state
857-
.sender_fee_tracker
858-
.unblock_allocation_id(allocation_id);
859-
// update the receipt fees by reseting to 0
860-
myself.cast(SenderAccountMessage::UpdateReceiptFees(
861-
allocation_id,
862-
ReceiptFees::UpdateValue(UnaggregatedReceipts::default()),
863-
))?;
839+
// remove from sender_fee_tracker
840+
state.sender_fee_tracker.remove(allocation_id);
864841

865842
// rav tracker is not updated because it's still not redeemed
866843
}

tap-agent/src/agent/sender_allocation.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl Actor for SenderAllocation {
149149
if state.invalid_receipts_fees.value > 0 {
150150
sender_account_ref.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
151151
allocation_id,
152-
state.invalid_receipts_fees.clone(),
152+
state.invalid_receipts_fees,
153153
))?;
154154
}
155155

@@ -158,7 +158,7 @@ impl Actor for SenderAllocation {
158158

159159
sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
160160
allocation_id,
161-
ReceiptFees::UpdateValue(state.unaggregated_fees.clone()),
161+
ReceiptFees::UpdateValue(state.unaggregated_fees),
162162
))?;
163163

164164
// update rav tracker for sender account
@@ -268,7 +268,7 @@ impl Actor for SenderAllocation {
268268
state
269269
.request_rav()
270270
.await
271-
.map(|_| (state.unaggregated_fees.clone(), state.latest_rav.clone()))
271+
.map(|_| (state.unaggregated_fees, state.latest_rav.clone()))
272272
} else {
273273
Err(anyhow!("Unaggregated fee equals zero"))
274274
};
@@ -283,7 +283,7 @@ impl Actor for SenderAllocation {
283283
#[cfg(test)]
284284
SenderAllocationMessage::GetUnaggregatedReceipts(reply) => {
285285
if !reply.is_closed() {
286-
let _ = reply.send(unaggregated_fees.clone());
286+
let _ = reply.send(*unaggregated_fees);
287287
}
288288
}
289289
}
@@ -774,7 +774,7 @@ impl SenderAllocationState {
774774
self.sender_account_ref
775775
.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
776776
self.allocation_id,
777-
self.invalid_receipts_fees.clone(),
777+
self.invalid_receipts_fees,
778778
))?;
779779

780780
Ok(())

0 commit comments

Comments
 (0)