Skip to content

Commit d3a0829

Browse files
committed
fix: add metric and unbounded sender for receipt store
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent e018aab commit d3a0829

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

crates/service/src/tap.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use sqlx::PgPool;
1010
use tap_core::receipt::checks::ReceiptCheck;
1111
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
1212
use tokio::sync::{
13-
mpsc::{self, Sender},
13+
mpsc::{self, UnboundedSender},
1414
watch::Receiver,
1515
};
1616
use tokio_util::sync::CancellationToken;
@@ -31,7 +31,7 @@ const GRACE_PERIOD: u64 = 60;
3131
#[derive(Clone)]
3232
pub struct IndexerTapContext {
3333
domain_separator: Arc<Eip712Domain>,
34-
receipt_producer: Sender<DatabaseReceipt>,
34+
receipt_producer: UnboundedSender<DatabaseReceipt>,
3535
cancelation_token: CancellationToken,
3636
}
3737

@@ -60,8 +60,9 @@ impl IndexerTapContext {
6060
}
6161

6262
pub async fn new(pgpool: PgPool, domain_separator: Eip712Domain) -> Self {
63-
const MAX_RECEIPT_QUEUE_SIZE: usize = 1000;
64-
let (tx, rx) = mpsc::channel(MAX_RECEIPT_QUEUE_SIZE);
63+
// const MAX_RECEIPT_QUEUE_SIZE: usize = 1000;
64+
// let (tx, rx) = mpsc::channel(MAX_RECEIPT_QUEUE_SIZE);
65+
let (tx, rx) = mpsc::unbounded_channel();
6566
let cancelation_token = CancellationToken::new();
6667
let inner = InnerContext { pgpool };
6768
Self::spawn_store_receipt_task(inner, rx, cancelation_token.clone());

crates/service/src/tap/receipt_store.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,27 @@
33

44
use anyhow::anyhow;
55
use bigdecimal::num_bigint::BigInt;
6+
use lazy_static::lazy_static;
7+
use prometheus::{register_gauge, Gauge};
68
use sqlx::{types::BigDecimal, PgPool};
79
use tap_core::{
810
manager::adapters::ReceiptStore,
911
receipt::{state::Checking, ReceiptWithState},
1012
};
1113
use thegraph_core::alloy::{hex::ToHexExt, sol_types::Eip712Domain};
12-
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
14+
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle};
1315
use tokio_util::sync::CancellationToken;
1416

1517
use super::{AdapterError, IndexerTapContext};
1618

19+
lazy_static! {
20+
pub static ref RECEIPTS_IN_QUEUE: Gauge = register_gauge!(
21+
"indexer_receipts_in_queue_total",
22+
"Total receipts waiting to be stored",
23+
)
24+
.unwrap();
25+
}
26+
1727
#[derive(Clone)]
1828
pub struct InnerContext {
1929
pub pgpool: PgPool,
@@ -74,7 +84,7 @@ impl InnerContext {
7484
impl IndexerTapContext {
7585
pub fn spawn_store_receipt_task(
7686
inner_context: InnerContext,
77-
mut receiver: Receiver<DatabaseReceipt>,
87+
mut receiver: UnboundedReceiver<DatabaseReceipt>,
7888
cancelation_token: CancellationToken,
7989
) -> JoinHandle<()> {
8090
const BUFFER_SIZE: usize = 100;
@@ -83,7 +93,8 @@ impl IndexerTapContext {
8393
let mut buffer = Vec::with_capacity(BUFFER_SIZE);
8494
tokio::select! {
8595
biased;
86-
_ = receiver.recv_many(&mut buffer, BUFFER_SIZE) => {
96+
size = receiver.recv_many(&mut buffer, BUFFER_SIZE) => {
97+
RECEIPTS_IN_QUEUE.sub(size as f64);
8798
if let Err(e) = inner_context.store_receipts(buffer).await {
8899
tracing::error!("Failed to store receipts: {}", e);
89100
}
@@ -104,10 +115,11 @@ impl ReceiptStore for IndexerTapContext {
104115
receipt: ReceiptWithState<Checking>,
105116
) -> Result<u64, Self::AdapterError> {
106117
let db_receipt = DatabaseReceipt::from_receipt(receipt, &self.domain_separator)?;
107-
self.receipt_producer.send(db_receipt).await.map_err(|e| {
118+
self.receipt_producer.send(db_receipt).map_err(|e| {
108119
tracing::error!("Failed to queue receipt for storage: {}", e);
109120
anyhow!(e)
110121
})?;
122+
RECEIPTS_IN_QUEUE.inc();
111123

112124
// We don't need receipt_ids
113125
Ok(0)

0 commit comments

Comments
 (0)