Skip to content

Commit fb3ab3e

Browse files
committed
add r2d2 connection pool
1 parent fc51cdf commit fb3ab3e

File tree

3 files changed

+235
-0
lines changed

3 files changed

+235
-0
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ cast = { version = "0.3", features = ["std"] }
4848
arrow = { version = "6.5.0", default-features = false, features = ["prettyprint"] }
4949
rust_decimal = "1.14"
5050
strum = { version = "0.23", features = ["derive"] }
51+
r2d2 = "0.8.9"
5152

5253
[dev-dependencies]
5354
doc-comment = "0.3"
@@ -57,6 +58,7 @@ regex = "1.3"
5758
uuid = { version = "0.8", features = ["v4"] }
5859
unicase = "2.6.0"
5960
rand = "0.8.3"
61+
tempdir = "0.3.7"
6062
# criterion = "0.3"
6163

6264
# [[bench]]

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub use crate::row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows};
8282
pub use crate::statement::Statement;
8383
pub use crate::transaction::{DropBehavior, Savepoint, Transaction, TransactionBehavior};
8484
pub use crate::types::ToSql;
85+
pub use crate::r2d2::DuckdbConnectionManager;
8586

8687
#[macro_use]
8788
mod error;
@@ -97,6 +98,7 @@ mod raw_statement;
9798
mod row;
9899
mod statement;
99100
mod transaction;
101+
mod r2d2;
100102

101103
pub mod types;
102104

