From 73dd2769d81e4d3acf714b52d49f5cd7c78e73cb Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 5 Jan 2026 11:11:24 -0500 Subject: [PATCH 01/11] perf: improve audit --- crates/audit/src/storage.rs | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index a0ba49f..6684b9f 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -532,21 +532,32 @@ impl EventWriter for S3EventReaderWriter { let bundle_id = event.event.bundle_id(); let transaction_ids = event.event.transaction_ids(); - let start = Instant::now(); - self.update_bundle_history(event.clone()).await?; - self.metrics - .update_bundle_history_duration - .record(start.elapsed().as_secs_f64()); + let bundle_writer = self.clone(); + let bundle_task = tokio::spawn(async move { + let start = Instant::now(); + let result = bundle_writer.update_bundle_history(event).await; + bundle_writer + .metrics + .update_bundle_history_duration + .record(start.elapsed().as_secs_f64()); + result + }); let start = Instant::now(); - for tx_id in &transaction_ids { - self.update_transaction_by_hash_index(tx_id, bundle_id) - .await?; + let mut join_set = tokio::task::JoinSet::new(); + for tx_id in transaction_ids { + let tx_writer = self.clone(); + join_set.spawn(async move { + tx_writer + .update_transaction_by_hash_index(&tx_id, bundle_id) + .await + }); } self.metrics .update_tx_indexes_duration .record(start.elapsed().as_secs_f64()); + bundle_task.await??; Ok(()) } } From 128bcecaf916e219666693567d77d90849cd4a6d Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 5 Jan 2026 11:35:45 -0500 Subject: [PATCH 02/11] fix metric --- crates/audit/src/storage.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index 6684b9f..47b5e73 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -544,15 +544,21 @@ impl EventWriter for S3EventReaderWriter { }); let start = Instant::now(); - let mut join_set = tokio::task::JoinSet::new(); - for tx_id in transaction_ids { - let tx_writer = self.clone(); - join_set.spawn(async move { - tx_writer - .update_transaction_by_hash_index(&tx_id, bundle_id) - .await - }); + let tasks: Vec<_> = transaction_ids + .into_iter() + .map(|tx_id| { + let tx_writer = self.clone(); + tokio::spawn(async move { + tx_writer + .update_transaction_by_hash_index(&tx_id, bundle_id) + .await + }) + }) + .collect(); + for task in tasks { + task.await??; } + self.metrics .update_tx_indexes_duration .record(start.elapsed().as_secs_f64()); From 76520337c628966a17ca1a7b63bc2bc1b4f60bcf Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 5 Jan 2026 14:10:25 -0500 Subject: [PATCH 03/11] wip --- crates/audit/src/storage.rs | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index 47b5e73..9ce3c11 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -533,7 +533,8 @@ impl EventWriter for S3EventReaderWriter { let transaction_ids = event.event.transaction_ids(); let bundle_writer = self.clone(); - let bundle_task = tokio::spawn(async move { + // TODO: handle errors + tokio::spawn(async move { let start = Instant::now(); let result = bundle_writer.update_bundle_history(event).await; bundle_writer @@ -543,27 +544,20 @@ impl EventWriter for S3EventReaderWriter { result }); - let start = Instant::now(); - let tasks: Vec<_> = transaction_ids - .into_iter() - .map(|tx_id| { - let tx_writer = self.clone(); - tokio::spawn(async move { - tx_writer - .update_transaction_by_hash_index(&tx_id, bundle_id) - .await - }) - }) - .collect(); - for task in tasks { - task.await??; + for tx_id in transaction_ids { + let start = Instant::now(); + let tx_writer = self.clone(); + tokio::spawn(async move { + let _ = tx_writer + .update_transaction_by_hash_index(&tx_id, bundle_id) + .await; + tx_writer + .metrics + .update_tx_indexes_duration + .record(start.elapsed().as_secs_f64()); + }); } - self.metrics - .update_tx_indexes_duration - .record(start.elapsed().as_secs_f64()); - - bundle_task.await??; Ok(()) } } From b4f2e023e00a8c2aa490b1d2810211b4641196bc Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 5 Jan 2026 14:51:31 -0500 Subject: [PATCH 04/11] add error logs --- .gitignore | 2 +- crates/audit/src/storage.rs | 33 +++++++++++++++++++-------------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index cd0eced..509ea6b 100644 --- a/.gitignore +++ b/.gitignore @@ -26,4 +26,4 @@ Thumbs.db /ui/.claude # e2e / load tests -wallets.json \ No newline at end of file +wallets.json diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index 9ce3c11..7bf7a34 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -15,7 +15,7 @@ use std::fmt; use std::fmt::Debug; use std::time::Instant; use tips_core::AcceptedBundle; -use tracing::info; +use tracing::{error, info}; /// S3 key types for storing different event types. #[derive(Debug)] @@ -533,28 +533,33 @@ impl EventWriter for S3EventReaderWriter { let transaction_ids = event.event.transaction_ids(); let bundle_writer = self.clone(); - // TODO: handle errors tokio::spawn(async move { let start = Instant::now(); - let result = bundle_writer.update_bundle_history(event).await; - bundle_writer - .metrics - .update_bundle_history_duration - .record(start.elapsed().as_secs_f64()); - result + if let Err(e) = bundle_writer.update_bundle_history(event).await { + error!(error = %e, "Failed to update bundle history"); + } else { + bundle_writer + .metrics + .update_bundle_history_duration + .record(start.elapsed().as_secs_f64()); + } }); for tx_id in transaction_ids { let start = Instant::now(); let tx_writer = self.clone(); tokio::spawn(async move { - let _ = tx_writer + if let Err(e) = tx_writer .update_transaction_by_hash_index(&tx_id, bundle_id) - .await; - tx_writer - .metrics - .update_tx_indexes_duration - .record(start.elapsed().as_secs_f64()); + .await + { + error!(error = %e, "Failed to update transaction by hash index"); + } else { + tx_writer + .metrics + .update_tx_indexes_duration + .record(start.elapsed().as_secs_f64()); + } }); } From 51b3d2475643371f7f975fe626ce88ce9842d1f4 Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 5 Jan 2026 15:47:50 -0500 Subject: [PATCH 05/11] worker pool --- crates/audit/src/archiver.rs | 82 ++++++++++++++++++++++++++---------- crates/audit/src/storage.rs | 55 ++++++++++++------------ 2 files changed, 87 insertions(+), 50 deletions(-) diff --git a/crates/audit/src/archiver.rs b/crates/audit/src/archiver.rs index 80f038a..ae6dfeb 100644 --- a/crates/audit/src/archiver.rs +++ b/crates/audit/src/archiver.rs @@ -1,12 +1,19 @@ use crate::metrics::Metrics; -use crate::reader::EventReader; +use crate::reader::{Event, EventReader}; use crate::storage::EventWriter; use anyhow::Result; use std::fmt; +use std::marker::PhantomData; +use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use tokio::sync::{Mutex, mpsc}; use tokio::time::sleep; use tracing::{error, info}; +// TODO: make this configurable +const WORKER_POOL_SIZE: usize = 80; +const CHANNEL_BUFFER_SIZE: usize = 500; + /// Archives audit events from Kafka to S3 storage. pub struct KafkaAuditArchiver where @@ -14,8 +21,9 @@ where W: EventWriter + Clone + Send + 'static, { reader: R, - writer: W, + event_tx: mpsc::Sender, metrics: Metrics, + _phantom: PhantomData, } impl fmt::Debug for KafkaAuditArchiver @@ -35,17 +43,59 @@ where { /// Creates a new archiver with the given reader and writer. pub fn new(reader: R, writer: W) -> Self { + let (event_tx, event_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE); + let metrics = Metrics::default(); + + Self::spawn_workers(writer, event_rx, metrics.clone()); + Self { reader, - writer, - metrics: Metrics::default(), + event_tx, + metrics, + _phantom: PhantomData, + } + } + + fn spawn_workers(writer: W, event_rx: mpsc::Receiver, metrics: Metrics) { + let event_rx = Arc::new(Mutex::new(event_rx)); + + for worker_id in 0..WORKER_POOL_SIZE { + let writer = writer.clone(); + let metrics = metrics.clone(); + let event_rx = event_rx.clone(); + + tokio::spawn(async move { + loop { + let event = { + let mut rx = event_rx.lock().await; + rx.recv().await + }; + + match event { + Some(event) => { + let archive_start = Instant::now(); + if let Err(e) = writer.archive_event(event).await { + error!(worker_id, error = %e, "Failed to write event"); + } else { + metrics + .archive_event_duration + .record(archive_start.elapsed().as_secs_f64()); + metrics.events_processed.increment(1); + } + metrics.in_flight_archive_tasks.decrement(1.0); + } + None => { + info!(worker_id, "Worker stopped - channel closed"); + break; + } + } + } + }); } } /// Runs the archiver loop, reading events and writing them to storage. pub async fn run(&mut self) -> Result<()> { - info!("Starting Kafka bundle archiver"); - loop { let read_start = Instant::now(); match self.reader.read_event().await { @@ -61,23 +111,11 @@ where let event_age_ms = now_ms.saturating_sub(event.timestamp); self.metrics.event_age.record(event_age_ms as f64); - // TODO: the integration test breaks because Minio doesn't support etag - let writer = self.writer.clone(); - let metrics = self.metrics.clone(); self.metrics.in_flight_archive_tasks.increment(1.0); - tokio::spawn(async move { - let archive_start = Instant::now(); - if let Err(e) = writer.archive_event(event).await { - error!(error = %e, "Failed to write event"); - metrics.failed_archive_tasks.increment(1); - } else { - metrics - .archive_event_duration - .record(archive_start.elapsed().as_secs_f64()); - metrics.events_processed.increment(1); - } - metrics.in_flight_archive_tasks.decrement(1.0); - }); + if let Err(e) = self.event_tx.send(event).await { + error!(error = %e, "Failed to send event to worker pool"); + self.metrics.in_flight_archive_tasks.decrement(1.0); + } let commit_start = Instant::now(); if let Err(e) = self.reader.commit().await { diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index 7bf7a34..27469e9 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -15,7 +15,7 @@ use std::fmt; use std::fmt::Debug; use std::time::Instant; use tips_core::AcceptedBundle; -use tracing::{error, info}; +use tracing::info; /// S3 key types for storing different event types. #[derive(Debug)] @@ -532,36 +532,35 @@ impl EventWriter for S3EventReaderWriter { let bundle_id = event.event.bundle_id(); let transaction_ids = event.event.transaction_ids(); - let bundle_writer = self.clone(); - tokio::spawn(async move { - let start = Instant::now(); - if let Err(e) = bundle_writer.update_bundle_history(event).await { - error!(error = %e, "Failed to update bundle history"); - } else { - bundle_writer - .metrics - .update_bundle_history_duration - .record(start.elapsed().as_secs_f64()); + let bundle_start = Instant::now(); + let bundle_future = self.update_bundle_history(event); + + let tx_start = Instant::now(); + let tx_futures: Vec<_> = transaction_ids + .into_iter() + .map(|tx_id| async move { + self.update_transaction_by_hash_index(&tx_id, bundle_id) + .await + }) + .collect(); + + let (bundle_result, tx_results) = tokio::join!(bundle_future, async { + let mut results = Vec::new(); + for fut in tx_futures { + results.push(fut.await); } + results.into_iter().collect::>>() }); - for tx_id in transaction_ids { - let start = Instant::now(); - let tx_writer = self.clone(); - tokio::spawn(async move { - if let Err(e) = tx_writer - .update_transaction_by_hash_index(&tx_id, bundle_id) - .await - { - error!(error = %e, "Failed to update transaction by hash index"); - } else { - tx_writer - .metrics - .update_tx_indexes_duration - .record(start.elapsed().as_secs_f64()); - } - }); - } + bundle_result?; + tx_results?; + + self.metrics + .update_bundle_history_duration + .record(bundle_start.elapsed().as_secs_f64()); + self.metrics + .update_tx_indexes_duration + .record(tx_start.elapsed().as_secs_f64()); Ok(()) } From 6a1813a3b094f61a1329e2e9785a7f50f8e641e5 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 6 Jan 2026 13:25:21 -0500 Subject: [PATCH 06/11] make params configurable --- bin/tips-audit/src/main.rs | 13 ++++++++++++- crates/audit/src/archiver.rs | 19 ++++++++++--------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/bin/tips-audit/src/main.rs b/bin/tips-audit/src/main.rs index db958eb..38be65b 100644 --- a/bin/tips-audit/src/main.rs +++ b/bin/tips-audit/src/main.rs @@ -53,6 +53,12 @@ struct Args { #[arg(long, env = "TIPS_AUDIT_METRICS_ADDR", default_value = "0.0.0.0:9002")] metrics_addr: SocketAddr, + + #[arg(long, env = "TIPS_AUDIT_WORKER_POOL_SIZE", default_value = "80")] + worker_pool_size: usize, + + #[arg(long, env = "TIPS_AUDIT_CHANNEL_BUFFER_SIZE", default_value = "500")] + channel_buffer_size: usize, } #[tokio::main] @@ -82,7 +88,12 @@ async fn main() -> Result<()> { let s3_bucket = args.s3_bucket.clone(); let writer = S3EventReaderWriter::new(s3_client, s3_bucket); - let mut archiver = KafkaAuditArchiver::new(reader, writer); + let mut archiver = KafkaAuditArchiver::new( + reader, + writer, + args.worker_pool_size, + args.channel_buffer_size, + ); info!("Audit archiver initialized, starting main loop"); diff --git a/crates/audit/src/archiver.rs b/crates/audit/src/archiver.rs index ae6dfeb..f09d34e 100644 --- a/crates/audit/src/archiver.rs +++ b/crates/audit/src/archiver.rs @@ -10,10 +10,6 @@ use tokio::sync::{Mutex, mpsc}; use tokio::time::sleep; use tracing::{error, info}; -// TODO: make this configurable -const WORKER_POOL_SIZE: usize = 80; -const CHANNEL_BUFFER_SIZE: usize = 500; - /// Archives audit events from Kafka to S3 storage. pub struct KafkaAuditArchiver where @@ -42,11 +38,11 @@ where W: EventWriter + Clone + Send + 'static, { /// Creates a new archiver with the given reader and writer. - pub fn new(reader: R, writer: W) -> Self { - let (event_tx, event_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE); + pub fn new(reader: R, writer: W, worker_pool_size: usize, channel_buffer_size: usize) -> Self { + let (event_tx, event_rx) = mpsc::channel(channel_buffer_size); let metrics = Metrics::default(); - Self::spawn_workers(writer, event_rx, metrics.clone()); + Self::spawn_workers(writer, event_rx, metrics.clone(), worker_pool_size); Self { reader, @@ -56,10 +52,15 @@ where } } - fn spawn_workers(writer: W, event_rx: mpsc::Receiver, metrics: Metrics) { + fn spawn_workers( + writer: W, + event_rx: mpsc::Receiver, + metrics: Metrics, + worker_pool_size: usize, + ) { let event_rx = Arc::new(Mutex::new(event_rx)); - for worker_id in 0..WORKER_POOL_SIZE { + for worker_id in 0..worker_pool_size { let writer = writer.clone(); let metrics = metrics.clone(); let event_rx = event_rx.clone(); From 89a3effb1bd5f93f04dfe6a296ac71f38efbbc17 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 6 Jan 2026 13:33:57 -0500 Subject: [PATCH 07/11] simplify try_join --- crates/audit/src/storage.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index 27469e9..812da27 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -544,16 +544,12 @@ impl EventWriter for S3EventReaderWriter { }) .collect(); - let (bundle_result, tx_results) = tokio::join!(bundle_future, async { - let mut results = Vec::new(); + tokio::try_join!(bundle_future, async { for fut in tx_futures { - results.push(fut.await); + fut.await?; } - results.into_iter().collect::>>() - }); - - bundle_result?; - tx_results?; + Ok(()) + })?; self.metrics .update_bundle_history_duration From 153a72619abb9e4529e330864d10f28fe4009ed9 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 6 Jan 2026 15:21:50 -0500 Subject: [PATCH 08/11] try_join_all --- Cargo.lock | 1 + Cargo.toml | 1 + crates/audit/Cargo.toml | 1 + crates/audit/src/storage.rs | 9 +++------ 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c0f4707..6442fa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7096,6 +7096,7 @@ dependencies = [ "async-trait", "aws-sdk-s3", "bytes", + "futures", "metrics", "metrics-derive", "rdkafka", diff --git a/Cargo.toml b/Cargo.toml index bf8b84a..310ec9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,3 +97,4 @@ testcontainers = { version = "0.23.1", default-features = false } tracing-subscriber = { version = "0.3.20", default-features = false } testcontainers-modules = { version = "0.11.2", default-features = false } metrics-exporter-prometheus = { version = "0.17.0", default-features = false } +futures = { version = "0.3.31", default-features = false } diff --git a/crates/audit/Cargo.toml b/crates/audit/Cargo.toml index 76c5ad6..032fd9a 100644 --- a/crates/audit/Cargo.toml +++ b/crates/audit/Cargo.toml @@ -26,6 +26,7 @@ rdkafka = { workspace = true, features = ["tokio", "libz", "zstd", "ssl-vendored alloy-consensus = { workspace = true, features = ["std"] } alloy-primitives = { workspace = true, features = ["map-foldhash", "serde"] } aws-sdk-s3 = { workspace = true, features = ["rustls", "default-https-client", "rt-tokio"] } +futures = { workspace = true } [dev-dependencies] testcontainers = { workspace = true, features = ["blocking"] } diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index 812da27..ba481c1 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -10,6 +10,7 @@ use aws_sdk_s3::Client as S3Client; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::get_object::GetObjectError; use aws_sdk_s3::primitives::ByteStream; +use futures::future; use serde::{Deserialize, Serialize}; use std::fmt; use std::fmt::Debug; @@ -544,12 +545,8 @@ impl EventWriter for S3EventReaderWriter { }) .collect(); - tokio::try_join!(bundle_future, async { - for fut in tx_futures { - fut.await?; - } - Ok(()) - })?; + // Run the bundle and transaction futures concurrently and wait for them to complete + tokio::try_join!(bundle_future, future::try_join_all(tx_futures))?; self.metrics .update_bundle_history_duration From 5f5170838a79028ec7fb6a802e920f70f320c6d8 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 7 Jan 2026 12:11:52 -0500 Subject: [PATCH 09/11] chore: add more audit logs --- crates/audit/src/reader.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/audit/src/reader.rs b/crates/audit/src/reader.rs index 50697a2..be1c3c5 100644 --- a/crates/audit/src/reader.rs +++ b/crates/audit/src/reader.rs @@ -10,7 +10,7 @@ use rdkafka::{ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tips_core::kafka::load_kafka_config_from_file; use tokio::time::sleep; -use tracing::{debug, error}; +use tracing::{debug, error, info}; /// Creates a Kafka consumer from a properties file. pub fn create_kafka_consumer(kafka_properties_file: &str) -> Result { @@ -100,8 +100,9 @@ impl EventReader for KafkaAuditLogReader { let event: BundleEvent = serde_json::from_slice(payload)?; - debug!( + info!( bundle_id = %event.bundle_id(), + tx_ids = ?event.transaction_ids(), timestamp = timestamp, offset = message.offset(), partition = message.partition(), From 9b0ec57928ae5af29d9707bc9e01863827fc70ee Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 9 Jan 2026 10:04:03 -0500 Subject: [PATCH 10/11] perf: deduplicate bundles with the same contents (#137) * chore: use uuid v5 for determinism * feat: add ttl * diffs * fix: in-flight archive task * Revert "fix: in-flight archive task" This reverts commit a759f2b84f51001645fdcc124f8b88614483496a. * tmp: clear backlog by nooping * chore: log out meterbundleres --- Cargo.lock | 95 +++++++++++++++++++++- Cargo.toml | 1 + bin/tips-audit/src/main.rs | 4 + crates/audit/Cargo.toml | 2 +- crates/audit/src/archiver.rs | 31 +++++++- crates/audit/src/storage.rs | 24 +++--- crates/audit/src/types.rs | 12 ++- crates/audit/tests/common/mod.rs | 5 +- crates/audit/tests/integration_tests.rs | 9 ++- crates/audit/tests/s3_test.rs | 22 ++++-- crates/core/Cargo.toml | 2 +- crates/core/src/types.rs | 45 ++++++++++- crates/ingress-rpc/Cargo.toml | 2 + crates/ingress-rpc/src/service.rs | 96 +++++++++++++++-------- crates/system-tests/Cargo.toml | 2 +- crates/system-tests/tests/common/kafka.rs | 8 +- 16 files changed, 296 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6442fa5..e08f1be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1096,6 +1096,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -1654,7 +1665,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -1995,6 +2006,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.11" @@ -2133,6 +2153,15 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -2596,6 +2625,27 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -3959,6 +4009,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "moka" +version = "0.12.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3dec6bd31b08944e08b58fd99373893a6c17054d6f3ea5006cc894f4f4eee2a" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "event-listener", + "futures-util", + "parking_lot", + "portable-atomic", + "smallvec", + "tagptr", + "uuid", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -3992,7 +4062,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4332,6 +4402,12 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -6596,6 +6672,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -6883,6 +6965,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -7170,6 +7258,7 @@ dependencies = [ "metrics", "metrics-derive", "mockall", + "moka", "op-alloy-consensus", "op-alloy-network", "op-revm", @@ -7180,6 +7269,7 @@ dependencies = [ "tokio", "tracing", "url", + "uuid", "wiremock", ] @@ -7583,6 +7673,7 @@ dependencies = [ "getrandom 0.3.4", "js-sys", "serde_core", + "sha1_smol", "wasm-bindgen", ] diff --git a/Cargo.toml b/Cargo.toml index 310ec9d..623dedc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,3 +98,4 @@ tracing-subscriber = { version = "0.3.20", default-features = false } testcontainers-modules = { version = "0.11.2", default-features = false } metrics-exporter-prometheus = { version = "0.17.0", default-features = false } futures = { version = "0.3.31", default-features = false } +moka = { version = "0.12.12", default-features = false } diff --git a/bin/tips-audit/src/main.rs b/bin/tips-audit/src/main.rs index 38be65b..f29f6d1 100644 --- a/bin/tips-audit/src/main.rs +++ b/bin/tips-audit/src/main.rs @@ -59,6 +59,9 @@ struct Args { #[arg(long, env = "TIPS_AUDIT_CHANNEL_BUFFER_SIZE", default_value = "500")] channel_buffer_size: usize, + + #[arg(long, env = "TIPS_AUDIT_NOOP_ARCHIVE", default_value = "false")] + noop_archive: bool, } #[tokio::main] @@ -93,6 +96,7 @@ async fn main() -> Result<()> { writer, args.worker_pool_size, args.channel_buffer_size, + args.noop_archive, ); info!("Audit archiver initialized, starting main loop"); diff --git a/crates/audit/Cargo.toml b/crates/audit/Cargo.toml index 032fd9a..159f2aa 100644 --- a/crates/audit/Cargo.toml +++ b/crates/audit/Cargo.toml @@ -18,7 +18,7 @@ metrics-derive.workspace = true tips-core = { workspace = true, features = ["test-utils"] } serde = { workspace = true, features = ["std", "derive"] } tokio = { workspace = true, features = ["full"] } -uuid = { workspace = true, features = ["v4", "serde"] } +uuid = { workspace = true, features = ["v5", "serde"] } tracing = { workspace = true, features = ["std"] } anyhow = { workspace = true, features = ["std"] } serde_json = { workspace = true, features = ["std"] } diff --git a/crates/audit/src/archiver.rs b/crates/audit/src/archiver.rs index f09d34e..415782e 100644 --- a/crates/audit/src/archiver.rs +++ b/crates/audit/src/archiver.rs @@ -38,11 +38,23 @@ where W: EventWriter + Clone + Send + 'static, { /// Creates a new archiver with the given reader and writer. - pub fn new(reader: R, writer: W, worker_pool_size: usize, channel_buffer_size: usize) -> Self { + pub fn new( + reader: R, + writer: W, + worker_pool_size: usize, + channel_buffer_size: usize, + noop_archive: bool, + ) -> Self { let (event_tx, event_rx) = mpsc::channel(channel_buffer_size); let metrics = Metrics::default(); - Self::spawn_workers(writer, event_rx, metrics.clone(), worker_pool_size); + Self::spawn_workers( + writer, + event_rx, + metrics.clone(), + worker_pool_size, + noop_archive, + ); Self { reader, @@ -57,6 +69,7 @@ where event_rx: mpsc::Receiver, metrics: Metrics, worker_pool_size: usize, + noop_archive: bool, ) { let event_rx = Arc::new(Mutex::new(event_rx)); @@ -75,6 +88,20 @@ where match event { Some(event) => { let archive_start = Instant::now(); + // tmp: only use this to clear kafka consumer offset + // TODO: use debug! later + if noop_archive { + info!( + worker_id, + bundle_id = %event.event.bundle_id(), + tx_ids = ?event.event.transaction_ids(), + timestamp = event.timestamp, + "Noop archive - skipping event" + ); + metrics.events_processed.increment(1); + metrics.in_flight_archive_tasks.decrement(1.0); + continue; + } if let Err(e) = writer.archive_event(event).await { error!(worker_id, error = %e, "Failed to write event"); } else { diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index ba481c1..ba9d114 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -601,7 +601,7 @@ mod tests { use crate::reader::Event; use crate::types::{BundleEvent, DropReason, UserOpDropReason, UserOpEvent}; use alloy_primitives::{Address, B256, TxHash, U256}; - use tips_core::test_utils::create_bundle_from_txn_data; + use tips_core::{BundleExtensions, test_utils::create_bundle_from_txn_data}; use uuid::Uuid; fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event { @@ -616,7 +616,7 @@ mod tests { fn test_update_bundle_history_transform_adds_new_event() { let bundle_history = BundleHistory { history: vec![] }; let bundle = create_bundle_from_txn_data(); - let bundle_id = Uuid::new_v4(); + let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle.clone()), @@ -655,7 +655,7 @@ mod tests { }; let bundle = create_bundle_from_txn_data(); - let bundle_id = Uuid::new_v4(); + let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle), @@ -670,9 +670,9 @@ mod tests { #[test] fn test_update_bundle_history_transform_handles_all_event_types() { let bundle_history = BundleHistory { history: vec![] }; - let bundle_id = Uuid::new_v4(); - let bundle = create_bundle_from_txn_data(); + let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); + let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle), @@ -717,7 +717,8 @@ mod tests { #[test] fn test_update_transaction_metadata_transform_adds_new_bundle() { let metadata = TransactionMetadata { bundle_ids: vec![] }; - let bundle_id = Uuid::new_v4(); + let bundle = create_bundle_from_txn_data(); + let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let result = update_transaction_metadata_transform(metadata, bundle_id); @@ -729,7 +730,8 @@ mod tests { #[test] fn test_update_transaction_metadata_transform_skips_existing_bundle() { - let bundle_id = Uuid::new_v4(); + let bundle = create_bundle_from_txn_data(); + let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let metadata = TransactionMetadata { bundle_ids: vec![bundle_id], }; @@ -741,8 +743,12 @@ mod tests { #[test] fn test_update_transaction_metadata_transform_adds_to_existing_bundles() { - let existing_bundle_id = Uuid::new_v4(); - let new_bundle_id = Uuid::new_v4(); + // Some different, dummy bundle IDs since create_bundle_from_txn_data() returns the same bundle ID + // Even if the same txn is contained across multiple bundles, the bundle ID will be different since the + // UUID is based on the bundle hash. + let existing_bundle_id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(); + let new_bundle_id = Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap(); + let metadata = TransactionMetadata { bundle_ids: vec![existing_bundle_id], }; diff --git a/crates/audit/src/types.rs b/crates/audit/src/types.rs index 6200572..2dd0291 100644 --- a/crates/audit/src/types.rs +++ b/crates/audit/src/types.rs @@ -140,7 +140,11 @@ impl BundleEvent { format!("{bundle_id}-{block_hash}") } _ => { - format!("{}-{}", self.bundle_id(), Uuid::new_v4()) + format!( + "{}-{}", + self.bundle_id(), + Uuid::new_v5(&Uuid::NAMESPACE_OID, self.bundle_id().as_bytes()) + ) } } } @@ -200,7 +204,11 @@ impl UserOpEvent { format!("{user_op_hash}-{tx_hash}") } _ => { - format!("{}-{}", self.user_op_hash(), Uuid::new_v4()) + format!( + "{}-{}", + self.user_op_hash(), + Uuid::new_v5(&Uuid::NAMESPACE_OID, self.user_op_hash().as_slice()) + ) } } } diff --git a/crates/audit/tests/common/mod.rs b/crates/audit/tests/common/mod.rs index 4c5a67a..6db10f2 100644 --- a/crates/audit/tests/common/mod.rs +++ b/crates/audit/tests/common/mod.rs @@ -35,7 +35,10 @@ impl TestHarness { .await; let s3_client = aws_sdk_s3::Client::new(&config); - let bucket_name = format!("test-bucket-{}", Uuid::new_v4()); + let bucket_name = format!( + "test-bucket-{}", + Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()) + ); s3_client .create_bucket() diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index f2bb9d3..5b23fd0 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -9,7 +9,7 @@ use tips_audit_lib::{ storage::{BundleEventS3Reader, S3EventReaderWriter}, types::{BundleEvent, DropReason, UserOpEvent}, }; -use tips_core::test_utils::create_bundle_from_txn_data; +use tips_core::{BundleExtensions, test_utils::create_bundle_from_txn_data}; use uuid::Uuid; mod common; use common::TestHarness; @@ -24,11 +24,12 @@ async fn test_kafka_publisher_s3_archiver_integration() let s3_writer = S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); - let test_bundle_id = Uuid::new_v4(); + let bundle = create_bundle_from_txn_data(); + let test_bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let test_events = [ BundleEvent::Received { bundle_id: test_bundle_id, - bundle: Box::new(create_bundle_from_txn_data()), + bundle: Box::new(bundle.clone()), }, BundleEvent::Dropped { bundle_id: test_bundle_id, @@ -45,6 +46,8 @@ async fn test_kafka_publisher_s3_archiver_integration() let mut consumer = KafkaAuditArchiver::new( KafkaAuditLogReader::new(harness.kafka_consumer, topic.to_string())?, s3_writer.clone(), + 1, + 100, ); tokio::spawn(async move { diff --git a/crates/audit/tests/s3_test.rs b/crates/audit/tests/s3_test.rs index d6bfba6..d555bc0 100644 --- a/crates/audit/tests/s3_test.rs +++ b/crates/audit/tests/s3_test.rs @@ -13,7 +13,10 @@ use uuid::Uuid; mod common; use common::TestHarness; -use tips_core::test_utils::{TXN_HASH, create_bundle_from_txn_data}; +use tips_core::{ + BundleExtensions, + test_utils::{TXN_HASH, create_bundle_from_txn_data}, +}; fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event { Event { @@ -28,8 +31,8 @@ async fn test_event_write_and_read() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box>> = + JoinSet::new(); for i in 0..4 { let writer_clone = writer.clone(); diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 76d7d63..0eef255 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -18,7 +18,7 @@ test-utils = ["dep:alloy-signer-local", "dep:op-alloy-rpc-types"] op-alloy-flz.workspace = true alloy-serde.workspace = true serde = { workspace = true, features = ["std", "derive"] } -uuid = { workspace = true, features = ["v4", "serde"] } +uuid = { workspace = true, features = ["v5", "serde"] } tracing = { workspace = true, features = ["std"] } alloy-consensus = { workspace = true, features = ["std"] } alloy-rpc-types = { workspace = true, features = ["eth"] } diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 5badc34..6c38978 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -244,7 +244,9 @@ impl BundleTxs for AcceptedBundle { impl AcceptedBundle { pub fn new(bundle: ParsedBundle, meter_bundle_response: MeterBundleResponse) -> Self { Self { - uuid: bundle.replacement_uuid.unwrap_or_else(Uuid::new_v4), + uuid: bundle.replacement_uuid.unwrap_or_else(|| { + Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()) + }), txs: bundle.txs, block_number: bundle.block_number, flashblock_number_min: bundle.flashblock_number_min, @@ -345,7 +347,7 @@ mod tests { assert_eq!(bundle.bundle_hash(), expected_bundle_hash_single); - let uuid = Uuid::new_v4(); + let uuid = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let bundle = AcceptedBundle::new( Bundle { txs: vec![tx1_bytes.clone().into(), tx2_bytes.clone().into()], @@ -448,4 +450,43 @@ mod tests { assert_eq!(deserialized.state_block_number, 12345); assert_eq!(deserialized.total_gas_used, 21000); } + + #[test] + fn test_same_uuid_for_same_bundle_hash() { + let alice = PrivateKeySigner::random(); + let bob = PrivateKeySigner::random(); + + // suppose this is a spam tx + let tx1 = create_transaction(alice.clone(), 1, bob.address()); + let tx1_bytes = tx1.encoded_2718(); + + // we receive it the first time + let bundle1 = AcceptedBundle::new( + Bundle { + txs: vec![tx1_bytes.clone().into()], + block_number: 1, + replacement_uuid: None, + ..Default::default() + } + .try_into() + .unwrap(), + create_test_meter_bundle_response(), + ); + + // but we may receive it more than once + let bundle2 = AcceptedBundle::new( + Bundle { + txs: vec![tx1_bytes.clone().into()], + block_number: 1, + replacement_uuid: None, + ..Default::default() + } + .try_into() + .unwrap(), + create_test_meter_bundle_response(), + ); + + // however, the UUID should be the same + assert_eq!(bundle1.uuid(), bundle2.uuid()); + } } diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 9291d8f..75e30e0 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -33,6 +33,8 @@ backon = { workspace = true, features = ["std", "tokio-sleep"] } axum = { workspace = true, features = ["tokio", "http1", "json"] } clap = { version = "4.5.47", features = ["std", "derive", "env"] } op-alloy-consensus = { workspace = true, features = ["std", "k256", "serde"] } +moka = { workspace = true, features = ["future"] } +uuid = { workspace = true, features = ["v5"] } [dev-dependencies] mockall = "0.13" diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 8560e48..0d9838b 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -12,6 +12,7 @@ use jsonrpsee::{ core::{RpcResult, async_trait}, proc_macros::rpc, }; +use moka::future::Cache; use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use std::time::{SystemTime, UNIX_EPOCH}; @@ -23,6 +24,7 @@ use tips_core::{ use tokio::sync::{broadcast, mpsc}; use tokio::time::{Duration, Instant, timeout}; use tracing::{debug, info, warn}; +use uuid::Uuid; use crate::metrics::{Metrics, record_histogram}; use crate::queue::{BundleQueuePublisher, MessageQueue, UserOpQueuePublisher}; @@ -84,6 +86,7 @@ pub struct IngressService { builder_backrun_tx: broadcast::Sender, max_backrun_txs: usize, max_backrun_gas_limit: u64, + bundle_cache: Cache, } impl IngressService { @@ -108,6 +111,11 @@ impl IngressService { let reputation_service = mempool_engine .as_ref() .map(|engine| Arc::new(ReputationServiceImpl::new(engine.get_mempool()))); + + // A TTL cache to deduplicate bundles with the same Bundle ID + let bundle_cache = Cache::builder() + .time_to_live(Duration::from_secs(20)) + .build(); Self { mempool_provider, simulation_provider, @@ -134,6 +142,7 @@ impl IngressService { builder_backrun_tx, max_backrun_txs: config.max_backrun_txs, max_backrun_gas_limit: config.max_backrun_gas_limit, + bundle_cache, } } } @@ -301,7 +310,20 @@ impl IngressApiServer for Ingre self.metrics.bundles_parsed.increment(1); - let meter_bundle_response = self.meter_bundle(&bundle, bundle_hash).await.ok(); + let meter_bundle_response = match self.meter_bundle(&bundle, bundle_hash).await { + Ok(response) => { + info!(message = "Metering succeeded for raw transaction", bundle_hash = %bundle_hash, response = ?response); + Some(response) + } + Err(e) => { + warn!( + bundle_hash = %bundle_hash, + error = %e, + "Metering failed for raw transaction" + ); + None + } + }; if let Some(meter_info) = meter_bundle_response.as_ref() { self.metrics.successful_simulations.increment(1); @@ -313,42 +335,54 @@ impl IngressApiServer for Ingre let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.unwrap_or_default()); - if send_to_kafka { - if let Err(e) = self - .bundle_queue_publisher - .publish(&accepted_bundle, bundle_hash) - .await - { - warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e); - } - - self.metrics.sent_to_kafka.increment(1); - info!(message="queued singleton bundle", txn_hash=%transaction.tx_hash()); - } + let bundle_id = *accepted_bundle.uuid(); + if self.bundle_cache.get(&bundle_id).await.is_some() { + debug!( + message = "Duplicate bundle detected, skipping Kafka publish", + bundle_id = %bundle_id, + bundle_hash = %bundle_hash, + transaction_hash = %transaction.tx_hash(), + ); + } else { + self.bundle_cache.insert(bundle_id, ()).await; - if send_to_mempool { - let response = self - .mempool_provider - .send_raw_transaction(data.iter().as_slice()) - .await; - match response { - Ok(_) => { - self.metrics.sent_to_mempool.increment(1); - debug!(message = "sent transaction to the mempool", hash=%transaction.tx_hash()); + if send_to_kafka { + if let Err(e) = self + .bundle_queue_publisher + .publish(&accepted_bundle, bundle_hash) + .await + { + warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e); } - Err(e) => { - warn!(message = "Failed to send raw transaction to mempool", error = %e); + + self.metrics.sent_to_kafka.increment(1); + info!(message="queued singleton bundle", txn_hash=%transaction.tx_hash()); + } + + if send_to_mempool { + let response = self + .mempool_provider + .send_raw_transaction(data.iter().as_slice()) + .await; + match response { + Ok(_) => { + self.metrics.sent_to_mempool.increment(1); + debug!(message = "sent transaction to the mempool", hash=%transaction.tx_hash()); + } + Err(e) => { + warn!(message = "Failed to send raw transaction to mempool", error = %e); + } } } - } - info!( - message = "processed transaction", - bundle_hash = %bundle_hash, - transaction_hash = %transaction.tx_hash(), - ); + info!( + message = "processed transaction", + bundle_hash = %bundle_hash, + transaction_hash = %transaction.tx_hash(), + ); - self.send_audit_event(&accepted_bundle, accepted_bundle.bundle_hash()); + self.send_audit_event(&accepted_bundle, accepted_bundle.bundle_hash()); + } self.metrics .send_raw_transaction_duration diff --git a/crates/system-tests/Cargo.toml b/crates/system-tests/Cargo.toml index 3ce797b..6163b29 100644 --- a/crates/system-tests/Cargo.toml +++ b/crates/system-tests/Cargo.toml @@ -28,7 +28,7 @@ serde = { workspace = true, features = ["std", "derive"] } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["std"] } anyhow = { workspace = true, features = ["std"] } -uuid = { workspace = true, features = ["v4", "serde"] } +uuid = { workspace = true, features = ["v5", "serde"] } serde_json = { workspace = true, features = ["std"] } reqwest = { version = "0.12.12", features = ["json"] } rdkafka = { workspace = true, features = ["tokio", "libz", "zstd", "ssl-vendored"] } diff --git a/crates/system-tests/tests/common/kafka.rs b/crates/system-tests/tests/common/kafka.rs index a61e54c..5acda1b 100644 --- a/crates/system-tests/tests/common/kafka.rs +++ b/crates/system-tests/tests/common/kafka.rs @@ -39,7 +39,13 @@ fn build_kafka_consumer(properties_env: &str, default_path: &str) -> Result Date: Fri, 9 Jan 2026 10:12:54 -0500 Subject: [PATCH 11/11] make ttl configurable --- crates/ingress-rpc/src/lib.rs | 4 ++++ crates/ingress-rpc/src/service.rs | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 1f43896..6fd1ad1 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -204,6 +204,10 @@ pub struct Config { /// URL of third-party RPC endpoint to forward raw transactions to (enables forwarding if set) #[arg(long, env = "TIPS_INGRESS_RAW_TX_FORWARD_RPC")] pub raw_tx_forward_rpc: Option, + + /// TTL for bundle cache in seconds + #[arg(long, env = "TIPS_INGRESS_BUNDLE_CACHE_TTL", default_value = "20")] + pub bundle_cache_ttl: u64, } pub fn connect_ingress_to_builder( diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 0d9838b..3605dd8 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -114,7 +114,7 @@ impl IngressService { // A TTL cache to deduplicate bundles with the same Bundle ID let bundle_cache = Cache::builder() - .time_to_live(Duration::from_secs(20)) + .time_to_live(Duration::from_secs(config.bundle_cache_ttl)) .build(); Self { mempool_provider, @@ -649,6 +649,7 @@ mod tests { user_operation_topic: String::new(), max_backrun_txs: 5, max_backrun_gas_limit: 5000000, + bundle_cache_ttl: 20, } }