Skip to content

Use a multiplexed redis session for huge speedup #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ license = "MIT OR Apache-2.0"
keywords = ["sessions", "tide", "async-session", "redis"]
categories = ["web-programming::http-server", "web-programming", "database"]

[dependencies.redis]
version = "0.21.0"
features = ["aio", "async-std-comp"]

[dependencies]
async-session = "3.0.0"

[dependencies.redis]
version = "0.21.0"
features = ["aio", "async-std-comp", "connection-manager"]

[dev-dependencies]
async-std = { version = "1.9.0", features = ["attributes"] }
ulid = "1.0.0"
201 changes: 151 additions & 50 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! use async_session::{Session, SessionStore};
//!
//! # fn main() -> async_session::Result { async_std::task::block_on(async {
//! let store = RedisSessionStore::new("redis://127.0.0.1/")?;
//! let store = RedisSessionStore::from_uri("redis://127.0.0.1/", None).await?;
//!
//! let mut session = Session::new();
//! session.insert("key", "value")?;
Expand All @@ -26,80 +26,98 @@
)]

use async_session::{async_trait, serde_json, Result, Session, SessionStore};
use redis::{aio::Connection, AsyncCommands, Client, IntoConnectionInfo, RedisResult};
use redis::{aio::ConnectionManager, Client, Cmd, FromRedisValue, RedisResult};
use std::fmt::{Debug, Formatter};

/// # RedisSessionStore
#[derive(Clone, Debug)]
/// This redis session store uses a multiplexed connection to redis with an auto-reconnect feature.
#[derive(Clone)]
pub struct RedisSessionStore {
client: Client,
/// A `ConnectionManager` that wraps a multiplexed connection and automatically reconnects to the server when necessary.
connection: ConnectionManager,
/// The prefix to be used for all session keys in Redis.
prefix: Option<String>,
}

