Skip to content

Commit d557d3c

Browse files
authored
feat(all): add retryable checks (#293)
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent b44ed8a commit d557d3c

File tree

11 files changed

+85
-60
lines changed

11 files changed

+85
-60
lines changed

Cargo.lock

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ sqlx = { version = "0.7.2", features = [
3838
tracing = { version = "0.1.40", default-features = false }
3939
bigdecimal = "0.4.3"
4040
build-info = "0.0.38"
41-
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "9e1915b", default-features = false }
41+
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "ff856d9", default-features = false }
4242
tracing-subscriber = { version = "0.3", features = [
4343
"json",
4444
"env-filter",

common/src/tap/checks/allocation_eligible.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use anyhow::anyhow;
88
use eventuals::Eventual;
99

1010
use tap_core::receipt::{
11-
checks::{Check, CheckResult},
11+
checks::{Check, CheckError, CheckResult},
1212
state::Checking,
1313
ReceiptWithState,
1414
};
@@ -36,10 +36,10 @@ impl Check for AllocationEligible {
3636
.map(|allocations| allocations.contains_key(&allocation_id))
3737
.unwrap_or(false)
3838
{
39-
return Err(anyhow!(
39+
return Err(CheckError::Failed(anyhow!(
4040
"Receipt allocation ID `{}` is not eligible for this indexer",
4141
allocation_id
42-
));
42+
)));
4343
}
4444
Ok(())
4545
}

common/src/tap/checks/deny_list_check.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use sqlx::PgPool;
99
use std::collections::HashSet;
1010
use std::sync::RwLock;
1111
use std::{str::FromStr, sync::Arc};
12+
use tap_core::receipt::checks::CheckError;
1213
use tap_core::receipt::{
1314
checks::{Check, CheckResult},
1415
state::Checking,
@@ -153,12 +154,16 @@ impl Check for DenyListCheck {
153154
let receipt_signer = receipt
154155
.signed_receipt()
155156
.recover_signer(&self.domain_separator)
156-
.inspect_err(|e| {
157+
.map_err(|e| {
157158
error!("Failed to recover receipt signer: {}", e);
158-
})?;
159+
anyhow::anyhow!(e)
160+
})
161+
.map_err(CheckError::Failed)?;
159162
let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default();
160163

161-
let receipt_sender = escrow_accounts_snapshot.get_sender_for_signer(&receipt_signer)?;
164+
let receipt_sender = escrow_accounts_snapshot
165+
.get_sender_for_signer(&receipt_signer)
166+
.map_err(|e| CheckError::Failed(e.into()))?;
162167

163168
// Check that the sender is not denylisted
164169
if self
@@ -167,10 +172,10 @@ impl Check for DenyListCheck {
167172
.unwrap()
168173
.contains(&receipt_sender)
169174
{
170-
return Err(anyhow::anyhow!(
175+
return Err(CheckError::Failed(anyhow::anyhow!(
171176
"Received a receipt from a denylisted sender: {}",
172177
receipt_sender
173-
));
178+
)));
174179
}
175180

176181
Ok(())

common/src/tap/checks/receipt_max_val_check.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub struct ReceiptMaxValueCheck {
77
}
88

99
use tap_core::receipt::{
10-
checks::{Check, CheckResult},
10+
checks::{Check, CheckError, CheckResult},
1111
state::Checking,
1212
ReceiptWithState,
1313
};
@@ -26,10 +26,10 @@ impl Check for ReceiptMaxValueCheck {
2626
if receipt_value < self.receipt_max_value {
2727
Ok(())
2828
} else {
29-
Err(anyhow!(
29+
Err(CheckError::Failed(anyhow!(
3030
"Receipt value `{}` is higher than the limit set by the user",
3131
receipt_value
32-
))
32+
)))
3333
}
3434
}
3535
}

common/src/tap/checks/sender_balance_check.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use alloy::primitives::U256;
77
use anyhow::anyhow;
88
use eventuals::Eventual;
99
use tap_core::receipt::{
10-
checks::{Check, CheckResult},
10+
checks::{Check, CheckError, CheckResult},
1111
state::Checking,
1212
ReceiptWithState,
1313
};
@@ -38,22 +38,25 @@ impl Check for SenderBalanceCheck {
3838
.recover_signer(&self.domain_separator)
3939
.inspect_err(|e| {
4040
error!("Failed to recover receipt signer: {}", e);
41-
})?;
41+
})
42+
.map_err(|e| CheckError::Failed(e.into()))?;
4243

4344
// We bail if the receipt signer does not have a corresponding sender in the escrow
4445
// accounts.
45-
let receipt_sender = escrow_accounts_snapshot.get_sender_for_signer(&receipt_signer)?;
46+
let receipt_sender = escrow_accounts_snapshot
47+
.get_sender_for_signer(&receipt_signer)
48+
.map_err(|e| CheckError::Failed(e.into()))?;
4649

4750
// Check that the sender has a non-zero balance -- more advanced accounting is done in
4851
// `tap-agent`.
4952
if !escrow_accounts_snapshot
5053
.get_balance_for_sender(&receipt_sender)
5154
.map_or(false, |balance| balance > U256::ZERO)
5255
{
53-
return Err(anyhow!(
56+
return Err(CheckError::Failed(anyhow!(
5457
"Receipt sender `{}` does not have a sufficient balance",
5558
receipt_signer,
56-
));
59+
)));
5760
}
5861
Ok(())
5962
}

common/src/tap/checks/timestamp_check.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub struct TimestampCheck {
88
}
99

1010
use tap_core::receipt::{
11-
checks::{Check, CheckResult},
11+
checks::{Check, CheckError, CheckResult},
1212
state::Checking,
1313
ReceiptWithState,
1414
};
@@ -24,7 +24,9 @@ impl TimestampCheck {
2424
#[async_trait::async_trait]
2525
impl Check for TimestampCheck {
2626
async fn check(&self, receipt: &ReceiptWithState<Checking>) -> CheckResult {
27-
let timestamp_now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
27+
let timestamp_now = SystemTime::now()
28+
.duration_since(SystemTime::UNIX_EPOCH)
29+
.map_err(|e| CheckError::Failed(e.into()))?;
2830
let min_timestamp = timestamp_now - self.timestamp_error_tolerance;
2931
let max_timestamp = timestamp_now + self.timestamp_error_tolerance;
3032

@@ -33,10 +35,10 @@ impl Check for TimestampCheck {
3335
if receipt_timestamp < max_timestamp && receipt_timestamp > min_timestamp {
3436
Ok(())
3537
} else {
36-
Err(anyhow!(
38+
Err(CheckError::Failed(anyhow!(
3739
"Receipt timestamp `{}` is outside of current system time +/- timestamp_error_tolerance",
3840
receipt_timestamp.as_secs()
39-
))
41+
)))
4042
}
4143
}
4244
}

tap-agent/src/agent/sender_allocation.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,7 @@ pub mod tests {
810810
};
811811
use tap_aggregator::{jsonrpsee_helpers::JsonRpcResponse, server::run_server};
812812
use tap_core::receipt::{
813-
checks::{Check, CheckList},
813+
checks::{Check, CheckError, CheckList, CheckResult},
814814
state::Checking,
815815
ReceiptWithState,
816816
};
@@ -1386,8 +1386,8 @@ pub mod tests {
13861386

13871387
#[async_trait::async_trait]
13881388
impl Check for FailingCheck {
1389-
async fn check(&self, _receipt: &ReceiptWithState<Checking>) -> anyhow::Result<()> {
1390-
Err(anyhow::anyhow!("Failing check"))
1389+
async fn check(&self, _receipt: &ReceiptWithState<Checking>) -> CheckResult {
1390+
Err(CheckError::Failed(anyhow::anyhow!("Failing check")))
13911391
}
13921392
}
13931393

@@ -1406,7 +1406,13 @@ pub mod tests {
14061406
// make sure to fail them
14071407
let failing_receipts = checking_receipts
14081408
.into_iter()
1409-
.map(|receipt| async { receipt.finalize_receipt_checks(&checks).await.unwrap_err() })
1409+
.map(|receipt| async {
1410+
receipt
1411+
.finalize_receipt_checks(&checks)
1412+
.await
1413+
.unwrap()
1414+
.unwrap_err()
1415+
})
14101416
.collect::<Vec<_>>();
14111417
let failing_receipts: Vec<_> = join_all(failing_receipts).await;
14121418

tap-agent/src/tap/context/checks/allocation_id.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use anyhow::anyhow;
88
use eventuals::{Eventual, EventualExt};
99
use indexer_common::subgraph_client::{Query, SubgraphClient};
1010
use tap_core::receipt::{
11-
checks::{Check, CheckResult},
11+
checks::{Check, CheckError, CheckResult},
1212
state::Checking,
1313
ReceiptWithState,
1414
};
@@ -52,17 +52,20 @@ impl Check for AllocationId {
5252
// ID. So the receipts that are received here should already have been filtered by
5353
// allocation ID.
5454
if allocation_id != self.allocation_id {
55-
return Err(anyhow!("Receipt allocation_id different from expected: allocation_id: {}, expected_allocation_id: {}", allocation_id, self.allocation_id));
55+
return Err(CheckError::Failed(anyhow!("Receipt allocation_id different from expected: allocation_id: {}, expected_allocation_id: {}", allocation_id, self.allocation_id)));
5656
};
5757

5858
// Check that the allocation ID is not redeemed yet for this consumer
5959
match self.tap_allocation_redeemed.value().await {
6060
Ok(false) => Ok(()),
61-
Ok(true) => Err(anyhow!("Allocation {} already redeemed", allocation_id)),
62-
Err(e) => Err(anyhow!(
61+
Ok(true) => Err(CheckError::Failed(anyhow!(
62+
"Allocation {} already redeemed",
63+
allocation_id
64+
))),
65+
Err(e) => Err(CheckError::Retryable(anyhow!(
6366
"Could not get allocation escrow redemption status from eventual: {:?}",
6467
e
65-
)),
68+
))),
6669
}
6770
}
6871
}

tap-agent/src/tap/context/checks/signature.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use anyhow::anyhow;
66
use eventuals::Eventual;
77
use indexer_common::escrow_accounts::EscrowAccounts;
88
use tap_core::receipt::{
9-
checks::{Check, CheckResult},
9+
checks::{Check, CheckError, CheckResult},
1010
state::Checking,
1111
ReceiptWithState,
1212
};
@@ -32,25 +32,31 @@ impl Check for Signature {
3232
async fn check(&self, receipt: &ReceiptWithState<Checking>) -> CheckResult {
3333
let signer = receipt
3434
.signed_receipt()
35-
.recover_signer(&self.domain_separator)?;
36-
let escrow_accounts =
37-
self.escrow_accounts
38-
.value()
39-
.await
40-
.map_err(|e| AdapterError::ValidationError {
41-
error: format!("Could not get escrow accounts from eventual: {:?}", e),
42-
})?;
35+
.recover_signer(&self.domain_separator)
36+
.map_err(|e| CheckError::Failed(e.into()))?;
37+
let escrow_accounts = self
38+
.escrow_accounts
39+
.value()
40+
.await
41+
.map_err(|e| AdapterError::ValidationError {
42+
error: format!("Could not get escrow accounts from eventual: {:?}", e),
43+
})
44+
.map_err(|e| CheckError::Retryable(e.into()))?;
4345

44-
let sender = escrow_accounts.get_sender_for_signer(&signer)?;
46+
let sender = escrow_accounts
47+
.get_sender_for_signer(&signer)
48+
.map_err(|e| CheckError::Failed(e.into()))?;
4549

46-
let balance = escrow_accounts.get_balance_for_sender(&sender)?;
50+
let balance = escrow_accounts
51+
.get_balance_for_sender(&sender)
52+
.map_err(|e| CheckError::Failed(e.into()))?;
4753

4854
if balance == U256::ZERO {
49-
Err(anyhow!(
55+
Err(CheckError::Failed(anyhow!(
5056
"Balance for sender {}, signer {} is not positive",
5157
sender,
5258
signer
53-
))
59+
)))
5460
} else {
5561
Ok(())
5662
}

0 commit comments

Comments
 (0)