Skip to content

Commit 5eaace8

Browse files
committed
Close pool connections concurrently
1 parent a987eae commit 5eaace8

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

src/pool.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,16 @@ impl PoolBuilder {
7878

7979
/// Specify the number of sqlite connections to open as part of the pool.
8080
///
81-
/// Defaults to the number of logical CPUs of the current system.
81+
/// Defaults to the number of logical CPUs of the current system. Values
82+
/// less than `1` are clamped to `1`.
83+
///
84+
/// ```
85+
/// use async_sqlite::PoolBuilder;
86+
///
87+
/// let builder = PoolBuilder::new().num_conns(2);
88+
/// ```
8289
pub fn num_conns(mut self, num_conns: usize) -> Self {
83-
self.num_conns = Some(num_conns);
90+
self.num_conns = Some(num_conns.max(1));
8491
self
8592
}
8693

@@ -197,9 +204,9 @@ impl Pool {
197204
/// After this method returns, all calls to `self::conn()` or
198205
/// `self::conn_mut()` will return an [`Error::Closed`] error.
199206
pub async fn close(&self) -> Result<(), Error> {
200-
for client in self.state.clients.iter() {
201-
client.close().await?;
202-
}
207+
let closes = self.state.clients.iter().map(|client| client.close());
208+
let res = join_all(closes).await;
209+
res.into_iter().collect::<Result<Vec<_>, Error>>()?;
203210
Ok(())
204211
}
205212

tests/tests.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ async_test!(test_journal_mode);
8484
async_test!(test_concurrency);
8585
async_test!(test_pool);
8686
async_test!(test_pool_conn_for_each);
87+
async_test!(test_pool_close_concurrent);
88+
async_test!(test_pool_num_conns_zero_clamps);
8789

8890
async fn test_journal_mode() {
8991
let tmp_dir = tempfile::tempdir().unwrap();
@@ -227,3 +229,36 @@ async fn test_pool_conn_for_each() {
227229
// cleanup
228230
pool.close().await.expect("closing client conn");
229231
}
232+
233+
async fn test_pool_close_concurrent() {
234+
let tmp_dir = tempfile::tempdir().unwrap();
235+
let pool = PoolBuilder::new()
236+
.path(tmp_dir.path().join("sqlite.db"))
237+
.num_conns(2)
238+
.open()
239+
.await
240+
.expect("pool unable to be opened");
241+
242+
let c1 = pool.close();
243+
let c2 = pool.close();
244+
futures_util::future::join_all([c1, c2])
245+
.await
246+
.into_iter()
247+
.collect::<Result<Vec<_>, Error>>()
248+
.expect("closing concurrently");
249+
250+
let res = pool.conn(|c| c.execute("SELECT 1", ())).await;
251+
assert!(matches!(res, Err(Error::Closed)));
252+
}
253+
254+
async fn test_pool_num_conns_zero_clamps() {
255+
let tmp_dir = tempfile::tempdir().unwrap();
256+
let pool = PoolBuilder::new()
257+
.path(tmp_dir.path().join("clamp.db"))
258+
.num_conns(0)
259+
.open()
260+
.await
261+
.expect("pool unable to be opened");
262+
let results = pool.conn_for_each(|_| Ok(())).await;
263+
assert_eq!(results.len(), 1);
264+
}

0 commit comments

Comments
 (0)