Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tap_core/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@
//! # #[tokio::main]
//! # async fn main() {
//! # use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner};
//! # use tap_graph::{Receipt, SignedReceipt, ReceiptAggregateVoucher};
//! # use tap_graph::{Receipt, SignedReceipt};
//! # use tap_core::signed_message::Eip712SignedMessage;
//! # let domain_separator = Eip712Domain::default();
//! # let wallet = PrivateKeySigner::random();
//! # let message = Receipt::new(Address::from([0x11u8; 20]), 100).unwrap();
//!
//! let receipt = Eip712SignedMessage::new(&domain_separator, message, &wallet).unwrap();
//!
//! let manager = Manager::<_, _, ReceiptAggregateVoucher>::new(domain_separator, MyContext, CheckList::empty());
//! let manager = Manager::new(domain_separator, MyContext, CheckList::empty());
//! manager.verify_and_store_receipt(&Context::new(), receipt).await.unwrap()
//! # }
//! ```
Expand Down
90 changes: 41 additions & 49 deletions tap_core/src/manager/tap_manager.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::marker::PhantomData;

use alloy::{dyn_abi::Eip712Domain, sol_types::SolStruct};
use tap_receipt::rav::Aggregate;

Expand All @@ -20,7 +18,7 @@ use crate::{
Error,
};

