Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM debian:bookworm-slim

RUN apt-get update && apt-get install -y --no-install-recommends \
openssl ca-certificates protobuf-compiler postgresql-client curl \
jq git linux-perf \
jq git linux-perf libsasl2-dev\
strace valgrind procps \
bpftrace linux-headers-generic \
&& rm -rf /var/lib/apt/lists/*
Expand Down
2 changes: 1 addition & 1 deletion contrib/indexer-service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY ../../ .
#
ENV SQLX_OFFLINE=true
RUN apt-get update && apt-get install -y --no-install-recommends \
protobuf-compiler && rm -rf /var/lib/apt/lists/*
protobuf-compiler libsasl2-dev && rm -rf /var/lib/apt/lists/*
RUN cargo build --release --bin indexer-service-rs

########################################################################################
Expand Down
2 changes: 1 addition & 1 deletion contrib/tap-agent/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY . .
# the prepared files in the `.sqlx` directory.
ENV SQLX_OFFLINE=true
RUN apt-get update && apt-get install -y --no-install-recommends \
protobuf-compiler && rm -rf /var/lib/apt/lists/*
protobuf-compiler libsasl2-dev && rm -rf /var/lib/apt/lists/*
RUN cargo build --release --bin indexer-tap-agent

########################################################################################
Expand Down
5 changes: 4 additions & 1 deletion crates/service/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ const GRACE_PERIOD: u64 = 60;
#[derive(Clone)]
pub struct IndexerTapContext {
domain_separator: Arc<Eip712Domain>,
receipt_producer: Sender<DatabaseReceipt>,
receipt_producer: Sender<(
DatabaseReceipt,
tokio::sync::oneshot::Sender<Result<(), AdapterError>>,
)>,
cancelation_token: CancellationToken,
}

Expand Down
85 changes: 72 additions & 13 deletions crates/service/src/tap/receipt_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use itertools::{Either, Itertools};
use sqlx::{types::BigDecimal, PgPool};
use tap_core::{manager::adapters::ReceiptStore, receipt::WithValueAndTimestamp};
use thegraph_core::alloy::{hex::ToHexExt, sol_types::Eip712Domain};
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use tokio::{sync::mpsc::Receiver, sync::oneshot::Sender as OneShotSender, task::JoinHandle};
use tokio_util::sync::CancellationToken;

use super::{AdapterError, CheckingReceipt, IndexerTapContext, TapReceipt};
Expand Down Expand Up @@ -40,19 +40,28 @@ pub enum ProcessedReceipt {
impl InnerContext {
async fn process_db_receipts(
&self,
buffer: Vec<DatabaseReceipt>,
buffer: Vec<(DatabaseReceipt, OneShotSender<Result<(), AdapterError>>)>,
) -> Result<ProcessedReceipt, ProcessReceiptError> {
let (v1_receipts, v2_receipts): (Vec<_>, Vec<_>) =
buffer.into_iter().partition_map(|r| match r {
DatabaseReceipt::V1(db_receipt_v1) => Either::Left(db_receipt_v1),
DatabaseReceipt::V2(db_receipt_v2) => Either::Right(db_receipt_v2),
});
let (v1_data, v2_data): (Vec<_>, Vec<_>) =
buffer
.into_iter()
.partition_map(|(receipt, sender)| match receipt {
DatabaseReceipt::V1(receipt) => Either::Left((receipt, sender)),
DatabaseReceipt::V2(receipt) => Either::Right((receipt, sender)),
});

let (v1_receipts, v1_senders): (Vec<_>, Vec<_>) = v1_data.into_iter().unzip();
let (v2_receipts, v2_senders): (Vec<_>, Vec<_>) = v2_data.into_iter().unzip();

let (insert_v1, insert_v2) = tokio::join!(
self.store_receipts_v1(v1_receipts),
self.store_receipts_v2(v2_receipts),
);

// send back the result of storing receipts to callers
Self::notify_senders(v1_senders, &insert_v1, "V1");
Self::notify_senders(v2_senders, &insert_v2, "V2");

match (insert_v1, insert_v2) {
(Err(e1), Err(e2)) => Err(ProcessReceiptError::Both(e1.into(), e2.into())),

Expand All @@ -66,6 +75,29 @@ impl InnerContext {
}
}

fn notify_senders(
senders: Vec<OneShotSender<Result<(), AdapterError>>>,
result: &Result<Option<u64>, AdapterError>,
version: &str,
) {
match result {
Ok(_) => {
for sender in senders {
let _ = sender.send(Ok(()));
}
}
Err(e) => {
// Create error message once
let err_msg = format!("Failed to store {} receipts: {}", version, e);
tracing::error!("{}", err_msg);
for sender in senders {
// Convert to AdapterError for each sender
let _ = sender.send(Err(anyhow!(err_msg.clone()).into()));
}
}
}
}

async fn store_receipts_v1(
&self,
receipts: Vec<DbReceiptV1>,
Expand Down Expand Up @@ -197,7 +229,7 @@ impl InnerContext {
impl IndexerTapContext {
pub fn spawn_store_receipt_task(
inner_context: InnerContext,
mut receiver: Receiver<DatabaseReceipt>,
mut receiver: Receiver<(DatabaseReceipt, OneShotSender<Result<(), AdapterError>>)>,
cancelation_token: CancellationToken,
) -> JoinHandle<()> {
const BUFFER_SIZE: usize = 100;
Expand All @@ -224,13 +256,19 @@ impl ReceiptStore<TapReceipt> for IndexerTapContext {

async fn store_receipt(&self, receipt: CheckingReceipt) -> Result<u64, Self::AdapterError> {
let db_receipt = DatabaseReceipt::from_receipt(receipt, &self.domain_separator)?;
self.receipt_producer.send(db_receipt).await.map_err(|e| {
tracing::error!("Failed to queue receipt for storage: {}", e);
anyhow!(e)
})?;
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
self.receipt_producer
.send((db_receipt, result_tx))
.await
.map_err(|e| {
tracing::error!("Failed to queue receipt for storage: {}", e);
anyhow!(e)
})?;

let res = result_rx.await.map_err(|e| anyhow!(e))?;

// We don't need receipt_ids
Ok(0)
res.map(|_| 0)
}
}

Expand Down Expand Up @@ -374,6 +412,23 @@ mod tests {
DatabaseReceipt::V2(DbReceiptV2::from_receipt(&v2, &TAP_EIP712_DOMAIN).unwrap())
}

pub type VecReceiptTx = Vec<(
DatabaseReceipt,
tokio::sync::oneshot::Sender<Result<(), AdapterError>>,
)>;
pub type VecRx = Vec<tokio::sync::oneshot::Receiver<Result<(), AdapterError>>>;

pub fn attach_oneshot_channels(receipts: Vec<DatabaseReceipt>) -> (VecReceiptTx, VecRx) {
let mut txs = Vec::with_capacity(receipts.len());
let mut rxs = Vec::with_capacity(receipts.len());
for r in receipts.into_iter() {
let (tx, rx) = tokio::sync::oneshot::channel();
txs.push((r, tx));
rxs.push(rx);
}
(txs, rxs)
}

mod when_all_migrations_are_run {
use super::*;

Expand All @@ -391,6 +446,7 @@ mod tests {
receipts: Vec<DatabaseReceipt>,
) {
let context = InnerContext { pgpool };
let (receipts, _rxs) = attach_oneshot_channels(receipts);

let res = context.process_db_receipts(receipts).await.unwrap();

Expand All @@ -415,7 +471,9 @@ mod tests {
let context = InnerContext { pgpool };

let v1 = create_v1().await;

let receipts = vec![v1];
let (receipts, _rxs) = attach_oneshot_channels(receipts);

let res = context.process_db_receipts(receipts).await.unwrap();

Expand All @@ -434,6 +492,7 @@ mod tests {
) {
let context = InnerContext { pgpool };

let (receipts, _rxs) = attach_oneshot_channels(receipts);
let error = context.process_db_receipts(receipts).await.unwrap_err();

let ProcessReceiptError::V2(error) = error else {
Expand Down
7 changes: 4 additions & 3 deletions integration-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use rav_tests::test_tap_rav_v1;
use rav_tests::{test_invalid_chain_id, test_tap_rav_v1};

mod metrics;
mod rav_tests;
mod receipt;
mod utils;

use metrics::MetricsChecker;
use receipt::create_tap_receipt;

#[tokio::main]
async fn main() -> Result<()> {
// Run the TAP receipt test
test_invalid_chain_id().await?;
test_tap_rav_v1().await
}
Loading
Loading