Skip to content

Commit 7c3a44a

Browse files
authored
VFS: Implement page cache (#838)
1 parent fa1e0ea commit 7c3a44a

File tree

3 files changed

+72
-11
lines changed

3 files changed

+72
-11
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ require (
1919
github.com/aws/smithy-go v1.22.5
2020
github.com/dustin/go-humanize v1.0.1
2121
github.com/fsouza/fake-gcs-server v1.47.3
22+
github.com/hashicorp/golang-lru/v2 v2.0.7
2223
github.com/mark3labs/mcp-go v0.32.0
2324
github.com/mattn/go-shellwords v1.0.12
2425
github.com/mattn/go-sqlite3 v1.14.19

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH
157157
github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q=
158158
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
159159
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
160+
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
161+
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
160162
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8=
161163
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI=
162164
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=

vfs.go

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ import (
1313
"sync"
1414
"time"
1515

16+
lru "github.com/hashicorp/golang-lru/v2"
1617
"github.com/psanford/sqlite3vfs"
1718
"github.com/superfly/ltx"
1819
)
1920

2021
const (
2122
DefaultPollInterval = 1 * time.Second
23+
DefaultCacheSize = 10 * 1024 * 1024 // 10MB
2224
)
2325

2426
// VFS implements the SQLite VFS interface for Litestream.
@@ -30,13 +32,17 @@ type VFS struct {
3032
// PollInterval is the interval at which to poll the replica client for new
3133
// LTX files. The index will be fetched for the new files automatically.
3234
PollInterval time.Duration
35+
36+
// CacheSize is the maximum size of the page cache in bytes.
37+
CacheSize int
3338
}
3439

3540
func NewVFS(client ReplicaClient, logger *slog.Logger) *VFS {
3641
return &VFS{
3742
client: client,
3843
logger: logger.With("vfs", "true"),
3944
PollInterval: DefaultPollInterval,
45+
CacheSize: DefaultCacheSize,
4046
}
4147
}
4248

@@ -54,6 +60,7 @@ func (vfs *VFS) Open(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.File, s
5460
func (vfs *VFS) openMainDB(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.File, sqlite3vfs.OpenFlag, error) {
5561
f := NewVFSFile(vfs.client, name, vfs.logger.With("name", name))
5662
f.PollInterval = vfs.PollInterval
63+
f.CacheSize = vfs.CacheSize
5764
if err := f.Open(); err != nil {
5865
return nil, 0, err
5966
}
@@ -95,7 +102,8 @@ type VFSFile struct {
95102
maxTXID1 ltx.TXID // Last TXID read from level 1
96103
index map[uint32]ltx.PageIndexElem
97104
pending map[uint32]ltx.PageIndexElem
98-
lockType sqlite3vfs.LockType // Current lock state
105+
cache *lru.Cache[uint32, []byte] // LRU cache for page data
106+
lockType sqlite3vfs.LockType // Current lock state
99107

100108
wg sync.WaitGroup
101109
ctx context.Context
@@ -104,6 +112,7 @@ type VFSFile struct {
104112
logger *slog.Logger
105113

106114
PollInterval time.Duration
115+
CacheSize int
107116
}
108117

109118
func NewVFSFile(client ReplicaClient, name string, logger *slog.Logger) *VFSFile {
@@ -114,6 +123,7 @@ func NewVFSFile(client ReplicaClient, name string, logger *slog.Logger) *VFSFile
114123
pending: make(map[uint32]ltx.PageIndexElem),
115124
logger: logger,
116125
PollInterval: DefaultPollInterval,
126+
CacheSize: DefaultCacheSize,
117127
}
118128
f.ctx, f.cancel = context.WithCancel(context.Background())
119129
return f
@@ -143,6 +153,18 @@ func (f *VFSFile) LockType() sqlite3vfs.LockType {
143153
func (f *VFSFile) Open() error {
144154
f.logger.Info("opening file")
145155

156+
// Initialize page cache. Convert byte size to number of 4KB pages.
157+
const pageSize = 4096
158+
cacheEntries := f.CacheSize / pageSize
159+
if cacheEntries < 1 {
160+
cacheEntries = 1
161+
}
162+
cache, err := lru.New[uint32, []byte](cacheEntries)
163+
if err != nil {
164+
return fmt.Errorf("create page cache: %w", err)
165+
}
166+
f.cache = cache
167+
146168
infos, err := CalcRestorePlan(context.Background(), f.client, 0, time.Time{}, f.logger)
147169
if err != nil {
148170
f.logger.Error("cannot calc restore plan", "error", err)
@@ -207,6 +229,21 @@ func (f *VFSFile) ReadAt(p []byte, off int64) (n int, err error) {
207229
f.logger.Info("reading at", "off", off, "len", len(p))
208230
pgno := uint32(off/4096) + 1
209231

232+
// Check cache first (cache is thread-safe)
233+
if data, ok := f.cache.Get(pgno); ok {
234+
n = copy(p, data[off%4096:])
235+
f.logger.Info("cache hit", "page", pgno, "n", n)
236+
237+
// Update the first page to pretend like we are in journal mode.
238+
if off == 0 {
239+
p[18], p[19] = 0x01, 0x01
240+
_, _ = rand.Read(p[24:28])
241+
}
242+
243+
return n, nil
244+
}
245+
246+
// Get page index element
210247
f.mu.Lock()
211248
elem, ok := f.index[pgno]
212249
f.mu.Unlock()
@@ -216,14 +253,18 @@ func (f *VFSFile) ReadAt(p []byte, off int64) (n int, err error) {
216253
return 0, fmt.Errorf("page not found: %d", pgno)
217254
}
218255

256+
// Fetch from storage (cache miss)
219257
_, data, err := FetchPage(context.Background(), f.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
220258
if err != nil {
221259
f.logger.Error("cannot fetch page", "error", err)
222260
return 0, fmt.Errorf("fetch page: %w", err)
223261
}
224262

263+
// Add to cache (cache is thread-safe)
264+
f.cache.Add(pgno, data)
265+
225266
n = copy(p, data[off%4096:])
226-
f.logger.Info("data read", "n", n, "data", len(data))
267+
f.logger.Info("data read from storage", "page", pgno, "n", n, "data", len(data))
227268

228269
// Update the first page to pretend like we are in journal mode.
229270
if off == 0 {
@@ -282,11 +323,16 @@ func (f *VFSFile) Unlock(elock sqlite3vfs.LockType) error {
282323

283324
f.lockType = elock
284325

285-
// Copy pending index to main index.
286-
for k, v := range f.pending {
287-
f.index[k] = v
326+
// Copy pending index to main index and invalidate affected pages in cache.
327+
if len(f.pending) > 0 {
328+
count := len(f.pending)
329+
for k, v := range f.pending {
330+
f.index[k] = v
331+
f.cache.Remove(k)
332+
}
333+
f.pending = make(map[uint32]ltx.PageIndexElem)
334+
f.logger.Debug("cache invalidated pages", "count", count)
288335
}
289-
f.pending = make(map[uint32]ltx.PageIndexElem)
290336

291337
return nil
292338
}
@@ -346,12 +392,24 @@ func (f *VFSFile) pollReplicaClient(ctx context.Context) error {
346392
f.mu.Lock()
347393
defer f.mu.Unlock()
348394

349-
target := f.index
350-
if f.lockType >= sqlite3vfs.LockShared {
351-
target = f.pending
352-
}
395+
// Apply updates and invalidate cache entries for updated pages
396+
invalidateN := 0
353397
for k, v := range index {
354-
target[k] = v
398+
// If we are holding a shared lock, add to pending index instead of main index.
399+
// We will copy these over once the shared lock is released.
400+
if f.lockType >= sqlite3vfs.LockShared {
401+
f.pending[k] = v
402+
continue
403+
}
404+
405+
// Otherwise update main index and invalidate cache entry.
406+
f.index[k] = v
407+
f.cache.Remove(k)
408+
invalidateN++
409+
}
410+
411+
if invalidateN > 0 {
412+
f.logger.Debug("cache invalidated pages due to new ltx files", "count", invalidateN)
355413
}
356414

357415
// Update to max TXID

0 commit comments

Comments
 (0)