Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bin/tips-audit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ clap.workspace = true
tokio.workspace = true
anyhow.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["std", "fmt", "ansi", "env-filter", "json"] }
metrics-exporter-prometheus = { workspace = true, features = ["http-listener"] }
dotenvy.workspace = true
rdkafka.workspace = true
aws-config.workspace = true
Expand Down
69 changes: 69 additions & 0 deletions bin/tips-audit/src/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::str::FromStr;
use tracing::warn;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LogFormat {
Pretty,
Json,
Compact,
}

impl FromStr for LogFormat {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"json" => Ok(Self::Json),
"compact" => Ok(Self::Compact),
"pretty" => Ok(Self::Pretty),
_ => {
warn!("Invalid log format '{}', defaulting to 'pretty'", s);
Ok(Self::Pretty)
}
}
}
}

pub fn init_logger_with_format(log_level: &str, format: LogFormat) {
let level = match log_level.to_lowercase().as_str() {
"trace" => tracing::Level::TRACE,
"debug" => tracing::Level::DEBUG,
"info" => tracing::Level::INFO,
"warn" => tracing::Level::WARN,
"error" => tracing::Level::ERROR,
_ => {
warn!("Invalid log level '{}', defaulting to 'info'", log_level);
tracing::Level::INFO
}
};

let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level.to_string()));

match format {
LogFormat::Json => {
tracing_subscriber::registry()
.with(env_filter)
.with(
fmt::layer()
.json()
.flatten_event(true)
.with_current_span(true),
)
.init();
}
LogFormat::Compact => {
tracing_subscriber::registry()
.with(env_filter)
.with(fmt::layer().compact())
.init();
}
LogFormat::Pretty => {
tracing_subscriber::registry()
.with(env_filter)
.with(fmt::layer().pretty())
.init();
}
}
}
9 changes: 6 additions & 3 deletions bin/tips-audit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use std::net::SocketAddr;
use tips_audit_lib::{
KafkaAuditArchiver, KafkaAuditLogReader, S3EventReaderWriter, create_kafka_consumer,
};
use tips_core::logger::init_logger_with_format;
use tips_core::metrics::init_prometheus_exporter;
mod logger;
mod metrics;

use logger::init_logger_with_format;
use metrics::init_prometheus_exporter;
use tracing::info;

