Skip to content

Commit a50e23d

Browse files
authored
refactor: use sender in tap context (#507)
1 parent 40e7612 commit a50e23d

File tree

6 files changed

+40
-78
lines changed

6 files changed

+40
-78
lines changed

crates/service/src/middleware.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ pub use attestation_signer::{signer_middleware, AttestationState};
1818
pub use deployment::deployment_middleware;
1919
pub use labels::labels_middleware;
2020
pub use prometheus_metrics::PrometheusMetricsMiddlewareLayer;
21-
pub use sender::{sender_middleware, SenderState};
21+
pub use sender::{sender_middleware, Sender, SenderState};
2222
pub use tap_context::context_middleware;
2323
pub use tap_receipt::receipt_middleware;

crates/service/src/middleware/tap_context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use thegraph_core::DeploymentId;
2020

2121
use crate::{error::IndexerServiceError, tap::AgoraQuery};
2222

23+
use super::sender::Sender;
24+
2325
/// Graphql query body to be decoded and passed to agora context
2426
#[derive(Debug, serde::Deserialize, serde::Serialize)]
2527
struct QueryBody {
@@ -39,6 +41,7 @@ pub async fn context_middleware(
3941
Err(_) => return Err(IndexerServiceError::DeploymentIdNotFound),
4042
},
4143
};
44+
let sender = request.extensions().get::<Sender>().cloned();
4245

4346
let (mut parts, body) = request.into_parts();
4447
let bytes = to_bytes(body, usize::MAX).await?;
@@ -56,6 +59,10 @@ pub async fn context_middleware(
5659
query: query_body.query.clone(),
5760
variables,
5861
});
62+
63+
if let Some(sender) = sender {
64+
ctx.insert(sender);
65+
}
5966
parts.extensions.insert(Arc::new(ctx));
6067
let request = Request::from_parts(parts, bytes.into());
6168
Ok(next.run(request).await)

crates/service/src/service/indexer_service.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> {
238238
database,
239239
allocations.clone(),
240240
escrow_accounts.clone(),
241-
domain_separator.clone(),
242241
timestamp_error_tolerance,
243242
receipt_max_value,
244243
)

