Skip to content

Commit 5f88c1c

Browse files
jaymellsvix-james
authored andcommitted
Remove connection pool for redis cache variant
Replace connection pooling with unpooled `redis::aio::ConnectionManager`. The ConnectionManager uses a single multiplexed connection to handle all requests and also handles reconnections directly, making connection pooling unnecessary. Note that these changes applying *only* to redis caching, not queuing. This is safe in the redis caching layer because we do not make blocking calls there. However, we *do* make blocking calls currently in our redis queue implementation, so we must continue to use connection pooling there to avoid blocking concurrent requests. Also note that names for our many methods of acquiring a `RedisPool` have been standardized for less ambiguity.
1 parent 6026dda commit 5f88c1c

File tree

8 files changed

+112
-27
lines changed

8 files changed

+112
-27
lines changed

server/Cargo.lock

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/svix-server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ chrono = { version="0.4.26", features = ["serde"] }
4949
reqwest = { version = "0.11.27", features = ["json", "rustls-tls", "hickory-resolver"], default-features = false }
5050
bb8 = "0.8"
5151
bb8-redis = "0.15.0"
52-
redis = { version = "0.25.4", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay"] }
52+
redis = { version = "0.25.4", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay", "connection-manager"] }
5353
thiserror = "1.0.30"
5454
bytes = "1.1.0"
5555
blake2 = "0.10.4"

server/svix-server/src/core/cache/redis.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ mod tests {
115115
async fn get_pool(redis_dsn: &str, cfg: &crate::cfg::Configuration) -> RedisPool {
116116
match cfg.cache_type {
117117
CacheType::RedisCluster => crate::redis::new_redis_clustered_unpooled(redis_dsn).await,
118-
CacheType::Redis => crate::redis::new_redis_pool(redis_dsn, cfg).await,
118+
CacheType::Redis => crate::redis::new_redis_unpooled(redis_dsn).await,
119119
_ => panic!(
120120
"This test should only be run when redis is configured as the cache provider"
121121
),

server/svix-server/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ pub async fn run_with_prefix(
110110
CacheBackend::None => cache::none::new(),
111111
CacheBackend::Memory => cache::memory::new(),
112112
CacheBackend::Redis(dsn) => {
113-
let mgr = crate::redis::new_redis_pool(dsn, &cfg).await;
113+
let mgr = crate::redis::new_redis_unpooled(dsn).await;
114114
cache::redis::new(mgr)
115115
}
116116
CacheBackend::RedisCluster(dsn) => {

server/svix-server/src/queue/redis.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ async fn new_pair_inner(
134134
// - redis tests that only makes sense to run with the DSN set
135135
let dsn = cfg.redis_dsn.as_deref().unwrap();
136136
let pool = match &cfg.queue_type {
137-
QueueType::RedisCluster => crate::redis::new_redis_pool_clustered(dsn, cfg).await,
138-
_ => crate::redis::new_redis_pool(dsn, cfg).await,
137+
QueueType::RedisCluster => crate::redis::new_redis_clustered_pooled(dsn, cfg).await,
138+
_ => crate::redis::new_redis_pooled(dsn, cfg).await,
139139
};
140140

141141
// Create the stream and consumer group for the MAIN queue should it not already exist. The
@@ -364,10 +364,11 @@ pub mod tests {
364364
async fn get_pool(cfg: &Configuration) -> RedisPool {
365365
match cfg.queue_type {
366366
QueueType::RedisCluster => {
367-
crate::redis::new_redis_pool_clustered(cfg.redis_dsn.as_deref().unwrap(), cfg).await
367+
crate::redis::new_redis_clustered_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg)
368+
.await
368369
}
369370
QueueType::Redis => {
370-
crate::redis::new_redis_pool(cfg.redis_dsn.as_deref().unwrap(), cfg).await
371+
crate::redis::new_redis_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg).await
371372
}
372373
_ => {
373374
panic!("This test should only be run when redis is configured as the queue backend")

server/svix-server/src/redis/mod.rs

Lines changed: 79 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub enum RedisPool {
1616
Clustered(ClusteredRedisPool),
1717
ClusteredUnpooled(ClusteredRedisUnpooled),
1818
NonClustered(NonClusteredRedisPool),
19+
NonClusteredUnpooled(NonClusteredRedisUnpooled),
1920
}
2021

2122
impl RedisPool {
@@ -24,6 +25,7 @@ impl RedisPool {
2425
Self::Clustered(pool) => pool.get().await,
2526
Self::NonClustered(pool) => pool.get().await,
2627
Self::ClusteredUnpooled(pool) => pool.get().await,
28+
Self::NonClusteredUnpooled(pool) => pool.get().await,
2729
}
2830
}
2931
}
@@ -63,6 +65,27 @@ impl std::fmt::Debug for ClusteredRedisUnpooled {
6365
}
6466
}
6567

68+
#[derive(Clone)]
69+
pub struct NonClusteredRedisUnpooled {
70+
con: redis::aio::ConnectionManager,
71+
}
72+
73+
impl NonClusteredRedisUnpooled {
74+
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
75+
Ok(PooledConnection::NonClusteredUnpooled(
76+
NonClusteredUnpooledConnection {
77+
con: self.con.clone(),
78+
},
79+
))
80+
}
81+
}
82+
83+
impl std::fmt::Debug for NonClusteredRedisUnpooled {
84+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85+
f.debug_struct("NonClusteredRedisUnpooled").finish()
86+
}
87+
}
88+
6689
#[derive(Clone, Debug)]
6790
pub struct NonClusteredRedisPool {
6891
pool: Pool<RedisConnectionManager>,
@@ -80,6 +103,7 @@ pub enum PooledConnection<'a> {
80103
Clustered(ClusteredPooledConnection<'a>),
81104
ClusteredUnpooled(ClusteredUnpooledConnection),
82105
NonClustered(NonClusteredPooledConnection<'a>),
106+
NonClusteredUnpooled(NonClusteredUnpooledConnection),
83107
}
84108

85109
impl PooledConnection<'_> {
@@ -104,6 +128,7 @@ impl redis::aio::ConnectionLike for PooledConnection<'_> {
104128
PooledConnection::Clustered(conn) => conn.con.req_packed_command(cmd),
105129
PooledConnection::NonClustered(conn) => conn.con.req_packed_command(cmd),
106130
PooledConnection::ClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
131+
PooledConnection::NonClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
107132
}
108133
}
109134

@@ -121,6 +146,9 @@ impl redis::aio::ConnectionLike for PooledConnection<'_> {
121146
PooledConnection::ClusteredUnpooled(conn) => {
122147
conn.con.req_packed_commands(cmd, offset, count)
123148
}
149+
PooledConnection::NonClusteredUnpooled(conn) => {
150+
conn.con.req_packed_commands(cmd, offset, count)
151+
}
124152
}
125153
}
126154

@@ -129,6 +157,7 @@ impl redis::aio::ConnectionLike for PooledConnection<'_> {
129157
PooledConnection::Clustered(conn) => conn.con.get_db(),
130158
PooledConnection::NonClustered(conn) => conn.con.get_db(),
131159
PooledConnection::ClusteredUnpooled(conn) => conn.con.get_db(),
160+
PooledConnection::NonClusteredUnpooled(conn) => conn.con.get_db(),
132161
}
133162
}
134163
}
@@ -150,6 +179,23 @@ impl<'a> NonClusteredPooledConnection<'a> {
150179
}
151180
}
152181

