Skip to content

Commit f563a74

Browse files
authored
Improve local cache and prefer local reads (#1545)
* update WIP list * prefer local max_frame_no this assumes that cache is always up to date and uses that instead of calling the storage server. later we will add replication and make sure that cache is upto date. * Prefer local reads and also save page versions locally Let's store `page_no` along with the `frame_no`. And then while reading, we will do a range read, read frame which is less than txn's `max_frame_no` * minor improvements: index creation, fix max_frame_num query * update local cache on inserts * avoid unnecessary call to ss in `find_frame` * remove unused `get_frame` * eliminiate all calls to storage server during read path
1 parent 784d9f9 commit f563a74

File tree

2 files changed

+66
-50
lines changed

2 files changed

+66
-50
lines changed

libsql-storage/src/lib.rs

Lines changed: 24 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use libsql_sys::rusqlite;
1313
use libsql_sys::wal::{Result, Vfs, Wal, WalManager};
1414
use rpc::storage_client::StorageClient;
1515
use tonic::transport::Channel;
16-
use tracing::{error, trace};
16+
use tracing::{error, trace, warn};
1717

1818
pub mod rpc {
1919
#![allow(clippy::all)]
@@ -23,12 +23,7 @@ pub mod rpc {
2323
// What does (not) work:
2424
// - there are no read txn locks nor upgrades
2525
// - no lock stealing
26-
// - write set is kept in mem
27-
// - txns don't use max_frame_no yet
2826
// - no savepoints, yet
29-
// - no multi tenancy, uses `default` namespace
30-
// - txn can read new frames after it started (since there are no read locks)
31-
// - requires huge memory as it assumes all the txn data fits in the memory
3227

3328
#[derive(Clone, Default)]
3429
pub struct DurableWalConfig {
@@ -178,17 +173,6 @@ impl DurableWal {
178173
.flatten();
179174
Ok(frame_no)
180175
}
181-
182-
async fn frames_count(&self) -> u64 {
183-
let req = rpc::FramesInWalRequest {
184-
namespace: self.namespace.to_string(),
185-
};
186-
let mut binding = self.client.clone();
187-
let resp = binding.frames_in_wal(req).await.unwrap();
188-
let count = resp.into_inner().count;
189-
trace!("DurableWal::frames_in_wal() = {}", count);
190-
count
191-
}
192176
}
193177

194178
impl Wal for DurableWal {
@@ -200,9 +184,7 @@ impl Wal for DurableWal {
200184
// - create a read lock
201185
// - save the current max_frame_no for this txn
202186
trace!("DurableWal::begin_read_txn()");
203-
let rt = tokio::runtime::Handle::current();
204-
let frame_no = tokio::task::block_in_place(|| rt.block_on(self.frames_count()));
205-
self.max_frame_no = frame_no;
187+
self.max_frame_no = self.local_cache.get_max_frame_num().unwrap();
206188
Ok(true)
207189
}
208190

@@ -232,21 +214,14 @@ impl Wal for DurableWal {
232214
if self.max_frame_no == 0 {
233215
return Ok(None);
234216
}
235-
let rt = tokio::runtime::Handle::current();
236-
// TODO: find_frame should account for `max_frame_no` of this txn
237-
let frame_no =
238-
tokio::task::block_in_place(|| rt.block_on(self.find_frame_by_page_no(page_no)))
239-
.unwrap();
240-
if frame_no.is_none() {
241-
return Ok(None);
242-
}
243217
return Ok(Some(page_no));
244218
}
245219

246220
#[tracing::instrument(skip_all, fields(page_no))]
247221
fn read_frame(&mut self, page_no: std::num::NonZeroU32, buffer: &mut [u8]) -> Result<()> {
248222
trace!("DurableWal::read_frame()");
249-
let rt = tokio::runtime::Handle::current();
223+
// to read a frame, first we check in transaction cache, then frames cache and lastly
224+
// storage server
250225
if let Ok(Some(frame)) = self
251226
.local_cache
252227
.get_page(self.conn_id.as_str(), u32::from(page_no))
@@ -258,20 +233,23 @@ impl Wal for DurableWal {
258233
buffer.copy_from_slice(&frame);
259234
return Ok(());
260235
}
261-
// TODO: this call is unnecessary since `read_frame` is always called after `find_frame`
262-
let frame_no =
263-
tokio::task::block_in_place(|| rt.block_on(self.find_frame_by_page_no(page_no)))
264-
.unwrap()
265-
.unwrap();
266236
// check if the frame exists in the local cache
267-
if let Ok(Some(frame)) = self.local_cache.get_frame(frame_no.into()) {
237+
if let Ok(Some(frame)) = self
238+
.local_cache
239+
.get_frame_by_page(u32::from(page_no), self.max_frame_no)
240+
{
268241
trace!(
269242
"DurableWal::read_frame(page_no: {:?}) -- read cache hit",
270243
page_no
271244
);
272245
buffer.copy_from_slice(&frame);
273246
return Ok(());
274247
}
248+
let rt = tokio::runtime::Handle::current();
249+
let frame_no =
250+
tokio::task::block_in_place(|| rt.block_on(self.find_frame_by_page_no(page_no)))
251+
.unwrap()
252+
.unwrap();
275253
let req = rpc::ReadFrameRequest {
276254
namespace: self.namespace.to_string(),
277255
frame_no: frame_no.get(),
@@ -281,17 +259,17 @@ impl Wal for DurableWal {
281259
let resp = tokio::task::block_in_place(|| rt.block_on(resp)).unwrap();
282260
let frame = resp.into_inner().frame.unwrap();
283261
buffer.copy_from_slice(&frame);
284-
let _ = self.local_cache.insert_frame(frame_no.into(), &frame);
262+
let _ = self
263+
.local_cache
264+
.insert_frame(frame_no.into(), u32::from(page_no), &frame);
285265
Ok(())
286266
}
287267

288268
fn db_size(&self) -> u32 {
289-
let rt = tokio::runtime::Handle::current();
290-
let size = tokio::task::block_in_place(|| rt.block_on(self.frames_count()))
291-
.try_into()
292-
.unwrap();
269+
let size = self.local_cache.get_max_frame_num().unwrap();
293270
trace!("DurableWal::db_size() => {}", size);
294-
size
271+
// TODO: serve the db size from the meta table
272+
size as u32
295273
}
296274

297275
fn begin_write_txn(&mut self) -> Result<()> {
@@ -379,7 +357,7 @@ impl Wal for DurableWal {
379357

380358
let req = rpc::InsertFramesRequest {
381359
namespace: self.namespace.to_string(),
382-
frames,
360+
frames: frames.clone(),
383361
max_frame_no: self.max_frame_no,
384362
};
385363
let mut binding = self.client.clone();
@@ -392,6 +370,10 @@ impl Wal for DurableWal {
392370
}
393371
return Err(rusqlite::ffi::Error::new(SQLITE_ABORT));
394372
}
373+
// TODO: fix parity with storage server frame num with local cache
374+
self.local_cache
375+
.insert_frames(self.max_frame_no, frames)
376+
.unwrap();
395377
Ok(resp.unwrap().into_inner().num_frames as usize)
396378
}
397379

libsql-storage/src/local_cache.rs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::path::{Path, PathBuf};
22
use std::sync::Arc;
33

4+
use crate::rpc::Frame;
45
use libsql_sys::rusqlite::{ffi, params, Connection, Error, Result};
56

67
/// We use LocalCache to cache frames and transaction state. Each namespace gets its own cache
@@ -36,10 +37,15 @@ impl LocalCache {
3637
self.conn.execute(
3738
"CREATE TABLE IF NOT EXISTS frames (
3839
frame_no INTEGER PRIMARY KEY NOT NULL,
40+
page_no INTEGER NOT NULL,
3941
data BLOB NOT NULL
4042
)",
4143
[],
4244
)?;
45+
self.conn.execute(
46+
"CREATE INDEX IF NOT EXISTS idx_page_no_frame_no ON frames (page_no, frame_no)",
47+
[],
48+
)?;
4349

4450
self.conn.execute(
4551
"CREATE TABLE IF NOT EXISTS transactions (
@@ -53,10 +59,10 @@ impl LocalCache {
5359
Ok(())
5460
}
5561

56-
pub fn insert_frame(&self, frame_no: u64, frame_data: &[u8]) -> Result<()> {
62+
pub fn insert_frame(&self, frame_no: u64, page_no: u32, frame_data: &[u8]) -> Result<()> {
5763
match self.conn.execute(
58-
"INSERT INTO frames (frame_no, data) VALUES (?1, ?2)",
59-
params![frame_no, frame_data],
64+
"INSERT INTO frames (frame_no, page_no, data) VALUES (?1, ?2, ?3)",
65+
params![frame_no, page_no, frame_data],
6066
) {
6167
Ok(_) => Ok(()),
6268
Err(Error::SqliteFailure(e, _)) if e.code == ffi::ErrorCode::ConstraintViolation => {
@@ -66,17 +72,45 @@ impl LocalCache {
6672
}
6773
}
6874

69-
pub fn get_frame(&self, frame_no: u64) -> Result<Option<Vec<u8>>> {
70-
let mut stmt = self
71-
.conn
72-
.prepare("SELECT data FROM frames WHERE frame_no = ?1")?;
73-
match stmt.query_row(params![frame_no], |row| row.get(0)) {
75+
pub fn insert_frames(&mut self, frame_no: u64, frames: Vec<Frame>) -> Result<()> {
76+
let tx = self.conn.transaction().unwrap();
77+
{
78+
let mut stmt =
79+
tx.prepare("INSERT INTO frames (frame_no, page_no, data) VALUES (?1, ?2, ?3)")?;
80+
let mut frame_no = frame_no;
81+
for f in frames {
82+
frame_no += 1;
83+
stmt.execute(params![frame_no, f.page_no, f.data]).unwrap();
84+
}
85+
}
86+
tx.commit().unwrap();
87+
Ok(())
88+
}
89+
90+
pub fn get_frame_by_page(&self, page_no: u32, max_frame_no: u64) -> Result<Option<Vec<u8>>> {
91+
let mut stmt = self.conn.prepare(
92+
"SELECT data FROM frames WHERE page_no=?1 AND frame_no <= ?2
93+
ORDER BY frame_no DESC LIMIT 1",
94+
)?;
95+
match stmt.query_row(params![page_no, max_frame_no], |row| row.get(0)) {
7496
Ok(frame_data) => Ok(Some(frame_data)),
7597
Err(Error::QueryReturnedNoRows) => Ok(None),
7698
Err(e) => Err(e),
7799
}
78100
}
79101

102+
pub fn get_max_frame_num(&self) -> Result<u64> {
103+
match self
104+
.conn
105+
.query_row("SELECT MAX(frame_no) from frames", (), |row| {
106+
row.get::<_, Option<u64>>(0)
107+
}) {
108+
Ok(Some(frame_no)) => Ok(frame_no),
109+
Ok(None) | Err(Error::QueryReturnedNoRows) => Ok(0),
110+
Err(e) => Err(e),
111+
}
112+
}
113+
80114
pub fn insert_page(&self, txn_id: &str, page_no: u32, frame_data: &[u8]) -> Result<()> {
81115
self.conn.execute(
82116
"INSERT INTO transactions (txn_id, page_no, data) VALUES (?1, ?2, ?3)

0 commit comments

Comments
 (0)