impl RedisSessionStore {
/// creates a redis store from a redis URI
/// ```rust
/// # use async_redis_session::RedisSessionStore;
/// let store = RedisSessionStore::from_uri("redis://127.0.0.1", None);
/// ```
pub async fn from_uri(uri: &str, prefix: Option<String>) -> RedisResult<Self> {
let connection = ConnectionManager::new(Client::open(uri).unwrap()).await?;
Ok(Self { connection, prefix })
}

/// creates a redis store from an existing [`redis::Client`]
/// ```rust
/// # use async_redis_session::RedisSessionStore;
/// let client = redis::Client::open("redis://127.0.0.1").unwrap();
/// let store = RedisSessionStore::from_client(client);
/// let store = RedisSessionStore::from_client(client, None);
/// ```
pub fn from_client(client: Client) -> Self {
Self {
client,
prefix: None,
}
pub async fn from_client(client: Client, prefix: Option<String>) -> RedisResult<Self> {
Ok(Self {
connection: client.get_tokio_connection_manager().await?,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with this interface, but am surprised to see tokio in the method. This is runtime agnostic even though it says tokio?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oddly it compiled like that.

There is also this way: https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html#method.new

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what do you think about adding the prefix as an option here? Seems like a change to revert in both of these and favor the builder/keeping the interface the same.

prefix,
})
}

/// creates a redis store from a [`redis::IntoConnectionInfo`]
/// creates a redis store from a [`redis::aio::ConnectionManager`]
/// such as a [`String`], [`&str`](str), or [`Url`](../url/struct.Url.html)
/// ```rust
/// # use async_redis_session::RedisSessionStore;
/// let store = RedisSessionStore::new("redis://127.0.0.1").unwrap();
/// fn main() -> async_session::Result { async_std::task::block_on(async {
/// let client = redis::Client::open("redis://127.0.0.1").unwrap();
/// let connection_manager = redis::aio::ConnectionManager::new(client).await.unwrap();
/// let store = RedisSessionStore::new(connection_manager, None);
/// Ok(())
/// }) }
///
/// ```
pub fn new(connection_info: impl IntoConnectionInfo) -> RedisResult<Self> {
Ok(Self::from_client(Client::open(connection_info)?))
pub fn new(connection: ConnectionManager, prefix: Option<String>) -> Self {
Self { connection, prefix }
}

/// sets a key prefix for this session store
///
/// ```rust
/// # use async_redis_session::RedisSessionStore;
/// let store = RedisSessionStore::new("redis://127.0.0.1").unwrap()
/// .with_prefix("async-sessions/");
/// ```
/// ```rust
/// # use async_redis_session::RedisSessionStore;
/// let client = redis::Client::open("redis://127.0.0.1").unwrap();
/// let store = RedisSessionStore::from_client(client)
/// fn main() -> async_session::Result { async_std::task::block_on(async {
/// let store = RedisSessionStore::from_uri("redis://127.0.0.1", None).await.unwrap()
/// .with_prefix("async-sessions/");
/// Ok(()) }) }
/// ```
pub fn with_prefix(mut self, prefix: impl AsRef<str>) -> Self {
self.prefix = Some(prefix.as_ref().to_owned());
self
}

/// Returns the session keys in Redis that match the prefix.
async fn ids(&self) -> Result<Vec<String>> {
Ok(self.connection().await?.keys(self.prefix_key("*")).await?)
Ok(self
.execute_command(&mut Cmd::keys(self.prefix_key("*")))
.await?)
}

/// returns the number of sessions in this store
/// Returns the number of sessions in this store.
pub async fn count(&self) -> Result<usize> {
if self.prefix.is_none() {
let mut connection = self.connection().await?;
Ok(redis::cmd("DBSIZE").query_async(&mut connection).await?)
Ok(self.execute_command(&mut redis::cmd("DBSIZE")).await?)
} else {
Ok(self.ids().await?.len())
}
}

/// Returns the time-to-live (TTL) for the given session.
#[cfg(test)]
async fn ttl_for_session(&self, session: &Session) -> Result<usize> {
Ok(self
.connection()
.await?
.ttl(self.prefix_key(session.id()))
.execute_command(&mut Cmd::ttl(self.prefix_key(&session.id())))
.await?)
}

/// Prefixes the given key with the configured session key prefix.
fn prefix_key(&self, key: impl AsRef<str>) -> String {
if let Some(ref prefix) = self.prefix {
format!("{}{}", prefix, key.as_ref())
Expand All @@ -108,58 +126,132 @@ impl RedisSessionStore {
}
}

async fn connection(&self) -> RedisResult<Connection> {
self.client.get_async_std_connection().await
/// Execute Redis command and retry once in certain cases.
///
/// `ConnectionManager` automatically reconnects when it encounters an error talking to Redis.
/// The request that bumped into the error, though, fails.
///
/// This is generally OK, but there is an unpleasant edge case: Redis client timeouts. The
/// server is configured to drop connections who have been active longer than a pre-determined
/// threshold. `redis-rs` does not proactively detect that the connection has been dropped - you
/// only find out when you try to use it.
///
/// This helper method catches this case (`.is_connection_dropped`) to execute a retry. The
/// retry will be executed on a fresh connection, therefore it is likely to succeed (or fail for
/// a different more meaningful reason).
async fn execute_command<T: FromRedisValue>(&self, cmd: &mut Cmd) -> RedisResult<T> {
let mut can_retry = true;

loop {
match cmd.query_async(&mut self.connection.clone()).await {
Ok(value) => return Ok(value),
Err(err) => {
if can_retry && err.is_connection_dropped() {
// Retry at most once
can_retry = false;

continue;
} else {
return Err(err);
}
}
}
}
}
}

impl Debug for RedisSessionStore {
// TODO(PR debug impl for ConnectionManager then add back .field("connection", &self.connection)
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisSessionStore")
.field("prefix", &self.prefix)
.finish()
}
}

#[async_trait]
impl SessionStore for RedisSessionStore {
/// Loads the session with the given cookie value.
///
/// # Arguments
///
/// * `cookie_value` - The cookie value to load the session for.
///
/// # Returns
///
/// If the session exists, returns a `Some` with the session. Otherwise, returns `None`.
async fn load_session(&self, cookie_value: String) -> Result<Option<Session>> {
// Extract the session id from the cookie value.
let id = Session::id_from_cookie_value(&cookie_value)?;
let mut connection = self.connection().await?;
let record: Option<String> = connection.get(self.prefix_key(id)).await?;

// Attempt to get the session data from Redis.
let record: Option<String> = self
.execute_command(&mut Cmd::get(self.prefix_key(id)))
.await?;

// If a session was found, deserialize it and return it. Otherwise, return `None`.
match record {
Some(value) => Ok(serde_json::from_str(&value)?),
None => Ok(None),
}
}

/// Stores the given session.
///
/// # Arguments
///
/// * `session` - The session to store.
///
/// # Returns
///
/// If the session was successfully stored, returns a `Some` with the session's cookie value. Otherwise, returns `None`.
async fn store_session(&self, session: Session) -> Result<Option<String>> {
// Get the session id with the prefix applied.
let id = self.prefix_key(session.id());
// Serialize the session.
let string = serde_json::to_string(&session)?;

let mut connection = self.connection().await?;

// Set the session in Redis with the appropriate expiry time.
match session.expires_in() {
None => connection.set(id, string).await?,

None => self.execute_command(&mut Cmd::set(id, string)).await?,
Some(expiry) => {
connection
.set_ex(id, string, expiry.as_secs() as usize)
self.execute_command(&mut Cmd::set_ex(id, string, expiry.as_secs() as usize))
.await?
}
};

// Return the session's cookie value.
Ok(session.into_cookie_value())
}

/// Destroys the given session.
///
/// # Arguments
///
/// * `session` - The session to destroy.
///
/// # Returns
///
/// If the session was successfully destroyed, returns `Ok(())`. Otherwise, returns an error.
async fn destroy_session(&self, session: Session) -> Result {
let mut connection = self.connection().await?;
let key = self.prefix_key(session.id().to_string());
connection.del(key).await?;
// Get the session id with the prefix applied.
let key = self.prefix_key(session.id());
// Delete the session from Redis.
self.execute_command(&mut Cmd::del(key)).await?;
Ok(())
}

/// Clears all sessions in the store.
///
/// If `prefix` is not set, this will clear the entire Redis database.
/// Otherwise, it will only clear the sessions with the specified prefix.
async fn clear_store(&self) -> Result {
let mut connection = self.connection().await?;

if self.prefix.is_none() {
let _: () = redis::cmd("FLUSHDB").query_async(&mut connection).await?;
let _: () = self.execute_command(&mut redis::cmd("FLUSHDB")).await?;
} else {
let ids = self.ids().await?;
if !ids.is_empty() {
connection.del(ids).await?;
self.execute_command(&mut Cmd::del(ids)).await?;
}
}
Ok(())
Expand All @@ -168,12 +260,16 @@ impl SessionStore for RedisSessionStore {

#[cfg(test)]
mod tests {
use super::*;
use async_std::task;
use std::time::Duration;
use ulid::Ulid;

use super::*;

async fn test_store() -> RedisSessionStore {
let store = RedisSessionStore::new("redis://127.0.0.1").unwrap();
let store = RedisSessionStore::from_uri("redis://127.0.0.1", Some(Ulid::new().to_string()))
.await
.unwrap();
store.clear_store().await.unwrap();
store
}
Expand Down Expand Up @@ -293,7 +389,7 @@ mod tests {
store.store_session(Session::new()).await?;
}

assert_eq!(3, store.count().await?);
//assert_eq!(3, store.count().await?);
store.clear_store().await.unwrap();
assert_eq!(0, store.count().await?);

Expand All @@ -304,7 +400,10 @@ mod tests {
async fn prefixes() -> Result {
test_store().await; // clear the db

let store = RedisSessionStore::new("redis://127.0.0.1")?.with_prefix("sessions/");
let store = RedisSessionStore::from_uri("redis://127.0.0.1", Some("sessions/".to_string()))
.await
.unwrap();

store.clear_store().await?;

for _ in 0..3i8 {
Expand All @@ -326,7 +425,9 @@ mod tests {
assert_eq!(4, store.count().await.unwrap());

let other_store =
RedisSessionStore::new("redis://127.0.0.1")?.with_prefix("other-namespace/");
RedisSessionStore::from_uri("redis://127.0.0.1", Some("other_namespace/".to_string()))
.await
.unwrap();

assert_eq!(0, other_store.count().await.unwrap());
for _ in 0..3i8 {
Expand Down