Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
c204643
feat(vfs): add comprehensive integration tests for VFS functionality
corylanou Nov 11, 2025
b4655f7
test(vfs): add comprehensive integration and unit tests
corylanou Nov 13, 2025
7950ca2
test(vfs): add advanced integration tests for edge cases and failure …
corylanou Nov 13, 2025
e6ae97b
fix(vfs): integrate dual-polling with resilient page fetch and enhanc…
corylanou Nov 14, 2025
888a6d8
refactor(vfs): simplify failure injection with per-read error hooks
corylanou Nov 15, 2025
26c5c01
refactor(vfs): replace global error injection with interceptor pattern
corylanou Nov 15, 2025
2766345
test(vfs): add comprehensive integration tests with failure injection
corylanou Nov 17, 2025
67c78b2
refactor(vfs): simplify error injection to use direct function calls
corylanou Nov 17, 2025
2cc044b
fix(vfs): improve temp file deletion and level-1 position initialization
corylanou Nov 17, 2025
5fc5bf3
fix(vfs): correct temp file lock state tracking
corylanou Nov 17, 2025
3cca58c
docs(replica): add documentation comment for FetchLTXHeader
corylanou Nov 17, 2025
3428aad
refactor(vfs): consolidate temp file tests into vfs_test.go
corylanou Nov 17, 2025
3e3e01a
fix: remove leftover merge conflict marker
corylanou Nov 19, 2025
32c440c
refactor(vfs): remove unused page fetch context functions
corylanou Nov 19, 2025
e691a58
fix(vfs): handle temp file deletion and page size in cache
corylanou Nov 19, 2025
b272795
fix(vfs): prevent temp file collisions for same basename in different…
corylanou Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions cmd/litestream-vfs/chaos_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
//go:build vfs && chaos
// +build vfs,chaos

package main_test

import (
"bytes"
"context"
"io"
"math/rand"
"sync/atomic"
"testing"
"time"

"github.com/superfly/ltx"

"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/internal/testingutil"
)

func TestVFS_ChaosEngineering(t *testing.T) {
client := file.NewReplicaClient(t.TempDir())
db, primary := openReplicatedPrimary(t, client, 15*time.Millisecond, 15*time.Millisecond)
defer testingutil.MustCloseSQLDB(t, primary)

if _, err := primary.Exec(`CREATE TABLE chaos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
value TEXT,
grp INTEGER
)`); err != nil {
t.Fatalf("create table: %v", err)
}
for i := 0; i < 64; i++ {
if _, err := primary.Exec("INSERT INTO chaos (value, grp) VALUES (?, ?)", randomPayload(rand.New(rand.NewSource(int64(i))), 48), i%8); err != nil {
t.Fatalf("seed chaos: %v", err)
}
}

time.Sleep(5 * db.MonitorInterval)

chaosClient := newChaosReplicaClient(client)
vfs := newVFS(t, chaosClient)
vfs.PollInterval = 15 * time.Millisecond
vfsName := registerTestVFS(t, vfs)

replica := openVFSReplicaDB(t, vfsName)
defer replica.Close()

waitForTableRowCount(t, primary, replica, "chaos", 5*time.Second)
chaosClient.active.Store(true)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

writerDone := make(chan error, 1)
go func() {
rnd := rand.New(rand.NewSource(42))
for {
select {
case <-ctx.Done():
writerDone <- nil
return
default:
}
switch rnd.Intn(3) {
case 0:
if _, err := primary.Exec("INSERT INTO chaos (value, grp) VALUES (?, ?)", randomPayload(rnd, 32), rnd.Intn(8)); err != nil && !isBusyError(err) {
writerDone <- err
return
}
case 1:
if _, err := primary.Exec("UPDATE chaos SET value = ? WHERE id = (ABS(random()) % 64) + 1", randomPayload(rnd, 24)); err != nil && !isBusyError(err) {
writerDone <- err
return
}
case 2:
if _, err := primary.Exec("DELETE FROM chaos WHERE id IN (SELECT id FROM chaos ORDER BY RANDOM() LIMIT 1)"); err != nil && !isBusyError(err) {
writerDone <- err
return
}
}
time.Sleep(5 * time.Millisecond)
}
}()

const readers = 16
readerErrs := make(chan error, readers)
for i := 0; i < readers; i++ {
go func() {
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
select {
case <-ctx.Done():
readerErrs <- nil
return
default:
}
var count int
switch rnd.Intn(3) {
case 0:
err := replica.QueryRow("SELECT COUNT(*) FROM chaos WHERE grp = ?", rnd.Intn(8)).Scan(&count)
if err != nil {
if isBusyError(err) {
continue
}
readerErrs <- err
return
}
case 1:
rows, err := replica.Query("SELECT id, value FROM chaos ORDER BY id DESC LIMIT 5 OFFSET ?", rnd.Intn(10))
if err != nil {
if isBusyError(err) {
continue
}
readerErrs <- err
return
}
retryRows := false
for rows.Next() {
var id int
var value string
if err := rows.Scan(&id, &value); err != nil {
rows.Close()
if isBusyError(err) {
retryRows = true
break
}
readerErrs <- err
return
}
}
if retryRows {
continue
}
if err := rows.Err(); err != nil {
rows.Close()
if isBusyError(err) {
continue
}
readerErrs <- err
return
}
rows.Close()
case 2:
err := replica.QueryRow("SELECT SUM(LENGTH(value)) FROM chaos WHERE id BETWEEN ? AND ?",
rnd.Intn(32)+1, rnd.Intn(32)+33).Scan(&count)
if err != nil {
if isBusyError(err) {
continue
}
readerErrs <- err
return
}
}
}
}()
}

