Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tap_aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn check_and_aggregate_receipts(
// Get the allocation id from the first receipt, return error if there are no receipts
let allocation_id = match receipts.first() {
Some(receipt) => receipt.message.allocation_id,
None => return Err(tap_core::Error::NoValidReceiptsForRAVRequest.into()),
None => return Err(tap_core::Error::NoValidReceiptsForRavRequest.into()),
};

// Check that the receipts all have the same allocation id
Expand Down
2 changes: 1 addition & 1 deletion tap_core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub enum Error {
AdapterError { source_error: anyhow::Error },
/// Error when no valid receipts are found for a RAV request
#[error("Failed to produce rav request, no valid receipts")]
NoValidReceiptsForRAVRequest,
NoValidReceiptsForRavRequest,

/// Error when the previous RAV allocation id does not match the allocation id from the new receipt
#[error("Previous RAV allocation id ({prev_id}) doesn't match the allocation id from the new receipt ({new_id}).")]
Expand Down
30 changes: 11 additions & 19 deletions tap_core/src/manager/adapters/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;

use crate::receipt::{
state::{Checking, ReceiptState},
ReceiptWithState,
ReceiptWithState, WithValueAndTimestamp,
};

/// Stores receipts in the storage.
Expand All @@ -16,7 +16,7 @@ use crate::receipt::{
///
/// For example code see [crate::manager::context::memory::ReceiptStorage]
#[async_trait]
pub trait ReceiptStore {
pub trait ReceiptStore<Rcpt> {
/// Defines the user-specified error type.
///
/// This error type should implement the `Error` and `Debug` traits from the standard library.
Expand All @@ -29,7 +29,7 @@ pub trait ReceiptStore {
/// this process should be captured and returned as an `AdapterError`.
async fn store_receipt(
&self,
receipt: ReceiptWithState<Checking>,
receipt: ReceiptWithState<Checking, Rcpt>,
) -> Result<u64, Self::AdapterError>;
}

Expand Down Expand Up @@ -62,7 +62,7 @@ pub trait ReceiptDelete {
///
/// For example code see [crate::manager::context::memory::ReceiptStorage]
#[async_trait]
pub trait ReceiptRead {
pub trait ReceiptRead<Rcpt> {
/// Defines the user-specified error type.
///
/// This error type should implement the `Error` and `Debug` traits from
Expand Down Expand Up @@ -92,15 +92,15 @@ pub trait ReceiptRead {
&self,
timestamp_range_ns: R,
limit: Option<u64>,
) -> Result<Vec<ReceiptWithState<Checking>>, Self::AdapterError>;
) -> Result<Vec<ReceiptWithState<Checking, Rcpt>>, Self::AdapterError>;
}

/// See [`ReceiptRead::retrieve_receipts_in_timestamp_range()`] for details.
///
/// WARNING: Will sort the receipts by timestamp using
/// [vec::sort_unstable](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.sort_unstable).
pub fn safe_truncate_receipts<T: ReceiptState>(
receipts: &mut Vec<ReceiptWithState<T>>,
pub fn safe_truncate_receipts<T: ReceiptState, Rcpt: WithValueAndTimestamp>(
receipts: &mut Vec<ReceiptWithState<T, Rcpt>>,
limit: u64,
) {
if receipts.len() <= limit as usize {
Expand All @@ -110,27 +110,19 @@ pub fn safe_truncate_receipts<T: ReceiptState>(
return;
}

receipts.sort_unstable_by_key(|rx_receipt| rx_receipt.signed_receipt().message.timestamp_ns);
receipts.sort_unstable_by_key(|rx_receipt| rx_receipt.signed_receipt().timestamp_ns());

// This one will be the last timestamp in `receipts` after naive truncation
let last_timestamp = receipts[limit as usize - 1]
.signed_receipt()
.message
.timestamp_ns;
let last_timestamp = receipts[limit as usize - 1].signed_receipt().timestamp_ns();
// This one is the timestamp that comes just after the one above
let after_last_timestamp = receipts[limit as usize]
.signed_receipt()
.message
.timestamp_ns;
let after_last_timestamp = receipts[limit as usize].signed_receipt().timestamp_ns();

receipts.truncate(limit as usize);

if last_timestamp == after_last_timestamp {
// If the last timestamp is the same as the one that came after it, we need to
// remove all the receipts with the same timestamp as the last one, because
// otherwise we would leave behind part of the receipts for that timestamp.
receipts.retain(|rx_receipt| {
rx_receipt.signed_receipt().message.timestamp_ns != last_timestamp
});
receipts.retain(|rx_receipt| rx_receipt.signed_receipt().timestamp_ns() != last_timestamp);
}
}
40 changes: 24 additions & 16 deletions tap_core/src/manager/context/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use async_trait::async_trait;
use crate::{
manager::adapters::*,
rav::SignedRAV,
receipt::{checks::StatefulTimestampCheck, state::Checking, ReceiptWithState},
receipt::{checks::StatefulTimestampCheck, state::Checking, ReceiptWithState, SignedReceipt},
signed_message::MessageId,
};

pub type EscrowStorage = Arc<RwLock<HashMap<Address, u128>>>;
pub type QueryAppraisals = Arc<RwLock<HashMap<MessageId, u128>>>;
pub type ReceiptStorage = Arc<RwLock<HashMap<u64, ReceiptWithState<Checking>>>>;
pub type ReceiptStorage = Arc<RwLock<HashMap<u64, ReceiptWithState<Checking, SignedReceipt>>>>;
pub type RAVStorage = Arc<RwLock<Option<SignedRAV>>>;

use thiserror::Error;
Expand Down Expand Up @@ -71,7 +71,7 @@ impl InMemoryContext {
pub async fn retrieve_receipt_by_id(
&self,
receipt_id: u64,
) -> Result<ReceiptWithState<Checking>, InMemoryError> {
) -> Result<ReceiptWithState<Checking, SignedReceipt>, InMemoryError> {
let receipt_storage = self.receipt_storage.read().unwrap();

receipt_storage
Expand All @@ -85,7 +85,7 @@ impl InMemoryContext {
pub async fn retrieve_receipts_by_timestamp(
&self,
timestamp_ns: u64,
) -> Result<Vec<(u64, ReceiptWithState<Checking>)>, InMemoryError> {
) -> Result<Vec<(u64, ReceiptWithState<Checking, SignedReceipt>)>, InMemoryError> {
let receipt_storage = self.receipt_storage.read().unwrap();
Ok(receipt_storage
.iter()
Expand All @@ -99,7 +99,7 @@ impl InMemoryContext {
pub async fn retrieve_receipts_upto_timestamp(
&self,
timestamp_ns: u64,
) -> Result<Vec<ReceiptWithState<Checking>>, InMemoryError> {
) -> Result<Vec<ReceiptWithState<Checking, SignedReceipt>>, InMemoryError> {
self.retrieve_receipts_in_timestamp_range(..=timestamp_ns, None)
.await
}
Expand Down Expand Up @@ -147,12 +147,12 @@ impl RAVRead for InMemoryContext {
}

#[async_trait]
impl ReceiptStore for InMemoryContext {
impl ReceiptStore<SignedReceipt> for InMemoryContext {
type AdapterError = InMemoryError;

async fn store_receipt(
&self,
receipt: ReceiptWithState<Checking>,
receipt: ReceiptWithState<Checking, SignedReceipt>,
) -> Result<u64, Self::AdapterError> {
let mut id_pointer = self.unique_id.write().unwrap();
let id_previous = *id_pointer;
Expand All @@ -179,15 +179,15 @@ impl ReceiptDelete for InMemoryContext {
}
}
#[async_trait]
impl ReceiptRead for InMemoryContext {
impl ReceiptRead<SignedReceipt> for InMemoryContext {
type AdapterError = InMemoryError;
async fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64> + std::marker::Send>(
&self,
timestamp_range_ns: R,
limit: Option<u64>,
) -> Result<Vec<ReceiptWithState<Checking>>, Self::AdapterError> {
) -> Result<Vec<ReceiptWithState<Checking, SignedReceipt>>, Self::AdapterError> {
let receipt_storage = self.receipt_storage.read().unwrap();
let mut receipts_in_range: Vec<ReceiptWithState<Checking>> = receipt_storage
let mut receipts_in_range: Vec<ReceiptWithState<Checking, SignedReceipt>> = receipt_storage
.iter()
.filter(|(_, rx_receipt)| {
timestamp_range_ns.contains(&rx_receipt.signed_receipt().message.timestamp_ns)
Expand Down Expand Up @@ -264,7 +264,7 @@ pub mod checks {
receipt::{
checks::{Check, CheckError, CheckResult, ReceiptCheck},
state::Checking,
Context, ReceiptError, ReceiptWithState,
Context, ReceiptError, ReceiptWithState, SignedReceipt,
},
signed_message::MessageId,
};
Expand All @@ -274,7 +274,7 @@ pub mod checks {
valid_signers: HashSet<Address>,
allocation_ids: Arc<RwLock<HashSet<Address>>>,
_query_appraisals: Arc<RwLock<HashMap<MessageId, u128>>>,
) -> Vec<ReceiptCheck> {
) -> Vec<ReceiptCheck<SignedReceipt>> {
vec![
// Arc::new(UniqueCheck ),
// Arc::new(ValueCheck { query_appraisals }),
Expand All @@ -291,8 +291,12 @@ pub mod checks {
}

#[async_trait::async_trait]
impl Check for AllocationIdCheck {
async fn check(&self, _: &Context, receipt: &ReceiptWithState<Checking>) -> CheckResult {
impl Check<SignedReceipt> for AllocationIdCheck {
async fn check(
&self,
_: &Context,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let received_allocation_id = receipt.signed_receipt().message.allocation_id;
if self
.allocation_ids
Expand All @@ -318,8 +322,12 @@ pub mod checks {
}

#[async_trait::async_trait]
impl Check for SignatureCheck {
async fn check(&self, _: &Context, receipt: &ReceiptWithState<Checking>) -> CheckResult {
impl Check<SignedReceipt> for SignatureCheck {
async fn check(
&self,
_: &Context,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let recovered_address = receipt
.signed_receipt()
.recover_signer(&self.domain_separator)
Expand Down
4 changes: 2 additions & 2 deletions tap_core/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
//! struct MyContext;
//!
//! #[async_trait]
//! impl ReceiptStore for MyContext {
//! impl<T: Send + 'static> ReceiptStore<T> for MyContext {
//! type AdapterError = ReceiptError;
//!
//! async fn store_receipt(&self, receipt: ReceiptWithState<Checking>) -> Result<u64, Self::AdapterError> {
//! async fn store_receipt(&self, receipt: ReceiptWithState<Checking, T>) -> Result<u64, Self::AdapterError> {
//! // ...
//! # Ok(0)
//! }
Expand Down
Loading
Loading