src/r2d2.rs

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
#![deny(warnings)]
2+
//! # Duckdb-rs support for the `r2d2` connection pool.
3+
//!
4+
//!
5+
//! Integrated with: [r2d2](https://crates.io/crates/r2d2)
6+
//!
7+
//!
8+
//! ## Example
9+
//!
10+
//! ```rust,no_run
11+
//! extern crate r2d2;
12+
//! extern crate duckdb;
13+
//!
14+
//!
15+
//! use std::thread;
16+
//! use duckdb::{DuckdbConnectionManager, params};
17+
//!
18+
//!
19+
//! fn main() {
20+
//! let manager = DuckdbConnectionManager::file("file.db").unwrap();
21+
//! let pool = r2d2::Pool::new(manager).unwrap();
22+
//! pool.get()
23+
//! .unwrap()
24+
//! .execute("CREATE TABLE IF NOT EXISTS foo (bar INTEGER)", params![])
25+
//! .unwrap();
26+
//!
27+
//! (0..10)
28+
//! .map(|i| {
29+
//! let pool = pool.clone();
30+
//! thread::spawn(move || {
31+
//! let conn = pool.get().unwrap();
32+
//! conn.execute("INSERT INTO foo (bar) VALUES (?)", &[&i])
33+
//! .unwrap();
34+
//! })
35+
//! })
36+
//! .collect::<Vec<_>>()
37+
//! .into_iter()
38+
//! .map(thread::JoinHandle::join)
39+
//! .collect::<Result<_, _>>()
40+
//! .unwrap()
41+
//! }
42+
//! ```
43+
use std::{path::Path, sync::{Mutex, Arc}};
44+
use crate::{Result, Connection, Error, Config};
45+
46+
/// An `r2d2::ManageConnection` for `duckdb::Connection`s.
47+
pub struct DuckdbConnectionManager {
48+
connection: Arc<Mutex<Connection>>,
49+
}
50+
51+
impl DuckdbConnectionManager {
52+
53+
/// Creates a new `DuckdbConnectionManager` from file.
54+
pub fn file<P: AsRef<Path>>(path: P) -> Result<Self> {
55+
Ok(Self {
56+
connection: Arc::new(Mutex::new(Connection::open(path)?)),
57+
})
58+
}
59+
/// Creates a new `DuckdbConnectionManager` from file with flags.
60+
pub fn file_with_flags<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
61+
Ok(Self {
62+
connection: Arc::new(Mutex::new(Connection::open_with_flags(path, config)?)),
63+
})
64+
}
65+
66+
/// Creates a new `DuckdbConnectionManager` from memory.
67+
pub fn memory() -> Result<Self> {
68+
Ok(Self {
69+
connection: Arc::new(Mutex::new(Connection::open_in_memory()?)),
70+
})
71+
}
72+
73+
/// Creates a new `DuckdbConnectionManager` from memory with flags.
74+
pub fn memory_with_flags(config: Config) -> Result<Self> {
75+
Ok(Self {
76+
connection: Arc::new(Mutex::new(Connection::open_in_memory_with_flags(config)?)),
77+
})
78+
}
79+
}
80+
81+
impl r2d2::ManageConnection for DuckdbConnectionManager {
82+
type Connection = Connection;
83+
type Error = Error;
84+
85+
fn connect(&self) -> Result<Self::Connection, Self::Error> {
86+
let conn = self.connection.lock().unwrap();
87+
Ok(conn.clone())
88+
}
89+
90+
fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
91+
conn.execute_batch("").map_err(Into::into)
92+
}
93+
94+
fn has_broken(&self, _: &mut Self::Connection) -> bool {
95+
false
96+
}
97+
}
98+
99+
100+
#[cfg(test)]
101+
mod test {
102+
extern crate r2d2;
103+
use super::*;
104+
use crate::Result;
105+
use crate::types::Value;
106+
use std::{sync::mpsc, thread};
107+
108+
use tempdir::TempDir;
109+
110+
111+
#[test]
112+
fn test_basic() -> Result<()>{
113+
let manager = DuckdbConnectionManager::file("file.db")?;
114+
let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap();
115+
116+
let (s1, r1) = mpsc::channel();
117+
let (s2, r2) = mpsc::channel();
118+
119+
let pool1 = pool.clone();
120+
let t1 = thread::spawn(move || {
121+
let conn = pool1.get().unwrap();
122+
s1.send(()).unwrap();
123+
r2.recv().unwrap();
124+
drop(conn);
125+
});
126+
127+
let pool2 = pool.clone();
128+
let t2 = thread::spawn(move || {
129+
let conn = pool2.get().unwrap();
130+
s2.send(()).unwrap();
131+
r1.recv().unwrap();
132+
drop(conn);
133+
});
134+
135+
t1.join().unwrap();
136+
t2.join().unwrap();
137+
138+
pool.get().unwrap();
139+
Ok(())
140+
}
141+
142+
#[test]
143+
fn test_file() -> Result<()>{
144+
let manager = DuckdbConnectionManager::file("file.db")?;
145+
let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap();
146+
147+
let (s1, r1) = mpsc::channel();
148+
let (s2, r2) = mpsc::channel();
149+
150+
let pool1 = pool.clone();
151+
let t1 = thread::spawn(move || {
152+
let conn = pool1.get().unwrap();
153+
let conn1: &Connection = &*conn;
154+
s1.send(()).unwrap();
155+
r2.recv().unwrap();
156+
drop(conn1);
157+
});
158+
159+
let pool2 = pool.clone();
160+
let t2 = thread::spawn(move || {
161+
let conn = pool2.get().unwrap();
162+
s2.send(()).unwrap();
163+
r1.recv().unwrap();
164+
drop(conn);
165+
});
166+
167+
t1.join().unwrap();
168+
t2.join().unwrap();
169+
170+
pool.get().unwrap();
171+
Ok(())
172+
}
173+
174+
#[test]
175+
fn test_is_valid() -> Result<()>{
176+
let manager = DuckdbConnectionManager::file("file.db")?;
177+
let pool = r2d2::Pool::builder()
178+
.max_size(1)
179+
.test_on_check_out(true)
180+
.build(manager)
181+
.unwrap();
182+
183+
pool.get().unwrap();
184+
Ok(())
185+
}
186+
187+
#[test]
188+
fn test_error_handling() -> Result<()> {
189+
//! We specify a directory as a database. This is bound to fail.
190+
let dir = TempDir::new("r2d2-duckdb").expect("Could not create temporary directory");
191+
let dirpath = dir.path().to_str().unwrap();
192+
assert!(DuckdbConnectionManager::file(dirpath).is_err());
193+
Ok(())
194+
}
195+
196+
#[test]
197+
fn test_with_flags() -> Result<()> {
198+
let config = Config::default()
199+
.access_mode(crate::AccessMode::ReadWrite)?
200+
.default_null_order(crate::DefaultNullOrder::NullsLast)?
201+
.default_order(crate::DefaultOrder::Desc)?
202+
.enable_external_access(true)?
203+
.enable_object_cache(false)?
204+
.max_memory("2GB")?
205+
.threads(4)?;
206+
let manager = DuckdbConnectionManager::file_with_flags("file.db", config)?;
207+
let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap();
208+
let conn = pool.get().unwrap();
209+
conn.execute_batch("CREATE TABLE foo(x Text)")?;
210+
211+
let mut stmt = conn.prepare("INSERT INTO foo(x) VALUES (?)")?;
212+
stmt.execute(&[&"a"])?;
213+
stmt.execute(&[&"b"])?;
214+
stmt.execute(&[&"c"])?;
215+
stmt.execute([Value::Null])?;
216+
217+
let val: Result<Vec<Option<String>>> = conn
218+
.prepare("SELECT x FROM foo ORDER BY x")?
219+
.query_and_then([], |row| row.get(0))?
220+
.collect();
221+
let val = val?;
222+
let mut iter = val.iter();
223+
assert_eq!(iter.next().unwrap().as_ref().unwrap(), "c");
224+
assert_eq!(iter.next().unwrap().as_ref().unwrap(), "b");
225+
assert_eq!(iter.next().unwrap().as_ref().unwrap(), "a");
226+
assert!(iter.next().unwrap().is_none());
227+
assert_eq!(iter.next(), None);
228+
229+
Ok(())
230+
}
231+
}

0 commit comments

Comments
 (0)