182+
pub struct NonClusteredUnpooledConnection {
183+
con: redis::aio::ConnectionManager,
184+
}
185+
186+
impl NonClusteredUnpooledConnection {
187+
pub async fn query_async<T: FromRedisValue>(&mut self, cmd: redis::Cmd) -> RedisResult<T> {
188+
cmd.query_async(&mut self.con).await
189+
}
190+
191+
pub async fn query_async_pipeline<T: FromRedisValue>(
192+
&mut self,
193+
pipe: redis::Pipeline,
194+
) -> RedisResult<T> {
195+
pipe.query_async(&mut self.con).await
196+
}
197+
}
198+
153199
pub struct ClusteredPooledConnection<'a> {
154200
con: bb8::PooledConnection<'a, RedisClusterConnectionManager>,
155201
}
@@ -211,31 +257,50 @@ async fn new_redis_pool_helper(
211257
}
212258
}
213259

214-
async fn new_redis_unpooled_helper(redis_dsn: &str) -> RedisPool {
215-
let cli = redis::cluster::ClusterClient::builder(vec![redis_dsn])
216-
.retries(1)
217-
.connection_timeout(REDIS_CONN_TIMEOUT)
218-
.build()
219-
.expect("Error initializing redis-unpooled client");
220-
let con = cli
221-
.get_async_connection()
260+
async fn new_redis_unpooled_helper(redis_dsn: &str, clustered: bool) -> RedisPool {
261+
if clustered {
262+
let cli = redis::cluster::ClusterClient::builder(vec![redis_dsn])
263+
.retries(1)
264+
.connection_timeout(REDIS_CONN_TIMEOUT)
265+
.build()
266+
.expect("Error initializing redis-unpooled cluster client");
267+
let con = cli
268+
.get_async_connection()
269+
.await
270+
.expect("Failed to get redis-cluster-unpooled connection");
271+
RedisPool::ClusteredUnpooled(ClusteredRedisUnpooled { con })
272+
} else {
273+
let cli = redis::Client::open(redis_dsn).expect("Error initializing redis unpooled client");
274+
let con = redis::aio::ConnectionManager::new_with_backoff_and_timeouts(
275+
cli,
276+
2,
277+
100,
278+
1,
279+
Duration::MAX,
280+
REDIS_CONN_TIMEOUT,
281+
)
222282
.await
223-
.expect("Failed to get redis-cluster-unpooled connection");
224-
RedisPool::ClusteredUnpooled(ClusteredRedisUnpooled { con })
283+
.expect("Failed to get redis-unpooled connection manager");
284+
RedisPool::NonClusteredUnpooled(NonClusteredRedisUnpooled { con })
285+
}
225286
}
226287

227-
pub async fn new_redis_pool_clustered(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
288+
pub async fn new_redis_clustered_pooled(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
228289
new_redis_pool_helper(redis_dsn, true, cfg.redis_pool_max_size).await
229290
}
230291

231292
pub async fn new_redis_clustered_unpooled(redis_dsn: &str) -> RedisPool {
232-
new_redis_unpooled_helper(redis_dsn).await
293+
new_redis_unpooled_helper(redis_dsn, true).await
233294
}
234295

235-
pub async fn new_redis_pool(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
296+
pub async fn new_redis_pooled(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
236297
new_redis_pool_helper(redis_dsn, false, cfg.redis_pool_max_size).await
237298
}
238299

300+
pub async fn new_redis_unpooled(redis_dsn: &str) -> RedisPool {
301+
new_redis_unpooled_helper(redis_dsn, false).await
302+
}
303+
239304
#[cfg(test)]
240305
mod tests {
241306
use redis::AsyncCommands;
@@ -246,7 +311,7 @@ mod tests {
246311
async fn get_pool(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
247312
match cfg.cache_type {
248313
CacheType::RedisCluster => super::new_redis_clustered_unpooled(redis_dsn).await,
249-
CacheType::Redis => super::new_redis_pool(redis_dsn, cfg).await,
314+
CacheType::Redis => super::new_redis_unpooled(redis_dsn).await,
250315
_ => panic!(
251316
"This test should only be run when redis is configured as the cache provider"
252317
),

server/svix-server/tests/it/message_app.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use svix_server::{
1212
message_app::AppEndpointKey,
1313
types::{BaseId, OrganizationId},
1414
},
15-
redis::{new_redis_clustered_unpooled, new_redis_pool},
15+
redis::{new_redis_clustered_unpooled, new_redis_unpooled},
1616
};
1717

1818
use crate::utils::{
@@ -66,7 +66,7 @@ async fn test_app_deletion() {
6666
let cache = match cfg.cache_backend() {
6767
CacheBackend::None => cache::none::new(),
6868
CacheBackend::Redis(dsn) => {
69-
let mgr = new_redis_pool(dsn, &cfg).await;
69+
let mgr = new_redis_unpooled(dsn).await;
7070
cache::redis::new(mgr)
7171
}
7272
CacheBackend::RedisCluster(dsn) => {
@@ -150,7 +150,7 @@ async fn test_endp_deletion() {
150150
let cache = match cfg.cache_backend() {
151151
CacheBackend::None => cache::none::new(),
152152
CacheBackend::Redis(dsn) => {
153-
let mgr = new_redis_pool(dsn, &cfg).await;
153+
let mgr = new_redis_unpooled(dsn).await;
154154
cache::redis::new(mgr)
155155
}
156156
CacheBackend::RedisCluster(dsn) => {

server/svix-server/tests/it/redis_queue.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@ use svix_server::{
1313
queue::{
1414
new_pair, MessageTask, QueueTask, TaskQueueConsumer, TaskQueueDelivery, TaskQueueProducer,
1515
},
16-
redis::{new_redis_pool, new_redis_pool_clustered, RedisPool},
16+
redis::{new_redis_clustered_pooled, new_redis_pooled, RedisPool},
1717
};
1818

1919
// TODO: Don't copy this from the Redis queue test directly, place the fn somewhere both can access
2020
async fn get_pool(cfg: &Configuration) -> RedisPool {
2121
match cfg.queue_type {
2222
QueueType::RedisCluster => {
23-
new_redis_pool_clustered(cfg.redis_dsn.as_deref().unwrap(), cfg).await
23+
new_redis_clustered_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg).await
2424
}
25-
QueueType::Redis => new_redis_pool(cfg.redis_dsn.as_deref().unwrap(), cfg).await,
25+
QueueType::Redis => new_redis_pooled(cfg.redis_dsn.as_deref().unwrap(), cfg).await,
2626
_ => {
2727
panic!("This test should only be run when redis is configured as the queue backend")
2828
}

0 commit comments

Comments
 (0)