Skip to content

Commit 11910d2

Browse files
authored
feat(common): batch insert receipts via mpsc (#285)
* feat(common): use channel to batch insert Signed-off-by: Gustavo Inacio <[email protected]> * fix(common): only cancel when queue empty Signed-off-by: Gustavo Inacio <[email protected]> --------- Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 933f813 commit 11910d2

4 files changed

+166
-51
lines changed

.sqlx/query-85ff59ba99e8c59b2273bde484cc555a3268c17806410275b9ef943a293d7867.json

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-ed054cb84e667373b57cbb62dbd4a96ec4b8b54f04355188eb7c16703e41e7b3.json

Lines changed: 0 additions & 19 deletions
This file was deleted.

common/src/tap.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,25 @@ use crate::tap::checks::timestamp_check::TimestampCheck;
99
use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation};
1010
use alloy::dyn_abi::Eip712Domain;
1111
use eventuals::Eventual;
12+
use receipt_store::{DatabaseReceipt, InnerContext};
1213
use sqlx::PgPool;
1314
use std::fmt::Debug;
1415
use std::time::Duration;
1516
use std::{collections::HashMap, sync::Arc};
1617
use tap_core::receipt::checks::ReceiptCheck;
1718
use thegraph_core::Address;
19+
use tokio::sync::mpsc::{self, Sender};
20+
use tokio_util::sync::CancellationToken;
1821
use tracing::error;
1922

2023
mod checks;
2124
mod receipt_store;
2225

23-
#[derive(Clone)]
2426
pub struct IndexerTapContext {
25-
pgpool: PgPool,
2627
domain_separator: Arc<Eip712Domain>,
28+
29+
receipt_producer: Sender<DatabaseReceipt>,
30+
cancelation_token: CancellationToken,
2731
}
2832

2933
#[derive(Debug, thiserror::Error)]
@@ -54,9 +58,22 @@ impl IndexerTapContext {
5458
}
5559

5660
pub async fn new(pgpool: PgPool, domain_separator: Eip712Domain) -> Self {
61+
const MAX_RECEIPT_QUEUE_SIZE: usize = 1000;
62+
let (tx, rx) = mpsc::channel(MAX_RECEIPT_QUEUE_SIZE);
63+
let cancelation_token = CancellationToken::new();
64+
let inner = InnerContext { pgpool };
65+
Self::spawn_store_receipt_task(inner, rx, cancelation_token.clone());
66+
5767
Self {
58-
pgpool,
68+
cancelation_token,
69+
receipt_producer: tx,
5970
domain_separator: Arc::new(domain_separator),
6071
}
6172
}
6273
}
74+
75+
impl Drop for IndexerTapContext {
76+
fn drop(&mut self) {
77+
self.cancelation_token.cancel();
78+
}
79+
}

common/src/tap/receipt_store.rs

Lines changed: 127 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,101 @@
11
// Copyright 2023-, GraphOps and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use alloy::hex::ToHexExt;
4+
use alloy::{dyn_abi::Eip712Domain, hex::ToHexExt};
55
use anyhow::anyhow;
66
use bigdecimal::num_bigint::BigInt;
7-
use sqlx::types::BigDecimal;
7+
use sqlx::{types::BigDecimal, PgPool};
88
use tap_core::{
99
manager::adapters::ReceiptStore,
1010
receipt::{state::Checking, ReceiptWithState},
1111
};
12+
use tokio::{select, sync::mpsc::Receiver, task::JoinHandle};
13+
use tokio_util::sync::CancellationToken;
1214
use tracing::error;
1315

1416
use super::{AdapterError, IndexerTapContext};
1517

