Skip to content

Commit 6c0a744

Browse files
committed
refactor: split into files
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 4592c42 commit 6c0a744

File tree

12 files changed

+892
-754
lines changed

12 files changed

+892
-754
lines changed

tap-agent/src/agent.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use sender_accounts_manager::SenderAccountsManager;
2121
pub mod sender_account;
2222
pub mod sender_accounts_manager;
2323
pub mod sender_allocation;
24-
pub mod sender_fee_tracker;
2524
pub mod unaggregated_receipts;
2625

2726
pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {

tap-agent/src/agent/sender_account.rs

Lines changed: 67 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ use tap_core::rav::SignedRAV;
2626
use tracing::{error, Level};
2727

2828
use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
29-
use super::sender_fee_tracker::{BufferedReceiptFee, DurationInfo};
3029
use crate::agent::sender_allocation::SenderAllocationMessage;
31-
use crate::agent::sender_fee_tracker::SenderFeeTracker;
3230
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
31+
use crate::tracker::{SenderFeeTracker, SimpleFeeTracker};
3332
use crate::{
3433
config::{self},
3534
tap::escrow_adapter::EscrowAdapter,
@@ -97,7 +96,7 @@ pub enum SenderAccountMessage {
9796
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
9897
UpdateRav(SignedRAV),
9998
#[cfg(test)]
100-
GetSenderFeeTracker(ractor::RpcReplyPort<SenderFeeTracker<BufferedReceiptFee, DurationInfo>>),
99+
GetSenderFeeTracker(ractor::RpcReplyPort<SenderFeeTracker>),
101100
#[cfg(test)]
102101
GetDeny(ractor::RpcReplyPort<bool>),
103102
#[cfg(test)]
@@ -131,9 +130,9 @@ pub struct SenderAccountArgs {
131130
}
132131
pub struct State {
133132
prefix: Option<String>,
134-
sender_fee_tracker: SenderFeeTracker<BufferedReceiptFee, DurationInfo>,
135-
rav_tracker: SenderFeeTracker<u128>,
136-
invalid_receipts_tracker: SenderFeeTracker<u128>,
133+
sender_fee_tracker: SenderFeeTracker,
134+
rav_tracker: SimpleFeeTracker,
135+
invalid_receipts_tracker: SimpleFeeTracker,
137136
allocation_ids: HashSet<Address>,
138137
_indexer_allocations_handle: PipeHandle,
139138
_escrow_account_monitor: PipeHandle,
@@ -238,6 +237,53 @@ impl State {
238237
Ok(())
239238
}
240239

240+
fn finalize_rav_request(
241+
&mut self,
242+
allocation_id: Address,
243+
rav_result: anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>,
244+
) {
245+
self.sender_fee_tracker.finish_rav_request(allocation_id);
246+
match rav_result {
247+
Ok((fees, rav)) => {
248+
self.sender_fee_tracker.ok_rav_request(allocation_id);
249+
250+
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
251+
self.update_rav(allocation_id, rav_value);
252+
253+
// update sender fee tracker
254+
self.update_sender_fee(allocation_id, fees);
255+
}
256+
Err(err) => {
257+
// TODO we should update the total value too
258+
self.sender_fee_tracker.failed_rav_backoff(allocation_id);
259+
error!(
260+
"Error while requesting RAV for sender {} and allocation {}: {}",
261+
self.sender, allocation_id, err
262+
);
263+
}
264+
};
265+
}
266+
267+
fn update_rav(&mut self, allocation_id: Address, rav_value: u128) {
268+
self.rav_tracker.update(allocation_id, rav_value);
269+
PENDING_RAV
270+
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
271+
.set(rav_value as f64);
272+
}
273+
274+
fn update_sender_fee(
275+
&mut self,
276+
allocation_id: Address,
277+
unaggregated_fees: UnaggregatedReceipts,
278+
) {
279+
self.sender_fee_tracker
280+
.update(allocation_id, unaggregated_fees);
281+
282+
UNAGGREGATED_FEES
283+
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
284+
.set(unaggregated_fees.value as f64);
285+
}
286+
241287
fn deny_condition_reached(&self) -> bool {
242288
let pending_ravs = self.rav_tracker.get_total_fee();
243289
let unaggregated_fees = self.sender_fee_tracker.get_total_fee();
@@ -470,9 +516,8 @@ impl Actor for SenderAccount {
470516
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
471517
config.tap.rav_request_timestamp_buffer_ms,
472518
)),
473-
// sender_fee_tracker: SenderFeeTracker::new(),
474-
rav_tracker: SenderFeeTracker::default(),
475-
invalid_receipts_tracker: SenderFeeTracker::default(),
519+
rav_tracker: SimpleFeeTracker::default(),
520+
invalid_receipts_tracker: SimpleFeeTracker::default(),
476521
allocation_ids: allocation_ids.clone(),
477522
_indexer_allocations_handle,
478523
_escrow_account_monitor,
@@ -520,16 +565,7 @@ impl Actor for SenderAccount {
520565

521566
match message {
522567
SenderAccountMessage::UpdateRav(rav) => {
523-
state
524-
.rav_tracker
525-
.update(rav.message.allocationId, rav.message.valueAggregate, 0);
526-
527-
PENDING_RAV
528-
.with_label_values(&[
529-
&state.sender.to_string(),
530-
&rav.message.allocationId.to_string(),
531-
])
532-
.set(rav.message.valueAggregate as f64);
568+
state.update_rav(rav.message.allocationId, rav.message.valueAggregate);
533569

534570
let should_deny = !state.denied && state.deny_condition_reached();
535571
if should_deny {
@@ -543,7 +579,7 @@ impl Actor for SenderAccount {
543579

544580
state
545581
.invalid_receipts_tracker
546-
.update(allocation_id, unaggregated_fees.value, 0);
582+
.update(allocation_id, unaggregated_fees.value);
547583

548584
// invalid receipts can't go down
549585
let should_deny = !state.denied && state.deny_condition_reached();
@@ -571,8 +607,9 @@ impl Actor for SenderAccount {
571607
);
572608
SenderAccount::deny_sender(&state.pgpool, state.sender).await;
573609
}
574-
state.sender_fee_tracker.add(allocation_id, value);
575610

611+
// add new value
612+
state.sender_fee_tracker.add(allocation_id, value);
576613
UNAGGREGATED_FEES
577614
.with_label_values(&[
578615
&state.sender.to_string(),
@@ -581,58 +618,10 @@ impl Actor for SenderAccount {
581618
.add(value as f64);
582619
}
583620
ReceiptFees::RavRequestResponse(rav_result) => {
584-
state.sender_fee_tracker.finish_rav_request(allocation_id);
585-
match rav_result {
586-
Ok((fees, rav)) => {
587-
state.sender_fee_tracker.ok_rav_request(allocation_id);
588-
589-
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
590-
// update rav tracker
591-
state.rav_tracker.update(allocation_id, rav_value, 0);
592-
PENDING_RAV
593-
.with_label_values(&[
594-
&state.sender.to_string(),
595-
&allocation_id.to_string(),
596-
])
597-
.set(rav_value as f64);
598-
599-
// update sender fee tracker
600-
state.sender_fee_tracker.update(
601-
allocation_id,
602-
fees.value,
603-
fees.counter,
604-
);
605-
UNAGGREGATED_FEES
606-
.with_label_values(&[
607-
&state.sender.to_string(),
608-
&allocation_id.to_string(),
609-
])
610-
.set(fees.value as f64);
611-
}
612-
Err(err) => {
613-
state.sender_fee_tracker.failed_rav_backoff(allocation_id);
614-
error!(
615-
"Error while requesting RAV for sender {} and allocation {}: {}",
616-
state.sender,
617-
allocation_id,
618-
err
619-
);
620-
}
621-
};
621+
state.finalize_rav_request(allocation_id, rav_result);
622622
}
623623
ReceiptFees::UpdateValue(unaggregated_fees) => {
624-
state.sender_fee_tracker.update(
625-
allocation_id,
626-
unaggregated_fees.value,
627-
unaggregated_fees.counter,
628-
);
629-
630-
UNAGGREGATED_FEES
631-
.with_label_values(&[
632-
&state.sender.to_string(),
633-
&allocation_id.to_string(),
634-
])
635-
.set(unaggregated_fees.value as f64);
624+
state.update_sender_fee(allocation_id, unaggregated_fees);
636625
}
637626
ReceiptFees::Retry => {}
638627
}
@@ -646,12 +635,11 @@ impl Actor for SenderAccount {
646635
}
647636
let total_counter_for_allocation = state
648637
.sender_fee_tracker
649-
.get_total_counter_outside_buffer_for_allocation(&allocation_id);
638+
.get_count_outside_buffer_for_allocation(&allocation_id);
639+
let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id);
650640
let counter_greater_receipt_limit = total_counter_for_allocation
651641
>= state.config.tap.rav_request_receipt_limit
652-
&& !state
653-
.sender_fee_tracker
654-
.check_allocation_has_rav_request_running(allocation_id);
642+
&& can_trigger_rav;
655643
let total_fee_outside_buffer =
656644
state.sender_fee_tracker.get_total_fee_outside_buffer();
657645
let total_fee_greater_trigger_value =
@@ -775,7 +763,7 @@ impl Actor for SenderAccount {
775763
for allocation_id in tracked_allocation_ids.difference(&active_allocation_ids) {
776764
// if it's being tracked and we didn't receive any update from the non_final_last_ravs
777765
// remove from the tracker
778-
state.rav_tracker.update(*allocation_id, 0, 0);
766+
state.rav_tracker.remove(*allocation_id);
779767

780768
let _ = PENDING_RAV.remove_label_values(&[
781769
&state.sender.to_string(),
@@ -784,10 +772,7 @@ impl Actor for SenderAccount {
784772
}
785773

786774
for (allocation_id, value) in non_final_last_ravs {
787-
state.rav_tracker.update(allocation_id, value, 0);
788-
PENDING_RAV
789-
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
790-
.set(value as f64);
775+
state.update_rav(allocation_id, value);
791776
}
792777
// now that balance and rav tracker is updated, check
793778
match (state.denied, state.deny_condition_reached()) {
@@ -851,15 +836,8 @@ impl Actor for SenderAccount {
851836
return Ok(());
852837
};
853838

854-
// clean up hashset
855-
state
856-
.sender_fee_tracker
857-
.unblock_allocation_id(allocation_id);
858-
// update the receipt fees by reseting to 0
859-
myself.cast(SenderAccountMessage::UpdateReceiptFees(
860-
allocation_id,
861-
ReceiptFees::UpdateValue(UnaggregatedReceipts::default()),
862-
))?;
839+
// remove from sender_fee_tracker
840+
state.sender_fee_tracker.remove(allocation_id);
863841

864842
// rav tracker is not updated because it's still not redeemed
865843
}

tap-agent/src/agent/sender_allocation.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl Actor for SenderAllocation {
149149
if state.invalid_receipts_fees.value > 0 {
150150
sender_account_ref.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
151151
allocation_id,
152-
state.invalid_receipts_fees.clone(),
152+
state.invalid_receipts_fees,
153153
))?;
154154
}
155155

@@ -158,7 +158,7 @@ impl Actor for SenderAllocation {
158158

159159
sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
160160
allocation_id,
161-
ReceiptFees::UpdateValue(state.unaggregated_fees.clone()),
161+
ReceiptFees::UpdateValue(state.unaggregated_fees),
162162
))?;
163163

164164
// update rav tracker for sender account
@@ -196,7 +196,12 @@ impl Actor for SenderAllocation {
196196
}
197197

198198
while let Err(err) = state.mark_rav_last().await {
199-
error!(error = %err, %state.allocation_id, %state.sender, "Error while marking allocation last. Retrying in 30 seconds...");
199+
error!(
200+
error = %err,
201+
%state.allocation_id,
202+
%state.sender,
203+
"Error while marking allocation last. Retrying in 30 seconds..."
204+
);
200205
tokio::time::sleep(Duration::from_secs(30)).await;
201206
}
202207

@@ -263,7 +268,7 @@ impl Actor for SenderAllocation {
263268
state
264269
.request_rav()
265270
.await
266-
.map(|_| (state.unaggregated_fees.clone(), state.latest_rav.clone()))
271+
.map(|_| (state.unaggregated_fees, state.latest_rav.clone()))
267272
} else {
268273
Err(anyhow!("Unaggregated fee equals zero"))
269274
};
@@ -278,7 +283,7 @@ impl Actor for SenderAllocation {
278283
#[cfg(test)]
279284
SenderAllocationMessage::GetUnaggregatedReceipts(reply) => {
280285
if !reply.is_closed() {
281-
let _ = reply.send(unaggregated_fees.clone());
286+
let _ = reply.send(*unaggregated_fees);
282287
}
283288
}
284289
}
@@ -769,7 +774,7 @@ impl SenderAllocationState {
769774
self.sender_account_ref
770775
.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
771776
self.allocation_id,
772-
self.invalid_receipts_fees.clone(),
777+
self.invalid_receipts_fees,
773778
))?;
774779

775780
Ok(())

0 commit comments

Comments
 (0)