Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use sender_accounts_manager::SenderAccountsManager;
pub mod sender_account;
pub mod sender_accounts_manager;
pub mod sender_allocation;
pub mod sender_fee_tracker;
pub mod unaggregated_receipts;

pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {
Expand Down
188 changes: 92 additions & 96 deletions tap-agent/src/agent/sender_account.rs

Large diffs are not rendered by default.

33 changes: 23 additions & 10 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl Actor for SenderAllocation {
if state.invalid_receipts_fees.value > 0 {
sender_account_ref.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
allocation_id,
state.invalid_receipts_fees.clone(),
state.invalid_receipts_fees,
))?;
}

Expand All @@ -158,7 +158,7 @@ impl Actor for SenderAllocation {

sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
allocation_id,
ReceiptFees::UpdateValue(state.unaggregated_fees.clone()),
ReceiptFees::UpdateValue(state.unaggregated_fees),
))?;

// update rav tracker for sender account
Expand Down Expand Up @@ -196,7 +196,12 @@ impl Actor for SenderAllocation {
}

while let Err(err) = state.mark_rav_last().await {
error!(error = %err, %state.allocation_id, %state.sender, "Error while marking allocation last. Retrying in 30 seconds...");
error!(
error = %err,
%state.allocation_id,
%state.sender,
"Error while marking allocation last. Retrying in 30 seconds..."
);
tokio::time::sleep(Duration::from_secs(30)).await;
}

Expand Down Expand Up @@ -224,7 +229,10 @@ impl Actor for SenderAllocation {
match message {
SenderAllocationMessage::NewReceipt(notification) => {
let NewReceiptNotification {
id, value: fees, ..
id,
value: fees,
timestamp_ns,
..
} = notification;
if id <= unaggregated_fees.last_id {
// our world assumption is wrong
Expand Down Expand Up @@ -255,15 +263,15 @@ impl Actor for SenderAllocation {
.sender_account_ref
.cast(SenderAccountMessage::UpdateReceiptFees(
state.allocation_id,
ReceiptFees::NewReceipt(fees),
ReceiptFees::NewReceipt(fees, timestamp_ns),
))?;
}
SenderAllocationMessage::TriggerRAVRequest => {
let rav_result = if state.unaggregated_fees.value > 0 {
state
.request_rav()
.await
.map(|_| (state.unaggregated_fees.clone(), state.latest_rav.clone()))
.map(|_| (state.unaggregated_fees, state.latest_rav.clone()))
} else {
Err(anyhow!("Unaggregated fee equals zero"))
};
Expand All @@ -278,7 +286,7 @@ impl Actor for SenderAllocation {
#[cfg(test)]
SenderAllocationMessage::GetUnaggregatedReceipts(reply) => {
if !reply.is_closed() {
let _ = reply.send(unaggregated_fees.clone());
let _ = reply.send(*unaggregated_fees);
}
}
}
Expand Down Expand Up @@ -769,7 +777,7 @@ impl SenderAllocationState {
self.sender_account_ref
.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
self.allocation_id,
self.invalid_receipts_fees.clone(),
self.invalid_receipts_fees,
))?;

Ok(())
Expand Down Expand Up @@ -1104,14 +1112,19 @@ pub mod tests {
)
.unwrap();

let timestamp_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;

cast!(
sender_allocation,
SenderAllocationMessage::NewReceipt(NewReceiptNotification {
id: 1,
value: 20,
allocation_id: *ALLOCATION_ID_0,
signer_address: SIGNER.1,
timestamp_ns: 0,
timestamp_ns,
})
)
.unwrap();
Expand All @@ -1121,7 +1134,7 @@ pub mod tests {
// should emit update aggregate fees message to sender account
let expected_message = SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::NewReceipt(20u128),
ReceiptFees::NewReceipt(20u128, timestamp_ns),
);
let startup_load_msg = message_receiver.recv().await.unwrap();
assert_eq!(
Expand Down
Loading
Loading