Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
538 changes: 538 additions & 0 deletions libs/Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions libs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
members = [
"./pavex*",
"generate_from_path",
"pavex_cli_deps",
"pavexc_rustdoc_types",
"persist_if_changed",
"px_workspace_hack",
]
Expand Down Expand Up @@ -104,6 +102,7 @@ serde_stacker = "0.1"
sha2 = "0.10.8"
similar = "2.6.0"
smallvec = "1"
sqlx = { version = "0.8" }
socket2 = "0.5.7"
supports-color = "3.0.1"
supports-hyperlinks = "3.0.0"
Expand Down
11 changes: 4 additions & 7 deletions libs/deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@ no-default-features = false

[advisories]
ignore = [
# `yaml` crate is unmaintained, but we depend on it transitively via `config`,
# so no easy way to remove it. Since it isn't vulnerable, we'll ignore the advisory
# for now.
"RUSTSEC-2024-0320",
# `proc-macro-error` is unmaintained, but we depend on it transitively via `vergen-lib`.
# There is no runtime risk, so we'll ignore the advisory for now.
"RUSTSEC-2024-0370",
# `instant` crate is unmaintained, but we depend on it transitively,
# so no easy way to remove it until all intermediate crates get rid of it.
# Since it isn't vulnerable, we'll ignore the advisory for now.
"RUSTSEC-2024-0384",
]

