Skip to content

Commit b498941

Browse files
committed
Initial sketch of a statement pooling cache
Connection isn't Send yet, so this isn't actually useable
1 parent 824e2d1 commit b498941

File tree

2 files changed

+172
-1
lines changed

2 files changed

+172
-1
lines changed

src/lib.rs

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,24 @@
11
#![doc(html_root_url="https://sfackler.github.io/doc")]
2+
#![feature(if_let, unsafe_destructor)]
23
extern crate r2d2;
34
extern crate postgres;
45

6+
use std::cell::RefCell;
7+
use std::collections::LruCache;
8+
use std::default::Default;
59
use std::fmt;
6-
use postgres::{PostgresConnection, PostgresConnectParams, IntoConnectParams, SslMode};
10+
use std::mem;
11+
use std::rc::Rc;
12+
use postgres::{PostgresConnection,
13+
PostgresConnectParams,
14+
IntoConnectParams,
15+
SslMode,
16+
PostgresResult,
17+
PostgresStatement,
18+
PostgresCopyInStatement,
19+
PostgresTransaction};
720
use postgres::error::{PostgresConnectError, PostgresError};
21+
use postgres::types::ToSql;
822

923
pub enum Error {
1024
ConnectError(PostgresConnectError),
@@ -52,3 +66,145 @@ impl r2d2::PoolManager<PostgresConnection, Error> for PostgresPoolManager {
5266
conn.is_desynchronized()
5367
}
5468
}
69+
70+
pub struct Config {
71+
pub statement_pool_size: uint,
72+
}
73+
74+
impl Default for Config {
75+
fn default() -> Config {
76+
Config {
77+
statement_pool_size: 10,
78+
}
79+
}
80+
}
81+
82+
pub struct StatementPoolingManager {
83+
manager: PostgresPoolManager,
84+
config: Config,
85+
}
86+
87+
impl StatementPoolingManager {
88+
pub fn new<T>(params: T, ssl_mode: SslMode, config: Config) -> StatementPoolingManager
89+
where T: IntoConnectParams {
90+
StatementPoolingManager {
91+
manager: PostgresPoolManager::new(params, ssl_mode),
92+
config: config
93+
}
94+
}
95+
}
96+
97+
impl r2d2::PoolManager<Connection, Error> for StatementPoolingManager {
98+
fn connect(&self) -> Result<Connection, Error> {
99+
Ok(Connection {
100+
conn: box try!(self.manager.connect()),
101+
stmts: RefCell::new(LruCache::new(self.config.statement_pool_size))
102+
})
103+
}
104+
105+
fn is_valid(&self, conn: &mut Connection) -> Result<(), Error> {
106+
self.manager.is_valid(&mut *conn.conn)
107+
}
108+
109+
fn has_broken(&self, conn: &mut Connection) -> bool {
110+
self.manager.has_broken(&mut *conn.conn)
111+
}
112+
}
113+
114+
pub trait GenericConnection {
115+
/// Like `PostgresConnection::prepare`.
116+
fn prepare<'a>(&'a self, query: &str) -> PostgresResult<Rc<PostgresStatement<'a>>>;
117+
118+
/// Like `PostgresConnection::execute`.
119+
fn execute(&self, query: &str, params: &[&ToSql]) -> PostgresResult<uint> {
120+
self.prepare(query).and_then(|s| s.execute(params))
121+
}
122+
123+
/// Like `PostgresConnection::prepare_copy_in`.
124+
fn prepare_copy_in<'a>(&'a self, table: &str, columns: &[&str])
125+
-> PostgresResult<PostgresCopyInStatement<'a>>;
126+
127+
/// Like `PostgresConnection::transaction`.
128+
fn transaction<'a>(&'a self) -> PostgresResult<Transaction<'a>>;
129+
130+
/// Like `PostgresConnection::batch_execute`.
131+
fn batch_execute(&self, query: &str) -> PostgresResult<()>;
132+
}
133+
134+
pub struct Connection {
135+
conn: Box<PostgresConnection>,
136+
stmts: RefCell<LruCache<String, Rc<PostgresStatement<'static>>>>,
137+
}
138+
139+
#[unsafe_destructor]
140+
impl Drop for Connection {
141+
// Just make sure that all the statements drop before the connection
142+
fn drop(&mut self) {
143+
self.stmts.borrow_mut().change_capacity(0);
144+
}
145+
}
146+
147+
impl GenericConnection for Connection {
148+
fn prepare<'a>(&'a self, query: &str) -> PostgresResult<Rc<PostgresStatement<'a>>> {
149+
let query = query.into_string();
150+
let mut stmts = self.stmts.borrow_mut();
151+
152+
if let Some(stmt) = stmts.get(&query) {
153+
return Ok(unsafe { mem::transmute(stmt.clone()) });
154+
}
155+
156+
let stmt = Rc::new(try!(self.conn.prepare(query[])));
157+
stmts.put(query, unsafe { mem::transmute(stmt.clone()) });
158+
Ok(stmt)
159+
}
160+
161+
fn prepare_copy_in<'a>(&'a self, table: &str, columns: &[&str])
162+
-> PostgresResult<PostgresCopyInStatement<'a>> {
163+
self.conn.prepare_copy_in(table, columns)
164+
}
165+
166+
fn transaction<'a>(&'a self) -> PostgresResult<Transaction<'a>> {
167+
Ok(Transaction {
168+
conn: self,
169+
trans: try!(self.conn.transaction())
170+
})
171+
}
172+
173+
fn batch_execute(&self, query: &str) -> PostgresResult<()> {
174+
self.conn.batch_execute(query)
175+
}
176+
}
177+
178+
pub struct Transaction<'a> {
179+
conn: &'a Connection,
180+
trans: PostgresTransaction<'a>
181+
}
182+
183+
impl<'a> GenericConnection for Transaction<'a> {
184+
fn prepare<'a>(&'a self, query: &str) -> PostgresResult<Rc<PostgresStatement<'a>>> {
185+
let query = query.into_string();
186+
let mut stmts = self.conn.stmts.borrow_mut();
187+
188+
if let Some(stmt) = stmts.get(&query) {
189+
return Ok(unsafe { mem::transmute(stmt.clone()) });
190+
}
191+
192+
Ok(Rc::new(try!(self.trans.prepare(query[]))))
193+
}
194+
195+
fn prepare_copy_in<'a>(&'a self, table: &str, columns: &[&str])
196+
-> PostgresResult<PostgresCopyInStatement<'a>> {
197+
self.trans.prepare_copy_in(table, columns)
198+
}
199+
200+
fn transaction<'a>(&'a self) -> PostgresResult<Transaction<'a>> {
201+
Ok(Transaction {
202+
conn: self.conn,
203+
trans: try!(self.trans.transaction())
204+
})
205+
}
206+
207+
fn batch_execute(&self, query: &str) -> PostgresResult<()> {
208+
self.trans.batch_execute(query)
209+
}
210+
}

tests/test.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::comm;
88

99
use postgres::NoSsl;
1010
use r2d2_postgres::PostgresPoolManager;
11+
use r2d2_postgres::GenericConnection;
1112

1213
#[test]
1314
fn test_basic() {
@@ -57,3 +58,17 @@ fn test_is_valid() {
5758

5859
pool.get().unwrap();
5960
}
61+
62+
/*
63+
#[test]
64+
fn test_statement_pool() {
65+
let manager = r2d2_postgres::StatementPoolingManager::new(
66+
"postgres://postgres@localhost", NoSsl, Default::default());
67+
let pool = r2d2::Pool::new(Default::default(), manager, r2d2::NoopErrorHandler).unwrap();
68+
69+
let conn = pool.get().unwrap();
70+
let stmt = conn.prepare("SELECT 1::INT").unwrap();
71+
let stmt2 = conn.prepare("SELECT 1::INT").unwrap();
72+
assert_eq!(*stmt as *mut _, *stmt2 as *mut _);
73+
}
74+
*/

0 commit comments

Comments
 (0)