Skip to content

Commit d7fa9ec

Browse files
committed
add semaphore for database read transaction limiting
1 parent 4309fb1 commit d7fa9ec

File tree

2 files changed

+15
-18
lines changed

2 files changed

+15
-18
lines changed

crates/database/db/src/db.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ use sea_orm::{
88
DatabaseConnection, SqlxSqliteConnector, TransactionTrait,
99
};
1010
use std::sync::Arc;
11-
use tokio::sync::Mutex;
11+
use tokio::sync::{Mutex, Semaphore};
1212

1313
// TODO: make these configurable via CLI.
1414

1515
/// The timeout duration for database busy errors.
1616
const BUSY_TIMEOUT_SECS: u64 = 5;
1717

1818
/// The maximum number of connections in the database connection pool.
19-
const MAX_CONNECTIONS: u32 = 10;
19+
const MAX_CONNECTIONS: u32 = 32;
2020

2121
/// The minimum number of connections in the database connection pool.
2222
const MIN_CONNECTIONS: u32 = 5;
@@ -36,6 +36,8 @@ pub struct Database {
3636
connection: DatabaseConnection,
3737
/// A mutex to ensure that only one mutable transaction is active at a time.
3838
write_lock: Arc<Mutex<()>>,
39+
/// A semaphore to limit the number of concurrent read-only transactions.
40+
read_locks: Arc<Semaphore>,
3941
/// The database metrics.
4042
metrics: DatabaseMetrics,
4143
/// The temporary directory used for testing. We keep it here to ensure it lives as long as the
@@ -80,9 +82,13 @@ impl Database {
8082
.connect_with(options)
8183
.await?;
8284

85+
// We reserve one connection for write transactions.
86+
let read_connection_limit = max_connections as usize - 1;
87+
8388
Ok(Self {
8489
connection: SqlxSqliteConnector::from_sqlx_sqlite_pool(sqlx_pool),
8590
write_lock: Arc::new(Mutex::new(())),
91+
read_locks: Arc::new(Semaphore::new(read_connection_limit)),
8692
metrics: DatabaseMetrics::default(),
8793
#[cfg(feature = "test-utils")]
8894
tmp_dir: None,
@@ -111,7 +117,8 @@ impl DatabaseTransactionProvider for Database {
111117
/// Creates a new [`TX`] which can be used for read-only operations.
112118
async fn tx(&self) -> Result<TX, DatabaseError> {
113119
tracing::trace!(target: "scroll::db", "Creating new read-only transaction");
114-
Ok(TX::new(self.connection.clone().begin().await?))
120+
let permit = self.read_locks.clone().acquire_owned().await.unwrap();
121+
Ok(TX::new(self.connection.clone().begin().await?, permit))
115122
}
116123

117124
/// Creates a new [`TXMut`] which can be used for atomic read and write operations.
@@ -148,18 +155,6 @@ impl DatabaseConnectionProvider for Database {
148155
}
149156
}
150157

151-
impl From<DatabaseConnection> for Database {
152-
fn from(connection: DatabaseConnection) -> Self {
153-
Self {
154-
connection,
155-
write_lock: Arc::new(Mutex::new(())),
156-
metrics: DatabaseMetrics::default(),
157-
#[cfg(feature = "test-utils")]
158-
tmp_dir: None,
159-
}
160-
}
161-
}
162-
163158
#[cfg(test)]
164159
mod test {
165160
use super::*;

crates/database/db/src/transaction.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::DatabaseConnectionProvider;
22

33
use super::{DatabaseError, ReadConnectionProvider, WriteConnectionProvider};
4-
use tokio::sync::OwnedMutexGuard;
4+
use tokio::sync::{OwnedMutexGuard, OwnedSemaphorePermit};
55

66
/// A type that represents a read-only database transaction.
77
///
@@ -10,12 +10,14 @@ use tokio::sync::OwnedMutexGuard;
1010
pub struct TX {
1111
/// The underlying database transaction.
1212
tx: sea_orm::DatabaseTransaction,
13+
/// A permit for the read transaction semaphore.
14+
_permit: OwnedSemaphorePermit,
1315
}
1416

1517
impl TX {
1618
/// Creates a new [`TX`] instance associated with the provided [`sea_orm::DatabaseTransaction`].
17-
pub const fn new(tx: sea_orm::DatabaseTransaction) -> Self {
18-
Self { tx }
19+
pub const fn new(tx: sea_orm::DatabaseTransaction, permit: OwnedSemaphorePermit) -> Self {
20+
Self { tx, _permit: permit }
1921
}
2022
}
2123

0 commit comments

Comments
 (0)