18+
#[derive(Clone)]
19+
pub struct InnerContext {
20+
pub pgpool: PgPool,
21+
}
22+
23+
impl InnerContext {
24+
async fn store_receipts(&self, receipts: Vec<DatabaseReceipt>) -> Result<(), AdapterError> {
25+
let receipts_len = receipts.len();
26+
let mut signers = Vec::with_capacity(receipts_len);
27+
let mut signatures = Vec::with_capacity(receipts_len);
28+
let mut allocation_ids = Vec::with_capacity(receipts_len);
29+
let mut timestamps = Vec::with_capacity(receipts_len);
30+
let mut nonces = Vec::with_capacity(receipts_len);
31+
let mut values = Vec::with_capacity(receipts_len);
32+
33+
for receipt in receipts {
34+
signers.push(receipt.signer_address);
35+
signatures.push(receipt.signature);
36+
allocation_ids.push(receipt.allocation_id);
37+
timestamps.push(receipt.timestamp_ns);
38+
nonces.push(receipt.nonce);
39+
values.push(receipt.value);
40+
}
41+
sqlx::query!(
42+
r#"INSERT INTO scalar_tap_receipts (
43+
signer_address,
44+
signature,
45+
allocation_id,
46+
timestamp_ns,
47+
nonce,
48+
value
49+
) SELECT * FROM UNNEST(
50+
$1::CHAR(40)[],
51+
$2::BYTEA[],
52+
$3::CHAR(40)[],
53+
$4::NUMERIC(20)[],
54+
$5::NUMERIC(20)[],
55+
$6::NUMERIC(40)[]
56+
)"#,
57+
&signers,
58+
&signatures,
59+
&allocation_ids,
60+
&timestamps,
61+
&nonces,
62+
&values,
63+
)
64+
.execute(&self.pgpool)
65+
.await
66+
.map_err(|e| {
67+
error!("Failed to store receipt: {}", e);
68+
anyhow!(e)
69+
})?;
70+
71+
Ok(())
72+
}
73+
}
74+
75+
impl IndexerTapContext {
76+
pub fn spawn_store_receipt_task(
77+
inner_context: InnerContext,
78+
mut receiver: Receiver<DatabaseReceipt>,
79+
cancelation_token: CancellationToken,
80+
) -> JoinHandle<()> {
81+
const BUFFER_SIZE: usize = 100;
82+
tokio::spawn(async move {
83+
loop {
84+
let mut buffer = Vec::with_capacity(BUFFER_SIZE);
85+
select! {
86+
biased;
87+
_ = receiver.recv_many(&mut buffer, BUFFER_SIZE) => {
88+
if let Err(e) = inner_context.store_receipts(buffer).await {
89+
error!("Failed to store receipts: {}", e);
90+
}
91+
}
92+
_ = cancelation_token.cancelled() => { break },
93+
}
94+
}
95+
})
96+
}
97+
}
98+
1699
#[async_trait::async_trait]
17100
impl ReceiptStore for IndexerTapContext {
18101
type AdapterError = AdapterError;
@@ -21,38 +104,53 @@ impl ReceiptStore for IndexerTapContext {
21104
&self,
22105
receipt: ReceiptWithState<Checking>,
23106
) -> Result<u64, Self::AdapterError> {
107+
let db_receipt = DatabaseReceipt::from_receipt(receipt, &self.domain_separator)?;
108+
self.receipt_producer.send(db_receipt).await.map_err(|e| {
109+
error!("Failed to queue receipt for storage: {}", e);
110+
anyhow!(e)
111+
})?;
112+
113+
// We don't need receipt_ids
114+
Ok(0)
115+
}
116+
}
117+
118+
pub struct DatabaseReceipt {
119+
signer_address: String,
120+
signature: Vec<u8>,
121+
allocation_id: String,
122+
timestamp_ns: BigDecimal,
123+
nonce: BigDecimal,
124+
value: BigDecimal,
125+
}
126+
127+
impl DatabaseReceipt {
128+
fn from_receipt(
129+
receipt: ReceiptWithState<Checking>,
130+
separator: &Eip712Domain,
131+
) -> anyhow::Result<Self> {
24132
let receipt = receipt.signed_receipt();
25-
let allocation_id = receipt.message.allocation_id;
26-
let encoded_signature = receipt.signature.as_bytes().to_vec();
133+
let allocation_id = receipt.message.allocation_id.encode_hex();
134+
let signature = receipt.signature.as_bytes().to_vec();
27135

28-
let receipt_signer = receipt
29-
.recover_signer(self.domain_separator.as_ref())
136+
let signer_address = receipt
137+
.recover_signer(separator)
30138
.map_err(|e| {
31139
error!("Failed to recover receipt signer: {}", e);
32140
anyhow!(e)
33-
})?;
34-
35-
// TODO: consider doing this in another async task to avoid slowing down the paid query flow.
36-
sqlx::query!(
37-
r#"
38-
INSERT INTO scalar_tap_receipts (signer_address, signature, allocation_id, timestamp_ns, nonce, value)
39-
VALUES ($1, $2, $3, $4, $5, $6)
40-
"#,
41-
receipt_signer.encode_hex(),
42-
encoded_signature,
43-
allocation_id.encode_hex(),
44-
BigDecimal::from(receipt.message.timestamp_ns),
45-
BigDecimal::from(receipt.message.nonce),
46-
BigDecimal::from(BigInt::from(receipt.message.value)),
47-
)
48-
.execute(&self.pgpool)
49-
.await
50-
.map_err(|e| {
51-
error!("Failed to store receipt: {}", e);
52-
anyhow!(e)
53-
})?;
141+
})?
142+
.encode_hex();
54143

55-
// We don't need receipt_ids
56-
Ok(0)
144+
let timestamp_ns = BigDecimal::from(receipt.message.timestamp_ns);
145+
let nonce = BigDecimal::from(receipt.message.nonce);
146+
let value = BigDecimal::from(BigInt::from(receipt.message.value));
147+
Ok(Self {
148+
allocation_id,
149+
nonce,
150+
signature,
151+
signer_address,
152+
timestamp_ns,
153+
value,
154+
})
57155
}
58156
}

0 commit comments

Comments
 (0)