Skip to content

Commit 2211a7b

Browse files
authored
fix(processor): Fix client handling and number of active connections (#4597)
1 parent 38985a2 commit 2211a7b

File tree

4 files changed

+18
-6
lines changed

4 files changed

+18
-6
lines changed

relay-config/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2425,6 +2425,7 @@ impl Config {
24252425
Some(create_redis_pools(
24262426
redis_configs,
24272427
self.cpu_concurrency() as u32,
2428+
self.pool_concurrency() as u32,
24282429
))
24292430
}
24302431

relay-config/src/redis.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,20 @@ pub(super) fn create_redis_pool(
273273
}
274274
}
275275

276-
pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -> RedisPoolConfigs {
276+
pub(super) fn create_redis_pools(
277+
configs: &RedisConfigs,
278+
cpu_concurrency: u32,
279+
max_pool_concurrency: u32,
280+
) -> RedisPoolConfigs {
277281
// Default `max_connections` for the `project_configs` pool.
278282
// In a unified config, this is used for all pools.
279283
let project_configs_default_connections =
280284
std::cmp::max(cpu_concurrency * 2, DEFAULT_MIN_MAX_CONNECTIONS);
281285

286+
// The number of default connections is equal to how many threads we have times the number of
287+
// futures we can concurrently drive times some leeway since we might use more connections.
288+
let default_connections = cpu_concurrency * max_pool_concurrency * 2;
289+
282290
match configs {
283291
RedisConfigs::Unified(cfg) => {
284292
let pool = create_redis_pool(cfg, project_configs_default_connections);
@@ -291,8 +299,8 @@ pub(super) fn create_redis_pools(configs: &RedisConfigs, cpu_concurrency: u32) -
291299
} => {
292300
let project_configs =
293301
create_redis_pool(project_configs, project_configs_default_connections);
294-
let cardinality = create_redis_pool(cardinality, cpu_concurrency);
295-
let quotas = create_redis_pool(quotas, cpu_concurrency);
302+
let cardinality = create_redis_pool(cardinality, default_connections);
303+
let quotas = create_redis_pool(quotas, default_connections);
296304
RedisPoolConfigs::Individual {
297305
project_configs,
298306
cardinality,

relay-quotas/src/redis.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,6 @@ impl<T: GlobalLimiter> RedisRateLimiter<T> {
271271
quantity: usize,
272272
over_accept_once: bool,
273273
) -> Result<RateLimits, RateLimitingError> {
274-
let mut client = self.pool.client().map_err(RateLimitingError::Redis)?;
275274
let timestamp = UnixTimestamp::now();
276275
let mut invocation = self.script.prepare_invoke();
277276
let mut tracked_quotas = Vec::new();
@@ -338,6 +337,10 @@ impl<T: GlobalLimiter> RedisRateLimiter<T> {
338337
return Ok(rate_limits);
339338
}
340339

340+
// We get the redis client after the global rate limiting since we don't want to hold the
341+
// client across await points, otherwise it might be held for too long, and we will run out
342+
// of connections.
343+
let mut client = self.pool.client().map_err(RateLimitingError::Redis)?;
341344
let rejections: Vec<bool> = invocation
342345
.invoke(&mut client.connection().map_err(RateLimitingError::Redis)?)
343346
.map_err(RedisError::Redis)

relay-server/src/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ pub enum ServiceError {
5656
#[error("could not initialize kafka producer: {0}")]
5757
Kafka(String),
5858

59-
/// Initializing the Redis cluster client failed.
59+
/// Initializing the Redis client failed.
6060
#[cfg(feature = "processing")]
61-
#[error("could not initialize redis cluster client")]
61+
#[error("could not initialize redis client during startup")]
6262
Redis,
6363
}
6464

0 commit comments

Comments
 (0)