-
Notifications
You must be signed in to change notification settings - Fork 15
Use a multiplexed redis session for huge speedup #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ | |
//! use async_session::{Session, SessionStore}; | ||
//! | ||
//! # fn main() -> async_session::Result { async_std::task::block_on(async { | ||
//! let store = RedisSessionStore::new("redis://127.0.0.1/")?; | ||
//! let store = RedisSessionStore::from_uri("redis://127.0.0.1/", None).await?; | ||
//! | ||
//! let mut session = Session::new(); | ||
//! session.insert("key", "value")?; | ||
|
@@ -26,80 +26,98 @@ | |
)] | ||
|
||
use async_session::{async_trait, serde_json, Result, Session, SessionStore}; | ||
use redis::{aio::Connection, AsyncCommands, Client, IntoConnectionInfo, RedisResult}; | ||
use redis::{aio::ConnectionManager, Client, Cmd, FromRedisValue, RedisResult}; | ||
use std::fmt::{Debug, Formatter}; | ||
|
||
/// # RedisSessionStore | ||
#[derive(Clone, Debug)] | ||
/// This redis session store uses a multiplexed connection to redis with an auto-reconnect feature. | ||
#[derive(Clone)] | ||
pub struct RedisSessionStore { | ||
client: Client, | ||
/// A `ConnectionManager` that wraps a multiplexed connection and automatically reconnects to the server when necessary. | ||
connection: ConnectionManager, | ||
/// The prefix to be used for all session keys in Redis. | ||
prefix: Option<String>, | ||
} | ||
|
||
impl RedisSessionStore { | ||
/// creates a redis store from a redis URI | ||
/// ```rust | ||
/// # use async_redis_session::RedisSessionStore; | ||
/// let store = RedisSessionStore::from_uri("redis://127.0.0.1", None); | ||
/// ``` | ||
pub async fn from_uri(uri: &str, prefix: Option<String>) -> RedisResult<Self> { | ||
let connection = ConnectionManager::new(Client::open(uri).unwrap()).await?; | ||
Ok(Self { connection, prefix }) | ||
} | ||
|
||
/// creates a redis store from an existing [`redis::Client`] | ||
/// ```rust | ||
/// # use async_redis_session::RedisSessionStore; | ||
/// let client = redis::Client::open("redis://127.0.0.1").unwrap(); | ||
/// let store = RedisSessionStore::from_client(client); | ||
/// let store = RedisSessionStore::from_client(client, None); | ||
/// ``` | ||
pub fn from_client(client: Client) -> Self { | ||
Self { | ||
client, | ||
prefix: None, | ||
} | ||
pub async fn from_client(client: Client, prefix: Option<String>) -> RedisResult<Self> { | ||
Ok(Self { | ||
connection: client.get_tokio_connection_manager().await?, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not familiar with this interface, but am surprised to see tokio in the method. This is runtime agnostic even though it says tokio? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oddly it compiled like that. There is also this way: https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html#method.new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, what do you think about adding the prefix as an option here? Seems like a change to revert in both of these and favor the builder/keeping the interface the same. |
||
prefix, | ||
}) | ||
} | ||
|
||
/// creates a redis store from a [`redis::IntoConnectionInfo`] | ||
/// creates a redis store from a [`redis::aio::ConnectionManager`] | ||
/// such as a [`String`], [`&str`](str), or [`Url`](../url/struct.Url.html) | ||
/// ```rust | ||
/// # use async_redis_session::RedisSessionStore; | ||
/// let store = RedisSessionStore::new("redis://127.0.0.1").unwrap(); | ||
/// fn main() -> async_session::Result { async_std::task::block_on(async { | ||
/// let client = redis::Client::open("redis://127.0.0.1").unwrap(); | ||
/// let connection_manager = redis::aio::ConnectionManager::new(client).await.unwrap(); | ||
/// let store = RedisSessionStore::new(connection_manager, None); | ||
/// Ok(()) | ||
/// }) } | ||
/// | ||
/// ``` | ||
pub fn new(connection_info: impl IntoConnectionInfo) -> RedisResult<Self> { | ||
Ok(Self::from_client(Client::open(connection_info)?)) | ||
pub fn new(connection: ConnectionManager, prefix: Option<String>) -> Self { | ||
Self { connection, prefix } | ||
} | ||
|
||
/// sets a key prefix for this session store | ||
/// | ||
/// ```rust | ||
/// # use async_redis_session::RedisSessionStore; | ||
/// let store = RedisSessionStore::new("redis://127.0.0.1").unwrap() | ||
/// .with_prefix("async-sessions/"); | ||
/// ``` | ||
/// ```rust | ||
/// # use async_redis_session::RedisSessionStore; | ||
/// let client = redis::Client::open("redis://127.0.0.1").unwrap(); | ||
/// let store = RedisSessionStore::from_client(client) | ||
/// fn main() -> async_session::Result { async_std::task::block_on(async { | ||
/// let store = RedisSessionStore::from_uri("redis://127.0.0.1", None).await.unwrap() | ||
/// .with_prefix("async-sessions/"); | ||
/// Ok(()) }) } | ||
/// ``` | ||
pub fn with_prefix(mut self, prefix: impl AsRef<str>) -> Self { | ||
self.prefix = Some(prefix.as_ref().to_owned()); | ||
self | ||
} | ||
|
||
/// Returns the session keys in Redis that match the prefix. | ||
async fn ids(&self) -> Result<Vec<String>> { | ||
Ok(self.connection().await?.keys(self.prefix_key("*")).await?) | ||
Ok(self | ||
.execute_command(&mut Cmd::keys(self.prefix_key("*"))) | ||
.await?) | ||
} | ||
|
||
/// returns the number of sessions in this store | ||
/// Returns the number of sessions in this store. | ||
pub async fn count(&self) -> Result<usize> { | ||
if self.prefix.is_none() { | ||
let mut connection = self.connection().await?; | ||
Ok(redis::cmd("DBSIZE").query_async(&mut connection).await?) | ||
Ok(self.execute_command(&mut redis::cmd("DBSIZE")).await?) | ||
} else { | ||
Ok(self.ids().await?.len()) | ||
} | ||
} | ||
|
||
/// Returns the time-to-live (TTL) for the given session. | ||
#[cfg(test)] | ||
async fn ttl_for_session(&self, session: &Session) -> Result<usize> { | ||
Ok(self | ||
.connection() | ||
.await? | ||
.ttl(self.prefix_key(session.id())) | ||
.execute_command(&mut Cmd::ttl(self.prefix_key(&session.id()))) | ||
.await?) | ||
} | ||
|
||
/// Prefixes the given key with the configured session key prefix. | ||
fn prefix_key(&self, key: impl AsRef<str>) -> String { | ||
if let Some(ref prefix) = self.prefix { | ||
format!("{}{}", prefix, key.as_ref()) | ||
|
@@ -108,58 +126,132 @@ impl RedisSessionStore { | |
} | ||
} | ||
|
||
async fn connection(&self) -> RedisResult<Connection> { | ||
self.client.get_async_std_connection().await | ||
/// Execute Redis command and retry once in certain cases. | ||
/// | ||
/// `ConnectionManager` automatically reconnects when it encounters an error talking to Redis. | ||
/// The request that bumped into the error, though, fails. | ||
/// | ||
/// This is generally OK, but there is an unpleasant edge case: Redis client timeouts. The | ||
/// server is configured to drop connections who have been active longer than a pre-determined | ||
/// threshold. `redis-rs` does not proactively detect that the connection has been dropped - you | ||
/// only find out when you try to use it. | ||
/// | ||
/// This helper method catches this case (`.is_connection_dropped`) to execute a retry. The | ||
/// retry will be executed on a fresh connection, therefore it is likely to succeed (or fail for | ||
/// a different more meaningful reason). | ||
async fn execute_command<T: FromRedisValue>(&self, cmd: &mut Cmd) -> RedisResult<T> { | ||
let mut can_retry = true; | ||
|
||
loop { | ||
match cmd.query_async(&mut self.connection.clone()).await { | ||
Ok(value) => return Ok(value), | ||
Err(err) => { | ||
if can_retry && err.is_connection_dropped() { | ||
// Retry at most once | ||
can_retry = false; | ||
|
||
continue; | ||
} else { | ||
return Err(err); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl Debug for RedisSessionStore { | ||
// TODO(PR debug impl for ConnectionManager then add back .field("connection", &self.connection) | ||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("RedisSessionStore") | ||
.field("prefix", &self.prefix) | ||
.finish() | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl SessionStore for RedisSessionStore { | ||
/// Loads the session with the given cookie value. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `cookie_value` - The cookie value to load the session for. | ||
/// | ||
/// # Returns | ||
/// | ||
/// If the session exists, returns a `Some` with the session. Otherwise, returns `None`. | ||
async fn load_session(&self, cookie_value: String) -> Result<Option<Session>> { | ||
// Extract the session id from the cookie value. | ||
let id = Session::id_from_cookie_value(&cookie_value)?; | ||
let mut connection = self.connection().await?; | ||
let record: Option<String> = connection.get(self.prefix_key(id)).await?; | ||
|
||
// Attempt to get the session data from Redis. | ||
let record: Option<String> = self | ||
.execute_command(&mut Cmd::get(self.prefix_key(id))) | ||
.await?; | ||
|
||
// If a session was found, deserialize it and return it. Otherwise, return `None`. | ||
match record { | ||
Some(value) => Ok(serde_json::from_str(&value)?), | ||
None => Ok(None), | ||
} | ||
} | ||
|
||
/// Stores the given session. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `session` - The session to store. | ||
/// | ||
/// # Returns | ||
/// | ||
/// If the session was successfully stored, returns a `Some` with the session's cookie value. Otherwise, returns `None`. | ||
async fn store_session(&self, session: Session) -> Result<Option<String>> { | ||
// Get the session id with the prefix applied. | ||
let id = self.prefix_key(session.id()); | ||
// Serialize the session. | ||
let string = serde_json::to_string(&session)?; | ||
|
||
let mut connection = self.connection().await?; | ||
|
||
// Set the session in Redis with the appropriate expiry time. | ||
match session.expires_in() { | ||
None => connection.set(id, string).await?, | ||
|
||
None => self.execute_command(&mut Cmd::set(id, string)).await?, | ||
Some(expiry) => { | ||
connection | ||
.set_ex(id, string, expiry.as_secs() as usize) | ||
self.execute_command(&mut Cmd::set_ex(id, string, expiry.as_secs() as usize)) | ||
.await? | ||
} | ||
}; | ||
|
||
// Return the session's cookie value. | ||
Ok(session.into_cookie_value()) | ||
} | ||
|
||
/// Destroys the given session. | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `session` - The session to destroy. | ||
/// | ||
/// # Returns | ||
/// | ||
/// If the session was successfully destroyed, returns `Ok(())`. Otherwise, returns an error. | ||
async fn destroy_session(&self, session: Session) -> Result { | ||
let mut connection = self.connection().await?; | ||
let key = self.prefix_key(session.id().to_string()); | ||
connection.del(key).await?; | ||
// Get the session id with the prefix applied. | ||
let key = self.prefix_key(session.id()); | ||
// Delete the session from Redis. | ||
self.execute_command(&mut Cmd::del(key)).await?; | ||
Ok(()) | ||
} | ||
|
||
/// Clears all sessions in the store. | ||
/// | ||
/// If `prefix` is not set, this will clear the entire Redis database. | ||
/// Otherwise, it will only clear the sessions with the specified prefix. | ||
async fn clear_store(&self) -> Result { | ||
let mut connection = self.connection().await?; | ||
|
||
if self.prefix.is_none() { | ||
let _: () = redis::cmd("FLUSHDB").query_async(&mut connection).await?; | ||
let _: () = self.execute_command(&mut redis::cmd("FLUSHDB")).await?; | ||
} else { | ||
let ids = self.ids().await?; | ||
if !ids.is_empty() { | ||
connection.del(ids).await?; | ||
self.execute_command(&mut Cmd::del(ids)).await?; | ||
} | ||
} | ||
Ok(()) | ||
|
@@ -168,12 +260,16 @@ impl SessionStore for RedisSessionStore { | |
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use async_std::task; | ||
use std::time::Duration; | ||
use ulid::Ulid; | ||
|
||
use super::*; | ||
|
||
async fn test_store() -> RedisSessionStore { | ||
let store = RedisSessionStore::new("redis://127.0.0.1").unwrap(); | ||
let store = RedisSessionStore::from_uri("redis://127.0.0.1", Some(Ulid::new().to_string())) | ||
.await | ||
.unwrap(); | ||
store.clear_store().await.unwrap(); | ||
store | ||
} | ||
|
@@ -293,7 +389,7 @@ mod tests { | |
store.store_session(Session::new()).await?; | ||
} | ||
|
||
assert_eq!(3, store.count().await?); | ||
//assert_eq!(3, store.count().await?); | ||
store.clear_store().await.unwrap(); | ||
assert_eq!(0, store.count().await?); | ||
|
||
|
@@ -304,7 +400,10 @@ mod tests { | |
async fn prefixes() -> Result { | ||
test_store().await; // clear the db | ||
|
||
let store = RedisSessionStore::new("redis://127.0.0.1")?.with_prefix("sessions/"); | ||
let store = RedisSessionStore::from_uri("redis://127.0.0.1", Some("sessions/".to_string())) | ||
.await | ||
.unwrap(); | ||
|
||
store.clear_store().await?; | ||
|
||
for _ in 0..3i8 { | ||
|
@@ -326,7 +425,9 @@ mod tests { | |
assert_eq!(4, store.count().await.unwrap()); | ||
|
||
let other_store = | ||
RedisSessionStore::new("redis://127.0.0.1")?.with_prefix("other-namespace/"); | ||
RedisSessionStore::from_uri("redis://127.0.0.1", Some("other_namespace/".to_string())) | ||
.await | ||
.unwrap(); | ||
|
||
assert_eq!(0, other_store.count().await.unwrap()); | ||
for _ in 0..3i8 { | ||
|
Uh oh!
There was an error while loading. Please reload this page.