Skip to content

Commit 0ce2aab

Browse files
committed
feat!: add limit to receipts retrieve
Helps with keeping the rule of 15,000 max receipts per aggregation request. Also adds a helper function to implement the limit safely. Signed-off-by: Alexis Asseman <[email protected]>
1 parent e55ea03 commit 0ce2aab

File tree

7 files changed

+155
-15
lines changed

7 files changed

+155
-15
lines changed

tap_core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ alloy-primitives = { version = "0.5.0", features = ["serde"] }
2323
strum = "0.24.1"
2424
strum_macros = "0.24.3"
2525
async-trait = "0.1.72"
26-
tokio = { version = "1.29.1", features = ["macros"] }
26+
tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread"] }
2727

2828
[dev-dependencies]
2929
criterion = { version = "0.5", features = ["async_std"] }

tap_core/src/adapters/receipt_storage_adapter.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,26 @@ pub trait ReceiptStorageAdapter {
5757
/// This method should be implemented to fetch all `ReceivedReceipts` within a specific timestamp range
5858
/// from your storage system. The returned receipts should be in the form of a vector of tuples where
5959
/// each tuple contains the unique receipt_id and the corresponding `ReceivedReceipt`.
60+
///
61+
/// If a limit is specified, the adapter should return at most that many receipts, while making
62+
/// sure that that no receipts are left behind for any timestamp that is returned. Examples:
63+
///
64+
/// - If the adapter has 10 receipts for timestamp 100, and 5 receipts for timestamp 200, and
65+
/// the limit is 10, the adapter should return all 10 receipts for timestamp 100, and none for
66+
/// timestamp 200.
67+
/// - If the adapter has 5 receipts for timestamp 100, and 10 receipts for timestamp 200, and
68+
/// the limit is 10, the adapter should return all 5 receipts for timestamp 100, and none for
69+
/// timestamp 200. (because it would have to leave behind 5 receipts for timestamp 200, which
70+
/// is not allowed).
71+
///
72+
/// You can use the [`safe_truncate_receipts()`] function to help with this, but feel free to
73+
/// implement a more efficient solution for your situation if you can.
74+
///
6075
/// Any errors that occur during this process should be captured and returned as an `AdapterError`.
6176
async fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64> + std::marker::Send>(
6277
&self,
6378
timestamp_range_ns: R,
79+
limit: Option<u64>,
6480
) -> Result<Vec<(u64, ReceivedReceipt)>, Self::AdapterError>;
6581

6682
/// Updates a specific `ReceivedReceipt` identified by a unique receipt_id.
@@ -84,3 +100,42 @@ pub trait ReceiptStorageAdapter {
84100
timestamp_ns: R,
85101
) -> Result<(), Self::AdapterError>;
86102
}
103+
104+
/// See [`ReceiptStorageAdapter::retrieve_receipts_in_timestamp_range()`] for details.
105+
///
106+
/// WARNING: Will sort the receipts by timestamp using
107+
/// [vec::sort_unstable](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.sort_unstable).
108+
pub fn safe_truncate_receipts(receipts: &mut Vec<(u64, ReceivedReceipt)>, limit: u64) {
109+
if receipts.len() <= limit as usize {
110+
return;
111+
} else if limit == 0 {
112+
receipts.clear();
113+
return;
114+
}
115+
116+
receipts.sort_unstable_by_key(|(_, rx_receipt)| rx_receipt.signed_receipt.message.timestamp_ns);
117+
118+
// This one will be the last timestamp in `receipts` after naive truncation
119+
let last_timestamp = receipts[limit as usize - 1]
120+
.1
121+
.signed_receipt
122+
.message
123+
.timestamp_ns;
124+
// This one is the timestamp that comes just after the one above
125+
let after_last_timestamp = receipts[limit as usize]
126+
.1
127+
.signed_receipt
128+
.message
129+
.timestamp_ns;
130+
131+
receipts.truncate(limit as usize);
132+
133+
if last_timestamp == after_last_timestamp {
134+
// If the last timestamp is the same as the one that came after it, we need to
135+
// remove all the receipts with the same timestamp as the last one, because
136+
// otherwise we would leave behind part of the receipts for that timestamp.
137+
receipts.retain(|(_, rx_receipt)| {
138+
rx_receipt.signed_receipt.message.timestamp_ns != last_timestamp
139+
});
140+
}
141+
}

tap_core/src/adapters/test/receipt_storage_adapter_mock.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use async_trait::async_trait;
77
use tokio::sync::RwLock;
88

99
use crate::{
10-
adapters::receipt_storage_adapter::ReceiptStorageAdapter, tap_receipt::ReceivedReceipt,
10+
adapters::receipt_storage_adapter::{safe_truncate_receipts, ReceiptStorageAdapter},
11+
tap_receipt::ReceivedReceipt,
1112
};
1213