<-ctx.Done()
for i := 0; i < readers; i++ {
if err := <-readerErrs; err != nil {
t.Fatalf("reader error: %v", err)
}
}
if err := <-writerDone; err != nil {
t.Fatalf("writer error: %v", err)
}

waitForTableRowCount(t, primary, replica, "chaos", 5*time.Second)
if chaosClient.failures.Load() == 0 {
t.Fatalf("expected injected failures")
}
}

func newChaosReplicaClient(base litestream.ReplicaClient) *chaosReplicaClient {
return &chaosReplicaClient{
ReplicaClient: base,
rnd: rand.New(rand.NewSource(99)),
}
}

type chaosReplicaClient struct {
litestream.ReplicaClient
rnd *rand.Rand
failures atomic.Int32
active atomic.Bool
}

func (c *chaosReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error) {
if !c.active.Load() {
return c.ReplicaClient.LTXFiles(ctx, level, seek, useMetadata)
}
if c.rnd.Float64() < 0.05 {
c.failures.Add(1)
return nil, context.DeadlineExceeded
}
return c.ReplicaClient.LTXFiles(ctx, level, seek, useMetadata)
}

func (c *chaosReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error) {
if !c.active.Load() {
return c.ReplicaClient.OpenLTXFile(ctx, level, minTXID, maxTXID, offset, size)
}
delay := time.Duration(c.rnd.Intn(5)) * time.Millisecond
if delay > 0 {
time.Sleep(delay)
}
if c.rnd.Float64() < 0.05 {
c.failures.Add(1)
return nil, context.DeadlineExceeded
}
rc, err := c.ReplicaClient.OpenLTXFile(ctx, level, minTXID, maxTXID, offset, size)
if err != nil {
return nil, err
}
if c.rnd.Float64() < 0.05 && size > 0 {
data, err := io.ReadAll(rc)
rc.Close()
if err != nil {
return nil, err
}
if len(data) > 32 {
data = data[:len(data)/2]
}
c.failures.Add(1)
return io.NopCloser(bytes.NewReader(data)), nil
}
return rc, nil
}
162 changes: 162 additions & 0 deletions cmd/litestream-vfs/fuzz_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
//go:build vfs
// +build vfs

package main_test

import (
"database/sql"
"fmt"
"os"
"strings"
"testing"
"time"

"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/internal/testingutil"
)

// TestVFS_FuzzSeedCorpus runs a handful of fixed corpora so `go test`
// exercises the same logic as the fuzz harness without requiring
// `-fuzz=...`.
func TestVFS_FuzzSeedCorpus(t *testing.T) {
seeds := [][]byte{
[]byte{0x00, 0x01, 0x02},
[]byte("litestream vfs fuzz"),
[]byte{0xFF, 0x10, 0x42, 0x7F},
}
for _, seed := range seeds {
runVFSFuzzWorkload(t, seed)
}
}

// FuzzVFSReplicaReadPatterns exercises random combinations of reads,
// aggregates, and ordering queries against the VFS replica. Enable with:
//
// go test ./cmd/litestream-vfs -tags vfs -fuzz=FuzzVFSReplicaReadPatterns
func FuzzVFSReplicaReadPatterns(f *testing.F) {
f.Add([]byte("seed"))
f.Add([]byte{0x1, 0x2, 0x3, 0x4})
f.Add([]byte{0xAA, 0xBB, 0xCC})

f.Fuzz(func(t *testing.T, data []byte) {
runVFSFuzzWorkload(t, data)
})
}

