Skip to content

Commit 00e1fe4

Browse files
jaymellsvix-james
authored andcommitted
Add redisclusterunpooled cache type
Adds downstream changes to support unpooled redis cluster. Simply put, we don't gain anything by using connection pooling for clustered redis. The redis cluster client handles establishing connections, handling errors, etc., internally. As long as we don't make blocking redis requests, we are fine without connection pooling in the redis caching layer.
1 parent 108c979 commit 00e1fe4

File tree

10 files changed

+139
-72
lines changed

10 files changed

+139
-72
lines changed

server/Cargo.lock

Lines changed: 7 additions & 45 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/run-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ echo "*********** RUN 3 ***********"
5050
echo "*********** RUN 4 ***********"
5151
(
5252
export SVIX_QUEUE_TYPE="rediscluster"
53-
export SVIX_CACHE_TYPE="rediscluster"
53+
export SVIX_CACHE_TYPE="redisclusterunpooled"
5454
export SVIX_REDIS_DSN="redis://localhost:6380"
5555
${TEST_COMMAND}
5656
${TEST_COMMAND} -- --ignored redis

server/svix-server/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ ed25519-compact = "2.1.1"
4848
chrono = { version="0.4.26", features = ["serde"] }
4949
reqwest = { version = "0.11.27", features = ["json", "rustls-tls", "hickory-resolver"], default-features = false }
5050
bb8 = "0.8"
51-
bb8-redis = "0.14.0"
52-
redis = { version = "0.24.0", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay"] }
51+
bb8-redis = "0.15.0"
52+
redis = { version = "0.25.4", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay"] }
5353
thiserror = "1.0.30"
5454
bytes = "1.1.0"
5555
blake2 = "0.10.4"

server/svix-server/src/cfg.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ pub struct ConfigurationInner {
193193
fn validate_config_complete(config: &ConfigurationInner) -> Result<(), ValidationError> {
194194
match config.cache_type {
195195
CacheType::None | CacheType::Memory => {}
196-
CacheType::Redis | CacheType::RedisCluster => {
196+
CacheType::Redis | CacheType::RedisCluster | CacheType::RedisClusterUnpooled => {
197197
if config.cache_dsn().is_none() {
198198
return Err(ValidationError {
199199
code: Cow::from("missing field"),
@@ -267,6 +267,9 @@ impl ConfigurationInner {
267267
CacheType::Memory => CacheBackend::Memory,
268268
CacheType::Redis => CacheBackend::Redis(self.cache_dsn().expect(err)),
269269
CacheType::RedisCluster => CacheBackend::RedisCluster(self.cache_dsn().expect(err)),
270+
CacheType::RedisClusterUnpooled => {
271+
CacheBackend::RedisClusterUnpooled(self.cache_dsn().expect(err))
272+
}
270273
}
271274
}
272275
}
@@ -304,6 +307,7 @@ pub enum CacheBackend<'a> {
304307
Memory,
305308
Redis(&'a str),
306309
RedisCluster(&'a str),
310+
RedisClusterUnpooled(&'a str),
307311
}
308312

309313
#[derive(Clone, Debug, Deserialize)]
@@ -336,6 +340,7 @@ pub enum CacheType {
336340
Memory,
337341
Redis,
338342
RedisCluster,
343+
RedisClusterUnpooled,
339344
None,
340345
}
341346

server/svix-server/src/core/cache/redis.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ mod tests {
115115
async fn get_pool(redis_dsn: &str, cfg: &crate::cfg::Configuration) -> RedisPool {
116116
match cfg.cache_type {
117117
CacheType::RedisCluster => crate::redis::new_redis_pool_clustered(redis_dsn, cfg).await,
118+
CacheType::RedisClusterUnpooled => {
119+
crate::redis::new_redis_clustered_unpooled(redis_dsn).await
120+
}
118121
CacheType::Redis => crate::redis::new_redis_pool(redis_dsn, cfg).await,
119122
_ => panic!(
120123
"This test should only be run when redis is configured as the cache provider"

server/svix-server/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ pub async fn run_with_prefix(
117117
let mgr = crate::redis::new_redis_pool_clustered(dsn, &cfg).await;
118118
cache::redis::new(mgr)
119119
}
120+
CacheBackend::RedisClusterUnpooled(dsn) => {
121+
let mgr = crate::redis::new_redis_clustered_unpooled(dsn).await;
122+
cache::redis::new(mgr)
123+
}
120124
};
121125
tracing::debug!("Cache: Started");
122126

server/svix-server/src/queue/redis.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,9 @@ pub mod tests {
369369
CacheType::Redis => {
370370
crate::redis::new_redis_pool(cfg.redis_dsn.as_deref().unwrap(), cfg).await
371371
}
372+
CacheType::RedisClusterUnpooled => {
373+
crate::redis::new_redis_clustered_unpooled(cfg.redis_dsn.as_deref().unwrap()).await
374+
}
372375
_ => {
373376
panic!("This test should only be run when redis is configured as the queue backend")
374377
}

server/svix-server/src/redis/mod.rs

Lines changed: 100 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
mod cluster;
22

3+
use std::time::Duration;
4+
35
use bb8::{Pool, RunError};
4-
use bb8_redis::RedisMultiplexedConnectionManager;
6+
use bb8_redis::RedisConnectionManager;
57
use redis::{FromRedisValue, RedisError, RedisResult};
68

79
pub use self::cluster::RedisClusterConnectionManager;
810
use crate::cfg::Configuration;
911

12+
pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2);
13+
1014
#[derive(Clone, Debug)]
1115
pub enum RedisPool {
1216
Clustered(ClusteredRedisPool),
17+
ClusteredUnpooled(ClusteredRedisUnpooled),
1318
NonClustered(NonClusteredRedisPool),
1419
}
1520

@@ -18,6 +23,7 @@ impl RedisPool {
1823
match self {
1924
Self::Clustered(pool) => pool.get().await,
2025
Self::NonClustered(pool) => pool.get().await,
26+
Self::ClusteredUnpooled(pool) => pool.get().await,
2127
}
2228
}
2329
}
@@ -36,9 +42,30 @@ impl ClusteredRedisPool {
3642
}
3743
}
3844

45+
#[derive(Clone)]
46+
pub struct ClusteredRedisUnpooled {
47+
con: redis::cluster_async::ClusterConnection,
48+
}
49+
50+
impl ClusteredRedisUnpooled {
51+
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
52+
Ok(PooledConnection::ClusteredUnpooled(
53+
ClusteredUnpooledConnection {
54+
con: self.con.clone(),
55+
},
56+
))
57+
}
58+
}
59+
60+
impl std::fmt::Debug for ClusteredRedisUnpooled {
61+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62+
f.debug_struct("ClusteredRedisUnpooled").finish()
63+
}
64+
}
65+
3966
#[derive(Clone, Debug)]
4067
pub struct NonClusteredRedisPool {
41-
pool: Pool<RedisMultiplexedConnectionManager>,
68+
pool: Pool<RedisConnectionManager>,
4269
}
4370

4471
impl NonClusteredRedisPool {
@@ -51,6 +78,7 @@ impl NonClusteredRedisPool {
5178

5279
pub enum PooledConnection<'a> {
5380
Clustered(ClusteredPooledConnection<'a>),
81+
ClusteredUnpooled(ClusteredUnpooledConnection),
5482
NonClustered(NonClusteredPooledConnection<'a>),
5583
}
5684

@@ -75,6 +103,7 @@ impl redis::aio::ConnectionLike for PooledConnection<'_> {
75103
match self {
76104
PooledConnection::Clustered(conn) => conn.con.req_packed_command(cmd),
77105
PooledConnection::NonClustered(conn) => conn.con.req_packed_command(cmd),
106+
PooledConnection::ClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
78107
}
79108
}
80109

@@ -89,19 +118,23 @@ impl redis::aio::ConnectionLike for PooledConnection<'_> {
89118
PooledConnection::NonClustered(conn) => {
90119
conn.con.req_packed_commands(cmd, offset, count)
91120
}
121+
PooledConnection::ClusteredUnpooled(conn) => {
122+
conn.con.req_packed_commands(cmd, offset, count)
123+
}
92124
}
93125
}
94126

95127
fn get_db(&self) -> i64 {
96128
match self {
97129
PooledConnection::Clustered(conn) => conn.con.get_db(),
98130
PooledConnection::NonClustered(conn) => conn.con.get_db(),
131+
PooledConnection::ClusteredUnpooled(conn) => conn.con.get_db(),
99132
}
100133
}
101134
}
102135

103136
pub struct NonClusteredPooledConnection<'a> {
104-
con: bb8::PooledConnection<'a, RedisMultiplexedConnectionManager>,
137+
con: bb8::PooledConnection<'a, RedisConnectionManager>,
105138
}
106139

107140
impl<'a> NonClusteredPooledConnection<'a> {
@@ -134,28 +167,73 @@ impl<'a> ClusteredPooledConnection<'a> {
134167
}
135168
}
136169

137-
pub async fn new_redis_pool_clustered(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
138-
let mgr = RedisClusterConnectionManager::new(redis_dsn)
139-
.expect("Error initializing redis cluster client");
140-
let pool = bb8::Pool::builder()
141-
.max_size(cfg.redis_pool_max_size.into())
142-
.build(mgr)
170+
pub struct ClusteredUnpooledConnection {
171+
con: redis::cluster_async::ClusterConnection,
172+
}
173+
174+
impl ClusteredUnpooledConnection {
175+
pub async fn query_async<T: FromRedisValue>(&mut self, cmd: redis::Cmd) -> RedisResult<T> {
176+
cmd.query_async(&mut self.con).await
177+
}
178+
179+
pub async fn query_async_pipeline<T: FromRedisValue>(
180+
&mut self,
181+
pipe: redis::Pipeline,
182+
) -> RedisResult<T> {
183+
pipe.query_async(&mut self.con).await
184+
}
185+
}
186+
187+
async fn new_redis_pool_helper(
188+
redis_dsn: &str,
189+
clustered: bool,
190+
max_connections: u16,
191+
) -> RedisPool {
192+
if clustered {
193+
let mgr = RedisClusterConnectionManager::new(redis_dsn)
194+
.expect("Error initializing redis cluster client");
195+
let pool = bb8::Pool::builder()
196+
.max_size(max_connections.into())
197+
.build(mgr)
198+
.await
199+
.expect("Error initializing redis cluster connection pool");
200+
let pool = ClusteredRedisPool { pool };
201+
RedisPool::Clustered(pool)
202+
} else {
203+
let mgr = RedisConnectionManager::new(redis_dsn).expect("Error initializing redis client");
204+
let pool = bb8::Pool::builder()
205+
.max_size(max_connections.into())
206+
.build(mgr)
207+
.await
208+
.expect("Error initializing redis connection pool");
209+
let pool = NonClusteredRedisPool { pool };
210+
RedisPool::NonClustered(pool)
211+
}
212+
}
213+
214+
async fn new_redis_unpooled_helper(redis_dsn: &str) -> RedisPool {
215+
let cli = redis::cluster::ClusterClient::builder(vec![redis_dsn])
216+
.retries(1)
217+
.connection_timeout(REDIS_CONN_TIMEOUT)
218+
.build()
219+
.expect("Error initializing redis-unpooled client");
220+
let con = cli
221+
.get_async_connection()
143222
.await
144-
.expect("Error initializing redis cluster connection pool");
145-
let pool = ClusteredRedisPool { pool };
146-
RedisPool::Clustered(pool)
223+
.expect("Failed to get redis-cluster-unpooled connection");
224+
RedisPool::ClusteredUnpooled(ClusteredRedisUnpooled { con })
225+
}
226+
227+
pub async fn new_redis_pool_clustered(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
228+
new_redis_pool_helper(redis_dsn, true, cfg.redis_pool_max_size).await
229+
}
230+
231+
pub async fn new_redis_clustered_unpooled(redis_dsn: &str) -> RedisPool {
232+
new_redis_unpooled_helper(redis_dsn).await
147233
}
148234

149235
pub async fn new_redis_pool(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
150-
let mgr =
151-
RedisMultiplexedConnectionManager::new(redis_dsn).expect("Error initializing redis client");
152-
let pool = bb8::Pool::builder()
153-
.max_size(cfg.redis_pool_max_size.into())
154-
.build(mgr)
155-
.await
156-
.expect("Error initializing redis connection pool");
157-
let pool = NonClusteredRedisPool { pool };
158-
RedisPool::NonClustered(pool)
236+
new_redis_pool_helper(redis_dsn, false, cfg.redis_pool_max_size).await
159237
}
160238

161239
#[cfg(test)]
@@ -168,6 +246,7 @@ mod tests {
168246
async fn get_pool(redis_dsn: &str, cfg: &Configuration) -> RedisPool {
169247
match cfg.cache_type {
170248
CacheType::RedisCluster => super::new_redis_pool_clustered(redis_dsn, cfg).await,
249+
CacheType::RedisClusterUnpooled => super::new_redis_clustered_unpooled(redis_dsn).await,
171250
CacheType::Redis => super::new_redis_pool(redis_dsn, cfg).await,
172251
_ => panic!(
173252
"This test should only be run when redis is configured as the cache provider"

0 commit comments

Comments
 (0)