@@ -167,6 +167,45 @@ impl ManagedConnectionWalWrapper {
167
167
extended_code : 517 , // stale read
168
168
} ) ;
169
169
}
170
+ // If other connection is about to checkpoint - we better to immediately return.
171
+ //
172
+ // The reason is that write transaction are upgraded from read transactions in SQLite.
173
+ // Due to this, every write transaction need to hold SHARED-WAL lock and if we will
174
+ // block write transaction here - we will prevent checkpoint process from restarting the WAL
175
+ // (because it needs to acquire EXCLUSIVE-WAL lock)
176
+ //
177
+ // So, the scenario is following:
178
+ // T0: we have a bunch of SELECT queries which will execute till time T2
179
+ // T1: CHECKPOINT process is starting: it holds CKPT and WRITE lock and attempt to acquire
180
+ // EXCLUSIVE-WAL locks one by one in order to check the position of readers. CHECKPOINT will
181
+ // use busy handler and can potentially acquire lock not from the first attempt.
182
+ // T2: CHECKPOINT process were able to check all WAL reader positions (by acquiring lock or atomically check reader position)
183
+ // and started to transfer WAL to the DB file
184
+ // T3: INSERT query starts executing: it started as a read transaction and holded SHARED-WAL lock but then it needs to
185
+ // upgrade to write transaction through begin_write_txn call
186
+ // T4: CHECKPOINT transferred all pages from WAL to DB file and need to check if it can restart the WAL. In order to
187
+ // do that it needs to hold all EXCLUSIVE-WAL locks to make sure that all readers use only DB file
188
+ //
189
+ // In the scenario above, if we will park INSERT at the time T3 - CHECKPOINT will be unable to hold EXCLUSIVE-WAL
190
+ // locks and so WAL will not be truncated.
191
+ // In case when DB has continious load with overlapping reads and writes - this problem became very noticeable
192
+ // as it can defer WAL truncation a lot.
193
+ //
194
+ // Also, such implementation is more aligned with LibSQL/SQLite behaviour where sqlite3WalBeginWriteTransaction
195
+ // immediately abort with SQLITE_BUSY error if it can't acquire WRITE lock (which CHECKPOINT also take before start of the work)
196
+ // and busy handler (e.g. retries) for writes are invoked by SQLite at upper layer of request processing.
197
+ match * current {
198
+ Some ( Slot {
199
+ id,
200
+ state : SlotState :: Acquired ( SlotType :: Checkpoint ) ,
201
+ ..
202
+ } ) if id != self . id => {
203
+ return Err ( rusqlite:: ffi:: Error :: new ( rusqlite:: ffi:: SQLITE_BUSY ) ) ;
204
+ }
205
+ _ => { }
206
+ }
207
+ // note, that it's important that we return SQLITE_BUSY error for CHECKPOINT starvation problem before that condition
208
+ // because after we will add something to the write_queue - we can't easily abort execution of acquire() method
170
209
if current. as_mut ( ) . map_or ( true , |slot| slot. id != self . id ) && !enqueued {
171
210
self . manager
172
211
. write_queue
@@ -196,7 +235,7 @@ impl ManagedConnectionWalWrapper {
196
235
let since_started = slot. started_at . elapsed ( ) ;
197
236
let deadline = slot. started_at + self . manager . txn_timeout_duration ;
198
237
match slot. state {
199
- SlotState :: Acquired => {
238
+ SlotState :: Acquired ( .. ) => {
200
239
if since_started >= self . manager . txn_timeout_duration {
201
240
let id = slot. id ;
202
241
drop ( current) ;
@@ -354,11 +393,17 @@ impl ManagedConnectionWalWrapper {
354
393
}
355
394
}
356
395
396
+ #[ derive( Copy , Clone , Debug , PartialEq ) ]
397
+ enum SlotType {
398
+ WriteTxn ,
399
+ Checkpoint ,
400
+ }
401
+
357
402
#[ derive( Copy , Clone , Debug ) ]
358
403
enum SlotState {
359
404
Notified ,
360
405
Acquiring ,
361
- Acquired ,
406
+ Acquired ( SlotType ) ,
362
407
Failure ,
363
408
}
364
409
@@ -389,7 +434,7 @@ impl WrapWal<InnerWal> for ManagedConnectionWalWrapper {
389
434
Ok ( _) => {
390
435
tracing:: debug!( "transaction acquired" ) ;
391
436
let mut lock = self . manager . current . lock ( ) ;
392
- lock. as_mut ( ) . unwrap ( ) . state = SlotState :: Acquired ;
437
+ lock. as_mut ( ) . unwrap ( ) . state = SlotState :: Acquired ( SlotType :: WriteTxn ) ;
393
438
394
439
Ok ( ( ) )
395
440
}
@@ -424,7 +469,8 @@ impl WrapWal<InnerWal> for ManagedConnectionWalWrapper {
424
469
) -> libsql_sys:: wal:: Result < ( ) > {
425
470
let before = Instant :: now ( ) ;
426
471
self . acquire ( ) ?;
427
- self . manager . current . lock ( ) . as_mut ( ) . unwrap ( ) . state = SlotState :: Acquired ;
472
+ self . manager . current . lock ( ) . as_mut ( ) . unwrap ( ) . state =
473
+ SlotState :: Acquired ( SlotType :: Checkpoint ) ;
428
474
429
475
let mode = if rand:: random :: < f32 > ( ) < 0.1 {
430
476
CheckpointMode :: Truncate
@@ -476,7 +522,7 @@ impl WrapWal<InnerWal> for ManagedConnectionWalWrapper {
476
522
// if the slot acquire the transaction lock
477
523
if let Some ( Slot {
478
524
id,
479
- state : SlotState :: Acquired ,
525
+ state : SlotState :: Acquired ( .. ) ,
480
526
..
481
527
} ) = * current
482
528
{
0 commit comments