Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tap_aggregator/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl From<u128> for Uint128 {
impl RavRequest {
pub fn new(
receipts: Vec<tap_core::receipt::SignedReceipt>,
previous_rav: Option<tap_core::rav::SignedRAV>,
previous_rav: Option<tap_core::rav::SignedRav>,
) -> Self {
Self {
receipts: receipts.into_iter().map(Into::into).collect(),
Expand All @@ -124,8 +124,8 @@ impl RavRequest {
}

impl RavResponse {
pub fn signed_rav(mut self) -> anyhow::Result<tap_core::rav::SignedRAV> {
let signed_rav: tap_core::rav::SignedRAV = self
pub fn signed_rav(mut self) -> anyhow::Result<tap_core::rav::SignedRav> {
let signed_rav: tap_core::rav::SignedRav = self
.rav
.take()
.ok_or(anyhow!("Couldn't find rav"))?
Expand Down
4 changes: 2 additions & 2 deletions tap_aggregator/tests/aggregate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ async fn aggregation_test() {

let rav_request = RavRequest::new(receipts.clone(), None);
let res = client.aggregate_receipts(rav_request).await.unwrap();
let signed_rav: tap_core::rav::SignedRAV = res.into_inner().signed_rav().unwrap();
let signed_rav: tap_core::rav::SignedRav = res.into_inner().signed_rav().unwrap();

let sender_aggregator = HttpClientBuilder::default().build(&endpoint).unwrap();

let previous_rav: Option<tap_core::rav::SignedRAV> = None;
let previous_rav: Option<tap_core::rav::SignedRav> = None;

let response: JsonRpcResponse<EIP712SignedMessage<ReceiptAggregateVoucher>> = sender_aggregator
.request(
Expand Down
10 changes: 5 additions & 5 deletions tap_core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::result::Result as StdResult;
use alloy::primitives::{Address, SignatureError};
use thiserror::Error as ThisError;

use crate::{rav::ReceiptAggregateVoucher, receipt::ReceiptError};
use crate::receipt::ReceiptError;

/// Error type for the TAP protocol
#[derive(ThisError, Debug)]
Expand All @@ -36,10 +36,10 @@ pub enum Error {
},

/// Error when the received RAV does not match the expected RAV
#[error("Received RAV does not match expexted RAV")]
InvalidReceivedRAV {
received_rav: ReceiptAggregateVoucher,
expected_rav: ReceiptAggregateVoucher,
#[error("Received RAV does not match expected RAV")]
InvalidReceivedRav {
received_rav: String,
expected_rav: String,
},
/// Generic error from the adapter
#[error("Error from adapter.\n Caused by: {source_error}")]
Expand Down
11 changes: 6 additions & 5 deletions tap_core/src/manager/adapters/rav.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use alloy::sol_types::SolStruct;
use async_trait::async_trait;

use crate::rav::SignedRAV;
use crate::signed_message::EIP712SignedMessage;

/// Stores the latest RAV in the storage.
///
Expand All @@ -12,7 +13,7 @@ use crate::rav::SignedRAV;
/// For example code see [crate::manager::context::memory::RAVStorage]

#[async_trait]
pub trait RAVStore {
pub trait RavStore<T: SolStruct> {
/// Defines the user-specified error type.
///
/// This error type should implement the `Error` and `Debug` traits from
Expand All @@ -25,7 +26,7 @@ pub trait RAVStore {
/// This method should be implemented to store the most recent validated
/// `SignedRAV` into your chosen storage system. Any errors that occur
/// during this process should be captured and returned as an `AdapterError`.
async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError>;
async fn update_last_rav(&self, rav: EIP712SignedMessage<T>) -> Result<(), Self::AdapterError>;
}

/// Reads the RAV from storage
Expand All @@ -35,7 +36,7 @@ pub trait RAVStore {
/// For example code see [crate::manager::context::memory::RAVStorage]

#[async_trait]
pub trait RAVRead {
pub trait RavRead<T: SolStruct> {
/// Defines the user-specified error type.
///
/// This error type should implement the `Error` and `Debug` traits from
Expand All @@ -46,5 +47,5 @@ pub trait RAVRead {
/// Retrieves the latest `SignedRAV` from the storage.
///
/// If no `SignedRAV` is available, this method should return `None`.
async fn last_rav(&self) -> Result<Option<SignedRAV>, Self::AdapterError>;
async fn last_rav(&self) -> Result<Option<EIP712SignedMessage<T>>, Self::AdapterError>;
}
12 changes: 6 additions & 6 deletions tap_core/src/manager/context/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ use async_trait::async_trait;

use crate::{
manager::adapters::*,
rav::SignedRAV,
rav::{ReceiptAggregateVoucher, SignedRav},
receipt::{checks::StatefulTimestampCheck, state::Checking, ReceiptWithState, SignedReceipt},
signed_message::MessageId,
};

pub type EscrowStorage = Arc<RwLock<HashMap<Address, u128>>>;
pub type QueryAppraisals = Arc<RwLock<HashMap<MessageId, u128>>>;
pub type ReceiptStorage = Arc<RwLock<HashMap<u64, ReceiptWithState<Checking, SignedReceipt>>>>;
pub type RAVStorage = Arc<RwLock<Option<SignedRAV>>>;
pub type RAVStorage = Arc<RwLock<Option<SignedRav>>>;

use thiserror::Error;

Expand Down Expand Up @@ -125,10 +125,10 @@ impl InMemoryContext {
}

#[async_trait]
impl RAVStore for InMemoryContext {
impl RavStore<ReceiptAggregateVoucher> for InMemoryContext {
type AdapterError = InMemoryError;

async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError> {
async fn update_last_rav(&self, rav: SignedRav) -> Result<(), Self::AdapterError> {
let mut rav_storage = self.rav_storage.write().unwrap();
let timestamp = rav.message.timestampNs;
*rav_storage = Some(rav);
Expand All @@ -138,10 +138,10 @@ impl RAVStore for InMemoryContext {
}

#[async_trait]
impl RAVRead for InMemoryContext {
impl RavRead<ReceiptAggregateVoucher> for InMemoryContext {
type AdapterError = InMemoryError;

async fn last_rav(&self) -> Result<Option<SignedRAV>, Self::AdapterError> {
async fn last_rav(&self) -> Result<Option<SignedRav>, Self::AdapterError> {
Ok(self.rav_storage.read().unwrap().clone())
}
}
Expand Down
3 changes: 2 additions & 1 deletion tap_core/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
//! ReceiptError,
//! Context
//! },
//! rav::ReceiptAggregateVoucher,
//! manager::{
//! Manager,
//! adapters::ReceiptStore
Expand Down Expand Up @@ -70,7 +71,7 @@
//!
//! let receipt = EIP712SignedMessage::new(&domain_separator, message, &wallet).unwrap();
//!
//! let manager = Manager::new(domain_separator, MyContext, CheckList::empty());
//! let manager = Manager::<_, _, ReceiptAggregateVoucher>::new(domain_separator, MyContext, CheckList::empty());
//! manager.verify_and_store_receipt(&Context::new(), receipt).await.unwrap()
//! # }
//! ```
Expand Down
85 changes: 37 additions & 48 deletions tap_core/src/manager/tap_manager.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use alloy::dyn_abi::Eip712Domain;
use std::marker::PhantomData;

use alloy::{dyn_abi::Eip712Domain, sol_types::SolStruct};

use super::adapters::{
RAVRead, RAVStore, ReceiptDelete, ReceiptRead, ReceiptStore, SignatureChecker,
RavRead, RavStore, ReceiptDelete, ReceiptRead, ReceiptStore, SignatureChecker,
};
use crate::{
rav::{RavRequest, ReceiptAggregateVoucher, SignedRAV},
rav::{Aggregate, RavRequest},
receipt::{
checks::{CheckBatch, CheckList, TimestampCheck, UniqueCheck},
state::{Checked, Failed},
Context, ReceiptError, ReceiptWithState, SignedReceipt, WithUniqueId,
WithValueAndTimestamp,
Context, ReceiptError, ReceiptWithState, WithUniqueId, WithValueAndTimestamp,
},
signed_message::EIP712SignedMessage,
Error,
};

pub struct Manager<E, Rcpt> {
pub struct Manager<E, Rcpt, Rav> {
/// Context that implements adapters
context: E,

Expand All @@ -27,9 +29,11 @@ pub struct Manager<E, Rcpt> {
/// Struct responsible for doing checks for receipt. Ownership stays with manager allowing manager
/// to update configuration ( like minimum timestamp ).
domain_separator: Eip712Domain,

_phantom: PhantomData<Rav>,
}

impl<E, Rcpt> Manager<E, Rcpt> {
impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav> {
/// Creates new manager with provided `adapters`, any receipts received by this manager
/// will complete all `required_checks` before being accepted or declined from RAV.
/// `starting_min_timestamp` will be used as min timestamp until the first RAV request is created.
Expand All @@ -43,13 +47,15 @@ impl<E, Rcpt> Manager<E, Rcpt> {
context,
domain_separator,
checks: checks.into(),
_phantom: PhantomData,
}
}
}

impl<E, Rcpt> Manager<E, Rcpt>
impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: RAVStore + SignatureChecker,
E: RavStore<Rav> + SignatureChecker,
Rav: SolStruct + PartialEq<Rav> + Sync + std::fmt::Debug,
{
/// Verify `signed_rav` matches all values on `expected_rav`, and that `signed_rav` has a valid signer.
///
Expand All @@ -59,17 +65,17 @@ where
///
pub async fn verify_and_store_rav(
&self,
expected_rav: ReceiptAggregateVoucher,
signed_rav: SignedRAV,
expected_rav: Rav,
signed_rav: EIP712SignedMessage<Rav>,
) -> std::result::Result<(), Error> {
self.context
.check_signature(&signed_rav, &self.domain_separator)
.await?;

if signed_rav.message != expected_rav {
return Err(Error::InvalidReceivedRAV {
received_rav: signed_rav.message,
expected_rav,
return Err(Error::InvalidReceivedRav {
received_rav: format!("{:?}", signed_rav.message),
expected_rav: format!("{:?}", expected_rav),
});
}

Expand All @@ -84,11 +90,12 @@ where
}
}

impl<E, Rcpt> Manager<E, Rcpt>
impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: RAVRead,
E: RavRead<Rav>,
Rav: SolStruct,
{
async fn get_previous_rav(&self) -> Result<Option<SignedRAV>, Error> {
async fn get_previous_rav(&self) -> Result<Option<EIP712SignedMessage<Rav>>, Error> {
let previous_rav = self
.context
.last_rav()
Expand All @@ -100,7 +107,7 @@ where
}
}

impl<E, Rcpt> Manager<E, Rcpt>
impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: ReceiptRead<Rcpt> + SignatureChecker,
Rcpt: WithUniqueId + WithValueAndTimestamp + Sync,
Expand Down Expand Up @@ -162,11 +169,11 @@ where
}
}

// TODO make create_rav_request generic over receipt
impl<E> Manager<E, SignedReceipt>
impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: ReceiptRead<SignedReceipt> + RAVRead + SignatureChecker,
//Rcpt: WithUniqueId + WithValueAndTimestamp + Sync,
E: ReceiptRead<Rcpt> + RavRead<Rav> + SignatureChecker,
Rav: SolStruct + WithValueAndTimestamp + Clone + Aggregate<Rcpt>,
Rcpt: WithUniqueId + WithValueAndTimestamp + Sync,
{
/// Completes remaining checks on all receipts up to
/// (current time - `timestamp_buffer_ns`). Returns them in two lists
Expand All @@ -188,18 +195,18 @@ where
ctx: &Context,
timestamp_buffer_ns: u64,
receipts_limit: Option<u64>,
) -> Result<RavRequest<SignedReceipt>, Error> {
) -> Result<RavRequest<Rcpt, Rav>, Error> {
let previous_rav = self.get_previous_rav().await?;
let min_timestamp_ns = previous_rav
.as_ref()
.map(|rav| rav.message.timestampNs + 1)
.map(|rav| rav.message.timestamp_ns() + 1)
.unwrap_or(0);

let (valid_receipts, invalid_receipts) = self
.collect_receipts(ctx, timestamp_buffer_ns, min_timestamp_ns, receipts_limit)
.await?;

let expected_rav = Self::generate_expected_rav(&valid_receipts, previous_rav.clone());
let expected_rav = Rav::aggregate_receipts(&valid_receipts, previous_rav.clone());

Ok(RavRequest {
valid_receipts,
Expand All @@ -208,30 +215,12 @@ where
expected_rav,
})
}

fn generate_expected_rav(
receipts: &[ReceiptWithState<Checked, SignedReceipt>],
previous_rav: Option<SignedRAV>,
) -> Result<ReceiptAggregateVoucher, Error> {
if receipts.is_empty() {
return Err(Error::NoValidReceiptsForRavRequest);
}
let allocation_id = receipts[0].signed_receipt().message.allocation_id;
let receipts = receipts
.iter()
.map(|rx_receipt| rx_receipt.signed_receipt().clone())
.collect::<Vec<_>>();
ReceiptAggregateVoucher::aggregate_receipts(
allocation_id,
receipts.as_slice(),
previous_rav,
)
}
}

impl<E, Rcpt> Manager<E, Rcpt>
impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: ReceiptDelete + RAVRead,
E: ReceiptDelete + RavRead<Rav>,
Rav: SolStruct + WithValueAndTimestamp,
{
/// Removes obsolete receipts from storage. Obsolete receipts are receipts
/// that are older than the last RAV, and therefore already aggregated into the RAV.
Expand All @@ -247,7 +236,7 @@ where
match self.get_previous_rav().await? {
Some(last_rav) => {
self.context
.remove_receipts_in_timestamp_range(..=last_rav.message.timestampNs)
.remove_receipts_in_timestamp_range(..=last_rav.message.timestamp_ns())
.await
.map_err(|err| Error::AdapterError {
source_error: anyhow::Error::new(err),
Expand All @@ -259,7 +248,7 @@ where
}
}

impl<E, Rcpt> Manager<E, Rcpt>
impl<E, Rcpt, Rav> Manager<E, Rcpt, Rav>
where
E: ReceiptStore<Rcpt>,
{
Expand Down
Loading
Loading