diff --git a/Cargo.lock b/Cargo.lock index c0f4707..c793736 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7079,11 +7079,13 @@ dependencies = [ "aws-sdk-s3", "clap", "dotenvy", + "metrics-exporter-prometheus", "rdkafka", "tips-audit-lib", "tips-core", "tokio", "tracing", + "tracing-subscriber 0.3.22", ] [[package]] @@ -7119,14 +7121,11 @@ dependencies = [ "alloy-rpc-types", "alloy-serde", "alloy-signer-local", - "metrics-exporter-prometheus", "op-alloy-consensus", "op-alloy-flz", "op-alloy-rpc-types", "serde", "serde_json", - "tracing", - "tracing-subscriber 0.3.22", "uuid", ] @@ -7168,6 +7167,7 @@ dependencies = [ "jsonrpsee", "metrics", "metrics-derive", + "metrics-exporter-prometheus", "mockall", "op-alloy-consensus", "op-alloy-network", @@ -7178,6 +7178,7 @@ dependencies = [ "tips-core", "tokio", "tracing", + "tracing-subscriber 0.3.22", "url", "wiremock", ] diff --git a/bin/tips-audit/Cargo.toml b/bin/tips-audit/Cargo.toml index 7d43244..c55fda6 100644 --- a/bin/tips-audit/Cargo.toml +++ b/bin/tips-audit/Cargo.toml @@ -14,6 +14,8 @@ clap.workspace = true tokio.workspace = true anyhow.workspace = true tracing.workspace = true +tracing-subscriber = { workspace = true, features = ["std", "fmt", "ansi", "env-filter", "json"] } +metrics-exporter-prometheus = { workspace = true, features = ["http-listener"] } dotenvy.workspace = true rdkafka.workspace = true aws-config.workspace = true diff --git a/bin/tips-audit/src/logger.rs b/bin/tips-audit/src/logger.rs new file mode 100644 index 0000000..ee2e823 --- /dev/null +++ b/bin/tips-audit/src/logger.rs @@ -0,0 +1,69 @@ +use std::str::FromStr; +use tracing::warn; +use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LogFormat { + Pretty, + Json, + Compact, +} + +impl FromStr for LogFormat { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "json" => Ok(Self::Json), + "compact" => Ok(Self::Compact), + "pretty" => Ok(Self::Pretty), + _ => { + warn!("Invalid log format '{}', defaulting to 'pretty'", s); + Ok(Self::Pretty) + } + } + } +} + +pub fn init_logger_with_format(log_level: &str, format: LogFormat) { + let level = match log_level.to_lowercase().as_str() { + "trace" => tracing::Level::TRACE, + "debug" => tracing::Level::DEBUG, + "info" => tracing::Level::INFO, + "warn" => tracing::Level::WARN, + "error" => tracing::Level::ERROR, + _ => { + warn!("Invalid log level '{}', defaulting to 'info'", log_level); + tracing::Level::INFO + } + }; + + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level.to_string())); + + match format { + LogFormat::Json => { + tracing_subscriber::registry() + .with(env_filter) + .with( + fmt::layer() + .json() + .flatten_event(true) + .with_current_span(true), + ) + .init(); + } + LogFormat::Compact => { + tracing_subscriber::registry() + .with(env_filter) + .with(fmt::layer().compact()) + .init(); + } + LogFormat::Pretty => { + tracing_subscriber::registry() + .with(env_filter) + .with(fmt::layer().pretty()) + .init(); + } + } +} diff --git a/bin/tips-audit/src/main.rs b/bin/tips-audit/src/main.rs index db958eb..e2a49fc 100644 --- a/bin/tips-audit/src/main.rs +++ b/bin/tips-audit/src/main.rs @@ -8,8 +8,11 @@ use std::net::SocketAddr; use tips_audit_lib::{ KafkaAuditArchiver, KafkaAuditLogReader, S3EventReaderWriter, create_kafka_consumer, }; -use tips_core::logger::init_logger_with_format; -use tips_core::metrics::init_prometheus_exporter; +mod logger; +mod metrics; + +use logger::init_logger_with_format; +use metrics::init_prometheus_exporter; use tracing::info; #[derive(Debug, Clone, ValueEnum)] @@ -34,7 +37,7 @@ struct Args { log_level: String, #[arg(long, env = "TIPS_AUDIT_LOG_FORMAT", default_value = "pretty")] - log_format: tips_core::logger::LogFormat, + log_format: logger::LogFormat, #[arg(long, env = "TIPS_AUDIT_S3_CONFIG_TYPE", default_value = "aws")] s3_config_type: S3ConfigType, diff --git a/crates/core/src/metrics.rs b/bin/tips-audit/src/metrics.rs similarity index 99% rename from crates/core/src/metrics.rs rename to bin/tips-audit/src/metrics.rs index de365ce..bb5f67f 100644 --- a/crates/core/src/metrics.rs +++ b/bin/tips-audit/src/metrics.rs @@ -1,6 +1,7 @@ -use metrics_exporter_prometheus::PrometheusBuilder; use std::net::SocketAddr; +use metrics_exporter_prometheus::PrometheusBuilder; + pub fn init_prometheus_exporter(addr: SocketAddr) -> Result<(), Box> { PrometheusBuilder::new() .with_http_listener(addr) diff --git a/bin/tips-ingress-rpc/src/main.rs b/bin/tips-ingress-rpc/src/main.rs index dcb22c2..74562cf 100644 --- a/bin/tips-ingress-rpc/src/main.rs +++ b/bin/tips-ingress-rpc/src/main.rs @@ -5,14 +5,14 @@ use jsonrpsee::server::Server; use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; +use tips_audit_lib::load_kafka_config_from_file; use tips_audit_lib::{BundleEvent, KafkaBundleEventPublisher, connect_audit_to_publisher}; -use tips_core::kafka::load_kafka_config_from_file; -use tips_core::logger::init_logger_with_format; -use tips_core::metrics::init_prometheus_exporter; use tips_core::{AcceptedBundle, MeterBundleResponse}; use tips_ingress_rpc_lib::Config; use tips_ingress_rpc_lib::connect_ingress_to_builder; use tips_ingress_rpc_lib::health::bind_health_server; +use tips_ingress_rpc_lib::logger::init_logger_with_format; +use tips_ingress_rpc_lib::metrics::init_prometheus_exporter; use tips_ingress_rpc_lib::queue::KafkaMessageQueue; use tips_ingress_rpc_lib::service::{IngressApiServer, IngressService, Providers}; use tokio::sync::{broadcast, mpsc}; diff --git a/crates/account-abstraction-core/src/factories/kafka_engine.rs b/crates/account-abstraction-core/src/factories/kafka_engine.rs index a7d972b..cf7dcbc 100644 --- a/crates/account-abstraction-core/src/factories/kafka_engine.rs +++ b/crates/account-abstraction-core/src/factories/kafka_engine.rs @@ -1,13 +1,13 @@ use crate::domain::mempool::PoolConfig; use crate::infrastructure::in_memory::mempool::InMemoryMempool; use crate::infrastructure::kafka::consumer::KafkaEventSource; +use crate::kafka::load_kafka_config_from_file; use crate::services::mempool_engine::MempoolEngine; use rdkafka::{ ClientConfig, consumer::{Consumer, StreamConsumer}, }; use std::sync::Arc; -use tips_core::kafka::load_kafka_config_from_file; use tokio::sync::RwLock; pub fn create_mempool_engine( diff --git a/crates/core/src/kafka.rs b/crates/account-abstraction-core/src/kafka.rs similarity index 100% rename from crates/core/src/kafka.rs rename to crates/account-abstraction-core/src/kafka.rs diff --git a/crates/account-abstraction-core/src/lib.rs b/crates/account-abstraction-core/src/lib.rs index 0b88b37..fb2446d 100644 --- a/crates/account-abstraction-core/src/lib.rs +++ b/crates/account-abstraction-core/src/lib.rs @@ -4,6 +4,7 @@ pub mod domain; pub mod factories; pub mod infrastructure; +mod kafka; pub mod services; // Convenient re-exports for common imports diff --git a/crates/audit/src/lib.rs b/crates/audit/src/lib.rs index 8291936..45f7aa9 100644 --- a/crates/audit/src/lib.rs +++ b/crates/audit/src/lib.rs @@ -22,7 +22,7 @@ pub use publisher::{ mod reader; pub use reader::{ Event, EventReader, KafkaAuditLogReader, KafkaUserOpAuditLogReader, UserOpEventReader, - UserOpEventWrapper, assign_topic_partition, create_kafka_consumer, + UserOpEventWrapper, assign_topic_partition, create_kafka_consumer, load_kafka_config_from_file, }; mod storage; diff --git a/crates/audit/src/reader.rs b/crates/audit/src/reader.rs index 50697a2..9571ed1 100644 --- a/crates/audit/src/reader.rs +++ b/crates/audit/src/reader.rs @@ -7,11 +7,33 @@ use rdkafka::{ consumer::{Consumer, StreamConsumer}, message::Message, }; +use std::collections::HashMap; +use std::fs; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tips_core::kafka::load_kafka_config_from_file; use tokio::time::sleep; use tracing::{debug, error}; +/// Loads Kafka configuration from a properties file. +pub fn load_kafka_config_from_file( + properties_file_path: &str, +) -> Result, std::io::Error> { + let kafka_properties = fs::read_to_string(properties_file_path)?; + + let mut config = HashMap::new(); + + for line in kafka_properties.lines() { + let line = line.trim(); + if line.is_empty() || line.starts_with('#') { + continue; + } + if let Some((key, value)) = line.split_once('=') { + config.insert(key.trim().to_string(), value.trim().to_string()); + } + } + + Ok(config) +} + /// Creates a Kafka consumer from a properties file. pub fn create_kafka_consumer(kafka_properties_file: &str) -> Result { let client_config: ClientConfig = diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 76d7d63..7444c00 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -19,7 +19,6 @@ op-alloy-flz.workspace = true alloy-serde.workspace = true serde = { workspace = true, features = ["std", "derive"] } uuid = { workspace = true, features = ["v4", "serde"] } -tracing = { workspace = true, features = ["std"] } alloy-consensus = { workspace = true, features = ["std"] } alloy-rpc-types = { workspace = true, features = ["eth"] } alloy-provider = { workspace = true, features = ["reqwest"] } @@ -27,8 +26,6 @@ alloy-signer-local = { workspace = true, optional = true } op-alloy-consensus = { workspace = true, features = ["std", "k256", "serde"] } alloy-primitives = { workspace = true, features = ["map-foldhash", "serde"] } op-alloy-rpc-types = { workspace = true, features = ["std"], optional = true } -metrics-exporter-prometheus = { workspace = true, features = ["http-listener"] } -tracing-subscriber = { workspace = true, features = ["std", "fmt", "ansi", "env-filter", "json"] } [dev-dependencies] alloy-signer-local.workspace = true diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index c2a045c..222480a 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -6,9 +6,6 @@ use alloy_rpc_types as _; -pub mod kafka; -pub mod logger; -pub mod metrics; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; pub mod types; diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 9291d8f..edb25b9 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -16,6 +16,7 @@ dotenvy.workspace = true tips-audit-lib.workspace = true async-trait.workspace = true metrics-derive.workspace = true +metrics-exporter-prometheus = { workspace = true, features = ["http-listener"] } op-alloy-network.workspace = true alloy-signer-local.workspace = true base-reth-rpc-types.workspace = true @@ -23,6 +24,7 @@ account-abstraction-core.workspace = true alloy-primitives = { workspace = true, features = ["map-foldhash", "serde"] } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["std"] } +tracing-subscriber = { workspace = true, features = ["std", "fmt", "ansi", "env-filter", "json"] } anyhow = { workspace = true, features = ["std"] } serde_json = { workspace = true, features = ["std"] } rdkafka = { workspace = true, features = ["tokio", "libz", "zstd", "ssl-vendored"] } diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 1f43896..6a07315 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -1,4 +1,5 @@ pub mod health; +pub mod logger; pub mod metrics; pub mod queue; pub mod service; @@ -112,7 +113,7 @@ pub struct Config { pub log_level: String, #[arg(long, env = "TIPS_INGRESS_LOG_FORMAT", default_value = "pretty")] - pub log_format: tips_core::logger::LogFormat, + pub log_format: crate::logger::LogFormat, /// Default lifetime for sent transactions in seconds (default: 3 hours) #[arg( diff --git a/crates/core/src/logger.rs b/crates/ingress-rpc/src/logger.rs similarity index 100% rename from crates/core/src/logger.rs rename to crates/ingress-rpc/src/logger.rs diff --git a/crates/ingress-rpc/src/metrics.rs b/crates/ingress-rpc/src/metrics.rs index fb53aa5..cc8deb4 100644 --- a/crates/ingress-rpc/src/metrics.rs +++ b/crates/ingress-rpc/src/metrics.rs @@ -1,7 +1,17 @@ +use std::net::SocketAddr; + use metrics::{Counter, Histogram}; use metrics_derive::Metrics; +use metrics_exporter_prometheus::PrometheusBuilder; use tokio::time::Duration; +pub fn init_prometheus_exporter(addr: SocketAddr) -> Result<(), Box> { + PrometheusBuilder::new() + .with_http_listener(addr) + .install() + .map_err(|e| Box::new(e) as Box) +} + pub fn record_histogram(rpc_latency: Duration, rpc: String) { metrics::histogram!("tips_ingress_rpc_rpc_latency", "rpc" => rpc) .record(rpc_latency.as_secs_f64());