1
1
use async_session:: { async_trait, chrono:: Utc , log, serde_json, Result , Session , SessionStore } ;
2
+ use async_std:: task;
2
3
use sqlx:: prelude:: * ;
3
4
use sqlx:: { pool:: PoolConnection , sqlite:: SqlitePool , SqliteConnection } ;
4
-
5
+ use std:: time:: Duration ;
6
+
7
+ /// sqlx sqlite session store for async-sessions
8
+ ///
9
+ /// ```rust
10
+ /// use async_sqlx_session::SqliteStore;
11
+ /// use async_session::SessionStore;
12
+ /// # fn main() -> async_session::Result { async_std::task::block_on(async {
13
+ /// let store = SqliteStore::new("sqlite:%3Amemory:").await?;
14
+ /// store.migrate().await?;
15
+ /// store.spawn_cleanup_task(std::time::Duration::from_secs(60 * 60));
16
+ ///
17
+ /// let mut session = async_session::Session::new();
18
+ /// session.insert("key".into(), "value".into());
19
+ ///
20
+ /// let cookie_value = store.store_session(session).await.unwrap();
21
+ /// let session = store.load_session(cookie_value).await.unwrap();
22
+ /// assert_eq!(session.get("key"), Some("value".to_owned()));
23
+ /// # Ok(()) }) }
24
+ ///
5
25
#[ derive( Clone , Debug ) ]
6
26
pub struct SqliteStore {
7
27
client : SqlitePool ,
@@ -20,15 +40,24 @@ impl SqliteStore {
20
40
Ok ( Self :: from_client ( SqlitePool :: new ( database_url) . await ?) )
21
41
}
22
42
23
- pub fn with_table_name ( mut self , table_name : String ) -> Self {
24
- if table_name. chars ( ) . any ( |c| !c. is_ascii_alphanumeric ( ) ) {
43
+ pub async fn new_with_table_name ( database_url : & str , table_name : & str ) -> sqlx:: Result < Self > {
44
+ Ok ( Self :: from_client ( SqlitePool :: new ( database_url) . await ?) . with_table_name ( table_name) )
45
+ }
46
+
47
+ pub fn with_table_name ( mut self , table_name : impl AsRef < str > ) -> Self {
48
+ let table_name = table_name. as_ref ( ) ;
49
+ if table_name. is_empty ( )
50
+ || !table_name
51
+ . chars ( )
52
+ . all ( |c| c. is_ascii_alphanumeric ( ) || c == '-' || c == '_' )
53
+ {
25
54
panic ! (
26
- "table name must be alphanumeric , but {} was not" ,
55
+ "table name must be [a-zA-Z0-9_-]+ , but {} was not" ,
27
56
table_name
28
57
) ;
29
58
}
30
59
31
- self . table_name = table_name;
60
+ self . table_name = table_name. to_owned ( ) ;
32
61
self
33
62
}
34
63
@@ -40,7 +69,7 @@ impl SqliteStore {
40
69
r#"
41
70
CREATE TABLE IF NOT EXISTS %%TABLE_NAME%% (
42
71
id TEXT PRIMARY KEY NOT NULL,
43
- expires DATETIME NOT NULL,
72
+ expires INTEGER NULL,
44
73
session TEXT NOT NULL
45
74
)
46
75
"# ,
@@ -57,6 +86,33 @@ impl SqliteStore {
57
86
async fn connection ( & self ) -> sqlx:: Result < PoolConnection < SqliteConnection > > {
58
87
self . client . acquire ( ) . await
59
88
}
89
+
90
+ pub fn spawn_cleanup_task ( & self , period : Duration ) -> task:: JoinHandle < ( ) > {
91
+ let store = self . clone ( ) ;
92
+ task:: spawn ( async move {
93
+ loop {
94
+ task:: sleep ( period) . await ;
95
+ if let Err ( error) = store. cleanup ( ) . await {
96
+ log:: error!( "cleanup error: {}" , error) ;
97
+ }
98
+ }
99
+ } )
100
+ }
101
+
102
+ pub async fn cleanup ( & self ) -> sqlx:: Result < ( ) > {
103
+ let mut connection = self . connection ( ) . await ?;
104
+ sqlx:: query ( & self . substitute_table_name (
105
+ r#"
106
+ DELETE FROM %%TABLE_NAME%%
107
+ WHERE expires < ?
108
+ "# ,
109
+ ) )
110
+ . bind ( Utc :: now ( ) . timestamp ( ) )
111
+ . execute ( & mut connection)
112
+ . await ?;
113
+
114
+ Ok ( ( ) )
115
+ }
60
116
}
61
117
62
118
#[ async_trait]
@@ -68,7 +124,7 @@ impl SessionStore for SqliteStore {
68
124
let ( session, ) : ( String , ) = sqlx:: query_as ( & self . substitute_table_name (
69
125
r#"
70
126
SELECT session FROM %%TABLE_NAME%%
71
- WHERE id = ? AND expires > ?
127
+ WHERE id = ? AND ( expires IS NULL || expires > ?)
72
128
"# ,
73
129
) )
74
130
. bind ( & id)
@@ -85,23 +141,27 @@ impl SessionStore for SqliteStore {
85
141
let string = serde_json:: to_string ( & session) . ok ( ) ?;
86
142
let mut connection = self . connection ( ) . await . ok ( ) ?;
87
143
88
- sqlx:: query ( & self . substitute_table_name (
144
+ let result = sqlx:: query ( & self . substitute_table_name (
89
145
r#"
90
146
INSERT INTO %%TABLE_NAME%%
91
- (id, expires, session ) VALUES (?, ?, ?)
147
+ (id, session, expires ) VALUES (?, ?, ?)
92
148
ON CONFLICT(id) DO UPDATE SET
93
149
expires = excluded.expires,
94
150
session = excluded.session
95
151
"# ,
96
152
) )
97
153
. bind ( & id)
98
- . bind ( & session. expiry ( ) . map ( |expiry| expiry. timestamp ( ) ) )
99
154
. bind ( & string)
155
+ . bind ( & session. expiry ( ) . map ( |expiry| expiry. timestamp ( ) ) )
100
156
. execute ( & mut connection)
101
- . await
102
- . unwrap ( ) ;
157
+ . await ;
103
158
104
- session. into_cookie_value ( )
159
+ if let Err ( e) = result {
160
+ dbg ! ( e) ;
161
+ None
162
+ } else {
163
+ session. into_cookie_value ( )
164
+ }
105
165
}
106
166
107
167
async fn destroy_session ( & self , session : Session ) -> Result {
@@ -131,19 +191,4 @@ impl SessionStore for SqliteStore {
131
191
132
192
Ok ( ( ) )
133
193
}
134
-
135
- async fn cleanup ( & self ) -> Result {
136
- let mut connection = self . connection ( ) . await ?;
137
- sqlx:: query ( & self . substitute_table_name (
138
- r#"
139
- DELETE FROM %%TABLE_NAME%%
140
- WHERE expires < ?
141
- "# ,
142
- ) )
143
- . bind ( Utc :: now ( ) . timestamp ( ) )
144
- . execute ( & mut connection)
145
- . await ?;
146
-
147
- Ok ( ( ) )
148
- }
149
194
}
0 commit comments