pub struct Manager<E, Rcpt, Rav> {
pub struct Manager<E, Rcpt> {
/// Context that implements adapters
context: E,

Expand All @@ -30,11 +28,9 @@ pub struct Manager<E, Rcpt, Rav> {
/// Struct responsible for doing checks for receipt. Ownership stays with manager allowing manager
/// to update configuration ( like minimum timestamp ).
domain_separator: Eip712Domain,

_phantom: PhantomData<Rav>,
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav> {
impl<E, Rcpt> Manager<E, Rcpt> {
/// Creates new manager with provided `adapters`, any receipts received by this manager
/// will complete all `required_checks` before being accepted or declined from RAV.
/// `starting_min_timestamp` will be used as min timestamp until the first RAV request is created.
Expand All @@ -48,27 +44,40 @@ impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav> {
context,
domain_separator,
checks: checks.into(),
_phantom: PhantomData,
}
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: RavStore<Rav> + SignatureChecker,
Rav: SolStruct + PartialEq<Rav> + Sync + std::fmt::Debug,
{
async fn get_previous_rav<Rav: SolStruct>(
&self,
) -> Result<Option<Eip712SignedMessage<Rav>>, Error>
where
E: RavRead<Rav>,
{
let previous_rav = self
.context
.last_rav()
.await
.map_err(|err| Error::AdapterError {
source_error: anyhow::Error::new(err),
})?;
Ok(previous_rav)
}

/// Verify `signed_rav` matches all values on `expected_rav`, and that `signed_rav` has a valid signer.
///
/// # Errors
///
/// Returns [`Error::AdapterError`] if there are any errors while storing RAV
///
pub async fn verify_and_store_rav(
pub async fn verify_and_store_rav<Rav>(
&self,
expected_rav: Rav,
signed_rav: Eip712SignedMessage<Rav>,
) -> std::result::Result<(), Error> {
) -> std::result::Result<(), Error>
where
E: RavStore<Rav> + SignatureChecker,
Rav: SolStruct + PartialEq<Rav> + Sync + std::fmt::Debug,
{
self.context
.check_signature(&signed_rav, &self.domain_separator)
.await?;
Expand All @@ -91,27 +100,10 @@ where
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: RavRead<Rav>,
Rav: SolStruct,
{
async fn get_previous_rav(&self) -> Result<Option<Eip712SignedMessage<Rav>>, Error> {
let previous_rav = self
.context
.last_rav()
.await
.map_err(|err| Error::AdapterError {
source_error: anyhow::Error::new(err),
})?;
Ok(previous_rav)
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: ReceiptRead<Rcpt> + SignatureChecker,
Rcpt: WithUniqueId + WithValueAndTimestamp + Sync,
E: ReceiptRead<Rcpt>,
Rcpt: WithUniqueId + WithValueAndTimestamp,
{
async fn collect_receipts(
&self,
Expand Down Expand Up @@ -168,14 +160,7 @@ where

Ok((checked_receipts, failed_receipts))
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: ReceiptRead<Rcpt> + RavRead<Rav> + SignatureChecker,
Rav: SolStruct + WithValueAndTimestamp + Clone + Aggregate<Rcpt>,
Rcpt: WithUniqueId + WithValueAndTimestamp + Sync,
{
/// Completes remaining checks on all receipts up to
/// (current time - `timestamp_buffer_ns`). Returns them in two lists
/// (valid receipts and invalid receipts) along with the expected RAV that
Expand All @@ -191,12 +176,16 @@ where
/// previous RAV is greater than the min timestamp. Caused by timestamp
/// buffer being too large, or requests coming too soon.
///
pub async fn create_rav_request(
pub async fn create_rav_request<Rav>(
&self,
ctx: &Context,
timestamp_buffer_ns: u64,
receipts_limit: Option<u64>,
) -> Result<RavRequest<Rcpt, Rav>, Error> {
) -> Result<RavRequest<Rcpt, Rav>, Error>
where
E: RavRead<Rav>,
Rav: SolStruct + WithValueAndTimestamp + Clone + Aggregate<Rcpt>,
{
let previous_rav = self.get_previous_rav().await?;
let min_timestamp_ns = previous_rav
.as_ref()
Expand All @@ -218,10 +207,9 @@ where
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: ReceiptDelete + RavRead<Rav>,
Rav: SolStruct + WithValueAndTimestamp,
E: ReceiptDelete,
{
/// Removes obsolete receipts from storage. Obsolete receipts are receipts
/// that are older than the last RAV, and therefore already aggregated into the RAV.
Expand All @@ -233,7 +221,11 @@ where
/// Returns [`Error::AdapterError`] if there are any errors while retrieving
/// last RAV or removing receipts
///
pub async fn remove_obsolete_receipts(&self) -> Result<(), Error> {
pub async fn remove_obsolete_receipts<Rav>(&self) -> Result<(), Error>
where
E: RavRead<Rav>,
Rav: SolStruct + WithValueAndTimestamp,
{
match self.get_previous_rav().await? {
Some(last_rav) => {
self.context
Expand All @@ -249,7 +241,7 @@ where
}
}

impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
impl<E, Rcpt> Manager<E, Rcpt>
where
E: ReceiptStore<Rcpt>,
{
Expand Down
3 changes: 1 addition & 2 deletions tap_core/tests/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ async fn manager_verify_and_store_varying_initial_checks(
signer,
..
} = context;
let manager =
Manager::<_, _, ReceiptAggregateVoucher>::new(domain_separator.clone(), context, checks);
let manager = Manager::new(domain_separator.clone(), context, checks);

let value = 20u128;
let signed_receipt = Eip712SignedMessage::new(
Expand Down
10 changes: 5 additions & 5 deletions tap_integration_tests/tests/indexer_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ pub trait Rpc {
/// threshold is a limit to which receipt_count can increment, after reaching which RAV request is triggered.
/// aggregator_client is an HTTP client used for making JSON-RPC requests to another server.
pub struct RpcManager<E> {
manager: Arc<Manager<E, SignedReceipt, ReceiptAggregateVoucher>>, // Manager object reference counted with an Arc
receipt_count: Arc<AtomicU64>, // Thread-safe atomic counter for receipts
threshold: u64, // The count at which a RAV request will be triggered
manager: Arc<Manager<E, SignedReceipt>>, // Manager object reference counted with an Arc
receipt_count: Arc<AtomicU64>, // Thread-safe atomic counter for receipts
threshold: u64, // The count at which a RAV request will be triggered
aggregator_client: (HttpClient, String), // HTTP client for sending requests to the aggregator server
}

Expand All @@ -66,7 +66,7 @@ where
aggregate_server_api_version: String,
) -> Result<Self> {
Ok(Self {
manager: Arc::new(Manager::<E, SignedReceipt, ReceiptAggregateVoucher>::new(
manager: Arc::new(Manager::<E, SignedReceipt>::new(
domain_separator,
context,
required_checks,
Expand Down Expand Up @@ -184,7 +184,7 @@ where

// request_rav function creates a request for aggregate receipts (RAV), sends it to another server and verifies the result.
async fn request_rav<E>(
manager: &Arc<Manager<E, SignedReceipt, ReceiptAggregateVoucher>>,
manager: &Arc<Manager<E, SignedReceipt>>,
time_stamp_buffer: u64, // Buffer for timestamping, see tap_core for details
aggregator_client: &(HttpClient, String), // HttpClient for making requests to the tap_aggregator server
threshold: usize,
Expand Down
4 changes: 4 additions & 0 deletions tap_receipt/src/rav.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ pub enum AggregationError {
/// Error when no valid receipts are found for a RAV request
#[error("Failed to produce rav request, no valid receipts")]
NoValidReceiptsForRavRequest,

/// Other user-defined error
#[error(transparent)]
Other(anyhow::Error),
}