diff --git a/Cargo.lock b/Cargo.lock index 7c3206cc..723036d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1637,6 +1637,24 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a" +[[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "bitflags 2.10.0", + "cexpr", + "clang-sys", + "itertools 0.10.5", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.111", +] + [[package]] name = "bit-set" version = "0.8.0" @@ -1860,6 +1878,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -1884,6 +1911,17 @@ dependencies = [ "windows-link", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.53" @@ -3644,6 +3682,16 @@ version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link", +] + [[package]] name = "libm" version = "0.2.15" @@ -3830,6 +3878,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -3914,6 +3968,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -4192,6 +4256,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-src" +version = "300.5.4+3.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507b3792995dae9b0df8a1c1e3771e8418b7c2d9f0baeba32e6fe8b06c7cb72" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.111" @@ -4200,6 +4273,7 @@ checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -4789,7 +4863,9 @@ dependencies = [ "libc", "libz-sys", "num_enum", + "openssl-sys", "pkg-config", + "zstd-sys", ] [[package]] @@ -8287,6 +8363,7 @@ version = "2.0.16+zstd.1.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" dependencies = [ + "bindgen", "cc", "pkg-config", ] diff --git a/bin/tips-ingress-rpc/src/main.rs b/bin/tips-ingress-rpc/src/main.rs index fb47d72f..dcb22c2d 100644 --- a/bin/tips-ingress-rpc/src/main.rs +++ b/bin/tips-ingress-rpc/src/main.rs @@ -73,18 +73,27 @@ async fn main() -> anyhow::Result<()> { let (audit_tx, audit_rx) = mpsc::unbounded_channel::(); connect_audit_to_publisher(audit_rx, audit_publisher); - let user_op_properties_file = &config.user_operation_consumer_properties; - - let mempool_engine = create_mempool_engine( - user_op_properties_file, - &config.user_operation_topic, - &config.user_operation_consumer_group_id, - None, - )?; - - let mempool_engine_handle = { - let engine = mempool_engine.clone(); - tokio::spawn(async move { engine.run().await }) + let (mempool_engine, mempool_engine_handle) = if let Some(user_op_properties_file) = + &config.user_operation_consumer_properties + { + let engine = create_mempool_engine( + user_op_properties_file, + &config.user_operation_topic, + &config.user_operation_consumer_group_id, + None, + )?; + + let handle = { + let engine_clone = engine.clone(); + tokio::spawn(async move { engine_clone.run().await }) + }; + + (Some(engine), Some(handle)) + } else { + info!( + "User operation consumer properties not provided, skipping mempool engine initialization" + ); + (None, None) }; let (builder_tx, _) = @@ -126,7 +135,9 @@ async fn main() -> anyhow::Result<()> { handle.stopped().await; health_handle.abort(); - mempool_engine_handle.abort(); + if let Some(engine_handle) = mempool_engine_handle { + engine_handle.abort(); + } Ok(()) } diff --git a/crates/account-abstraction-core/Cargo.toml b/crates/account-abstraction-core/Cargo.toml index 6a61a622..6f4f5c9b 100644 --- a/crates/account-abstraction-core/Cargo.toml +++ b/crates/account-abstraction-core/Cargo.toml @@ -19,7 +19,7 @@ tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["std"] } anyhow = { workspace = true, features = ["std"] } serde_json = { workspace = true, features = ["std"] } -rdkafka = { workspace = true, features = ["tokio", "libz"] } +rdkafka = { workspace = true, features = ["tokio", "libz", "zstd", "ssl-vendored"] } alloy-rpc-types = { workspace = true, features = ["eth"] } jsonrpsee = { workspace = true, features = ["server", "macros"] } alloy-provider = { workspace = true, features = ["reqwest"] } diff --git a/crates/audit/Cargo.toml b/crates/audit/Cargo.toml index d8957d99..76c5ad68 100644 --- a/crates/audit/Cargo.toml +++ b/crates/audit/Cargo.toml @@ -22,7 +22,7 @@ uuid = { workspace = true, features = ["v4", "serde"] } tracing = { workspace = true, features = ["std"] } anyhow = { workspace = true, features = ["std"] } serde_json = { workspace = true, features = ["std"] } -rdkafka = { workspace = true, features = ["tokio", "libz"] } +rdkafka = { workspace = true, features = ["tokio", "libz", "zstd", "ssl-vendored"] } alloy-consensus = { workspace = true, features = ["std"] } alloy-primitives = { workspace = true, features = ["map-foldhash", "serde"] } aws-sdk-s3 = { workspace = true, features = ["rustls", "default-https-client", "rt-tokio"] } diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index d2b28658..354a54ce 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -26,7 +26,7 @@ tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["std"] } anyhow = { workspace = true, features = ["std"] } serde_json = { workspace = true, features = ["std"] } -rdkafka = { workspace = true, features = ["tokio", "libz"] } +rdkafka = { workspace = true, features = ["tokio", "libz", "zstd", "ssl-vendored"] } alloy-consensus = { workspace = true, features = ["std"] } alloy-provider = { workspace = true, features = ["reqwest"] } jsonrpsee = { workspace = true, features = ["server", "macros"] } diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 87c2dc53..1f438961 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -90,7 +90,7 @@ pub struct Config { long, env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE" )] - pub user_operation_consumer_properties: String, + pub user_operation_consumer_properties: Option, /// Consumer group id for user operation topic (set uniquely per deployment) #[arg( diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 4ac51fb7..8e8a81ee 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -73,7 +73,7 @@ pub struct IngressService { tx_submission_method: TxSubmissionMethod, bundle_queue_publisher: BundleQueuePublisher, user_op_queue_publisher: UserOpQueuePublisher, - reputation_service: Arc>, + reputation_service: Option>>, audit_channel: mpsc::UnboundedSender, send_transaction_default_lifetime_seconds: u64, metrics: Metrics, @@ -93,9 +93,10 @@ impl IngressService { audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, builder_backrun_tx: broadcast::Sender, - mempool_engine: Arc>, + mempool_engine: impl Into>>>, config: Config, ) -> Self { + let mempool_engine = mempool_engine.into(); let mempool_provider = Arc::new(providers.mempool); let simulation_provider = Arc::new(providers.simulation); let raw_tx_forward_provider = providers.raw_tx_forward.map(Arc::new); @@ -104,7 +105,9 @@ impl IngressService { config.validate_user_operation_timeout_ms, ); let queue_connection = Arc::new(queue); - let reputation_service = ReputationServiceImpl::new(mempool_engine.get_mempool()); + let reputation_service = mempool_engine + .as_ref() + .map(|engine| Arc::new(ReputationServiceImpl::new(engine.get_mempool()))); Self { mempool_provider, simulation_provider, @@ -119,7 +122,7 @@ impl IngressService { queue_connection.clone(), config.ingress_topic, ), - reputation_service: Arc::new(reputation_service), + reputation_service, audit_channel, send_transaction_default_lifetime_seconds: config .send_transaction_default_lifetime_seconds, @@ -384,11 +387,11 @@ impl IngressApiServer for Ingre chain_id: 1, }; - // DO Nothing with reputation at the moment as this is scafolding - let _ = self - .reputation_service - .get_reputation(&request.user_operation.sender()) - .await; + if let Some(reputation_service) = &self.reputation_service { + let _ = reputation_service + .get_reputation(&request.user_operation.sender()) + .await; + } let user_op_hash = request.hash().map_err(|e| { warn!(message = "Failed to hash user operation", error = %e); @@ -592,7 +595,7 @@ mod tests { ingress_topic: String::new(), audit_kafka_properties: String::new(), audit_topic: String::new(), - user_operation_consumer_properties: String::new(), + user_operation_consumer_properties: Some(String::new()), user_operation_consumer_group_id: "tips-user-operation".to_string(), log_level: String::from("info"), log_format: tips_core::logger::LogFormat::Pretty, diff --git a/crates/system-tests/Cargo.toml b/crates/system-tests/Cargo.toml index d460ede7..3ce797bd 100644 --- a/crates/system-tests/Cargo.toml +++ b/crates/system-tests/Cargo.toml @@ -31,7 +31,7 @@ anyhow = { workspace = true, features = ["std"] } uuid = { workspace = true, features = ["v4", "serde"] } serde_json = { workspace = true, features = ["std"] } reqwest = { version = "0.12.12", features = ["json"] } -rdkafka = { workspace = true, features = ["tokio", "libz"] } +rdkafka = { workspace = true, features = ["tokio", "libz", "zstd", "ssl-vendored"] } alloy-consensus = { workspace = true, features = ["std"] } alloy-provider = { workspace = true, features = ["reqwest"] } jsonrpsee = { workspace = true, features = ["server", "macros"] }