crates/service/src/tap.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,14 @@ impl IndexerTapContext {
4747
pgpool: PgPool,
4848
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
4949
escrow_accounts: Receiver<EscrowAccounts>,
50-
domain_separator: Eip712Domain,
5150
timestamp_error_tolerance: Duration,
5251
receipt_max_value: u128,
5352
) -> Vec<ReceiptCheck> {
5453
vec![
5554
Arc::new(AllocationEligible::new(indexer_allocations)),
56-
Arc::new(SenderBalanceCheck::new(
57-
escrow_accounts.clone(),
58-
domain_separator.clone(),
59-
)),
55+
Arc::new(SenderBalanceCheck::new(escrow_accounts)),
6056
Arc::new(TimestampCheck::new(timestamp_error_tolerance)),
61-
Arc::new(DenyListCheck::new(pgpool.clone(), escrow_accounts, domain_separator).await),
57+
Arc::new(DenyListCheck::new(pgpool.clone()).await),
6258
Arc::new(ReceiptMaxValueCheck::new(receipt_max_value)),
6359
Arc::new(MinimumValue::new(pgpool, Duration::from_secs(GRACE_PERIOD)).await),
6460
]

crates/service/src/tap/checks/deny_list_check.rs

Lines changed: 19 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use alloy::dyn_abi::Eip712Domain;
4+
use crate::middleware::Sender;
55
use alloy::primitives::Address;
6-
use indexer_monitor::EscrowAccounts;
76
use sqlx::postgres::PgListener;
87
use sqlx::PgPool;
98
use std::collections::HashSet;
@@ -15,23 +14,16 @@ use tap_core::receipt::{
1514
state::Checking,
1615
ReceiptWithState,
1716
};
18-
use tokio::sync::watch::Receiver;
1917
use tracing::error;
2018

2119
pub struct DenyListCheck {
22-
escrow_accounts: Receiver<EscrowAccounts>,
23-
domain_separator: Eip712Domain,
2420
sender_denylist: Arc<RwLock<HashSet<Address>>>,
2521
_sender_denylist_watcher_handle: Arc<tokio::task::JoinHandle<()>>,
2622
sender_denylist_watcher_cancel_token: tokio_util::sync::CancellationToken,
2723
}
2824

2925
impl DenyListCheck {
30-
pub async fn new(
31-
pgpool: PgPool,
32-
escrow_accounts: Receiver<EscrowAccounts>,
33-
domain_separator: Eip712Domain,
34-
) -> Self {
26+
pub async fn new(pgpool: PgPool) -> Self {
3527
// Listen to pg_notify events. We start it before updating the sender_denylist so that we
3628
// don't miss any updates. PG will buffer the notifications until we start consuming them.
3729
let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap();
@@ -57,8 +49,6 @@ impl DenyListCheck {
5749
sender_denylist_watcher_cancel_token.clone(),
5850
)));
5951
Self {
60-
domain_separator,
61-
escrow_accounts,
6252
sender_denylist,
6353
_sender_denylist_watcher_handle: sender_denylist_watcher_handle,
6454
sender_denylist_watcher_cancel_token,
@@ -152,29 +142,19 @@ impl DenyListCheck {
152142
impl Check for DenyListCheck {
153143
async fn check(
154144
&self,
155-
_: &tap_core::receipt::Context,
156-
receipt: &ReceiptWithState<Checking>,
145+
ctx: &tap_core::receipt::Context,
146+
_: &ReceiptWithState<Checking>,
157147
) -> CheckResult {
158-
let receipt_signer = receipt
159-
.signed_receipt()
160-
.recover_signer(&self.domain_separator)
161-
.map_err(|e| {
162-
error!("Failed to recover receipt signer: {}", e);
163-
anyhow::anyhow!(e)
164-
})
165-
.map_err(CheckError::Failed)?;
166-
let escrow_accounts_snapshot = self.escrow_accounts.borrow();
167-
168-
let receipt_sender = escrow_accounts_snapshot
169-
.get_sender_for_signer(&receipt_signer)
170-
.map_err(|e| CheckError::Failed(e.into()))?;
148+
let Sender(receipt_sender) = ctx
149+
.get::<Sender>()
150+
.ok_or(CheckError::Failed(anyhow::anyhow!("Could not find sender")))?;
171151

172152
// Check that the sender is not denylisted
173153
if self
174154
.sender_denylist
175155
.read()
176156
.unwrap()
177-
.contains(&receipt_sender)
157+
.contains(receipt_sender)
178158
{
179159
return Err(CheckError::Failed(anyhow::anyhow!(
180160
"Received a receipt from a denylisted sender: {}",
@@ -200,26 +180,16 @@ mod tests {
200180

201181
use alloy::hex::ToHexExt;
202182
use tap_core::receipt::{Context, ReceiptWithState};
203-
use tokio::sync::watch;
204183

205-
use test_assets::{
206-
self, create_signed_receipt, ESCROW_ACCOUNTS_BALANCES, ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS,
207-
TAP_EIP712_DOMAIN, TAP_SENDER,
208-
};
184+
use test_assets::{self, create_signed_receipt, TAP_SENDER};
209185

210186
use super::*;
211187

212188
const ALLOCATION_ID: &str = "0xdeadbeefcafebabedeadbeefcafebabedeadbeef";
213189

214190
async fn new_deny_list_check(pgpool: PgPool) -> DenyListCheck {
215191
// Mock escrow accounts
216-
let escrow_accounts_rx = watch::channel(EscrowAccounts::new(
217-
ESCROW_ACCOUNTS_BALANCES.to_owned(),
218-
ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
219-
))
220-
.1;
221-
222-
DenyListCheck::new(pgpool, escrow_accounts_rx, TAP_EIP712_DOMAIN.to_owned()).await
192+
DenyListCheck::new(pgpool).await
223193
}
224194

225195
#[sqlx::test(migrations = "../../migrations")]
@@ -244,9 +214,12 @@ mod tests {
244214

245215
let checking_receipt = ReceiptWithState::new(signed_receipt);
246216

217+
let mut ctx = Context::new();
218+
ctx.insert(Sender(TAP_SENDER.1));
219+
247220
// Check that the receipt is rejected
248221
assert!(deny_list_check
249-
.check(&Context::new(), &checking_receipt)
222+
.check(&ctx, &checking_receipt)
250223
.await
251224
.is_err());
252225
}
@@ -262,8 +235,10 @@ mod tests {
262235
// Check that the receipt is valid
263236
let checking_receipt = ReceiptWithState::new(signed_receipt);
264237

238+
let mut ctx = Context::new();
239+
ctx.insert(Sender(TAP_SENDER.1));
265240
deny_list_check
266-
.check(&Context::new(), &checking_receipt)
241+
.check(&ctx, &checking_receipt)
267242
.await
268243
.unwrap();
269244

@@ -282,7 +257,7 @@ mod tests {
282257
// Check that the receipt is rejected
283258
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
284259
assert!(deny_list_check
285-
.check(&Context::new(), &checking_receipt)
260+
.check(&ctx, &checking_receipt)
286261
.await
287262
.is_err());
288263

@@ -301,7 +276,7 @@ mod tests {
301276
// Check that the receipt is valid again
302277
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
303278
deny_list_check
304-
.check(&Context::new(), &checking_receipt)
279+
.check(&ctx, &checking_receipt)
305280
.await
306281
.unwrap();
307282
}

crates/service/src/tap/checks/sender_balance_check.rs

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use alloy::dyn_abi::Eip712Domain;
54
use alloy::primitives::U256;
65
use anyhow::anyhow;
76
use indexer_monitor::EscrowAccounts;
@@ -11,55 +10,41 @@ use tap_core::receipt::{
1110
ReceiptWithState,
1211
};
1312
use tokio::sync::watch::Receiver;
14-
use tracing::error;
13+
14+
use crate::middleware::Sender;
1515

1616
pub struct SenderBalanceCheck {
1717
escrow_accounts: Receiver<EscrowAccounts>,
18-
19-
domain_separator: Eip712Domain,
2018
}
2119

2220
impl SenderBalanceCheck {
23-
pub fn new(escrow_accounts: Receiver<EscrowAccounts>, domain_separator: Eip712Domain) -> Self {
24-
Self {
25-
escrow_accounts,
26-
domain_separator,
27-
}
21+
pub fn new(escrow_accounts: Receiver<EscrowAccounts>) -> Self {
22+
Self { escrow_accounts }
2823
}
2924
}
3025

3126
#[async_trait::async_trait]
3227
impl Check for SenderBalanceCheck {
3328
async fn check(
3429
&self,
35-
_: &tap_core::receipt::Context,
36-
receipt: &ReceiptWithState<Checking>,
30+
ctx: &tap_core::receipt::Context,
31+
_: &ReceiptWithState<Checking>,
3732
) -> CheckResult {
3833
let escrow_accounts_snapshot = self.escrow_accounts.borrow();
3934

40-
let receipt_signer = receipt
41-
.signed_receipt()
42-
.recover_signer(&self.domain_separator)
43-
.inspect_err(|e| {
44-
error!("Failed to recover receipt signer: {}", e);
45-
})
46-
.map_err(|e| CheckError::Failed(e.into()))?;
47-
48-
// We bail if the receipt signer does not have a corresponding sender in the escrow
49-
// accounts.
50-
let receipt_sender = escrow_accounts_snapshot
51-
.get_sender_for_signer(&receipt_signer)
52-
.map_err(|e| CheckError::Failed(e.into()))?;
35+
let Sender(receipt_sender) = ctx
36+
.get::<Sender>()
37+
.ok_or(CheckError::Failed(anyhow::anyhow!("Could not find sender")))?;
5338

5439
// Check that the sender has a non-zero balance -- more advanced accounting is done in
5540
// `tap-agent`.
5641
if !escrow_accounts_snapshot
57-
.get_balance_for_sender(&receipt_sender)
42+
.get_balance_for_sender(receipt_sender)
5843
.map_or(false, |balance| balance > U256::ZERO)
5944
{
6045
return Err(CheckError::Failed(anyhow!(
6146
"Receipt sender `{}` does not have a sufficient balance",
62-
receipt_signer,
47+
receipt_sender,
6348
)));
6449
}
6550
Ok(())

0 commit comments

Comments
 (0)