Skip to content

Commit b696a24

Browse files
committed
feat: wal_checkpoint(): Do wal_checkpoint(PASSIVE) and wal_checkpoint(FULL) before wal_checkpoint(TRUNCATE)
This way the subsequent `wal_checkpoint(TRUNCATE)` is faster. We don't want to block writers and readers for a long period.
1 parent 7e4822c commit b696a24

File tree

2 files changed

+75
-21
lines changed

2 files changed

+75
-21
lines changed

src/sql.rs

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::collections::{HashMap, HashSet};
44
use std::path::{Path, PathBuf};
55

6-
use anyhow::{Context as _, Result, bail};
6+
use anyhow::{Context as _, Result, bail, ensure};
77
use rusqlite::{Connection, OpenFlags, Row, config::DbConfig, types::ValueRef};
88
use tokio::sync::RwLock;
99

@@ -13,7 +13,6 @@ use crate::config::Config;
1313
use crate::constants::DC_CHAT_ID_TRASH;
1414
use crate::context::Context;
1515
use crate::debug_logging::set_debug_logging_xdc;
16-
use crate::ensure_and_debug_assert;
1716
use crate::ephemeral::start_ephemeral_timers;
1817
use crate::imex::BLOBS_BACKUP_NAME;
1918
use crate::location::delete_orphaned_poi_locations;
@@ -24,7 +23,7 @@ use crate::net::http::http_cache_cleanup;
2423
use crate::net::prune_connection_history;
2524
use crate::param::{Param, Params};
2625
use crate::stock_str;
27-
use crate::tools::{SystemTime, delete_file, time};
26+
use crate::tools::{SystemTime, Time, delete_file, time, time_elapsed};
2827

2928
/// Extension to [`rusqlite::ToSql`] trait
3029
/// which also includes [`Send`] and [`Sync`].
@@ -180,7 +179,7 @@ impl Sql {
180179

181180
/// Creates a new connection pool.
182181
fn new_pool(dbfile: &Path, passphrase: String) -> Result<Pool> {
183-
let mut connections = Vec::new();
182+
let mut connections = Vec::with_capacity(Self::N_DB_CONNECTIONS);
184183
for _ in 0..Self::N_DB_CONNECTIONS {
185184
let connection = new_connection(dbfile, &passphrase)?;
186185
connections.push(connection);
@@ -642,28 +641,74 @@ impl Sql {
642641
}
643642

644643
/// Runs a checkpoint operation in TRUNCATE mode, so the WAL file is truncated to 0 bytes.
645-
pub(crate) async fn wal_checkpoint(&self) -> Result<()> {
646-
let lock = self.pool.read().await;
647-
let pool = lock.as_ref().context("No SQL connection pool")?;
648-
let mut conns = Vec::new();
644+
pub(crate) async fn wal_checkpoint(context: &Context) -> Result<()> {
645+
let t_start = Time::now();
646+
let lock = context.sql.pool.read().await;
647+
let Some(pool) = lock.as_ref() else {
648+
// No db connections, nothing to checkpoint.
649+
return Ok(());
650+
};
651+
652+
// Do as much work as possible without blocking anybody.
649653
let query_only = true;
654+
let conn = pool.get(query_only).await?;
655+
tokio::task::block_in_place(|| {
656+
// Execute some transaction causing the WAL file to be opened so that the
657+
// `wal_checkpoint()` can proceed, otherwise it fails when called the first time,
658+
// see https://sqlite.org/forum/forumpost/7512d76a05268fc8.
659+
conn.query_row("PRAGMA table_list", [], |_| Ok(()))?;
660+
conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |_| Ok(()))
661+
})?;
662+
663+
// Kick out writers.
664+
const _: () = assert!(Sql::N_DB_CONNECTIONS > 1, "Deadlock possible");
665+
let _write_lock = pool.write_lock().await;
666+
let t_writers_blocked = Time::now();
667+
// Ensure that all readers use the most recent database snapshot (are at the end of WAL) so
668+
// that `wal_checkpoint(FULL)` isn't blocked. We could use `PASSIVE` as well, but it's
669+
// documented poorly, https://www.sqlite.org/pragma.html#pragma_wal_checkpoint and
670+
// https://www.sqlite.org/c3ref/wal_checkpoint_v2.html don't tell how it interacts with new
671+
// readers.
672+
let mut read_conns = Vec::with_capacity(Self::N_DB_CONNECTIONS - 1);
673+
for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
674+
read_conns.push(pool.get(query_only).await?);
675+
}
676+
read_conns.clear();
677+
// Checkpoint the remaining WAL pages without blocking readers.
678+
let (pages_total, pages_checkpointed) = tokio::task::block_in_place(|| {
679+
conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
680+
let pages_total: i64 = row.get(1)?;
681+
let pages_checkpointed: i64 = row.get(2)?;
682+
Ok((pages_total, pages_checkpointed))
683+
})
684+
})?;
685+
if pages_checkpointed < pages_total {
686+
warn!(
687+
context,
688+
"Cannot checkpoint whole WAL. Pages total: {pages_total}, checkpointed: {pages_checkpointed}. Make sure there are no external connections running transactions.",
689+
);
690+
}
650691
// Kick out readers to avoid blocking/SQLITE_BUSY.
651692
for _ in 0..(Self::N_DB_CONNECTIONS - 1) {
652-
conns.push(pool.get(query_only).await?);
693+
read_conns.push(pool.get(query_only).await?);
653694
}
654-
let conn = pool.get(query_only).await?;
655-
tokio::task::block_in_place(move || {
656-
// Execute some transaction causing the WAL file to be opened so that the
657-
// `wal_checkpoint()` can proceed, otherwise it fails when called the first time, see
658-
// https://sqlite.org/forum/forumpost/7512d76a05268fc8.
659-
conn.query_row("PRAGMA table_list", [], |_row| Ok(()))?;
695+
let t_readers_blocked = Time::now();
696+
tokio::task::block_in_place(|| {
660697
let blocked = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
661698
let blocked: i64 = row.get(0)?;
662699
Ok(blocked)
663700
})?;
664-
ensure_and_debug_assert!(blocked == 0,);
701+
ensure!(blocked == 0);
665702
Ok(())
666-
})
703+
})?;
704+
info!(
705+
context,
706+
"wal_checkpoint: Total time: {:?}. Writers blocked for: {:?}. Readers blocked for: {:?}.",
707+
time_elapsed(&t_start),
708+
time_elapsed(&t_writers_blocked),
709+
time_elapsed(&t_readers_blocked),
710+
);
711+
Ok(())
667712
}
668713
}
669714

