Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM debian:bookworm-slim

RUN apt-get update && apt-get install -y --no-install-recommends \
openssl ca-certificates protobuf-compiler postgresql-client curl \
jq git linux-perf \
jq git linux-perf libsasl2-dev\
strace valgrind procps \
bpftrace linux-headers-generic \
&& rm -rf /var/lib/apt/lists/*
Expand Down
2 changes: 1 addition & 1 deletion contrib/indexer-service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY ../../ .
#
ENV SQLX_OFFLINE=true
RUN apt-get update && apt-get install -y --no-install-recommends \
protobuf-compiler && rm -rf /var/lib/apt/lists/*
protobuf-compiler libsasl2-dev && rm -rf /var/lib/apt/lists/*
RUN cargo build --release --bin indexer-service-rs

########################################################################################
Expand Down
2 changes: 1 addition & 1 deletion contrib/tap-agent/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY . .
# the prepared files in the `.sqlx` directory.
ENV SQLX_OFFLINE=true
RUN apt-get update && apt-get install -y --no-install-recommends \
protobuf-compiler && rm -rf /var/lib/apt/lists/*
protobuf-compiler libsasl2-dev && rm -rf /var/lib/apt/lists/*
RUN cargo build --release --bin indexer-tap-agent

########################################################################################
Expand Down
5 changes: 4 additions & 1 deletion crates/service/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ const GRACE_PERIOD: u64 = 60;
#[derive(Clone)]
pub struct IndexerTapContext {
domain_separator: Arc<Eip712Domain>,
receipt_producer: Sender<DatabaseReceipt>,
receipt_producer: Sender<(
DatabaseReceipt,
tokio::sync::oneshot::Sender<Result<(), AdapterError>>,
)>,
cancelation_token: CancellationToken,
}

Expand Down
85 changes: 72 additions & 13 deletions crates/service/src/tap/receipt_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use itertools::{Either, Itertools};
use sqlx::{types::BigDecimal, PgPool};
use tap_core::{manager::adapters::ReceiptStore, receipt::WithValueAndTimestamp};
use thegraph_core::alloy::{hex::ToHexExt, sol_types::Eip712Domain};
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use tokio::{sync::mpsc::Receiver, sync::oneshot::Sender as OneShotSender, task::JoinHandle};
use tokio_util::sync::CancellationToken;

use super::{AdapterError, CheckingReceipt, IndexerTapContext, TapReceipt};
Expand Down Expand Up @@ -40,19 +40,28 @@ pub enum ProcessedReceipt {
impl InnerContext {
async fn process_db_receipts(
&self,
buffer: Vec<DatabaseReceipt>,
buffer: Vec<(DatabaseReceipt, OneShotSender<Result<(), AdapterError>>)>,
) -> Result<ProcessedReceipt, ProcessReceiptError> {
let (v1_receipts, v2_receipts): (Vec<_>, Vec<_>) =
buffer.into_iter().partition_map(|r| match r {
DatabaseReceipt::V1(db_receipt_v1) => Either::Left(db_receipt_v1),
DatabaseReceipt::V2(db_receipt_v2) => Either::Right(db_receipt_v2),
});
let (v1_data, v2_data): (Vec<_>, Vec<_>) =
buffer
.into_iter()
.partition_map(|(receipt, sender)| match receipt {
DatabaseReceipt::V1(receipt) => Either::Left((receipt, sender)),
DatabaseReceipt::V2(receipt) => Either::Right((receipt, sender)),
});

let (v1_receipts, v1_senders): (Vec<_>, Vec<_>) = v1_data.into_iter().unzip();
let (v2_receipts, v2_senders): (Vec<_>, Vec<_>) = v2_data.into_iter().unzip();

let (insert_v1, insert_v2) = tokio::join!(
self.store_receipts_v1(v1_receipts),
self.store_receipts_v2(v2_receipts),
);

// send back the result of storing receipts to callers
Self::notify_senders(v1_senders, &insert_v1, "V1");
Self::notify_senders(v2_senders, &insert_v2, "V2");

match (insert_v1, insert_v2) {
(Err(e1), Err(e2)) => Err(ProcessReceiptError::Both(e1.into(), e2.into())),

Expand All @@ -66,6 +75,29 @@ impl InnerContext {
}
}

fn notify_senders(
senders: Vec<OneShotSender<Result<(), AdapterError>>>,
result: &Result<Option<u64>, AdapterError>,
version: &str,
) {
match result {
Ok(_) => {
for sender in senders {
let _ = sender.send(Ok(()));
}
}
Err(e) => {
// Create error message once
let err_msg = format!("Failed to store {} receipts: {}", version, e);
tracing::error!("{}", err_msg);
for sender in senders {
// Convert to AdapterError for each sender
let _ = sender.send(Err(anyhow!(err_msg.clone()).into()));
}
}
}
}

async fn store_receipts_v1(
&self,
receipts: Vec<DbReceiptV1>,
Expand Down Expand Up @@ -197,7 +229,7 @@ impl InnerContext {
impl IndexerTapContext {
pub fn spawn_store_receipt_task(
inner_context: InnerContext,
mut receiver: Receiver<DatabaseReceipt>,
mut receiver: Receiver<(DatabaseReceipt, OneShotSender<Result<(), AdapterError>>)>,
cancelation_token: CancellationToken,
) -> JoinHandle<()> {
const BUFFER_SIZE: usize = 100;
Expand All @@ -224,13 +256,19 @@ impl ReceiptStore<TapReceipt> for IndexerTapContext {

async fn store_receipt(&self, receipt: CheckingReceipt) -> Result<u64, Self::AdapterError> {
let db_receipt = DatabaseReceipt::from_receipt(receipt, &self.domain_separator)?;
self.receipt_producer.send(db_receipt).await.map_err(|e| {
tracing::error!("Failed to queue receipt for storage: {}", e);
anyhow!(e)
})?;
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
self.receipt_producer
.send((db_receipt, result_tx))
.await
.map_err(|e| {
tracing::error!("Failed to queue receipt for storage: {}", e);
anyhow!(e)
})?;

let res = result_rx.await.map_err(|e| anyhow!(e))?;

// We don't need receipt_ids
Ok(0)
res.map(|_| 0)
}
}

Expand Down Expand Up @@ -374,6 +412,23 @@ mod tests {
DatabaseReceipt::V2(DbReceiptV2::from_receipt(&v2, &TAP_EIP712_DOMAIN).unwrap())
}

pub type VecReceiptTx = Vec<(
DatabaseReceipt,
tokio::sync::oneshot::Sender<Result<(), AdapterError>>,
)>;
pub type VecRx = Vec<tokio::sync::oneshot::Receiver<Result<(), AdapterError>>>;

pub fn attach_oneshot_channels(receipts: Vec<DatabaseReceipt>) -> (VecReceiptTx, VecRx) {
let mut txs = Vec::with_capacity(receipts.len());
let mut rxs = Vec::with_capacity(receipts.len());
for r in receipts.into_iter() {
let (tx, rx) = tokio::sync::oneshot::channel();
txs.push((r, tx));
rxs.push(rx);
}
(txs, rxs)
}

mod when_all_migrations_are_run {
use super::*;

Expand All @@ -391,6 +446,7 @@ mod tests {
receipts: Vec<DatabaseReceipt>,
) {
let context = InnerContext { pgpool };
let (receipts, _rxs) = attach_oneshot_channels(receipts);

let res = context.process_db_receipts(receipts).await.unwrap();

Expand All @@ -415,7 +471,9 @@ mod tests {
let context = InnerContext { pgpool };

let v1 = create_v1().await;

let receipts = vec![v1];
let (receipts, _rxs) = attach_oneshot_channels(receipts);

let res = context.process_db_receipts(receipts).await.unwrap();

Expand All @@ -434,6 +492,7 @@ mod tests {
) {
let context = InnerContext { pgpool };

let (receipts, _rxs) = attach_oneshot_channels(receipts);
let error = context.process_db_receipts(receipts).await.unwrap_err();

let ProcessReceiptError::V2(error) = error else {
Expand Down
7 changes: 4 additions & 3 deletions integration-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use rav_tests::test_tap_rav_v1;
use rav_tests::{test_invalid_chain_id, test_tap_rav_v1};

mod metrics;
mod rav_tests;
mod receipt;
mod utils;

use metrics::MetricsChecker;
use receipt::create_tap_receipt;

#[tokio::main]
async fn main() -> Result<()> {
// Run the TAP receipt test
test_invalid_chain_id().await?;
test_tap_rav_v1().await
}
Loading
Loading