diff --git a/Cargo.lock b/Cargo.lock index 6442fa54..e08f1bec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1096,6 +1096,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -1654,7 +1665,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -1995,6 +2006,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.11" @@ -2133,6 +2153,15 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -2596,6 +2625,27 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -3959,6 +4009,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "moka" +version = "0.12.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3dec6bd31b08944e08b58fd99373893a6c17054d6f3ea5006cc894f4f4eee2a" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "event-listener", + "futures-util", + "parking_lot", + "portable-atomic", + "smallvec", + "tagptr", + "uuid", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -3992,7 +4062,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4332,6 +4402,12 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -6596,6 +6672,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -6883,6 +6965,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -7170,6 +7258,7 @@ dependencies = [ "metrics", "metrics-derive", "mockall", + "moka", "op-alloy-consensus", "op-alloy-network", "op-revm", @@ -7180,6 +7269,7 @@ dependencies = [ "tokio", "tracing", "url", + "uuid", "wiremock", ] @@ -7583,6 +7673,7 @@ dependencies = [ "getrandom 0.3.4", "js-sys", "serde_core", + "sha1_smol", "wasm-bindgen", ] diff --git a/Cargo.toml b/Cargo.toml index 310ec9df..623dedc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,3 +98,4 @@ tracing-subscriber = { version = "0.3.20", default-features = false } testcontainers-modules = { version = "0.11.2", default-features = false } metrics-exporter-prometheus = { version = "0.17.0", default-features = false } futures = { version = "0.3.31", default-features = false } +moka = { version = "0.12.12", default-features = false } diff --git a/bin/tips-audit/src/main.rs b/bin/tips-audit/src/main.rs index 38be65bb..f29f6d19 100644 --- a/bin/tips-audit/src/main.rs +++ b/bin/tips-audit/src/main.rs @@ -59,6 +59,9 @@ struct Args { #[arg(long, env = "TIPS_AUDIT_CHANNEL_BUFFER_SIZE", default_value = "500")] channel_buffer_size: usize, + + #[arg(long, env = "TIPS_AUDIT_NOOP_ARCHIVE", default_value = "false")] + noop_archive: bool, } #[tokio::main] @@ -93,6 +96,7 @@ async fn main() -> Result<()> { writer, args.worker_pool_size, args.channel_buffer_size, + args.noop_archive, ); info!("Audit archiver initialized, starting main loop"); diff --git a/crates/audit/Cargo.toml b/crates/audit/Cargo.toml index 032fd9a9..159f2aa5 100644 --- a/crates/audit/Cargo.toml +++ b/crates/audit/Cargo.toml @@ -18,7 +18,7 @@ metrics-derive.workspace = true tips-core = { workspace = true, features = ["test-utils"] } serde = { workspace = true, features = ["std", "derive"] } tokio = { workspace = true, features = ["full"] } -uuid = { workspace = true, features = ["v4", "serde"] } +uuid = { workspace = true, features = ["v5", "serde"] } tracing = { workspace = true, features = ["std"] } anyhow = { workspace = true, features = ["std"] } serde_json = { workspace = true, features = ["std"] } diff --git a/crates/audit/src/archiver.rs b/crates/audit/src/archiver.rs index f09d34e2..415782e5 100644 --- a/crates/audit/src/archiver.rs +++ b/crates/audit/src/archiver.rs @@ -38,11 +38,23 @@ where W: EventWriter + Clone + Send + 'static, { /// Creates a new archiver with the given reader and writer. - pub fn new(reader: R, writer: W, worker_pool_size: usize, channel_buffer_size: usize) -> Self { + pub fn new( + reader: R, + writer: W, + worker_pool_size: usize, + channel_buffer_size: usize, + noop_archive: bool, + ) -> Self { let (event_tx, event_rx) = mpsc::channel(channel_buffer_size); let metrics = Metrics::default(); - Self::spawn_workers(writer, event_rx, metrics.clone(), worker_pool_size); + Self::spawn_workers( + writer, + event_rx, + metrics.clone(), + worker_pool_size, + noop_archive, + ); Self { reader, @@ -57,6 +69,7 @@ where event_rx: mpsc::Receiver, metrics: Metrics, worker_pool_size: usize, + noop_archive: bool, ) { let event_rx = Arc::new(Mutex::new(event_rx)); @@ -75,6 +88,20 @@ where match event { Some(event) => { let archive_start = Instant::now(); + // tmp: only use this to clear kafka consumer offset + // TODO: use debug! later + if noop_archive { + info!( + worker_id, + bundle_id = %event.event.bundle_id(), + tx_ids = ?event.event.transaction_ids(), + timestamp = event.timestamp, + "Noop archive - skipping event" + ); + metrics.events_processed.increment(1); + metrics.in_flight_archive_tasks.decrement(1.0); + continue; + } if let Err(e) = writer.archive_event(event).await { error!(worker_id, error = %e, "Failed to write event"); } else { diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index ba481c1a..ba9d1143 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -601,7 +601,7 @@ mod tests { use crate::reader::Event; use crate::types::{BundleEvent, DropReason, UserOpDropReason, UserOpEvent}; use alloy_primitives::{Address, B256, TxHash, U256}; - use tips_core::test_utils::create_bundle_from_txn_data; + use tips_core::{BundleExtensions, test_utils::create_bundle_from_txn_data}; use uuid::Uuid; fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event { @@ -616,7 +616,7 @@ mod tests { fn test_update_bundle_history_transform_adds_new_event() { let bundle_history = BundleHistory { history: vec![] }; let bundle = create_bundle_from_txn_data(); - let bundle_id = Uuid::new_v4(); + let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle.clone()), @@ -655,7 +655,7 @@ mod tests { }; let bundle = create_bundle_from_txn_data(); - let bundle_id = Uuid::new_v4(); + let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle), @@ -670,9 +670,9 @@ mod tests { #[test] fn test_update_bundle_history_transform_handles_all_event_types() { let bundle_history = BundleHistory { history: vec![] }; - let bundle_id = Uuid::new_v4(); - let bundle = create_bundle_from_txn_data(); + let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); + let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle), @@ -717,7 +717,8 @@ mod tests { #[test] fn test_update_transaction_metadata_transform_adds_new_bundle() { let metadata = TransactionMetadata { bundle_ids: vec![] }; - let bundle_id = Uuid::new_v4(); + let bundle = create_bundle_from_txn_data(); + let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let result = update_transaction_metadata_transform(metadata, bundle_id); @@ -729,7 +730,8 @@ mod tests { #[test] fn test_update_transaction_metadata_transform_skips_existing_bundle() { - let bundle_id = Uuid::new_v4(); + let bundle = create_bundle_from_txn_data(); + let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let metadata = TransactionMetadata { bundle_ids: vec![bundle_id], }; @@ -741,8 +743,12 @@ mod tests { #[test] fn test_update_transaction_metadata_transform_adds_to_existing_bundles() { - let existing_bundle_id = Uuid::new_v4(); - let new_bundle_id = Uuid::new_v4(); + // Some different, dummy bundle IDs since create_bundle_from_txn_data() returns the same bundle ID + // Even if the same txn is contained across multiple bundles, the bundle ID will be different since the + // UUID is based on the bundle hash. + let existing_bundle_id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(); + let new_bundle_id = Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap(); + let metadata = TransactionMetadata { bundle_ids: vec![existing_bundle_id], }; diff --git a/crates/audit/src/types.rs b/crates/audit/src/types.rs index 6200572c..2dd0291d 100644 --- a/crates/audit/src/types.rs +++ b/crates/audit/src/types.rs @@ -140,7 +140,11 @@ impl BundleEvent { format!("{bundle_id}-{block_hash}") } _ => { - format!("{}-{}", self.bundle_id(), Uuid::new_v4()) + format!( + "{}-{}", + self.bundle_id(), + Uuid::new_v5(&Uuid::NAMESPACE_OID, self.bundle_id().as_bytes()) + ) } } } @@ -200,7 +204,11 @@ impl UserOpEvent { format!("{user_op_hash}-{tx_hash}") } _ => { - format!("{}-{}", self.user_op_hash(), Uuid::new_v4()) + format!( + "{}-{}", + self.user_op_hash(), + Uuid::new_v5(&Uuid::NAMESPACE_OID, self.user_op_hash().as_slice()) + ) } } } diff --git a/crates/audit/tests/common/mod.rs b/crates/audit/tests/common/mod.rs index 4c5a67ac..6db10f25 100644 --- a/crates/audit/tests/common/mod.rs +++ b/crates/audit/tests/common/mod.rs @@ -35,7 +35,10 @@ impl TestHarness { .await; let s3_client = aws_sdk_s3::Client::new(&config); - let bucket_name = format!("test-bucket-{}", Uuid::new_v4()); + let bucket_name = format!( + "test-bucket-{}", + Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()) + ); s3_client .create_bucket() diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index f2bb9d3d..5b23fd02 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -9,7 +9,7 @@ use tips_audit_lib::{ storage::{BundleEventS3Reader, S3EventReaderWriter}, types::{BundleEvent, DropReason, UserOpEvent}, }; -use tips_core::test_utils::create_bundle_from_txn_data; +use tips_core::{BundleExtensions, test_utils::create_bundle_from_txn_data}; use uuid::Uuid; mod common; use common::TestHarness; @@ -24,11 +24,12 @@ async fn test_kafka_publisher_s3_archiver_integration() let s3_writer = S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); - let test_bundle_id = Uuid::new_v4(); + let bundle = create_bundle_from_txn_data(); + let test_bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let test_events = [ BundleEvent::Received { bundle_id: test_bundle_id, - bundle: Box::new(create_bundle_from_txn_data()), + bundle: Box::new(bundle.clone()), }, BundleEvent::Dropped { bundle_id: test_bundle_id, @@ -45,6 +46,8 @@ async fn test_kafka_publisher_s3_archiver_integration() let mut consumer = KafkaAuditArchiver::new( KafkaAuditLogReader::new(harness.kafka_consumer, topic.to_string())?, s3_writer.clone(), + 1, + 100, ); tokio::spawn(async move { diff --git a/crates/audit/tests/s3_test.rs b/crates/audit/tests/s3_test.rs index d6bfba6f..d555bc0b 100644 --- a/crates/audit/tests/s3_test.rs +++ b/crates/audit/tests/s3_test.rs @@ -13,7 +13,10 @@ use uuid::Uuid; mod common; use common::TestHarness; -use tips_core::test_utils::{TXN_HASH, create_bundle_from_txn_data}; +use tips_core::{ + BundleExtensions, + test_utils::{TXN_HASH, create_bundle_from_txn_data}, +}; fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event { Event { @@ -28,8 +31,8 @@ async fn test_event_write_and_read() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box>> = + JoinSet::new(); for i in 0..4 { let writer_clone = writer.clone(); diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 76d7d631..0eef255d 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -18,7 +18,7 @@ test-utils = ["dep:alloy-signer-local", "dep:op-alloy-rpc-types"] op-alloy-flz.workspace = true alloy-serde.workspace = true serde = { workspace = true, features = ["std", "derive"] } -uuid = { workspace = true, features = ["v4", "serde"] } +uuid = { workspace = true, features = ["v5", "serde"] } tracing = { workspace = true, features = ["std"] } alloy-consensus = { workspace = true, features = ["std"] } alloy-rpc-types = { workspace = true, features = ["eth"] } diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 5badc344..6c389782 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -244,7 +244,9 @@ impl BundleTxs for AcceptedBundle { impl AcceptedBundle { pub fn new(bundle: ParsedBundle, meter_bundle_response: MeterBundleResponse) -> Self { Self { - uuid: bundle.replacement_uuid.unwrap_or_else(Uuid::new_v4), + uuid: bundle.replacement_uuid.unwrap_or_else(|| { + Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()) + }), txs: bundle.txs, block_number: bundle.block_number, flashblock_number_min: bundle.flashblock_number_min, @@ -345,7 +347,7 @@ mod tests { assert_eq!(bundle.bundle_hash(), expected_bundle_hash_single); - let uuid = Uuid::new_v4(); + let uuid = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice()); let bundle = AcceptedBundle::new( Bundle { txs: vec![tx1_bytes.clone().into(), tx2_bytes.clone().into()], @@ -448,4 +450,43 @@ mod tests { assert_eq!(deserialized.state_block_number, 12345); assert_eq!(deserialized.total_gas_used, 21000); } + + #[test] + fn test_same_uuid_for_same_bundle_hash() { + let alice = PrivateKeySigner::random(); + let bob = PrivateKeySigner::random(); + + // suppose this is a spam tx + let tx1 = create_transaction(alice.clone(), 1, bob.address()); + let tx1_bytes = tx1.encoded_2718(); + + // we receive it the first time + let bundle1 = AcceptedBundle::new( + Bundle { + txs: vec![tx1_bytes.clone().into()], + block_number: 1, + replacement_uuid: None, + ..Default::default() + } + .try_into() + .unwrap(), + create_test_meter_bundle_response(), + ); + + // but we may receive it more than once + let bundle2 = AcceptedBundle::new( + Bundle { + txs: vec![tx1_bytes.clone().into()], + block_number: 1, + replacement_uuid: None, + ..Default::default() + } + .try_into() + .unwrap(), + create_test_meter_bundle_response(), + ); + + // however, the UUID should be the same + assert_eq!(bundle1.uuid(), bundle2.uuid()); + } } diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 9291d8fe..75e30e00 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -33,6 +33,8 @@ backon = { workspace = true, features = ["std", "tokio-sleep"] } axum = { workspace = true, features = ["tokio", "http1", "json"] } clap = { version = "4.5.47", features = ["std", "derive", "env"] } op-alloy-consensus = { workspace = true, features = ["std", "k256", "serde"] } +moka = { workspace = true, features = ["future"] } +uuid = { workspace = true, features = ["v5"] } [dev-dependencies] mockall = "0.13" diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 8560e48e..0d9838b1 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -12,6 +12,7 @@ use jsonrpsee::{ core::{RpcResult, async_trait}, proc_macros::rpc, }; +use moka::future::Cache; use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use std::time::{SystemTime, UNIX_EPOCH}; @@ -23,6 +24,7 @@ use tips_core::{ use tokio::sync::{broadcast, mpsc}; use tokio::time::{Duration, Instant, timeout}; use tracing::{debug, info, warn}; +use uuid::Uuid; use crate::metrics::{Metrics, record_histogram}; use crate::queue::{BundleQueuePublisher, MessageQueue, UserOpQueuePublisher}; @@ -84,6 +86,7 @@ pub struct IngressService { builder_backrun_tx: broadcast::Sender, max_backrun_txs: usize, max_backrun_gas_limit: u64, + bundle_cache: Cache, } impl IngressService { @@ -108,6 +111,11 @@ impl IngressService { let reputation_service = mempool_engine .as_ref() .map(|engine| Arc::new(ReputationServiceImpl::new(engine.get_mempool()))); + + // A TTL cache to deduplicate bundles with the same Bundle ID + let bundle_cache = Cache::builder() + .time_to_live(Duration::from_secs(20)) + .build(); Self { mempool_provider, simulation_provider, @@ -134,6 +142,7 @@ impl IngressService { builder_backrun_tx, max_backrun_txs: config.max_backrun_txs, max_backrun_gas_limit: config.max_backrun_gas_limit, + bundle_cache, } } } @@ -301,7 +310,20 @@ impl IngressApiServer for Ingre self.metrics.bundles_parsed.increment(1); - let meter_bundle_response = self.meter_bundle(&bundle, bundle_hash).await.ok(); + let meter_bundle_response = match self.meter_bundle(&bundle, bundle_hash).await { + Ok(response) => { + info!(message = "Metering succeeded for raw transaction", bundle_hash = %bundle_hash, response = ?response); + Some(response) + } + Err(e) => { + warn!( + bundle_hash = %bundle_hash, + error = %e, + "Metering failed for raw transaction" + ); + None + } + }; if let Some(meter_info) = meter_bundle_response.as_ref() { self.metrics.successful_simulations.increment(1); @@ -313,42 +335,54 @@ impl IngressApiServer for Ingre let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.unwrap_or_default()); - if send_to_kafka { - if let Err(e) = self - .bundle_queue_publisher - .publish(&accepted_bundle, bundle_hash) - .await - { - warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e); - } - - self.metrics.sent_to_kafka.increment(1); - info!(message="queued singleton bundle", txn_hash=%transaction.tx_hash()); - } + let bundle_id = *accepted_bundle.uuid(); + if self.bundle_cache.get(&bundle_id).await.is_some() { + debug!( + message = "Duplicate bundle detected, skipping Kafka publish", + bundle_id = %bundle_id, + bundle_hash = %bundle_hash, + transaction_hash = %transaction.tx_hash(), + ); + } else { + self.bundle_cache.insert(bundle_id, ()).await; - if send_to_mempool { - let response = self - .mempool_provider - .send_raw_transaction(data.iter().as_slice()) - .await; - match response { - Ok(_) => { - self.metrics.sent_to_mempool.increment(1); - debug!(message = "sent transaction to the mempool", hash=%transaction.tx_hash()); + if send_to_kafka { + if let Err(e) = self + .bundle_queue_publisher + .publish(&accepted_bundle, bundle_hash) + .await + { + warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e); } - Err(e) => { - warn!(message = "Failed to send raw transaction to mempool", error = %e); + + self.metrics.sent_to_kafka.increment(1); + info!(message="queued singleton bundle", txn_hash=%transaction.tx_hash()); + } + + if send_to_mempool { + let response = self + .mempool_provider + .send_raw_transaction(data.iter().as_slice()) + .await; + match response { + Ok(_) => { + self.metrics.sent_to_mempool.increment(1); + debug!(message = "sent transaction to the mempool", hash=%transaction.tx_hash()); + } + Err(e) => { + warn!(message = "Failed to send raw transaction to mempool", error = %e); + } } } - } - info!( - message = "processed transaction", - bundle_hash = %bundle_hash, - transaction_hash = %transaction.tx_hash(), - ); + info!( + message = "processed transaction", + bundle_hash = %bundle_hash, + transaction_hash = %transaction.tx_hash(), + ); - self.send_audit_event(&accepted_bundle, accepted_bundle.bundle_hash()); + self.send_audit_event(&accepted_bundle, accepted_bundle.bundle_hash()); + } self.metrics .send_raw_transaction_duration diff --git a/crates/system-tests/Cargo.toml b/crates/system-tests/Cargo.toml index 3ce797bd..6163b29e 100644 --- a/crates/system-tests/Cargo.toml +++ b/crates/system-tests/Cargo.toml @@ -28,7 +28,7 @@ serde = { workspace = true, features = ["std", "derive"] } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["std"] } anyhow = { workspace = true, features = ["std"] } -uuid = { workspace = true, features = ["v4", "serde"] } +uuid = { workspace = true, features = ["v5", "serde"] } serde_json = { workspace = true, features = ["std"] } reqwest = { version = "0.12.12", features = ["json"] } rdkafka = { workspace = true, features = ["tokio", "libz", "zstd", "ssl-vendored"] } diff --git a/crates/system-tests/tests/common/kafka.rs b/crates/system-tests/tests/common/kafka.rs index a61e54c6..5acda1b8 100644 --- a/crates/system-tests/tests/common/kafka.rs +++ b/crates/system-tests/tests/common/kafka.rs @@ -39,7 +39,13 @@ fn build_kafka_consumer(properties_env: &str, default_path: &str) -> Result