Skip to content

Commit 7aaa2c6

Browse files
authored
Merge pull request #161 from semiotic-ai/async
refactor!(core): make it all async
2 parents 77166f7 + 6f9816b commit 7aaa2c6

22 files changed

+392
-296
lines changed

tap_core/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ rand="0.8.5"
1212
thiserror="1.0.38"
1313
ethereum-types={version="0.14.1"}
1414
rstest = "0.17.0"
15-
async-std = { version = "1.5", features = ["attributes"] }
1615
ethers = { version = "2.0.0", default-features = false }
1716
ethers-core = "2.0.0"
1817
ethers-contract = "2.0.0"
@@ -21,6 +20,8 @@ anyhow = "1"
2120

2221
strum = "0.24.1"
2322
strum_macros = "0.24.3"
23+
async-trait = "0.1.72"
24+
tokio = { version = "1.29.1", features = ["macros"] }
2425

2526
[dev-dependencies]
2627
criterion = { version = "0.5", features = ["async_std"] }

tap_core/benches/timeline_aggretion_protocol_benchmark.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
1111
use std::str::FromStr;
1212

13-
use async_std::task::block_on;
1413
use criterion::async_executor::AsyncStdExecutor;
1514
use criterion::{black_box, criterion_group, criterion_main, Criterion};
1615
use ethereum_types::Address;
@@ -21,6 +20,7 @@ use tap_core::{
2120
eip_712_signed_message::EIP712SignedMessage,
2221
receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_receipt::Receipt,
2322
};
23+
use tokio::runtime::Runtime;
2424

2525
pub async fn create_and_sign_receipt(
2626
allocation_id: Address,
@@ -33,6 +33,8 @@ pub async fn create_and_sign_receipt(
3333
}
3434

3535
pub fn criterion_benchmark(c: &mut Criterion) {
36+
let async_runtime = Runtime::new().unwrap();
37+
3638
let wallet = LocalWallet::new(&mut OsRng);
3739

3840
// Arbitrary values wrapped in black box to avoid compiler optimizing them out
@@ -49,7 +51,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
4951
})
5052
});
5153

52-
let receipt = block_on(create_and_sign_receipt(allocation_id, value, &wallet));
54+
let receipt = async_runtime.block_on(create_and_sign_receipt(allocation_id, value, &wallet));
5355