@@ -792,8 +837,9 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
792837
// bigger than 200M) and also make sure we truncate the WAL periodically. Auto-checkponting does
793838
// not normally truncate the WAL (unless the `journal_size_limit` pragma is set), see
794839
// https://www.sqlite.org/wal.html.
795-
if let Err(err) = context.sql.wal_checkpoint().await {
840+
if let Err(err) = Sql::wal_checkpoint(context).await {
796841
warn!(context, "wal_checkpoint() failed: {err:#}.");
842+
debug_assert!(false);
797843
}
798844

799845
context

src/sql/pool.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ struct InnerPool {
6767
///
6868
/// This mutex is locked when write connection
6969
/// is outside the pool.
70-
write_mutex: Arc<Mutex<()>>,
70+
pub(crate) write_mutex: Arc<Mutex<()>>,
7171
}
7272

7373
impl InnerPool {
@@ -96,13 +96,13 @@ impl InnerPool {
9696
.pop()
9797
.context("Got a permit when there are no connections in the pool")?
9898
};
99-
conn.pragma_update(None, "query_only", "1")?;
10099
let conn = PooledConnection {
101100
pool: Arc::downgrade(&self),
102101
conn: Some(conn),
103102
_permit: permit,
104103
_write_mutex_guard: None,
105104
};
105+
conn.pragma_update(None, "query_only", "1")?;
106106
Ok(conn)
107107
} else {
108108
// We get write guard first to avoid taking a permit
@@ -119,13 +119,13 @@ impl InnerPool {
119119
"Got a permit and write lock when there are no connections in the pool",
120120
)?
121121
};
122-
conn.pragma_update(None, "query_only", "0")?;
123122
let conn = PooledConnection {
124123
pool: Arc::downgrade(&self),
125124
conn: Some(conn),
126125
_permit: permit,
127126
_write_mutex_guard: Some(write_mutex_guard),
128127
};
128+
conn.pragma_update(None, "query_only", "0")?;
129129
Ok(conn)
130130
}
131131
}
@@ -195,4 +195,12 @@ impl Pool {
195195
pub async fn get(&self, query_only: bool) -> Result<PooledConnection> {
196196
Arc::clone(&self.inner).get(query_only).await
197197
}
198+
199+
/// Returns a mutex guard guaranteeing that there are no concurrent write connections.
200+
///
201+
/// NB: Make sure you're not holding all connections when calling this, otherwise it deadlocks
202+
/// if there is a concurrent writer waiting for available connection.
203+
pub(crate) async fn write_lock(&self) -> OwnedMutexGuard<()> {
204+
Arc::clone(&self.inner.write_mutex).lock_owned().await
205+
}
198206
}

0 commit comments

Comments
 (0)