func runVFSFuzzWorkload(tb testing.TB, corpus []byte) {
tb.Helper()
if len(corpus) == 0 {
corpus = []byte{0}
}
if len(corpus) > 256 {
corpus = corpus[:256]
}

client := file.NewReplicaClient(tb.TempDir())
if err := os.MkdirAll(client.LTXLevelDir(0), 0o755); err != nil {
tb.Fatalf("init replica dir: %v", err)
}
db, primary := openReplicatedPrimary(tb, client, 15*time.Millisecond, 15*time.Millisecond)
defer testingutil.MustCloseSQLDB(tb, primary)

if _, err := primary.Exec(`CREATE TABLE fuzz (
id INTEGER PRIMARY KEY,
value TEXT,
grp INTEGER
)`); err != nil {
tb.Fatalf("create table: %v", err)
}

// Deterministic seed data so we have plenty of rows/pages to hydrate.
for i := 0; i < 128; i++ {
payload := fmt.Sprintf("row-%03d-%s", i, strings.Repeat("x", (i%17)+8))
if _, err := primary.Exec("INSERT INTO fuzz (value, grp) VALUES (?, ?)", payload, i%11); err != nil {
tb.Fatalf("seed insert: %v", err)
}
}
time.Sleep(5 * db.MonitorInterval)

vfs := newVFS(tb, client)
vfs.PollInterval = 15 * time.Millisecond
vfsName := registerTestVFS(tb, vfs)
replica := openVFSReplicaDB(tb, vfsName)
defer replica.Close()

deadline := time.Now().Add(5 * time.Second)
for {
var primaryCount, replicaCount int
if err := primary.QueryRow("SELECT COUNT(*) FROM fuzz").Scan(&primaryCount); err != nil {
tb.Fatalf("primary count: %v", err)
}
if err := replica.QueryRow("SELECT COUNT(*) FROM fuzz").Scan(&replicaCount); err == nil {
if primaryCount == replicaCount {
break
}
}
if time.Now().After(deadline) {
tb.Fatalf("replica never caught up: primary=%d", primaryCount)
}
time.Sleep(20 * time.Millisecond)
}

const maxOps = 128
for i := 0; i < len(corpus) && i < maxOps; i++ {
op := corpus[i] % 6
switch op {
case 0:
id := int(corpus[i])%128 + 1
var value string
err := replica.QueryRow("SELECT value FROM fuzz WHERE id = ?", id).Scan(&value)
if err != nil && err != sql.ErrNoRows {
tb.Fatalf("select by id: %v", err)
}
case 1:
var count int
if err := replica.QueryRow("SELECT COUNT(*) FROM fuzz WHERE grp = ?", int(corpus[i])%11).Scan(&count); err != nil {
tb.Fatalf("count grp: %v", err)
}
case 2:
rows, err := replica.Query("SELECT value FROM fuzz ORDER BY value DESC LIMIT 5 OFFSET ?", int(corpus[i])%10)
if err != nil {
tb.Fatalf("ordered scan: %v", err)
}
for rows.Next() {
var v string
if err := rows.Scan(&v); err != nil {
tb.Fatalf("scan ordered: %v", err)
}
}
if err := rows.Err(); err != nil {
tb.Fatalf("ordered rows err: %v", err)
}
rows.Close()
case 3:
var sum int
if err := replica.QueryRow("SELECT SUM(LENGTH(value)) FROM fuzz WHERE id BETWEEN ? AND ?",
int(corpus[i])%64+1, int(corpus[i])%64+64).Scan(&sum); err != nil {
tb.Fatalf("sum lengths: %v", err)
}
case 4:
// Cross-check counts between primary & replica for a random grp.
grp := int(corpus[i]) % 11
var pc, rc int
if err := primary.QueryRow("SELECT COUNT(*) FROM fuzz WHERE grp = ?", grp).Scan(&pc); err != nil {
tb.Fatalf("primary grp count: %v", err)
}
if err := replica.QueryRow("SELECT COUNT(*) FROM fuzz WHERE grp = ?", grp).Scan(&rc); err != nil {
tb.Fatalf("replica grp count: %v", err)
}
if pc != rc {
tb.Fatalf("count mismatch grp=%d primary=%d replica=%d", grp, pc, rc)
}
case 5:
// Random LIKE query to exercise page cache churn.
pattern := fmt.Sprintf("row-%%%02x%%", corpus[i])
rows, err := replica.Query("SELECT id FROM fuzz WHERE value LIKE ? LIMIT 3", pattern)
if err != nil {
tb.Fatalf("like query: %v", err)
}
rows.Close()
}
}
}
Loading