5456
c.bench_function("Validate Receipt", |b| {
5557
b.iter(|| {
@@ -63,7 +65,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
6365

6466
for log_number_of_receipts in 10..30 {
6567
let receipts = (0..2 ^ log_number_of_receipts)
66-
.map(|_| block_on(create_and_sign_receipt(allocation_id, value, &wallet)))
68+
.map(|_| async_runtime.block_on(create_and_sign_receipt(allocation_id, value, &wallet)))
6769
.collect::<Vec<_>>();
6870

6971
rav_group.bench_function(
@@ -79,11 +81,13 @@ pub fn criterion_benchmark(c: &mut Criterion) {
7981
},
8082
);
8183

82-
let signed_rav = block_on(EIP712SignedMessage::new(
83-
ReceiptAggregateVoucher::aggregate_receipts(allocation_id, &receipts, None).unwrap(),
84-
&wallet,
85-
))
86-
.unwrap();
84+
let signed_rav = async_runtime
85+
.block_on(EIP712SignedMessage::new(
86+
ReceiptAggregateVoucher::aggregate_receipts(allocation_id, &receipts, None)
87+
.unwrap(),
88+
&wallet,
89+
))
90+
.unwrap();
8791

8892
rav_group.bench_function(
8993
&format!("Validate RAV w/ 2^{} receipt's", log_number_of_receipts),

tap_core/src/adapters/collateral_adapter.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2023-, Semiotic AI, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use async_trait::async_trait;
45
use ethereum_types::Address;
56

67
/// `CollateralAdapter` defines a trait for adapters to handle collateral related operations.
@@ -27,6 +28,7 @@ use ethereum_types::Address;
2728
///
2829
/// For example code see [crate::adapters::collateral_adapter_mock]
2930
31+
#[async_trait]
3032
pub trait CollateralAdapter {
3133
/// Defines the user-specified error type.
3234
///
@@ -39,15 +41,18 @@ pub trait CollateralAdapter {
3941
/// This method should be implemented to fetch the local accounting amount of available collateral for a
4042
/// specified gateway from your system. Any errors that occur during this process should
4143
/// be captured and returned as an `AdapterError`.
42-
fn get_available_collateral(&self, gateway_id: Address) -> Result<u128, Self::AdapterError>;
44+
async fn get_available_collateral(
45+
&self,
46+
gateway_id: Address,
47+
) -> Result<u128, Self::AdapterError>;
4348

4449
/// Deducts a specified value from the local accounting of available collateral for a specified gateway.
4550
///
4651
/// This method should be implemented to deduct a specified value from the local accounting of
4752
/// available collateral of a specified gateway in your system. Any errors that occur during this
4853
/// process should be captured and returned as an `AdapterError`.
49-
fn subtract_collateral(
50-
&mut self,
54+
async fn subtract_collateral(
55+
&self,
5156
gateway_id: Address,
5257
value: u128,
5358
) -> Result<(), Self::AdapterError>;

tap_core/src/adapters/rav_storage_adapter.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright 2023-, Semiotic AI, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use async_trait::async_trait;
5+
46
use crate::tap_manager::SignedRAV;
57

68
/// `RAVStorageAdapter` defines a trait for storage adapters to handle `SignedRAV` data.
@@ -27,6 +29,7 @@ use crate::tap_manager::SignedRAV;
2729
///
2830
/// For example code see [crate::adapters::rav_storage_adapter_mock]
2931
32+
#[async_trait]
3033
pub trait RAVStorageAdapter {
3134
/// Defines the user-specified error type.
3235
///
@@ -38,12 +41,12 @@ pub trait RAVStorageAdapter {
3841
///
3942
/// This method should be implemented to store the most recent validated `SignedRAV` into your chosen storage system.
4043
/// Any errors that occur during this process should be captured and returned as an `AdapterError`.
41-
fn update_last_rav(&mut self, rav: SignedRAV) -> Result<(), Self::AdapterError>;
44+
async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError>;
4245

4346
/// Retrieves the latest `SignedRAV` from the storage.
4447
///
4548
/// This method should be implemented to fetch the latest `SignedRAV` from your storage system.
4649
/// If no `SignedRAV` is available, this method should return `None`.
4750
/// Any errors that occur during this process should be captured and returned as an `AdapterError`.
48-
fn last_rav(&self) -> Result<Option<SignedRAV>, Self::AdapterError>;
51+
async fn last_rav(&self) -> Result<Option<SignedRAV>, Self::AdapterError>;
4952
}

tap_core/src/adapters/receipt_checks_adapter.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use crate::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt};
5+
use async_trait::async_trait;
56
use ethereum_types::Address;
67

78
/// `ReceiptChecksAdapter` defines a trait for adapters to handle checks related to TAP receipts.
@@ -27,28 +28,29 @@ use ethereum_types::Address;
2728
///
2829
/// For example code see [crate::adapters::receipt_checks_adapter_mock]
2930
31+
#[async_trait]
3032
pub trait ReceiptChecksAdapter {
3133
/// Checks if the given receipt is unique in the system.
3234
///
3335
/// This method should be implemented to verify the uniqueness of a given receipt in your system. Keep in mind that
3436
/// the receipt likely will be in storage when this check is performed so the receipt id should be used to check
3537
/// for uniqueness.
36-
fn is_unique(&self, receipt: &EIP712SignedMessage<Receipt>, receipt_id: u64) -> bool;
38+
async fn is_unique(&self, receipt: &EIP712SignedMessage<Receipt>, receipt_id: u64) -> bool;
3739

3840
/// Verifies if the allocation ID is valid.
3941
///
4042
/// This method should be implemented to validate the given allocation ID is a valid allocation for the indexer. Valid is defined as
4143
/// an allocation ID that is owned by the indexer and still available for redeeming.
42-
fn is_valid_allocation_id(&self, allocation_id: Address) -> bool;
44+
async fn is_valid_allocation_id(&self, allocation_id: Address) -> bool;
4345

4446
/// Confirms the value of the receipt is valid for the given query ID.
4547
///
4648
/// This method should be implemented to confirm the validity of the given value for a specific query ID.
47-
fn is_valid_value(&self, value: u128, query_id: u64) -> bool;
49+
async fn is_valid_value(&self, value: u128, query_id: u64) -> bool;
4850

4951
/// Confirms the gateway ID is valid.
5052
///
5153
/// This method should be implemented to validate the given gateway ID is one associated with a gateway the indexer considers valid.
5254
/// The provided gateway ID is the address of the gateway that is recovered from the signature of the receipt.
53-
fn is_valid_gateway_id(&self, gateway_id: Address) -> bool;
55+
async fn is_valid_gateway_id(&self, gateway_id: Address) -> bool;
5456
}

tap_core/src/adapters/receipt_storage_adapter.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
use std::ops::RangeBounds;
55

6+
use async_trait::async_trait;
7+
68
use crate::tap_receipt::ReceivedReceipt;
79

810
/// `ReceiptStorageAdapter` defines a trait for storage adapters to manage `ReceivedReceipt` data.
@@ -35,6 +37,7 @@ use crate::tap_receipt::ReceivedReceipt;
3537
///
3638
/// For example code see [crate::adapters::receipt_storage_adapter_mock]
3739
40+
#[async_trait]
3841
pub trait ReceiptStorageAdapter {
3942
/// Defines the user-specified error type.
4043
///
@@ -47,15 +50,15 @@ pub trait ReceiptStorageAdapter {
4750
/// This method should be implemented to store a new `ReceivedReceipt` into your chosen storage system.
4851
/// It returns a unique receipt_id associated with the stored receipt. Any errors that occur during
4952
/// this process should be captured and returned as an `AdapterError`.
50-
fn store_receipt(&mut self, receipt: ReceivedReceipt) -> Result<u64, Self::AdapterError>;
53+
async fn store_receipt(&self, receipt: ReceivedReceipt) -> Result<u64, Self::AdapterError>;
5154

5255
/// Retrieves all `ReceivedReceipts` within a specific timestamp range.
5356
///
5457
/// This method should be implemented to fetch all `ReceivedReceipts` within a specific timestamp range
5558
/// from your storage system. The returned receipts should be in the form of a vector of tuples where
5659
/// each tuple contains the unique receipt_id and the corresponding `ReceivedReceipt`.
5760
/// Any errors that occur during this process should be captured and returned as an `AdapterError`.
58-
fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64>>(
61+
async fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64> + std::marker::Send>(
5962
&self,
6063
timestamp_range_ns: R,
6164
) -> Result<Vec<(u64, ReceivedReceipt)>, Self::AdapterError>;
@@ -65,8 +68,8 @@ pub trait ReceiptStorageAdapter {
6568
/// This method should be implemented to update a specific `ReceivedReceipt` identified by a unique
6669
/// receipt_id in your storage system. Any errors that occur during this process should be captured
6770
/// and returned as an `AdapterError`.
68-
fn update_receipt_by_id(
69-
&mut self,
71+
async fn update_receipt_by_id(
72+
&self,
7073
receipt_id: u64,
7174
receipt: ReceivedReceipt,
7275
) -> Result<(), Self::AdapterError>;
@@ -76,8 +79,8 @@ pub trait ReceiptStorageAdapter {
7679
/// This method should be implemented to remove all `ReceivedReceipts` within a specific timestamp
7780
/// range from your storage system. Any errors that occur during this process should be captured and
7881
/// returned as an `AdapterError`.
79-
fn remove_receipts_in_timestamp_range<R: RangeBounds<u64>>(
80-
&mut self,
82+
async fn remove_receipts_in_timestamp_range<R: RangeBounds<u64> + std::marker::Send>(
83+
&self,
8184
timestamp_ns: R,
8285
) -> Result<(), Self::AdapterError>;
8386
}

tap_core/src/adapters/test/collateral_adapter_mock.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
// Copyright 2023-, Semiotic AI, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::{
5-
collections::HashMap,
6-
sync::{Arc, RwLock},
7-
};
4+
use std::{collections::HashMap, sync::Arc};
85

6+
use async_trait::async_trait;
97
use ethereum_types::Address;
8+
use tokio::sync::RwLock;
109

1110
use crate::adapters::collateral_adapter::CollateralAdapter;
1211

@@ -27,8 +26,8 @@ impl CollateralAdapterMock {
2726
gateway_collateral_storage,
2827
}
2928
}
30-
pub fn collateral(&self, gateway_id: Address) -> Result<u128, AdpaterErrorMock> {
31-
let gateway_collateral_storage = self.gateway_collateral_storage.read().unwrap();
29+
pub async fn collateral(&self, gateway_id: Address) -> Result<u128, AdpaterErrorMock> {
30+
let gateway_collateral_storage = self.gateway_collateral_storage.read().await;
3231
if let Some(collateral) = gateway_collateral_storage.get(&gateway_id) {
3332
return Ok(*collateral);
3433
}
@@ -37,23 +36,23 @@ impl CollateralAdapterMock {
3736
})
3837
}
3938

40-
pub fn increase_collateral(&mut self, gateway_id: Address, value: u128) {
41-
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().unwrap();
39+
pub async fn increase_collateral(&mut self, gateway_id: Address, value: u128) {
40+
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().await;
4241

4342
if let Some(current_value) = gateway_collateral_storage.get(&gateway_id) {
44-
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().unwrap();
43+
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().await;
4544
gateway_collateral_storage.insert(gateway_id, current_value + value);
4645
} else {
4746
gateway_collateral_storage.insert(gateway_id, value);
4847
}
4948
}
5049

51-
pub fn reduce_collateral(
52-
&mut self,
50+
pub async fn reduce_collateral(
51+
&self,
5352
gateway_id: Address,
5453
value: u128,
5554
) -> Result<(), AdpaterErrorMock> {
56-
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().unwrap();
55+
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().await;
5756

5857
if let Some(current_value) = gateway_collateral_storage.get(&gateway_id) {
5958
let checked_new_value = current_value.checked_sub(value);
@@ -68,16 +67,20 @@ impl CollateralAdapterMock {
6867
}
6968
}
7069

70+
#[async_trait]
7171
impl CollateralAdapter for CollateralAdapterMock {
7272
type AdapterError = AdpaterErrorMock;
73-
fn get_available_collateral(&self, gateway_id: Address) -> Result<u128, Self::AdapterError> {
74-
self.collateral(gateway_id)
73+
async fn get_available_collateral(
74+
&self,
75+
gateway_id: Address,
76+
) -> Result<u128, Self::AdapterError> {
77+
self.collateral(gateway_id).await
7578
}
76-
fn subtract_collateral(
77-
&mut self,
79+
async fn subtract_collateral(
80+
&self,
7881
gateway_id: Address,
7982
value: u128,
8083
) -> Result<(), Self::AdapterError> {
81-
self.reduce_collateral(gateway_id, value)
84+
self.reduce_collateral(gateway_id, value).await
8285
}
8386
}

0 commit comments

Comments
 (0)