#[derive(Debug, Clone, ValueEnum)]
Expand All @@ -34,7 +37,7 @@ struct Args {
log_level: String,

#[arg(long, env = "TIPS_AUDIT_LOG_FORMAT", default_value = "pretty")]
log_format: tips_core::logger::LogFormat,
log_format: logger::LogFormat,

#[arg(long, env = "TIPS_AUDIT_S3_CONFIG_TYPE", default_value = "aws")]
s3_config_type: S3ConfigType,
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/metrics.rs → bin/tips-audit/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use metrics_exporter_prometheus::PrometheusBuilder;
use std::net::SocketAddr;

use metrics_exporter_prometheus::PrometheusBuilder;

pub fn init_prometheus_exporter(addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
PrometheusBuilder::new()
.with_http_listener(addr)
Expand Down
6 changes: 3 additions & 3 deletions bin/tips-ingress-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use jsonrpsee::server::Server;
use op_alloy_network::Optimism;
use rdkafka::ClientConfig;
use rdkafka::producer::FutureProducer;
use tips_audit_lib::load_kafka_config_from_file;
use tips_audit_lib::{BundleEvent, KafkaBundleEventPublisher, connect_audit_to_publisher};
use tips_core::kafka::load_kafka_config_from_file;
use tips_core::logger::init_logger_with_format;
use tips_core::metrics::init_prometheus_exporter;
use tips_core::{AcceptedBundle, MeterBundleResponse};
use tips_ingress_rpc_lib::Config;
use tips_ingress_rpc_lib::connect_ingress_to_builder;
use tips_ingress_rpc_lib::health::bind_health_server;
use tips_ingress_rpc_lib::logger::init_logger_with_format;
use tips_ingress_rpc_lib::metrics::init_prometheus_exporter;
use tips_ingress_rpc_lib::queue::KafkaMessageQueue;
use tips_ingress_rpc_lib::service::{IngressApiServer, IngressService, Providers};
use tokio::sync::{broadcast, mpsc};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::domain::mempool::PoolConfig;
use crate::infrastructure::in_memory::mempool::InMemoryMempool;
use crate::infrastructure::kafka::consumer::KafkaEventSource;
use crate::kafka::load_kafka_config_from_file;
use crate::services::mempool_engine::MempoolEngine;
use rdkafka::{
ClientConfig,
consumer::{Consumer, StreamConsumer},
};
use std::sync::Arc;
use tips_core::kafka::load_kafka_config_from_file;
use tokio::sync::RwLock;

pub fn create_mempool_engine(
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions crates/account-abstraction-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub mod domain;
pub mod factories;
pub mod infrastructure;
mod kafka;
pub mod services;

// Convenient re-exports for common imports
Expand Down
2 changes: 1 addition & 1 deletion crates/audit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use publisher::{
mod reader;
pub use reader::{
Event, EventReader, KafkaAuditLogReader, KafkaUserOpAuditLogReader, UserOpEventReader,
UserOpEventWrapper, assign_topic_partition, create_kafka_consumer,
UserOpEventWrapper, assign_topic_partition, create_kafka_consumer, load_kafka_config_from_file,
};

mod storage;
Expand Down
24 changes: 23 additions & 1 deletion crates/audit/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,33 @@ use rdkafka::{
consumer::{Consumer, StreamConsumer},
message::Message,
};
use std::collections::HashMap;
use std::fs;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tips_core::kafka::load_kafka_config_from_file;
use tokio::time::sleep;
use tracing::{debug, error};

/// Loads Kafka configuration from a properties file.
pub fn load_kafka_config_from_file(
properties_file_path: &str,
) -> Result<HashMap<String, String>, std::io::Error> {
let kafka_properties = fs::read_to_string(properties_file_path)?;

let mut config = HashMap::new();

for line in kafka_properties.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some((key, value)) = line.split_once('=') {
config.insert(key.trim().to_string(), value.trim().to_string());
}
}

Ok(config)
}

/// Creates a Kafka consumer from a properties file.
pub fn create_kafka_consumer(kafka_properties_file: &str) -> Result<StreamConsumer> {
let client_config: ClientConfig =
Expand Down
3 changes: 0 additions & 3 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ op-alloy-flz.workspace = true
alloy-serde.workspace = true
serde = { workspace = true, features = ["std", "derive"] }
uuid = { workspace = true, features = ["v4", "serde"] }
tracing = { workspace = true, features = ["std"] }
alloy-consensus = { workspace = true, features = ["std"] }
alloy-rpc-types = { workspace = true, features = ["eth"] }
alloy-provider = { workspace = true, features = ["reqwest"] }
alloy-signer-local = { workspace = true, optional = true }
op-alloy-consensus = { workspace = true, features = ["std", "k256", "serde"] }
alloy-primitives = { workspace = true, features = ["map-foldhash", "serde"] }
op-alloy-rpc-types = { workspace = true, features = ["std"], optional = true }
metrics-exporter-prometheus = { workspace = true, features = ["http-listener"] }
tracing-subscriber = { workspace = true, features = ["std", "fmt", "ansi", "env-filter", "json"] }

[dev-dependencies]
alloy-signer-local.workspace = true
Expand Down
3 changes: 0 additions & 3 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

use alloy_rpc_types as _;

pub mod kafka;
pub mod logger;
pub mod metrics;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
pub mod types;
Expand Down
2 changes: 2 additions & 0 deletions crates/ingress-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ dotenvy.workspace = true
tips-audit-lib.workspace = true
async-trait.workspace = true
metrics-derive.workspace = true
metrics-exporter-prometheus = { workspace = true, features = ["http-listener"] }
op-alloy-network.workspace = true
alloy-signer-local.workspace = true
base-reth-rpc-types.workspace = true
account-abstraction-core.workspace = true
alloy-primitives = { workspace = true, features = ["map-foldhash", "serde"] }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true, features = ["std"] }
tracing-subscriber = { workspace = true, features = ["std", "fmt", "ansi", "env-filter", "json"] }
anyhow = { workspace = true, features = ["std"] }
serde_json = { workspace = true, features = ["std"] }
rdkafka = { workspace = true, features = ["tokio", "libz", "zstd", "ssl-vendored"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/ingress-rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod health;
pub mod logger;
pub mod metrics;
pub mod queue;
pub mod service;
Expand Down Expand Up @@ -112,7 +113,7 @@ pub struct Config {
pub log_level: String,

#[arg(long, env = "TIPS_INGRESS_LOG_FORMAT", default_value = "pretty")]
pub log_format: tips_core::logger::LogFormat,
pub log_format: crate::logger::LogFormat,

/// Default lifetime for sent transactions in seconds (default: 3 hours)
#[arg(
Expand Down
File renamed without changes.
10 changes: 10 additions & 0 deletions crates/ingress-rpc/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
use std::net::SocketAddr;

use metrics::{Counter, Histogram};
use metrics_derive::Metrics;
use metrics_exporter_prometheus::PrometheusBuilder;
use tokio::time::Duration;

pub fn init_prometheus_exporter(addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
PrometheusBuilder::new()
.with_http_listener(addr)
.install()
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
}

pub fn record_histogram(rpc_latency: Duration, rpc: String) {
metrics::histogram!("tips_ingress_rpc_rpc_latency", "rpc" => rpc)
.record(rpc_latency.as_secs_f64());
Expand Down