Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ apalis = { version = "0.7", features = ["limit", "retry", "catch-panic", "timeou
apalis-redis = { version = "0.7" }
apalis-cron = { version = "0.7" }
redis = { version = "0.32", features = ["aio", "connection-manager", "tokio-comp"] }
deadpool-redis = { version = "0.22", features = ["rt_tokio_1"] }
tokio = { version = "1.43", features = ["sync", "io-util", "time"] }
rand = "0.9"
parking_lot = "0.12"
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ This table lists the environment variables and their default values.
| `METRICS_PORT` | `8081` | `<any tcp port (preferably choose non-privileged ports i.e. (1024-65535))>` | Port to use for metrics server. |
| `REDIS_URL` | `redis://localhost:6379` | `<redis connection string>` | Redis connection URL for the relayer. See [Storage Configuration](./configuration/storage) for Redis setup details. |
| `REDIS_CONNECTION_TIMEOUT_MS` | `10000` | `<timeout in milliseconds>` | Connection timeout for Redis in milliseconds. See [Storage Configuration](./configuration/storage) for Redis configuration. |
| `REDIS_POOL_MAX_SIZE` | `500` | `<number>` | Maximum number of connections in the Redis connection pool. See [Storage Configuration](./configuration/storage) for tuning guidance. |
| `REDIS_POOL_TIMEOUT_MS` | `10000` | `<timeout in milliseconds>` | Maximum time to wait for a connection from the pool. See [Storage Configuration](./configuration/storage) for performance tuning. |
| `REDIS_KEY_PREFIX` | `oz-relayer` | `string` | Redis key prefix for namespacing. See [Storage Configuration](./configuration/storage) for more information. |
| `STORAGE_ENCRYPTION_KEY` | `` | `string` | Encryption key used to encrypt data at rest in Redis storage. See [Storage Configuration](./configuration/storage) for security details. |
| `RPC_TIMEOUT_MS` | `10000` | `<timeout in milliseconds>` | Sets the maximum time to wait for RPC connections before timing out. |
Expand Down
20 changes: 20 additions & 0 deletions docs/configuration/storage.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ STORAGE_ENCRYPTION_KEY=your-encryption-key-here
| --- | --- | --- | --- |
| `REDIS_URL` | `redis://localhost:6379` | Redis connection string | Full connection URL for the Redis instance. Supports Redis, Redis Sentinel, and Redis Cluster configurations. |
| `REDIS_CONNECTION_TIMEOUT_MS` | `10000` | `number` (milliseconds) | Maximum time to wait when connecting to Redis before timing out. |
| `REDIS_POOL_MAX_SIZE` | `500` | `number` | Maximum number of connections in the Redis connection pool. Adjust based on your Redis instance capacity, workload, and deployment scale. |
| `REDIS_POOL_TIMEOUT_MS` | `10000` | `number` (milliseconds) | Maximum time to wait for a connection from the pool before timing out. Higher values help handle traffic spikes. |
| `REDIS_KEY_PREFIX` | `oz-relayer` | `string` | Prefix added to all Redis keys. Useful for namespacing when sharing Redis with other applications. |
| `STORAGE_ENCRYPTION_KEY` | `` | `string` (base64) | Encryption key used to encrypt sensitive data at rest in Redis. Generate using `cargo run --example generate_encryption_key`. |

Expand All @@ -114,6 +116,24 @@ When using Redis storage in production:

</Callout>

### Connection Pool Tuning

Redis connection pooling improves performance and prevents connection exhaustion. Configure pool size based on your Redis instance capacity, workload, and deployment scale.

***Factors to consider:***

* **Redis instance capacity**: Check your Redis instance's maximum connection limit (e.g., ElastiCache small instances support 65,000+ connections)
* **Transaction volume**: Higher TPS requires more connections
* **Number of relayers**: More active relayers increase concurrent operations
* **Instance count**: Divide total connection budget across multiple relayer instances
* **Traffic patterns**: Account for traffic spikes and peak loads

***Configuration:***
```bash
REDIS_POOL_MAX_SIZE=500 # Adjust based on instance capacity and workload
REDIS_POOL_TIMEOUT_MS=15000 # Time to wait for available connection
```

### Encryption at Rest

Sensitive configuration data is encrypted before being stored in Redis when `STORAGE_ENCRYPTION_KEY` is provided.
Expand Down
45 changes: 31 additions & 14 deletions src/bootstrap/initialize_app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
};
use actix_web::web;
use color_eyre::Result;
use deadpool_redis::Pool;
use std::sync::Arc;
use tracing::warn;

Expand All @@ -31,12 +32,20 @@ pub struct RepositoryCollection {

/// Initializes repositories based on the server configuration
///
/// # Arguments
///
/// * `config` - Server configuration
/// * `pool` - Redis connection pool (required for Redis storage type, None for in-memory)
///
/// # Returns
///
/// * `Result<RepositoryCollection>` - Initialized repositories
///
/// # Errors
pub async fn initialize_repositories(config: &ServerConfig) -> eyre::Result<RepositoryCollection> {
pub async fn initialize_repositories(
config: &ServerConfig,
pool: Option<Arc<Pool>>,
) -> eyre::Result<RepositoryCollection> {
let repositories = match config.repository_storage_type {
RepositoryStorageType::InMemory => RepositoryCollection {
relayer: Arc::new(RelayerRepositoryStorage::new_in_memory()),
Expand All @@ -54,39 +63,41 @@ pub async fn initialize_repositories(config: &ServerConfig) -> eyre::Result<Repo
return Err(eyre::eyre!("Storage encryption key is not set. Please set the STORAGE_ENCRYPTION_KEY environment variable."));
}

let connection_manager = initialize_redis_connection(config).await?;
let pool =
pool.ok_or_else(|| eyre::eyre!("Redis pool is required for Redis storage type"))?;

// Use the shared pool for all repositories
RepositoryCollection {
relayer: Arc::new(RelayerRepositoryStorage::new_redis(
connection_manager.clone(),
pool.clone(),
config.redis_key_prefix.clone(),
)?),
transaction: Arc::new(TransactionRepositoryStorage::new_redis(
connection_manager.clone(),
pool.clone(),
config.redis_key_prefix.clone(),
)?),
signer: Arc::new(SignerRepositoryStorage::new_redis(
connection_manager.clone(),
pool.clone(),
config.redis_key_prefix.clone(),
)?),
notification: Arc::new(NotificationRepositoryStorage::new_redis(
connection_manager.clone(),
pool.clone(),
config.redis_key_prefix.clone(),
)?),
network: Arc::new(NetworkRepositoryStorage::new_redis(
connection_manager.clone(),
pool.clone(),
config.redis_key_prefix.clone(),
)?),
transaction_counter: Arc::new(TransactionCounterRepositoryStorage::new_redis(
connection_manager.clone(),
pool.clone(),
config.redis_key_prefix.clone(),
)?),
plugin: Arc::new(PluginRepositoryStorage::new_redis(
connection_manager.clone(),
pool.clone(),
config.redis_key_prefix.clone(),
)?),
api_key: Arc::new(ApiKeyRepositoryStorage::new_redis(
connection_manager,
pool,
config.redis_key_prefix.clone(),
)?),
}
Expand All @@ -110,9 +121,13 @@ pub async fn initialize_repositories(config: &ServerConfig) -> eyre::Result<Repo
pub async fn initialize_app_state(
server_config: Arc<ServerConfig>,
) -> Result<web::ThinData<DefaultAppState>> {
let repositories = initialize_repositories(&server_config).await?;
// Initialize Redis pool once - shared by both repositories and job queues
let redis_pool = initialize_redis_connection(&server_config).await?;

let repositories = initialize_repositories(&server_config, Some(redis_pool.clone())).await?;

let queue = Queue::setup().await?;
// Reuse the same pool for job queues
let queue = Queue::setup(redis_pool).await?;
let job_producer = Arc::new(jobs::JobProducer::new(queue.clone()));

let app_state = web::ThinData(AppState {
Expand Down Expand Up @@ -146,7 +161,8 @@ mod tests {
#[tokio::test]
async fn test_initialize_repositories_in_memory() {
let config = create_test_server_config(RepositoryStorageType::InMemory);
let result = initialize_repositories(&config).await;
// For in-memory storage, pool is not required
let result = initialize_repositories(&config, None).await;

assert!(result.is_ok());
let repositories = result.unwrap();
Expand All @@ -165,7 +181,8 @@ mod tests {
#[tokio::test]
async fn test_repository_collection_functionality() {
let config = create_test_server_config(RepositoryStorageType::InMemory);
let repositories = initialize_repositories(&config).await.unwrap();
// For in-memory storage, pool is not required
let repositories = initialize_repositories(&config, None).await.unwrap();

// Test basic repository operations
let relayer = create_mock_relayer("test-relayer".to_string(), false);
Expand Down
98 changes: 98 additions & 0 deletions src/config/server_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub struct ServerConfig {
pub redis_connection_timeout_ms: u64,
/// The prefix for the Redis key.
pub redis_key_prefix: String,
/// Maximum number of connections in the Redis pool.
pub redis_pool_max_size: usize,
/// Timeout in milliseconds waiting to get a connection from the pool.
pub redis_pool_timeout_ms: u64,
/// The number of milliseconds to wait for an RPC response.
pub rpc_timeout_ms: u64,
/// Maximum number of retry attempts for provider operations.
Expand Down Expand Up @@ -120,6 +124,8 @@ impl ServerConfig {
enable_swagger: Self::get_enable_swagger(),
redis_connection_timeout_ms: Self::get_redis_connection_timeout_ms(),
redis_key_prefix: Self::get_redis_key_prefix(),
redis_pool_max_size: Self::get_redis_pool_max_size(),
redis_pool_timeout_ms: Self::get_redis_pool_timeout_ms(),
rpc_timeout_ms: Self::get_rpc_timeout_ms(),
provider_max_retries: Self::get_provider_max_retries(),
provider_retry_base_delay_ms: Self::get_provider_retry_base_delay_ms(),
Expand Down Expand Up @@ -245,6 +251,28 @@ impl ServerConfig {
env::var("REDIS_KEY_PREFIX").unwrap_or_else(|_| "oz-relayer".to_string())
}

/// Gets the Redis pool max size from environment variable or default
/// Returns default (500) if value is 0 or invalid
pub fn get_redis_pool_max_size() -> usize {
env::var("REDIS_POOL_MAX_SIZE")
.unwrap_or_else(|_| "500".to_string())
.parse()
.ok()
.filter(|&v| v > 0)
.unwrap_or(500)
}

/// Gets the Redis pool timeout from environment variable or default
/// Returns default (10000) if value is 0 or invalid
pub fn get_redis_pool_timeout_ms() -> u64 {
env::var("REDIS_POOL_TIMEOUT_MS")
.unwrap_or_else(|_| "10000".to_string())
.parse()
.ok()
.filter(|&v| v > 0)
.unwrap_or(10000)
}

/// Gets the RPC timeout from environment variable or default
pub fn get_rpc_timeout_ms() -> u64 {
env::var("RPC_TIMEOUT_MS")
Expand Down Expand Up @@ -606,6 +634,8 @@ mod tests {
env::remove_var("RESET_STORAGE_ON_START");
env::remove_var("STORAGE_ENCRYPTION_KEY");
env::remove_var("TRANSACTION_EXPIRATION_HOURS");
env::remove_var("REDIS_POOL_MAX_SIZE");
env::remove_var("REDIS_POOL_TIMEOUT_MS");

// Test individual getters with defaults
assert_eq!(ServerConfig::get_host(), "0.0.0.0");
Expand All @@ -631,6 +661,8 @@ mod tests {
assert!(!ServerConfig::get_reset_storage_on_start());
assert!(ServerConfig::get_storage_encryption_key().is_none());
assert_eq!(ServerConfig::get_transaction_expiration_hours(), 4);
assert_eq!(ServerConfig::get_redis_pool_max_size(), 500);
assert_eq!(ServerConfig::get_redis_pool_timeout_ms(), 10000);
}

#[test]
Expand Down Expand Up @@ -662,6 +694,8 @@ mod tests {
env::set_var("RESET_STORAGE_ON_START", "true");
env::set_var("STORAGE_ENCRYPTION_KEY", "my-encryption-key");
env::set_var("TRANSACTION_EXPIRATION_HOURS", "12");
env::set_var("REDIS_POOL_MAX_SIZE", "200");
env::set_var("REDIS_POOL_TIMEOUT_MS", "20000");

// Test individual getters with custom values
assert_eq!(ServerConfig::get_host(), "192.168.1.1");
Expand Down Expand Up @@ -693,6 +727,70 @@ mod tests {
assert!(ServerConfig::get_reset_storage_on_start());
assert!(ServerConfig::get_storage_encryption_key().is_some());
assert_eq!(ServerConfig::get_transaction_expiration_hours(), 12);
assert_eq!(ServerConfig::get_redis_pool_max_size(), 200);
assert_eq!(ServerConfig::get_redis_pool_timeout_ms(), 20000);
}

#[test]
fn test_get_redis_pool_max_size() {
let _lock = match ENV_MUTEX.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};

// Test default value when env var is not set
env::remove_var("REDIS_POOL_MAX_SIZE");
assert_eq!(ServerConfig::get_redis_pool_max_size(), 500);

// Test custom value
env::set_var("REDIS_POOL_MAX_SIZE", "100");
assert_eq!(ServerConfig::get_redis_pool_max_size(), 100);

// Test invalid value returns default
env::set_var("REDIS_POOL_MAX_SIZE", "not_a_number");
assert_eq!(ServerConfig::get_redis_pool_max_size(), 500);

// Test zero value returns default (invalid)
env::set_var("REDIS_POOL_MAX_SIZE", "0");
assert_eq!(ServerConfig::get_redis_pool_max_size(), 500);

// Test large value
env::set_var("REDIS_POOL_MAX_SIZE", "10000");
assert_eq!(ServerConfig::get_redis_pool_max_size(), 10000);

// Cleanup
env::remove_var("REDIS_POOL_MAX_SIZE");
}

#[test]
fn test_get_redis_pool_timeout_ms() {
let _lock = match ENV_MUTEX.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};

// Test default value when env var is not set
env::remove_var("REDIS_POOL_TIMEOUT_MS");
assert_eq!(ServerConfig::get_redis_pool_timeout_ms(), 10000);

// Test custom value
env::set_var("REDIS_POOL_TIMEOUT_MS", "15000");
assert_eq!(ServerConfig::get_redis_pool_timeout_ms(), 15000);

// Test invalid value returns default
env::set_var("REDIS_POOL_TIMEOUT_MS", "not_a_number");
assert_eq!(ServerConfig::get_redis_pool_timeout_ms(), 10000);

// Test zero value returns default (invalid)
env::set_var("REDIS_POOL_TIMEOUT_MS", "0");
assert_eq!(ServerConfig::get_redis_pool_timeout_ms(), 10000);

// Test large value
env::set_var("REDIS_POOL_TIMEOUT_MS", "60000");
assert_eq!(ServerConfig::get_redis_pool_timeout_ms(), 60000);

// Cleanup
env::remove_var("REDIS_POOL_TIMEOUT_MS");
}

#[test]
Expand Down
15 changes: 7 additions & 8 deletions src/domain/transaction/evm/price_calculator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@
//!
//! # Example
//! ```rust, ignore
//! # use your_crate::{PriceCalculator, EvmTransactionData, RelayerRepoModel, EvmGasPriceService};
//! # async fn example<P: EvmProviderTrait>(
//! # use openzeppelin_relayer::domain::transaction::evm::PriceCalculator;
//! # use openzeppelin_relayer::models::{EvmTransactionData, RelayerRepoModel, TransactionError};
//! # use openzeppelin_relayer::services::gas::evm_gas_price::EvmGasPriceServiceTrait;
//! # async fn example<G: EvmGasPriceServiceTrait>(
//! # calculator: &PriceCalculator<G>,
//! # tx_data: &EvmTransactionData,
//! # relayer: &RelayerRepoModel,
//! # gas_price_service: &EvmGasPriceService<P>,
//! # provider: &P
//! # ) -> Result<(), TransactionError> {
//! let price_params = PriceCalculator::get_transaction_price_params(
//! let price_params = calculator.get_transaction_price_params(
//! tx_data,
//! relayer,
//! gas_price_service,
//! provider
//! relayer
//! ).await?;
//! # Ok(())
//! # }
Expand Down
Loading
Loading