Add support for Postgres store with tokio-postgres along with bb8-postgres. Implementation attached. #120
Replies: 3 comments
-
|
Thanks for this! We very much want folks to write their own |
Beta Was this translation helpful? Give feedback.
-
|
Hi again, if you publish this as a crate please let us know and we'll add it to the list of available session stores. |
Beta Was this translation helpful? Give feedback.
-
|
Maybe not the best practice, but here is what I do with native use crate::conf::SessionDbConfig;
use async_trait::async_trait;
use std::error::Error;
use std::sync::Arc;
use tokio_postgres::{Error as PgError, NoTls};
use tower_sessions_core::{
SessionStore,
session::{Id, Record},
session_store::{self, ExpiredDeletion},
};
const DEFAULT_CLEANUP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(3600);
const CLEANUP_SQL: &str = "DELETE FROM sessions WHERE expiry_date <= (now() at time zone 'utc')";
const INIT_DB_SQL: &str = "CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
expiry_date TIMESTAMPTZ NOT NULL,
data BYTEA NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sessions_expiry ON sessions (expiry_date);";
// Action SQLs
const CREATE_SESSION_SQL: &str = "INSERT INTO sessions (id, expiry_date, data) VALUES ($1, $2, $3)";
const LOAD_SESSION_SQL: &str = "SELECT id, expiry_date, data FROM sessions WHERE id = $1 and expiry_date > (now() at time zone 'utc')";
const DELETE_SESSION_SQL: &str = "DELETE FROM sessions WHERE id = $1";
const UPDATE_SESSION_SQL: &str = "
INSERT INTO sessions (id, expiry_date, data)
VALUES ($1, $2, $3)
ON CONFLICT (id)
DO UPDATE SET expiry_date = EXCLUDED.expiry_date, data = EXCLUDED.data
";
#[derive(Clone, Debug)]
pub struct PgSessionStore {
client: Arc<tokio_postgres::Client>,
}
impl PgSessionStore {
/// Creates a new PostgresStore from a connected Client.
/// Will initialize the database by creating the sessions table if it does not exist.
/// Will also start a new background task to periodically clean up expired sessions.
pub async fn new(conf: &SessionDbConfig) -> Result<Self, Box<dyn Error>> {
let (client, connection) = tokio_postgres::connect(&conf.conn_conf, NoTls).await?;
// Spawn the connection handler (important for tokio-postgres)
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
let store = Self {
client: Arc::new(client),
};
// Spawn the auto-cleaner
let store_clone = store.clone();
tokio::spawn(async move {
// Run cleanup every 60 minutes
store_clone
.continuously_cleanup(DEFAULT_CLEANUP_INTERVAL)
.await;
});
store.init_database().await?;
Ok(store)
}
/// Initializes the database by creating the sessions table if it does not exist.
pub async fn init_database(&self) -> Result<(), PgError> {
self.client.batch_execute(INIT_DB_SQL).await
}
/// Periodically cleans up expired sessions.
pub async fn continuously_cleanup(&self, interval: std::time::Duration) {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
if let Err(e) = self.delete_expired().await {
eprintln!("Error during session cleanup: {}", e);
}
}
}
}
#[async_trait]
impl ExpiredDeletion for PgSessionStore {
async fn delete_expired(&self) -> session_store::Result<()> {
let client = self.client.as_ref();
client
.execute(CLEANUP_SQL, &[])
.await
.map(|_| ())
.map_err(|e| session_store::Error::Backend(e.to_string()))
}
}
#[async_trait]
impl SessionStore for PgSessionStore {
async fn create(&self, record: &mut Record) -> session_store::Result<()> {
let client = self.client.as_ref();
// Handling collisions similarly to the MemoryStore example:
// If the ID already exists (UNIQUE constraint violation), regenerate and retry.
loop {
let id_str = record.id.to_string();
let res = client
.execute(
CREATE_SESSION_SQL,
&[
&id_str,
&record.expiry_date.unix_timestamp(),
&rmp_serde::to_vec(&record).unwrap(),
],
)
.await;
match res {
Ok(_) => return Ok(()),
Err(e) => {
// Check for unique violation (Postgres SQLState 23505)
if e.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
record.id = Id::default();
continue;
}
return Err(session_store::Error::Backend(e.to_string()));
}
}
}
}
async fn save(&self, record: &Record) -> session_store::Result<()> {
let client = self.client.as_ref();
// UPSERT logic: Update the record if it exists, insert if it doesn't.
client
.execute(
UPDATE_SESSION_SQL,
&[
&record.id.to_string(),
&record.expiry_date.unix_timestamp(),
&rmp_serde::to_vec(&record).unwrap(),
],
)
.await
.map_err(|e| session_store::Error::Backend(e.to_string()))?;
Ok(())
}
async fn load(&self, session_id: &Id) -> session_store::Result<Option<Record>> {
let client = self.client.as_ref();
let row = client
.query_opt(LOAD_SESSION_SQL, &[&session_id.to_string()])
.await
.map_err(|e| session_store::Error::Backend(e.to_string()))?;
if let Some(row) = row {
let data: Vec<u8> = row.get(2);
// Deserialize the data
let record: Record = rmp_serde::from_slice(&data)
.map_err(|e| session_store::Error::Backend(e.to_string()))?;
Ok(Some(record))
} else {
Ok(None)
}
}
async fn delete(&self, session_id: &Id) -> session_store::Result<()> {
let client = self.client.as_ref();
client
.execute(DELETE_SESSION_SQL, &[&session_id.to_string()])
.await
.map_err(|e| session_store::Error::Backend(e.to_string()))?;
Ok(())
}
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I have implemented
PostgresStorewithtokio-postgresandbb8-postgreslibraries. I hope you can consider this for future implementation. Given below the code:Beta Was this translation helpful? Give feedback.
All reactions