diff --git a/Cargo.toml b/Cargo.toml index e3da58f..dac870c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,11 @@ license = "MIT OR Apache-2.0" keywords = ["sessions", "tide", "async-session", "redis"] categories = ["web-programming::http-server", "web-programming", "database"] -[dependencies.redis] -version = "0.21.0" -features = ["aio", "async-std-comp"] - [dependencies] async-session = "3.0.0" +redis = { version = "0.22.1", features = ["tokio-comp", "connection-manager"] } [dev-dependencies] -async-std = { version = "1.9.0", features = ["attributes"] } +ulid = "1.0.0" +tokio = { version = "1.22", features = ["full"] } +tokio-test = "0.4.2" diff --git a/src/lib.rs b/src/lib.rs index 0241b08..080a4d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,8 +3,8 @@ //! use async_redis_session::RedisSessionStore; //! use async_session::{Session, SessionStore}; //! -//! # fn main() -> async_session::Result { async_std::task::block_on(async { -//! let store = RedisSessionStore::new("redis://127.0.0.1/")?; +//! # fn main() -> async_session::Result { tokio_test::block_on(async { +//! let store = RedisSessionStore::from_uri("redis://127.0.0.1/", None).await?; //! //! let mut session = Session::new(); //! session.insert("key", "value")?; @@ -26,80 +26,98 @@ )] use async_session::{async_trait, serde_json, Result, Session, SessionStore}; -use redis::{aio::Connection, AsyncCommands, Client, IntoConnectionInfo, RedisResult}; +use redis::{aio::ConnectionManager, Client, Cmd, FromRedisValue, RedisResult}; +use std::fmt::{Debug, Formatter}; /// # RedisSessionStore -#[derive(Clone, Debug)] +/// This redis session store uses a multiplexed connection to redis with an auto-reconnect feature. +#[derive(Clone)] pub struct RedisSessionStore { - client: Client, + /// A `ConnectionManager` that wraps a multiplexed connection and automatically reconnects to the server when necessary. + connection: ConnectionManager, + /// The prefix to be used for all session keys in Redis. prefix: Option, } impl RedisSessionStore { + /// creates a redis store from a redis URI + /// ```rust + /// # use async_redis_session::RedisSessionStore; + /// let store = RedisSessionStore::from_uri("redis://127.0.0.1", None); + /// ``` + pub async fn from_uri(uri: &str, prefix: Option) -> RedisResult { + let connection = ConnectionManager::new(Client::open(uri).unwrap()).await?; + Ok(Self { connection, prefix }) + } + /// creates a redis store from an existing [`redis::Client`] /// ```rust /// # use async_redis_session::RedisSessionStore; /// let client = redis::Client::open("redis://127.0.0.1").unwrap(); - /// let store = RedisSessionStore::from_client(client); + /// let store = RedisSessionStore::from_client(client, None); /// ``` - pub fn from_client(client: Client) -> Self { - Self { - client, - prefix: None, - } + pub async fn from_client(client: Client, prefix: Option) -> RedisResult { + Ok(Self { + connection: client.get_tokio_connection_manager().await?, + prefix, + }) } - /// creates a redis store from a [`redis::IntoConnectionInfo`] + /// creates a redis store from a [`redis::aio::ConnectionManager`] /// such as a [`String`], [`&str`](str), or [`Url`](../url/struct.Url.html) /// ```rust /// # use async_redis_session::RedisSessionStore; - /// let store = RedisSessionStore::new("redis://127.0.0.1").unwrap(); + /// fn main() -> async_session::Result { tokio_test::block_on(async { + /// let client = redis::Client::open("redis://127.0.0.1").unwrap(); + /// let connection_manager = redis::aio::ConnectionManager::new(client).await.unwrap(); + /// let store = RedisSessionStore::new(connection_manager, None); + /// Ok(()) + /// }) } + /// /// ``` - pub fn new(connection_info: impl IntoConnectionInfo) -> RedisResult { - Ok(Self::from_client(Client::open(connection_info)?)) + pub fn new(connection: ConnectionManager, prefix: Option) -> Self { + Self { connection, prefix } } /// sets a key prefix for this session store /// /// ```rust /// # use async_redis_session::RedisSessionStore; - /// let store = RedisSessionStore::new("redis://127.0.0.1").unwrap() - /// .with_prefix("async-sessions/"); - /// ``` - /// ```rust - /// # use async_redis_session::RedisSessionStore; - /// let client = redis::Client::open("redis://127.0.0.1").unwrap(); - /// let store = RedisSessionStore::from_client(client) + /// fn main() -> async_session::Result { tokio_test::block_on(async { + /// let store = RedisSessionStore::from_uri("redis://127.0.0.1", None).await.unwrap() /// .with_prefix("async-sessions/"); + /// Ok(()) }) } /// ``` pub fn with_prefix(mut self, prefix: impl AsRef) -> Self { self.prefix = Some(prefix.as_ref().to_owned()); self } + /// Returns the session keys in Redis that match the prefix. async fn ids(&self) -> Result> { - Ok(self.connection().await?.keys(self.prefix_key("*")).await?) + Ok(self + .execute_command(&mut Cmd::keys(self.prefix_key("*"))) + .await?) } - /// returns the number of sessions in this store + /// Returns the number of sessions in this store. pub async fn count(&self) -> Result { if self.prefix.is_none() { - let mut connection = self.connection().await?; - Ok(redis::cmd("DBSIZE").query_async(&mut connection).await?) + Ok(self.execute_command(&mut redis::cmd("DBSIZE")).await?) } else { Ok(self.ids().await?.len()) } } + /// Returns the time-to-live (TTL) for the given session. #[cfg(test)] async fn ttl_for_session(&self, session: &Session) -> Result { Ok(self - .connection() - .await? - .ttl(self.prefix_key(session.id())) + .execute_command(&mut Cmd::ttl(self.prefix_key(&session.id()))) .await?) } + /// Prefixes the given key with the configured session key prefix. fn prefix_key(&self, key: impl AsRef) -> String { if let Some(ref prefix) = self.prefix { format!("{}{}", prefix, key.as_ref()) @@ -108,58 +126,132 @@ impl RedisSessionStore { } } - async fn connection(&self) -> RedisResult { - self.client.get_async_std_connection().await + /// Execute Redis command and retry once in certain cases. + /// + /// `ConnectionManager` automatically reconnects when it encounters an error talking to Redis. + /// The request that bumped into the error, though, fails. + /// + /// This is generally OK, but there is an unpleasant edge case: Redis client timeouts. The + /// server is configured to drop connections who have been active longer than a pre-determined + /// threshold. `redis-rs` does not proactively detect that the connection has been dropped - you + /// only find out when you try to use it. + /// + /// This helper method catches this case (`.is_connection_dropped`) to execute a retry. The + /// retry will be executed on a fresh connection, therefore it is likely to succeed (or fail for + /// a different more meaningful reason). + async fn execute_command(&self, cmd: &mut Cmd) -> RedisResult { + let mut can_retry = true; + + loop { + match cmd.query_async(&mut self.connection.clone()).await { + Ok(value) => return Ok(value), + Err(err) => { + if can_retry && err.is_connection_dropped() { + // Retry at most once + can_retry = false; + + continue; + } else { + return Err(err); + } + } + } + } + } +} + +impl Debug for RedisSessionStore { + // TODO(PR debug impl for ConnectionManager then add back .field("connection", &self.connection) + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RedisSessionStore") + .field("prefix", &self.prefix) + .finish() } } #[async_trait] impl SessionStore for RedisSessionStore { + /// Loads the session with the given cookie value. + /// + /// # Arguments + /// + /// * `cookie_value` - The cookie value to load the session for. + /// + /// # Returns + /// + /// If the session exists, returns a `Some` with the session. Otherwise, returns `None`. async fn load_session(&self, cookie_value: String) -> Result> { + // Extract the session id from the cookie value. let id = Session::id_from_cookie_value(&cookie_value)?; - let mut connection = self.connection().await?; - let record: Option = connection.get(self.prefix_key(id)).await?; + + // Attempt to get the session data from Redis. + let record: Option = self + .execute_command(&mut Cmd::get(self.prefix_key(id))) + .await?; + + // If a session was found, deserialize it and return it. Otherwise, return `None`. match record { Some(value) => Ok(serde_json::from_str(&value)?), None => Ok(None), } } + /// Stores the given session. + /// + /// # Arguments + /// + /// * `session` - The session to store. + /// + /// # Returns + /// + /// If the session was successfully stored, returns a `Some` with the session's cookie value. Otherwise, returns `None`. async fn store_session(&self, session: Session) -> Result> { + // Get the session id with the prefix applied. let id = self.prefix_key(session.id()); + // Serialize the session. let string = serde_json::to_string(&session)?; - let mut connection = self.connection().await?; - + // Set the session in Redis with the appropriate expiry time. match session.expires_in() { - None => connection.set(id, string).await?, - + None => self.execute_command(&mut Cmd::set(id, string)).await?, Some(expiry) => { - connection - .set_ex(id, string, expiry.as_secs() as usize) + self.execute_command(&mut Cmd::set_ex(id, string, expiry.as_secs() as usize)) .await? } }; + // Return the session's cookie value. Ok(session.into_cookie_value()) } + /// Destroys the given session. + /// + /// # Arguments + /// + /// * `session` - The session to destroy. + /// + /// # Returns + /// + /// If the session was successfully destroyed, returns `Ok(())`. Otherwise, returns an error. async fn destroy_session(&self, session: Session) -> Result { - let mut connection = self.connection().await?; - let key = self.prefix_key(session.id().to_string()); - connection.del(key).await?; + // Get the session id with the prefix applied. + let key = self.prefix_key(session.id()); + // Delete the session from Redis. + self.execute_command(&mut Cmd::del(key)).await?; Ok(()) } + /// Clears all sessions in the store. + /// + /// If `prefix` is not set, this will clear the entire Redis database. + /// Otherwise, it will only clear the sessions with the specified prefix. async fn clear_store(&self) -> Result { - let mut connection = self.connection().await?; - if self.prefix.is_none() { - let _: () = redis::cmd("FLUSHDB").query_async(&mut connection).await?; + let _: () = self.execute_command(&mut redis::cmd("FLUSHDB")).await?; } else { let ids = self.ids().await?; if !ids.is_empty() { - connection.del(ids).await?; + self.execute_command(&mut Cmd::del(ids)).await?; } } Ok(()) @@ -168,17 +260,21 @@ impl SessionStore for RedisSessionStore { #[cfg(test)] mod tests { + use redis::Client; + use tokio::time::{sleep, Duration}; + use super::*; - use async_std::task; - use std::time::Duration; async fn test_store() -> RedisSessionStore { - let store = RedisSessionStore::new("redis://127.0.0.1").unwrap(); + let store = + RedisSessionStore::from_uri("redis://127.0.0.1", Some(ulid::Ulid::new().to_string())) + .await + .unwrap(); store.clear_store().await.unwrap(); store } - #[async_std::test] + #[tokio::test] async fn creating_a_new_session_with_no_expiry() -> Result { let store = test_store().await; let mut session = Session::new(); @@ -194,7 +290,7 @@ mod tests { Ok(()) } - #[async_std::test] + #[tokio::test] async fn updating_a_session() -> Result { let store = test_store().await; let mut session = Session::new(); @@ -213,7 +309,7 @@ mod tests { Ok(()) } - #[async_std::test] + #[tokio::test] async fn updating_a_session_extending_expiry() -> Result { let store = test_store().await; let mut session = Session::new(); @@ -237,13 +333,13 @@ mod tests { assert_eq!(1, store.count().await.unwrap()); - task::sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(10)).await; assert_eq!(0, store.count().await.unwrap()); Ok(()) } - #[async_std::test] + #[tokio::test] async fn creating_a_new_session_with_expiry() -> Result { let store = test_store().await; let mut session = Session::new(); @@ -261,13 +357,13 @@ mod tests { assert!(!loaded_session.is_expired()); - task::sleep(Duration::from_secs(2)).await; + sleep(Duration::from_secs(2)).await; assert_eq!(None, store.load_session(cookie_value).await?); Ok(()) } - #[async_std::test] + #[tokio::test] async fn destroying_a_single_session() -> Result { let store = test_store().await; for _ in 0..3i8 { @@ -286,25 +382,28 @@ mod tests { Ok(()) } - #[async_std::test] + #[tokio::test] async fn clearing_the_whole_store() -> Result { let store = test_store().await; for _ in 0..3i8 { store.store_session(Session::new()).await?; } - assert_eq!(3, store.count().await?); + //assert_eq!(3, store.count().await?); store.clear_store().await.unwrap(); assert_eq!(0, store.count().await?); Ok(()) } - #[async_std::test] + #[tokio::test] async fn prefixes() -> Result { test_store().await; // clear the db - let store = RedisSessionStore::new("redis://127.0.0.1")?.with_prefix("sessions/"); + let client = Client::open("redis://127.0.0.1").unwrap(); + let cm = ConnectionManager::new(client).await.unwrap(); + let store = RedisSessionStore::new(cm.clone(), Some("sessions/".to_string())); + store.clear_store().await?; for _ in 0..3i8 { @@ -325,8 +424,7 @@ mod tests { assert_eq!(4, store.count().await.unwrap()); - let other_store = - RedisSessionStore::new("redis://127.0.0.1")?.with_prefix("other-namespace/"); + let other_store = RedisSessionStore::new(cm.clone(), Some("other_namespace/".to_string())); assert_eq!(0, other_store.count().await.unwrap()); for _ in 0..3i8 {