Skip to content

Commit 739c8ea

Browse files
authored
refactor: generic tracker and fee tracker (#395)
1 parent 8e827ee commit 739c8ea

File tree

12 files changed

+948
-640
lines changed

12 files changed

+948
-640
lines changed

tap-agent/src/agent.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use sender_accounts_manager::SenderAccountsManager;
2121
pub mod sender_account;
2222
pub mod sender_accounts_manager;
2323
pub mod sender_allocation;
24-
pub mod sender_fee_tracker;
2524
pub mod unaggregated_receipts;
2625

2726
pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {

tap-agent/src/agent/sender_account.rs

Lines changed: 92 additions & 96 deletions
Large diffs are not rendered by default.

tap-agent/src/agent/sender_allocation.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl Actor for SenderAllocation {
149149
if state.invalid_receipts_fees.value > 0 {
150150
sender_account_ref.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
151151
allocation_id,
152-
state.invalid_receipts_fees.clone(),
152+
state.invalid_receipts_fees,
153153
))?;
154154
}
155155

@@ -158,7 +158,7 @@ impl Actor for SenderAllocation {
158158

159159
sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
160160
allocation_id,
161-
ReceiptFees::UpdateValue(state.unaggregated_fees.clone()),
161+
ReceiptFees::UpdateValue(state.unaggregated_fees),
162162
))?;
163163

164164
// update rav tracker for sender account
@@ -196,7 +196,12 @@ impl Actor for SenderAllocation {
196196
}
197197

198198
while let Err(err) = state.mark_rav_last().await {
199-
error!(error = %err, %state.allocation_id, %state.sender, "Error while marking allocation last. Retrying in 30 seconds...");
199+
error!(
200+
error = %err,
201+
%state.allocation_id,
202+
%state.sender,
203+
"Error while marking allocation last. Retrying in 30 seconds..."
204+
);
200205
tokio::time::sleep(Duration::from_secs(30)).await;
201206
}
202207

@@ -224,7 +229,10 @@ impl Actor for SenderAllocation {
224229
match message {
225230
SenderAllocationMessage::NewReceipt(notification) => {
226231
let NewReceiptNotification {
227-
id, value: fees, ..
232+
id,
233+
value: fees,
234+
timestamp_ns,
235+
..
228236
} = notification;
229237
if id <= unaggregated_fees.last_id {
230238
// our world assumption is wrong
@@ -255,15 +263,15 @@ impl Actor for SenderAllocation {
255263
.sender_account_ref
256264
.cast(SenderAccountMessage::UpdateReceiptFees(
257265
state.allocation_id,
258-
ReceiptFees::NewReceipt(fees),
266+
ReceiptFees::NewReceipt(fees, timestamp_ns),
259267
))?;
260268
}
261269
SenderAllocationMessage::TriggerRAVRequest => {
262270
let rav_result = if state.unaggregated_fees.value > 0 {
263271
state
264272
.request_rav()
265273
.await
266-
.map(|_| (state.unaggregated_fees.clone(), state.latest_rav.clone()))
274+
.map(|_| (state.unaggregated_fees, state.latest_rav.clone()))
267275
} else {
268276
Err(anyhow!("Unaggregated fee equals zero"))
269277
};
@@ -278,7 +286,7 @@ impl Actor for SenderAllocation {
278286
#[cfg(test)]
279287
SenderAllocationMessage::GetUnaggregatedReceipts(reply) => {
280288
if !reply.is_closed() {
281-
let _ = reply.send(unaggregated_fees.clone());
289+
let _ = reply.send(*unaggregated_fees);
282290
}
283291
}
284292
}
@@ -769,7 +777,7 @@ impl SenderAllocationState {
769777
self.sender_account_ref
770778
.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
771779
self.allocation_id,
772-
self.invalid_receipts_fees.clone(),
780+
self.invalid_receipts_fees,
773781
))?;
774782

775783
Ok(())
@@ -1104,14 +1112,19 @@ pub mod tests {
11041112
)
11051113
.unwrap();
11061114

1115+
let timestamp_ns = SystemTime::now()
1116+
.duration_since(UNIX_EPOCH)
1117+
.unwrap()
1118+
.as_nanos() as u64;
1119+
11071120
cast!(
11081121
sender_allocation,
11091122
SenderAllocationMessage::NewReceipt(NewReceiptNotification {
11101123
id: 1,
11111124
value: 20,
11121125
allocation_id: *ALLOCATION_ID_0,
11131126
signer_address: SIGNER.1,
1114-
timestamp_ns: 0,
1127+
timestamp_ns,
11151128
})
11161129
)
11171130
.unwrap();
@@ -1121,7 +1134,7 @@ pub mod tests {
11211134
// should emit update aggregate fees message to sender account
11221135
let expected_message = SenderAccountMessage::UpdateReceiptFees(
11231136
*ALLOCATION_ID_0,
1124-
ReceiptFees::NewReceipt(20u128),
1137+
ReceiptFees::NewReceipt(20u128, timestamp_ns),
11251138
);
11261139
let startup_load_msg = message_receiver.recv().await.unwrap();
11271140
assert_eq!(

0 commit comments

Comments
 (0)