[licenses]
Expand Down
2 changes: 1 addition & 1 deletion libs/pavex_cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub enum Command {
/// Optional.
/// If provided, Pavex will serialize diagnostic information about
/// the application to the specified path.
#[clap(long, value_parser)]
#[clap(long, env = "PAVEX_DIAGNOSTICS", value_parser)]
diagnostics: Option<PathBuf>,
#[clap(long)]
/// Verify that the generated server SDK is up-to-date.
Expand Down
16 changes: 12 additions & 4 deletions libs/pavex_session/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
///
/// # Format stability
///
/// From an API perspective, a session id is an opaque sequence of bytes.
/// Do **not** depend on the specifics of the underlying representation.
/// It may change between versions and those changes will not be considered
/// breaking changes.
/// The session ID is guaranteed to be a valid UUID.
/// The format of the UUID is not guaranteed to be stable across different versions of this library.
///
/// It is recommended to treat the session ID as an opaque value in your application.
/// Knowing the format is primarily useful when implementing custom session storage backends, as
/// it allows you to leverage optimizations in your data store that are specific to the UUID format
/// (e.g. a dedicated data type, such as `UUID` in PostgreSQL).
pub struct SessionId(uuid::Uuid);

impl SessionId {
Expand All @@ -18,4 +21,9 @@ impl SessionId {
pub fn random() -> Self {
Self(uuid::Uuid::new_v4())
}

/// Returns the inner `uuid::Uuid` value.
pub fn inner(&self) -> uuid::Uuid {
self.0
}
}
12 changes: 9 additions & 3 deletions libs/pavex_session/src/session_.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::state::errors::{ServerGetError, ServerSetError, SyncError, ValueDeserializationError};
use anyhow::Context;
use errors::{FinalizeError, ValueSerializationError};
use pavex::cookie::{RemovalCookie, ResponseCookie};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -277,7 +278,7 @@ impl<'session> ClientSessionStateMut<'session> {
pub fn set<T: Serialize>(
&mut self,
key: String,
value: T
value: T,
) -> Result<Option<Value>, serde_json::Error> {
let value = serde_json::to_value(value)?;
Ok(self.set_value(key, value))
Expand Down Expand Up @@ -309,7 +310,10 @@ impl<'session> ClientSessionStateMut<'session> {
///
/// If the key exists, the removed value is returned.
/// If the removed value cannot be serialized, an error is returned.
pub fn remove<T: DeserializeOwned>(&mut self, key: &str) -> Result<Option<T>, serde_json::Error> {
pub fn remove<T: DeserializeOwned>(
&mut self,
key: &str,
) -> Result<Option<T>, serde_json::Error> {
self.remove_value(key)
.map(|value| serde_json::from_value(value))
.transpose()
Expand Down Expand Up @@ -447,9 +451,11 @@ impl<'session, 'store> ServerSessionState<'session, 'store> {
/// If the key exists, the removed value is returned.
/// If the removed value cannot be serialized, an error is returned.
pub async fn remove<T: DeserializeOwned>(&mut self, key: &str) -> Result<Option<T>, LoadError> {
self.remove_value(key).await?
self.remove_value(key)
.await?
.map(serde_json::from_value)
.transpose()
.context("Failed to deserialize the removed value.")
.map_err(LoadError::DeserializationError)
}

Expand Down
46 changes: 43 additions & 3 deletions libs/pavex_session/src/store_.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::SessionId;
use errors::{ChangeIdError, CreateError, DeleteError, LoadError, UpdateError, UpdateTtlError};
use errors::{
ChangeIdError, CreateError, DeleteError, DeleteExpiredError, LoadError, UpdateError,
UpdateTtlError,
};
use serde_json::Value;
use std::{borrow::Cow, collections::HashMap};
use std::{borrow::Cow, collections::HashMap, num::NonZeroUsize};

/// Where server-side session records are stored.
///
Expand Down Expand Up @@ -78,6 +81,14 @@ impl SessionStore {
) -> Result<(), ChangeIdError> {
self.0.change_id(old_id, new_id).await
}

/// Deletes expired session records from the store.
pub async fn delete_expired(
&self,
batch_size: Option<NonZeroUsize>,
) -> Result<usize, DeleteExpiredError> {
self.0.delete_expired(batch_size).await
}
}

#[async_trait::async_trait(?Send)]
Expand Down Expand Up @@ -118,6 +129,30 @@ pub trait SessionStorageBackend: std::fmt::Debug + Send + Sync {
///
/// The server-side state is left unchanged.
async fn change_id(&self, old_id: &SessionId, new_id: &SessionId) -> Result<(), ChangeIdError>;

/// Deletes expired session records from the store.
///
/// If `batch_size` is provided, the query will delete at most `batch_size` expired sessions.
/// In either case, if successful, the method returns the number of expired sessions that
/// have been deleted.
///
/// # When should you delete in batches?
///
/// If there are a lot of expired sessions in the database, deleting them all at once can
/// cause performance issues. By deleting in batches, you can limit the number of sessions
/// deleted in a single query, reducing the impact.
///
/// # Do I need to call this method?
///
/// It depends on the storage backend you are using. Some backends (e.g. Redis) have
/// built-in support for expiring keys, so you may not need to call this method at all.
///
/// If you're adding support for a new backend that has built-in support for expiring keys,
/// you can simply return `Ok(0)` from this method.
async fn delete_expired(
&self,
batch_size: Option<NonZeroUsize>,
) -> Result<usize, DeleteExpiredError>;
}

/// A server-side session record that's going to be stored in the
Expand Down Expand Up @@ -201,7 +236,7 @@ pub mod errors {
pub enum LoadError {
#[error("Failed to deserialize the session state.")]
/// Failed to deserialize the session state.
DeserializationError(#[from] serde_json::Error),
DeserializationError(#[source] anyhow::Error),
/// Something else went wrong when loading the session record.
#[error("Something went wrong when loading the session record.")]
Other(#[source] anyhow::Error),
Expand Down Expand Up @@ -234,6 +269,11 @@ pub mod errors {
Other(#[source] anyhow::Error),
}

/// The error returned by [`SessionStorageBackend::delete_expired`][super::SessionStorageBackend::delete_expired].
#[derive(Debug, thiserror::Error)]
#[error("Something went wrong when deleting expired sessions")]
pub struct DeleteExpiredError(#[from] anyhow::Error);

#[derive(Debug, thiserror::Error)]
#[error("There is no session with the given id")]
/// There is no session with the given ID.
Expand Down
42 changes: 36 additions & 6 deletions libs/pavex_session_memory_store/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! An in-memory session store for `pavex_session`, geared towards testing and local development.
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc, time::Duration};
use time::OffsetDateTime;
use tokio::sync::{Mutex, MutexGuard};

use pavex_session::{
store::{
errors::{
ChangeIdError, CreateError, DeleteError, DuplicateIdError, LoadError, UnknownIdError,
UpdateError, UpdateTtlError,
ChangeIdError, CreateError, DeleteError, DeleteExpiredError, DuplicateIdError,
LoadError, UnknownIdError, UpdateError, UpdateTtlError,
},
SessionRecord, SessionRecordRef, SessionStorageBackend,
},
Expand All @@ -22,7 +22,11 @@ use pavex_session::{
/// This store won't persist data between server restarts.
/// It also won't synchronize data between multiple server instances.
/// It is primarily intended for testing and local development.
pub struct SessionMemoryStore(Arc<Mutex<HashMap<SessionId, StoreRecord>>>);
pub struct InMemorySessionStore(Arc<Mutex<HashMap<SessionId, StoreRecord>>>);

#[doc(hidden)]
// Here for backwards compatibility.
pub type SessionStoreMemory = InMemorySessionStore;

#[derive(Debug)]
struct StoreRecord {
Expand All @@ -35,7 +39,7 @@ impl StoreRecord {
}
}

impl SessionMemoryStore {
impl InMemorySessionStore {
/// Creates a new (empty) in-memory session store.
pub fn new() -> Self {
Self(Arc::new(Mutex::new(HashMap::new())))
Expand Down Expand Up @@ -72,7 +76,7 @@ impl SessionMemoryStore {
}

#[async_trait::async_trait(?Send)]
impl SessionStorageBackend for SessionMemoryStore {
impl SessionStorageBackend for InMemorySessionStore {
/// Creates a new session record in the store using the provided ID.
#[tracing::instrument(name = "Create server-side session record", level = tracing::Level::TRACE, skip_all)]
async fn create(
Expand Down Expand Up @@ -174,4 +178,30 @@ impl SessionStorageBackend for SessionMemoryStore {
guard.insert(*new_id, record);
Ok(())
}

/// Delete all expired records from the store.
#[tracing::instrument(name = "Delete expired records", level = tracing::Level::TRACE, skip_all)]
async fn delete_expired(
&self,
batch_size: Option<NonZeroUsize>,
) -> Result<usize, DeleteExpiredError> {
let mut guard = self.0.lock().await;
let now = time::OffsetDateTime::now_utc();
let mut stale_ids = Vec::new();
for (id, record) in guard.iter() {
if record.deadline <= now {
stale_ids.push(*id);
if let Some(batch_size) = batch_size {
if stale_ids.len() >= batch_size.get() {
break;
}
}
}
}
let num_deleted = stale_ids.len();
for id in stale_ids {
guard.remove(&id);
}
Ok(num_deleted)
}
}
29 changes: 29 additions & 0 deletions libs/pavex_session_sqlx/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "pavex_session_sqlx"
edition.workspace = true
repository.workspace = true
homepage.workspace = true
license.workspace = true
version.workspace = true

[features]
default = []
postgres = ["sqlx/postgres"]

[package.metadata.docs.rs]
all-features = true

[dependencies]
pavex_session = { version = "0.1.53", path = "../pavex_session" }
time = { workspace = true, features = ["std"] }
serde_json = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tracing = { workspace = true }
anyhow = { workspace = true }
sqlx = { workspace = true, features = ["uuid", "time"] }
px_workspace_hack = { version = "0.1", path = "../px_workspace_hack" }

[dev-dependencies]
pavex_session_sqlx = { path = ".", version = "0.1.53", features = ["postgres"] }
pavex_tracing = { version = "0.1.53", path = "../pavex_tracing" }
10 changes: 10 additions & 0 deletions libs/pavex_session_sqlx/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//! Storage backends for `pavex_session`, implemented using the `sqlx` crate.
//!
//! There is a dedicated feature flag for each supported database backend:
//!
//! - `postgres`: Support for PostgreSQL.

#[cfg(feature = "postgres")]
mod postgres;
#[cfg(feature = "postgres")]
pub use postgres::PostgresSessionStore;
Loading