1314
pub struct ReceiptStorageAdapterMock {
@@ -52,7 +53,7 @@ impl ReceiptStorageAdapterMock {
5253
&self,
5354
timestamp_ns: u64,
5455
) -> Result<Vec<(u64, ReceivedReceipt)>, AdapterErrorMock> {
55-
self.retrieve_receipts_in_timestamp_range(..=timestamp_ns)
56+
self.retrieve_receipts_in_timestamp_range(..=timestamp_ns, None)
5657
.await
5758
}
5859
pub async fn remove_receipt_by_id(&mut self, receipt_id: u64) -> Result<(), AdapterErrorMock> {
@@ -96,15 +97,24 @@ impl ReceiptStorageAdapter for ReceiptStorageAdapterMock {
9697
async fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64> + std::marker::Send>(
9798
&self,
9899
timestamp_range_ns: R,
100+
limit: Option<u64>,
99101
) -> Result<Vec<(u64, ReceivedReceipt)>, Self::AdapterError> {
100102
let receipt_storage = self.receipt_storage.read().await;
101-
Ok(receipt_storage
103+
let mut receipts_in_range: Vec<(u64, ReceivedReceipt)> = receipt_storage
102104
.iter()
103105
.filter(|(_, rx_receipt)| {
104106
timestamp_range_ns.contains(&rx_receipt.signed_receipt.message.timestamp_ns)
105107
})
106108
.map(|(&id, rx_receipt)| (id, rx_receipt.clone()))
107-
.collect())
109+
.collect();
110+
111+
if limit.is_some_and(|limit| receipts_in_range.len() > limit as usize) {
112+
safe_truncate_receipts(&mut receipts_in_range, limit.unwrap());
113+
114+
Ok(receipts_in_range)
115+
} else {
116+
Ok(receipts_in_range)
117+
}
108118
}
109119
async fn update_receipt_by_id(
110120
&self,

tap_core/src/adapters/test/receipt_storage_adapter_test.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
#[cfg(test)]
55
mod receipt_storage_adapter_unit_test {
6+
use rand::seq::SliceRandom;
7+
use rand::thread_rng;
68
use std::collections::HashMap;
79
use std::str::FromStr;
810
use std::sync::Arc;
@@ -179,4 +181,72 @@ mod receipt_storage_adapter_unit_test {
179181
.is_err());
180182
}
181183
}
184+
185+
/// The test code will shuffle the input timestamps prior to calling safe_truncate_receipts.
186+
#[rstest]
187+
#[case(vec![1, 2, 3, 4, 5], 3, vec![1, 2, 3])]
188+
#[case(vec![1, 2, 3, 3, 4, 5], 3, vec![1, 2])]
189+
#[case(vec![1, 2, 3, 4, 4, 4], 3, vec![1, 2, 3])]
190+
#[case(vec![1, 1, 1, 1, 2, 3], 3, vec![])]
191+
#[tokio::test]
192+
async fn safe_truncate_receipts_test(
193+
domain_separator: Eip712Domain,
194+
#[case] input: Vec<u64>,
195+
#[case] limit: u64,
196+
#[case] expected: Vec<u64>,
197+
) {
198+
let wallet: LocalWallet = MnemonicBuilder::<English>::default()
199+
.phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about")
200+
.build()
201+
.unwrap();
202+
203+
// Vec of (id, receipt)
204+
let mut receipts_orig: Vec<(u64, ReceivedReceipt)> = Vec::new();
205+
206+
for (i, timestamp) in input.iter().enumerate() {
207+
// The contents of the receipt only need to be unique for this test (so we can check)
208+
receipts_orig.push((
209+
i as u64,
210+
ReceivedReceipt::new(
211+
EIP712SignedMessage::new(
212+
&domain_separator,
213+
Receipt {
214+
allocation_id: Address::ZERO,
215+
timestamp_ns: *timestamp,
216+
nonce: 0,
217+
value: 0,
218+
},
219+
&wallet,
220+
)
221+
.await
222+
.unwrap(),
223+
i as u64, // Will use that to check the IDs
224+
&get_full_list_of_checks(),
225+
),
226+
));
227+
}
228+
229+
let mut receipts_truncated = receipts_orig.clone();
230+
231+
// shuffle the input receipts
232+
receipts_truncated.shuffle(&mut thread_rng());
233+
234+
crate::adapters::receipt_storage_adapter::safe_truncate_receipts(
235+
&mut receipts_truncated,
236+
limit,
237+
);
238+
239+
assert_eq!(receipts_truncated.len(), expected.len());
240+
241+
for (elem_trun, expected_timestamp) in receipts_truncated.iter().zip(expected.iter()) {
242+
// Check timestamps
243+
assert_eq!(
244+
elem_trun.1.signed_receipt.message.timestamp_ns,
245+
*expected_timestamp
246+
);
247+
248+
// Check that the IDs are fine
249+
assert_eq!(elem_trun.0, elem_trun.1.query_id);
250+
}
251+
}
182252
}

tap_core/src/tap_manager/manager.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,15 +176,19 @@ impl<
176176
///
177177
/// Returns [`Error::TimestampRangeError`] if the max timestamp of the previous RAV is greater than the min timestamp. Caused by timestamp buffer being too large, or requests coming too soon.
178178
///
179-
pub async fn create_rav_request(&self, timestamp_buffer_ns: u64) -> Result<RAVRequest, Error> {
179+
pub async fn create_rav_request(
180+
&self,
181+
timestamp_buffer_ns: u64,
182+
receipts_limit: Option<u64>,
183+
) -> Result<RAVRequest, Error> {
180184
let previous_rav = self.get_previous_rav().await?;
181185
let min_timestamp_ns = previous_rav
182186
.as_ref()
183187
.map(|rav| rav.message.timestamp_ns + 1)
184188
.unwrap_or(0);
185189

186190
let (valid_receipts, invalid_receipts) = self
187-
.collect_receipts(timestamp_buffer_ns, min_timestamp_ns)
191+
.collect_receipts(timestamp_buffer_ns, min_timestamp_ns, receipts_limit)
188192
.await?;
189193

190194
let expected_rav = Self::generate_expected_rav(&valid_receipts, previous_rav.clone())?;
@@ -216,6 +220,7 @@ impl<
216220
&self,
217221
timestamp_buffer_ns: u64,
218222
min_timestamp_ns: u64,
223+
limit: Option<u64>,
219224
) -> Result<(Vec<SignedReceipt>, Vec<ReceivedReceipt>), Error> {
220225
let max_timestamp_ns = crate::get_current_timestamp_u64_ns()? - timestamp_buffer_ns;
221226

@@ -227,7 +232,7 @@ impl<
227232
}
228233
let received_receipts = self
229234
.receipt_storage_adapter
230-
.retrieve_receipts_in_timestamp_range(min_timestamp_ns..max_timestamp_ns)
235+
.retrieve_receipts_in_timestamp_range(min_timestamp_ns..max_timestamp_ns, limit)
231236
.await
232237
.map_err(|err| Error::AdapterError {
233238
source_error: anyhow::Error::new(err),

tap_core/src/tap_manager/test/manager_test.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ mod manager_unit_test {
223223
.await
224224
.is_ok());
225225
}
226-
let rav_request_result = manager.create_rav_request(0).await;
226+
let rav_request_result = manager.create_rav_request(0, None).await;
227227
assert!(rav_request_result.is_ok());
228228

229229
let rav_request = rav_request_result.unwrap();
@@ -303,7 +303,7 @@ mod manager_unit_test {
303303
.is_ok());
304304
expected_accumulated_value += value;
305305
}
306-
let rav_request_result = manager.create_rav_request(0).await;
306+
let rav_request_result = manager.create_rav_request(0, None).await;
307307
assert!(rav_request_result.is_ok());
308308

309309
let rav_request = rav_request_result.unwrap();
@@ -352,7 +352,7 @@ mod manager_unit_test {
352352
.is_ok());
353353
expected_accumulated_value += value;
354354
}
355-
let rav_request_result = manager.create_rav_request(0).await;
355+
let rav_request_result = manager.create_rav_request(0, None).await;
356356
assert!(rav_request_result.is_ok());
357357

358358
let rav_request = rav_request_result.unwrap();
@@ -443,7 +443,7 @@ mod manager_unit_test {
443443
manager.remove_obsolete_receipts().await.unwrap();
444444
}
445445

446-
let rav_request_1_result = manager.create_rav_request(0).await;
446+
let rav_request_1_result = manager.create_rav_request(0, None).await;
447447
assert!(rav_request_1_result.is_ok());
448448

449449
let rav_request_1 = rav_request_1_result.unwrap();
@@ -501,15 +501,15 @@ mod manager_unit_test {
501501
assert_eq!(
502502
manager
503503
.receipt_storage_adapter
504-
.retrieve_receipts_in_timestamp_range(..)
504+
.retrieve_receipts_in_timestamp_range(.., None)
505505
.await
506506
.unwrap()
507507
.len(),
508508
10
509509
);
510510
}
511511

512-
let rav_request_2_result = manager.create_rav_request(0).await;
512+
let rav_request_2_result = manager.create_rav_request(0, None).await;
513513
assert!(rav_request_2_result.is_ok());
514514

515515
let rav_request_2 = rav_request_2_result.unwrap();

tap_integration_tests/tests/indexer_mock/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ async fn request_rav<
218218
threshold: usize,
219219
) -> Result<()> {
220220
// Create the aggregate_receipts request params
221-
let rav_request = manager.create_rav_request(time_stamp_buffer).await?;
221+
let rav_request = manager.create_rav_request(time_stamp_buffer, None).await?;
222222

223223
// To-do: Need to add previous RAV, when tap_manager supports replacing receipts
224224
let params = rpc_params!(

0 commit comments

Comments
 (0)