Skip to content

Commit ab4941a

Browse files
corylanouclaude
andauthored
feat(vfs): add PRAGMA-based time travel for querying historical database states (#850)
Co-authored-by: Claude <[email protected]>
1 parent e4e82ee commit ab4941a

File tree

7 files changed

+610
-29
lines changed

7 files changed

+610
-29
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
//go:build vfs
2+
// +build vfs
3+
4+
package main_test
5+
6+
import (
7+
"context"
8+
"database/sql"
9+
"fmt"
10+
"path/filepath"
11+
"testing"
12+
"time"
13+
14+
_ "github.com/mattn/go-sqlite3"
15+
"github.com/psanford/sqlite3vfs"
16+
17+
"github.com/benbjohnson/litestream"
18+
"github.com/benbjohnson/litestream/file"
19+
"github.com/benbjohnson/litestream/internal/testingutil"
20+
)
21+
22+
func TestVFS_TimeTravelFunctions(t *testing.T) {
23+
ctx := context.Background()
24+
client := file.NewReplicaClient(t.TempDir())
25+
vfs := newVFS(t, client)
26+
vfs.PollInterval = 50 * time.Millisecond
27+
if err := sqlite3vfs.RegisterVFS("litestream-time", vfs); err != nil {
28+
t.Fatalf("failed to register litestream vfs: %v", err)
29+
}
30+
31+
db := testingutil.NewDB(t, filepath.Join(t.TempDir(), "db"))
32+
db.MonitorInterval = 50 * time.Millisecond
33+
db.Replica = litestream.NewReplica(db)
34+
db.Replica.Client = client
35+
db.Replica.SyncInterval = 50 * time.Millisecond
36+
if err := db.Open(); err != nil {
37+
t.Fatal(err)
38+
}
39+
defer func() { _ = db.Close(ctx) }()
40+
41+
sqldb0 := testingutil.MustOpenSQLDB(t, db.Path())
42+
defer testingutil.MustCloseSQLDB(t, sqldb0)
43+
44+
if _, err := sqldb0.Exec("CREATE TABLE t (x INTEGER)"); err != nil {
45+
t.Fatal(err)
46+
}
47+
if _, err := sqldb0.Exec("INSERT INTO t (x) VALUES (100)"); err != nil {
48+
t.Fatal(err)
49+
}
50+
time.Sleep(6 * db.MonitorInterval)
51+
52+
firstCreatedAt := fetchLTXCreatedAt(t, ctx, client)
53+
54+
time.Sleep(20 * time.Millisecond) // Ensure a different timestamp for the next file.
55+
if _, err := sqldb0.Exec("UPDATE t SET x = 200"); err != nil {
56+
t.Fatal(err)
57+
}
58+
time.Sleep(4 * db.MonitorInterval)
59+
60+
sqldb1, err := sql.Open("sqlite3", "file:/tmp/time-travel.db?vfs=litestream-time")
61+
if err != nil {
62+
t.Fatalf("failed to open database: %v", err)
63+
}
64+
defer sqldb1.Close()
65+
sqldb1.SetMaxOpenConns(1)
66+
time.Sleep(2 * vfs.PollInterval)
67+
68+
var value int
69+
if err := sqldb1.QueryRow("SELECT x FROM t").Scan(&value); err != nil {
70+
t.Fatalf("query latest value: %v", err)
71+
} else if got, want := value, 200; got != want {
72+
t.Fatalf("latest value: got %d, want %d", got, want)
73+
}
74+
75+
target := firstCreatedAt.Add(1 * time.Millisecond).UTC().Format(time.RFC3339Nano)
76+
if _, err := sqldb1.Exec(fmt.Sprintf("PRAGMA LITESTREAM_TIME = '%s'", target)); err != nil {
77+
t.Fatalf("set target time: %v", err)
78+
}
79+
80+
if err := sqldb1.QueryRow("SELECT x FROM t").Scan(&value); err != nil {
81+
t.Fatalf("query historical value: %v", err)
82+
} else if got, want := value, 100; got != want {
83+
t.Fatalf("historical value: got %d, want %d", got, want)
84+
}
85+
86+
var currentTime string
87+
if err := sqldb1.QueryRow("PRAGMA litestream_time").Scan(&currentTime); err != nil {
88+
t.Fatalf("current time: %v", err)
89+
} else if currentTime != target {
90+
t.Fatalf("current time mismatch: got %s, want %s", currentTime, target)
91+
}
92+
93+
if _, err := sqldb1.Exec("PRAGMA LITESTREAM_TIME = LATEST"); err != nil {
94+
t.Fatalf("reset time: %v", err)
95+
}
96+
97+
if err := sqldb1.QueryRow("SELECT x FROM t").Scan(&value); err != nil {
98+
t.Fatalf("query reset value: %v", err)
99+
} else if got, want := value, 200; got != want {
100+
t.Fatalf("reset value: got %d, want %d", got, want)
101+
}
102+
103+
if err := sqldb1.QueryRow("PRAGMA litestream_time").Scan(&currentTime); err != nil {
104+
t.Fatalf("current time after reset: %v", err)
105+
} else if currentTime != "latest" {
106+
t.Fatalf("current time after reset mismatch: got %s, want latest", currentTime)
107+
}
108+
}
109+
110+
func fetchLTXCreatedAt(tb testing.TB, ctx context.Context, client litestream.ReplicaClient) time.Time {
111+
tb.Helper()
112+
113+
itr, err := client.LTXFiles(ctx, 0, 0, true)
114+
if err != nil {
115+
tb.Fatalf("ltx files: %v", err)
116+
}
117+
defer itr.Close()
118+
119+
var ts time.Time
120+
for itr.Next() {
121+
ts = itr.Item().CreatedAt
122+
}
123+
if err := itr.Close(); err != nil {
124+
tb.Fatalf("close iterator: %v", err)
125+
}
126+
if ts.IsZero() {
127+
tb.Fatalf("no ltx files found")
128+
}
129+
return ts.UTC()
130+
}

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ require (
2626
github.com/nats-io/nats.go v1.44.0
2727
github.com/pkg/sftp v1.13.6
2828
github.com/prometheus/client_golang v1.17.0
29+
github.com/psanford/sqlite3vfs v0.0.0-20240315230605-24e1d98cf361 // direct
2930
github.com/studio-b12/gowebdav v0.11.0
3031
github.com/superfly/ltx v0.5.0
3132
golang.org/x/crypto v0.41.0
@@ -98,7 +99,6 @@ require (
9899
github.com/prometheus/client_model v0.5.0 // indirect
99100
github.com/prometheus/common v0.45.0 // indirect
100101
github.com/prometheus/procfs v0.12.0 // indirect
101-
github.com/psanford/sqlite3vfs v0.0.0-20240315230605-24e1d98cf361 // direct
102102
github.com/spf13/cast v1.7.1 // indirect
103103
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
104104
go.opencensus.io v0.24.0 // indirect
@@ -119,3 +119,6 @@ require (
119119
google.golang.org/grpc v1.60.1 // indirect
120120
google.golang.org/protobuf v1.31.0 // indirect
121121
)
122+
123+
// TODO: Remove this replace directive once https://github.com/psanford/sqlite3vfs/pull/15 is merged
124+
replace github.com/psanford/sqlite3vfs => github.com/corylanou/sqlite3vfs v0.0.0-20251124192245-ee0c382650c8

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
8383
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
8484
github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k=
8585
github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
86+
github.com/corylanou/sqlite3vfs v0.0.0-20251124192245-ee0c382650c8 h1:0JEMTJClO0eYJXrsA/YWQqTyXVuunIddI2XWm33eNPw=
87+
github.com/corylanou/sqlite3vfs v0.0.0-20251124192245-ee0c382650c8/go.mod h1:iW4cSew5PAb1sMZiTEkVJAIBNrepaB6jTYjeP47WtI0=
8688
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
8789
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
8890
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -231,8 +233,6 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne
231233
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
232234
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
233235
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
234-
github.com/psanford/sqlite3vfs v0.0.0-20240315230605-24e1d98cf361 h1:vAKifIJuYY306ZJSrwDgKonWcJGELijdaenABqbV03E=
235-
github.com/psanford/sqlite3vfs v0.0.0-20240315230605-24e1d98cf361/go.mod h1:iW4cSew5PAb1sMZiTEkVJAIBNrepaB6jTYjeP47WtI0=
236236
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
237237
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
238238
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=

src/litestream-vfs.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,18 @@
22
#include "sqlite3.h"
33
#include "sqlite3ext.h"
44
#include <stdio.h>
5+
#include <stdlib.h>
56

67
/* sqlite3vfs already called SQLITE_EXTENSION_INIT1 */
78
extern const sqlite3_api_routines *sqlite3_api;
89

9-
extern void Sqlite3HTTPRegister();
10-
1110
// This routine is called when the extension is loaded.
1211
// Register the new VFS.
1312
int sqlite3_litestreamvfs_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) {
1413
int rc = SQLITE_OK;
1514
SQLITE_EXTENSION_INIT2(pApi);
1615

17-
// call into Go
16+
// call into Go to register the VFS
1817
LitestreamVFSRegister();
1918

2019
if( rc==SQLITE_OK ) rc = SQLITE_OK_LOAD_PERMANENTLY;

src/sqlite3vfs.h

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#ifndef SQLITE3VFS_H
2+
#define SQLITE3VFS_H
3+
4+
#ifdef SQLITE3VFS_LOADABLE_EXT
5+
#include "sqlite3ext.h"
6+
#else
7+
#include "sqlite3-binding.h"
8+
#endif
9+
10+
typedef struct s3vfsFile {
11+
sqlite3_file base; /* IO methods */
12+
sqlite3_uint64 id; /* Go object id */
13+
} s3vfsFile;
14+
15+
int s3vfsNew(char* name, int maxPathName);
16+
17+
int s3vfsClose(sqlite3_file*);
18+
int s3vfsRead(sqlite3_file*, void*, int iAmt, sqlite3_int64 iOfst);
19+
int s3vfsWrite(sqlite3_file*,const void*,int iAmt, sqlite3_int64 iOfst);
20+
int s3vfsTruncate(sqlite3_file*, sqlite3_int64 size);
21+
int s3vfsSync(sqlite3_file*, int flags);
22+
int s3vfsFileSize(sqlite3_file*, sqlite3_int64 *pSize);
23+
int s3vfsLock(sqlite3_file*, int);
24+
int s3vfsUnlock(sqlite3_file*, int);
25+
int s3vfsCheckReservedLock(sqlite3_file*, int *pResOut);
26+
int s3vfsFileControl(sqlite3_file*, int op, void *pArg);
27+
int s3vfsSectorSize(sqlite3_file*);
28+
int s3vfsDeviceCharacteristics(sqlite3_file*);
29+
int s3vfsShmMap(sqlite3_file*, int iPg, int pgsz, int, void volatile**);
30+
int s3vfsShmLock(sqlite3_file*, int offset, int n, int flags);
31+
void s3vfsShmBarrier(sqlite3_file*);
32+
int s3vfsShmUnmap(sqlite3_file*, int deleteFlag);
33+
int s3vfsFetch(sqlite3_file*, sqlite3_int64 iOfst, int iAmt, void **pp);
34+
int s3vfsUnfetch(sqlite3_file*, sqlite3_int64 iOfst, void *p);
35+
36+
37+
int s3vfsOpen(sqlite3_vfs*, const char *, sqlite3_file*, int , int *);
38+
int s3vfsDelete(sqlite3_vfs*, const char *, int);
39+
int s3vfsAccess(sqlite3_vfs*, const char *, int, int *);
40+
int s3vfsFullPathname(sqlite3_vfs*, const char *zName, int, char *zOut);
41+
void *s3vfsDlOpen(sqlite3_vfs*, const char *zFilename);
42+
void s3vfsDlError(sqlite3_vfs*, int nByte, char *zErrMsg);
43+
void (*s3vfsDlSym(sqlite3_vfs *pVfs, void *p, const char*zSym))(void);
44+
void s3vfsDlClose(sqlite3_vfs*, void*);
45+
int s3vfsRandomness(sqlite3_vfs*, int nByte, char *zOut);
46+
int s3vfsSleep(sqlite3_vfs*, int microseconds);
47+
int s3vfsCurrentTime(sqlite3_vfs*, double*);
48+
int s3vfsGetLastError(sqlite3_vfs*, int, char *);
49+
int s3vfsCurrentTimeInt64(sqlite3_vfs*, sqlite3_int64*);
50+
51+
const extern sqlite3_io_methods s3vfs_io_methods;
52+
53+
#endif /* SQLITE3_VFS */

0 commit comments

Comments
 (0)