Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions tap_core/src/manager/adapters/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_trait::async_trait;

use crate::receipt::{
state::{Checking, ReceiptState},
ReceiptWithState,
ReceiptWithState, WithValueAndTimestamp,
};

/// Stores receipts in the storage.
Expand All @@ -16,7 +16,7 @@ use crate::receipt::{
///
/// For example code see [crate::manager::context::memory::ReceiptStorage]
#[async_trait]
pub trait ReceiptStore {
pub trait ReceiptStore<Rcpt> {
/// Defines the user-specified error type.
///
/// This error type should implement the `Error` and `Debug` traits from the standard library.
Expand All @@ -29,7 +29,7 @@ pub trait ReceiptStore {
/// this process should be captured and returned as an `AdapterError`.
async fn store_receipt(
&self,
receipt: ReceiptWithState<Checking>,
receipt: ReceiptWithState<Checking, Rcpt>,
) -> Result<u64, Self::AdapterError>;
}

Expand Down Expand Up @@ -62,7 +62,7 @@ pub trait ReceiptDelete {
///
/// For example code see [crate::manager::context::memory::ReceiptStorage]
#[async_trait]
pub trait ReceiptRead {
pub trait ReceiptRead<Rcpt> {
/// Defines the user-specified error type.
///
/// This error type should implement the `Error` and `Debug` traits from
Expand Down Expand Up @@ -92,15 +92,15 @@ pub trait ReceiptRead {
&self,
timestamp_range_ns: R,
limit: Option<u64>,
) -> Result<Vec<ReceiptWithState<Checking>>, Self::AdapterError>;
) -> Result<Vec<ReceiptWithState<Checking, Rcpt>>, Self::AdapterError>;
}

/// See [`ReceiptRead::retrieve_receipts_in_timestamp_range()`] for details.
///
/// WARNING: Will sort the receipts by timestamp using
/// [vec::sort_unstable](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.sort_unstable).
pub fn safe_truncate_receipts<T: ReceiptState>(
receipts: &mut Vec<ReceiptWithState<T>>,
pub fn safe_truncate_receipts<T: ReceiptState, Rcpt: WithValueAndTimestamp>(
receipts: &mut Vec<ReceiptWithState<T, Rcpt>>,
limit: u64,
) {
if receipts.len() <= limit as usize {
Expand All @@ -110,27 +110,19 @@ pub fn safe_truncate_receipts<T: ReceiptState>(
return;
}

receipts.sort_unstable_by_key(|rx_receipt| rx_receipt.signed_receipt().message.timestamp_ns);
receipts.sort_unstable_by_key(|rx_receipt| rx_receipt.signed_receipt().timestamp_ns());

// This one will be the last timestamp in `receipts` after naive truncation
let last_timestamp = receipts[limit as usize - 1]
.signed_receipt()
.message
.timestamp_ns;
let last_timestamp = receipts[limit as usize - 1].signed_receipt().timestamp_ns();
// This one is the timestamp that comes just after the one above
let after_last_timestamp = receipts[limit as usize]
.signed_receipt()
.message
.timestamp_ns;
let after_last_timestamp = receipts[limit as usize].signed_receipt().timestamp_ns();

receipts.truncate(limit as usize);

if last_timestamp == after_last_timestamp {
// If the last timestamp is the same as the one that came after it, we need to
// remove all the receipts with the same timestamp as the last one, because
// otherwise we would leave behind part of the receipts for that timestamp.
receipts.retain(|rx_receipt| {
rx_receipt.signed_receipt().message.timestamp_ns != last_timestamp
});
receipts.retain(|rx_receipt| rx_receipt.signed_receipt().timestamp_ns() != last_timestamp);
}
}
40 changes: 24 additions & 16 deletions tap_core/src/manager/context/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use async_trait::async_trait;
use crate::{
manager::adapters::*,
rav::SignedRAV,
receipt::{checks::StatefulTimestampCheck, state::Checking, ReceiptWithState},
receipt::{checks::StatefulTimestampCheck, state::Checking, ReceiptWithState, SignedReceipt},
signed_message::MessageId,
};

pub type EscrowStorage = Arc<RwLock<HashMap<Address, u128>>>;
pub type QueryAppraisals = Arc<RwLock<HashMap<MessageId, u128>>>;
pub type ReceiptStorage = Arc<RwLock<HashMap<u64, ReceiptWithState<Checking>>>>;
pub type ReceiptStorage = Arc<RwLock<HashMap<u64, ReceiptWithState<Checking, SignedReceipt>>>>;
pub type RAVStorage = Arc<RwLock<Option<SignedRAV>>>;

use thiserror::Error;
Expand Down Expand Up @@ -71,7 +71,7 @@ impl InMemoryContext {
pub async fn retrieve_receipt_by_id(
&self,
receipt_id: u64,
) -> Result<ReceiptWithState<Checking>, InMemoryError> {
) -> Result<ReceiptWithState<Checking, SignedReceipt>, InMemoryError> {
let receipt_storage = self.receipt_storage.read().unwrap();

receipt_storage
Expand All @@ -85,7 +85,7 @@ impl InMemoryContext {
pub async fn retrieve_receipts_by_timestamp(
&self,
timestamp_ns: u64,
) -> Result<Vec<(u64, ReceiptWithState<Checking>)>, InMemoryError> {
) -> Result<Vec<(u64, ReceiptWithState<Checking, SignedReceipt>)>, InMemoryError> {
let receipt_storage = self.receipt_storage.read().unwrap();
Ok(receipt_storage
.iter()
Expand All @@ -99,7 +99,7 @@ impl InMemoryContext {
pub async fn retrieve_receipts_upto_timestamp(
&self,
timestamp_ns: u64,
) -> Result<Vec<ReceiptWithState<Checking>>, InMemoryError> {
) -> Result<Vec<ReceiptWithState<Checking, SignedReceipt>>, InMemoryError> {
self.retrieve_receipts_in_timestamp_range(..=timestamp_ns, None)
.await
}
Expand Down Expand Up @@ -147,12 +147,12 @@ impl RAVRead for InMemoryContext {
}

#[async_trait]
impl ReceiptStore for InMemoryContext {
impl ReceiptStore<SignedReceipt> for InMemoryContext {
type AdapterError = InMemoryError;

async fn store_receipt(
&self,
receipt: ReceiptWithState<Checking>,
receipt: ReceiptWithState<Checking, SignedReceipt>,
) -> Result<u64, Self::AdapterError> {
let mut id_pointer = self.unique_id.write().unwrap();
let id_previous = *id_pointer;
Expand All @@ -179,15 +179,15 @@ impl ReceiptDelete for InMemoryContext {
}
}
#[async_trait]
impl ReceiptRead for InMemoryContext {
impl ReceiptRead<SignedReceipt> for InMemoryContext {
type AdapterError = InMemoryError;
async fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64> + std::marker::Send>(
&self,
timestamp_range_ns: R,
limit: Option<u64>,
) -> Result<Vec<ReceiptWithState<Checking>>, Self::AdapterError> {
) -> Result<Vec<ReceiptWithState<Checking, SignedReceipt>>, Self::AdapterError> {
let receipt_storage = self.receipt_storage.read().unwrap();
let mut receipts_in_range: Vec<ReceiptWithState<Checking>> = receipt_storage
let mut receipts_in_range: Vec<ReceiptWithState<Checking, SignedReceipt>> = receipt_storage
.iter()
.filter(|(_, rx_receipt)| {
timestamp_range_ns.contains(&rx_receipt.signed_receipt().message.timestamp_ns)
Expand Down Expand Up @@ -264,7 +264,7 @@ pub mod checks {
receipt::{
checks::{Check, CheckError, CheckResult, ReceiptCheck},
state::Checking,
Context, ReceiptError, ReceiptWithState,
Context, ReceiptError, ReceiptWithState, SignedReceipt,
},
signed_message::MessageId,
};
Expand All @@ -274,7 +274,7 @@ pub mod checks {
valid_signers: HashSet<Address>,
allocation_ids: Arc<RwLock<HashSet<Address>>>,
_query_appraisals: Arc<RwLock<HashMap<MessageId, u128>>>,
) -> Vec<ReceiptCheck> {
) -> Vec<ReceiptCheck<SignedReceipt>> {
vec![
// Arc::new(UniqueCheck ),
// Arc::new(ValueCheck { query_appraisals }),
Expand All @@ -291,8 +291,12 @@ pub mod checks {
}

#[async_trait::async_trait]
impl Check for AllocationIdCheck {
async fn check(&self, _: &Context, receipt: &ReceiptWithState<Checking>) -> CheckResult {
impl Check<SignedReceipt> for AllocationIdCheck {
async fn check(
&self,
_: &Context,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let received_allocation_id = receipt.signed_receipt().message.allocation_id;
if self
.allocation_ids
Expand All @@ -318,8 +322,12 @@ pub mod checks {
}

#[async_trait::async_trait]
impl Check for SignatureCheck {
async fn check(&self, _: &Context, receipt: &ReceiptWithState<Checking>) -> CheckResult {
impl Check<SignedReceipt> for SignatureCheck {
async fn check(
&self,
_: &Context,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let recovered_address = receipt
.signed_receipt()
.recover_signer(&self.domain_separator)
Expand Down
4 changes: 2 additions & 2 deletions tap_core/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
//! struct MyContext;
//!
//! #[async_trait]
//! impl ReceiptStore for MyContext {
//! impl<T: Send + 'static> ReceiptStore<T> for MyContext {
//! type AdapterError = ReceiptError;
//!
//! async fn store_receipt(&self, receipt: ReceiptWithState<Checking>) -> Result<u64, Self::AdapterError> {
//! async fn store_receipt(&self, receipt: ReceiptWithState<Checking, T>) -> Result<u64, Self::AdapterError> {
//! // ...
//! # Ok(0)
//! }
Expand Down
46 changes: 27 additions & 19 deletions tap_core/src/manager/tap_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,34 @@ use crate::{
receipt::{
checks::{CheckBatch, CheckList, TimestampCheck, UniqueCheck},
state::{Checked, Failed},
Context, ReceiptError, ReceiptWithState, SignedReceipt,
Context, ReceiptError, ReceiptWithState, SignedReceipt, WithUniqueId,
WithValueAndTimestamp,
},
Error,
};

pub struct Manager<E> {
pub struct Manager<E, Rcpt> {
/// Context that implements adapters
context: E,

/// Checks that must be completed for each receipt before being confirmed or denied for rav request
checks: CheckList,
checks: CheckList<Rcpt>,

/// Struct responsible for doing checks for receipt. Ownership stays with manager allowing manager
/// to update configuration ( like minimum timestamp ).
domain_separator: Eip712Domain,
}

impl<E> Manager<E> {
impl<E, Rcpt> Manager<E, Rcpt> {
/// Creates new manager with provided `adapters`, any receipts received by this manager
/// will complete all `required_checks` before being accepted or declined from RAV.
/// `starting_min_timestamp` will be used as min timestamp until the first RAV request is created.
///
pub fn new(domain_separator: Eip712Domain, context: E, checks: impl Into<CheckList>) -> Self {
pub fn new(
domain_separator: Eip712Domain,
context: E,
checks: impl Into<CheckList<Rcpt>>,
) -> Self {
Self {
context,
domain_separator,
Expand All @@ -42,7 +47,7 @@ impl<E> Manager<E> {
}
}

impl<E> Manager<E>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: RAVStore + SignatureChecker,
{
Expand Down Expand Up @@ -79,7 +84,7 @@ where
}
}

impl<E> Manager<E>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: RAVRead,
{
Expand All @@ -95,9 +100,10 @@ where
}
}

impl<E> Manager<E>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: ReceiptRead + SignatureChecker,
E: ReceiptRead<Rcpt> + SignatureChecker,
Rcpt: WithUniqueId + WithValueAndTimestamp + Sync,
{
async fn collect_receipts(
&self,
Expand All @@ -107,8 +113,8 @@ where
limit: Option<u64>,
) -> Result<
(
Vec<ReceiptWithState<Checked>>,
Vec<ReceiptWithState<Failed>>,
Vec<ReceiptWithState<Checked, Rcpt>>,
Vec<ReceiptWithState<Failed, Rcpt>>,
),
Error,
> {
Expand Down Expand Up @@ -156,9 +162,11 @@ where
}
}

impl<E> Manager<E>
// TODO make create_rav_request generic over receipt
impl<E> Manager<E, SignedReceipt>
where
E: ReceiptRead + RAVRead + SignatureChecker,
E: ReceiptRead<SignedReceipt> + RAVRead + SignatureChecker,
//Rcpt: WithUniqueId + WithValueAndTimestamp + Sync,
{
/// Completes remaining checks on all receipts up to
/// (current time - `timestamp_buffer_ns`). Returns them in two lists
Expand All @@ -180,7 +188,7 @@ where
ctx: &Context,
timestamp_buffer_ns: u64,
receipts_limit: Option<u64>,
) -> Result<RAVRequest, Error> {
) -> Result<RAVRequest<SignedReceipt>, Error> {
let previous_rav = self.get_previous_rav().await?;
let min_timestamp_ns = previous_rav
.as_ref()
Expand All @@ -202,7 +210,7 @@ where
}

fn generate_expected_rav(
receipts: &[ReceiptWithState<Checked>],
receipts: &[ReceiptWithState<Checked, SignedReceipt>],
previous_rav: Option<SignedRAV>,
) -> Result<ReceiptAggregateVoucher, Error> {
if receipts.is_empty() {
Expand All @@ -221,7 +229,7 @@ where
}
}

impl<E> Manager<E>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: ReceiptDelete + RAVRead,
{
Expand Down Expand Up @@ -251,9 +259,9 @@ where
}
}

impl<E> Manager<E>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: ReceiptStore,
E: ReceiptStore<Rcpt>,
{
/// Runs `initial_checks` on `signed_receipt` for initial verification,
/// then stores received receipt.
Expand All @@ -266,7 +274,7 @@ where
pub async fn verify_and_store_receipt(
&self,
ctx: &Context,
signed_receipt: SignedReceipt,
signed_receipt: Rcpt,
) -> std::result::Result<(), Error> {
let mut received_receipt = ReceiptWithState::new(signed_receipt);

Expand Down
6 changes: 3 additions & 3 deletions tap_core/src/rav/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ use crate::{

/// Request to `tap_aggregator` to aggregate receipts into a Signed RAV.
#[derive(Debug)]
pub struct RAVRequest {
pub struct RAVRequest<Rcpt> {
/// List of checked and reserved receipts to aggregate
pub valid_receipts: Vec<ReceiptWithState<Checked>>,
pub valid_receipts: Vec<ReceiptWithState<Checked, Rcpt>>,
/// Optional previous RAV to aggregate with
pub previous_rav: Option<SignedRAV>,
/// List of failed receipt used to log invalid receipts
pub invalid_receipts: Vec<ReceiptWithState<Failed>>,
pub invalid_receipts: Vec<ReceiptWithState<Failed, Rcpt>>,
/// Expected RAV to be created
pub expected_rav: Result<ReceiptAggregateVoucher, Error>,
}
Loading
Loading