diff --git a/cmd/litestream-vfs/main.go b/cmd/litestream-vfs/main.go index 3664b42a..246f3fc5 100644 --- a/cmd/litestream-vfs/main.go +++ b/cmd/litestream-vfs/main.go @@ -5,6 +5,11 @@ package main // import C is necessary export to the c-archive .a file +/* +#include +typedef int64_t sqlite3_int64; +typedef uint64_t sqlite3_uint64; +*/ import "C" import ( @@ -14,6 +19,7 @@ import ( "os" "strconv" "strings" + "unsafe" "github.com/psanford/sqlite3vfs" @@ -65,3 +71,72 @@ func LitestreamVFSRegister() { log.Fatalf("failed to register litestream vfs: %s", err) } } + +//export GoLitestreamRegisterConnection +func GoLitestreamRegisterConnection(dbPtr unsafe.Pointer, fileID C.sqlite3_uint64) *C.char { + if err := litestream.RegisterVFSConnection(uintptr(dbPtr), uint64(fileID)); err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export GoLitestreamUnregisterConnection +func GoLitestreamUnregisterConnection(dbPtr unsafe.Pointer) *C.char { + litestream.UnregisterVFSConnection(uintptr(dbPtr)) + return nil +} + +//export GoLitestreamSetTime +func GoLitestreamSetTime(dbPtr unsafe.Pointer, timestamp *C.char) *C.char { + if timestamp == nil { + return C.CString("timestamp required") + } + if err := litestream.SetVFSConnectionTime(uintptr(dbPtr), C.GoString(timestamp)); err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export GoLitestreamResetTime +func GoLitestreamResetTime(dbPtr unsafe.Pointer) *C.char { + if err := litestream.ResetVFSConnectionTime(uintptr(dbPtr)); err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export GoLitestreamTime +func GoLitestreamTime(dbPtr unsafe.Pointer, out **C.char) *C.char { + value, err := litestream.GetVFSConnectionTime(uintptr(dbPtr)) + if err != nil { + return C.CString(err.Error()) + } + if out != nil { + *out = C.CString(value) + } + return nil +} + +//export GoLitestreamTxid +func GoLitestreamTxid(dbPtr unsafe.Pointer, out **C.char) *C.char { + value, err := litestream.GetVFSConnectionTXID(uintptr(dbPtr)) + if err != nil { + return C.CString(err.Error()) + } + if out != nil { + *out = C.CString(value) + } + return nil +} + +//export GoLitestreamLag +func GoLitestreamLag(dbPtr unsafe.Pointer, out *C.sqlite3_int64) *C.char { + value, err := litestream.GetVFSConnectionLag(uintptr(dbPtr)) + if err != nil { + return C.CString(err.Error()) + } + if out != nil { + *out = C.sqlite3_int64(value) + } + return nil +} diff --git a/cmd/litestream-vfs/time_travel_test.go b/cmd/litestream-vfs/time_travel_test.go index 778fe7ff..dfc303ff 100644 --- a/cmd/litestream-vfs/time_travel_test.go +++ b/cmd/litestream-vfs/time_travel_test.go @@ -102,8 +102,188 @@ func TestVFS_TimeTravelFunctions(t *testing.T) { if err := sqldb1.QueryRow("PRAGMA litestream_time").Scan(¤tTime); err != nil { t.Fatalf("current time after reset: %v", err) - } else if currentTime != "latest" { - t.Fatalf("current time after reset mismatch: got %s, want latest", currentTime) + } + // After reset, should return actual LTX timestamp (not "latest" anymore per #853) + if _, err := time.Parse(time.RFC3339Nano, currentTime); err != nil { + t.Fatalf("current time after reset should be valid RFC3339Nano timestamp, got %s: %v", currentTime, err) + } +} + +func TestVFS_PragmaLitestreamTxid(t *testing.T) { + ctx := context.Background() + client := file.NewReplicaClient(t.TempDir()) + vfs := newVFS(t, client) + vfs.PollInterval = 50 * time.Millisecond + if err := sqlite3vfs.RegisterVFS("litestream-txid", vfs); err != nil { + t.Fatalf("failed to register litestream vfs: %v", err) + } + + db := testingutil.NewDB(t, filepath.Join(t.TempDir(), "db")) + db.MonitorInterval = 50 * time.Millisecond + db.Replica = litestream.NewReplica(db) + db.Replica.Client = client + db.Replica.SyncInterval = 50 * time.Millisecond + if err := db.Open(); err != nil { + t.Fatal(err) + } + defer func() { _ = db.Close(ctx) }() + + sqldb0 := testingutil.MustOpenSQLDB(t, db.Path()) + defer testingutil.MustCloseSQLDB(t, sqldb0) + + if _, err := sqldb0.Exec("CREATE TABLE t (x INTEGER)"); err != nil { + t.Fatal(err) + } + if _, err := sqldb0.Exec("INSERT INTO t (x) VALUES (100)"); err != nil { + t.Fatal(err) + } + time.Sleep(6 * db.MonitorInterval) + + sqldb1, err := sql.Open("sqlite3", "file:/tmp/txid-test.db?vfs=litestream-txid") + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer sqldb1.Close() + sqldb1.SetMaxOpenConns(1) + time.Sleep(2 * vfs.PollInterval) + + var txid int64 + if err := sqldb1.QueryRow("PRAGMA litestream_txid").Scan(&txid); err != nil { + t.Fatalf("query txid: %v", err) + } + if txid <= 0 { + t.Fatalf("expected positive TXID, got %d", txid) + } + + // Test that setting litestream_txid fails (read-only) + if _, err := sqldb1.Exec("PRAGMA litestream_txid = 123"); err == nil { + t.Fatal("expected error setting litestream_txid (read-only)") + } +} + +func TestVFS_PragmaLitestreamLag(t *testing.T) { + ctx := context.Background() + client := file.NewReplicaClient(t.TempDir()) + vfs := newVFS(t, client) + vfs.PollInterval = 50 * time.Millisecond + if err := sqlite3vfs.RegisterVFS("litestream-lag", vfs); err != nil { + t.Fatalf("failed to register litestream vfs: %v", err) + } + + db := testingutil.NewDB(t, filepath.Join(t.TempDir(), "db")) + db.MonitorInterval = 50 * time.Millisecond + db.Replica = litestream.NewReplica(db) + db.Replica.Client = client + db.Replica.SyncInterval = 50 * time.Millisecond + if err := db.Open(); err != nil { + t.Fatal(err) + } + defer func() { _ = db.Close(ctx) }() + + sqldb0 := testingutil.MustOpenSQLDB(t, db.Path()) + defer testingutil.MustCloseSQLDB(t, sqldb0) + + if _, err := sqldb0.Exec("CREATE TABLE t (x INTEGER)"); err != nil { + t.Fatal(err) + } + if _, err := sqldb0.Exec("INSERT INTO t (x) VALUES (100)"); err != nil { + t.Fatal(err) + } + time.Sleep(6 * db.MonitorInterval) + + sqldb1, err := sql.Open("sqlite3", "file:/tmp/lag-test.db?vfs=litestream-lag") + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer sqldb1.Close() + sqldb1.SetMaxOpenConns(1) + + // Wait for at least one successful poll (poll runs on ticker, not immediately) + time.Sleep(5 * vfs.PollInterval) + + var lag int64 + if err := sqldb1.QueryRow("PRAGMA litestream_lag").Scan(&lag); err != nil { + t.Fatalf("query lag: %v", err) + } + // Lag should be -1 (never polled) or a small non-negative value after polling + // -1 is valid if no poll has succeeded yet + if lag < -1 || lag > 5 { + t.Fatalf("expected lag between -1 and 5 seconds, got %d", lag) + } + + // Test that setting litestream_lag fails (read-only) + if _, err := sqldb1.Exec("PRAGMA litestream_lag = 123"); err == nil { + t.Fatal("expected error setting litestream_lag (read-only)") + } +} + +func TestVFS_PragmaRelativeTime(t *testing.T) { + ctx := context.Background() + client := file.NewReplicaClient(t.TempDir()) + vfs := newVFS(t, client) + vfs.PollInterval = 50 * time.Millisecond + if err := sqlite3vfs.RegisterVFS("litestream-relative", vfs); err != nil { + t.Fatalf("failed to register litestream vfs: %v", err) + } + + db := testingutil.NewDB(t, filepath.Join(t.TempDir(), "db")) + db.MonitorInterval = 50 * time.Millisecond + db.Replica = litestream.NewReplica(db) + db.Replica.Client = client + db.Replica.SyncInterval = 50 * time.Millisecond + if err := db.Open(); err != nil { + t.Fatal(err) + } + defer func() { _ = db.Close(ctx) }() + + sqldb0 := testingutil.MustOpenSQLDB(t, db.Path()) + defer testingutil.MustCloseSQLDB(t, sqldb0) + + if _, err := sqldb0.Exec("CREATE TABLE t (x INTEGER)"); err != nil { + t.Fatal(err) + } + if _, err := sqldb0.Exec("INSERT INTO t (x) VALUES (100)"); err != nil { + t.Fatal(err) + } + time.Sleep(6 * db.MonitorInterval) + + sqldb1, err := sql.Open("sqlite3", "file:/tmp/relative-test.db?vfs=litestream-relative") + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer sqldb1.Close() + sqldb1.SetMaxOpenConns(1) + time.Sleep(2 * vfs.PollInterval) + + // Test that relative time parsing works (even if no data exists at that time) + // The parse should succeed, but may return "no backup files available" if too far in past + now := time.Now() + _, err = sqldb1.Exec("PRAGMA litestream_time = '1 second ago'") + // This might fail if no LTX files exist at that time, which is expected + // The important thing is that the parsing worked (not a "parse timestamp" error) + if err != nil && err.Error() != "no backup files available" { + // Check if it's a parse error vs a "no files" error + if err.Error() == "invalid timestamp (expected RFC3339 or relative time like '5 minutes ago'): 1 second ago" { + t.Fatalf("relative time parsing failed: %v", err) + } + } + + // Reset to latest + if _, err := sqldb1.Exec("PRAGMA litestream_time = LATEST"); err != nil { + t.Fatalf("reset to latest: %v", err) + } + + // Verify the current time is recent (within last minute) + var currentTime string + if err := sqldb1.QueryRow("PRAGMA litestream_time").Scan(¤tTime); err != nil { + t.Fatalf("query current time: %v", err) + } + ts, err := time.Parse(time.RFC3339Nano, currentTime) + if err != nil { + t.Fatalf("parse current time: %v", err) + } + if now.Sub(ts) > time.Minute { + t.Fatalf("current time too old: %v (now: %v)", ts, now) } } diff --git a/go.mod b/go.mod index a24cb8ea..db7a0388 100644 --- a/go.mod +++ b/go.mod @@ -86,9 +86,14 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/hablullah/go-hijri v1.0.2 // indirect + github.com/hablullah/go-juliandays v1.0.0 // indirect + github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/kr/fs v0.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect + github.com/magefile/mage v1.14.0 // indirect + github.com/markusmobius/go-dateparser v1.2.4 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect @@ -100,6 +105,8 @@ require ( github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/spf13/cast v1.7.1 // indirect + github.com/tetratelabs/wazero v1.2.1 // indirect + github.com/wasilibs/go-re2 v1.3.0 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect diff --git a/go.sum b/go.sum index 5ca79b76..2af048a3 100644 --- a/go.sum +++ b/go.sum @@ -159,10 +159,16 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/hablullah/go-hijri v1.0.2 h1:drT/MZpSZJQXo7jftf5fthArShcaMtsal0Zf/dnmp6k= +github.com/hablullah/go-hijri v1.0.2/go.mod h1:OS5qyYLDjORXzK4O1adFw9Q5WfhOcMdAKglDkcTxgWQ= +github.com/hablullah/go-juliandays v1.0.0 h1:A8YM7wIj16SzlKT0SRJc9CD29iiaUzpBLzh5hr0/5p0= +github.com/hablullah/go-juliandays v1.0.0/go.mod h1:0JOYq4oFOuDja+oospuc61YoX+uNEn7Z6uHYTbBzdGc= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8= github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI= +github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958 h1:qxLoi6CAcXVzjfvu+KXIXJOAsQB62LXjsfbOaErsVzE= +github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958/go.mod h1:Wqfu7mjUHj9WDzSSPI5KfBclTTEnLveRUFr/ujWnTgE= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/keybase/go-keychain v0.0.1 h1:way+bWYa6lDppZoZcgMbYsvC7GxljxrskdNInRtuthU= @@ -179,8 +185,12 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= +github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mark3labs/mcp-go v0.32.0 h1:fgwmbfL2gbd67obg57OfV2Dnrhs1HtSdlY/i5fn7MU8= github.com/mark3labs/mcp-go v0.32.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4= +github.com/markusmobius/go-dateparser v1.2.4 h1:2e8XJozaERVxGwsRg72coi51L2aiYqE2gukkdLc85ck= +github.com/markusmobius/go-dateparser v1.2.4/go.mod h1:CBAUADJuMNhJpyM6IYaWAoFhtKaqnUcznY2cL7gNugY= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= @@ -256,6 +266,10 @@ github.com/studio-b12/gowebdav v0.11.0 h1:qbQzq4USxY28ZYsGJUfO5jR+xkFtcnwWgitp4Z github.com/studio-b12/gowebdav v0.11.0/go.mod h1:bHA7t77X/QFExdeAnDzK6vKM34kEZAcE1OX4MfiwjkE= github.com/superfly/ltx v0.5.0 h1:dXNrcT3ZtMb6iKZopIV7z5UBscnapg0b0F02loQsk5o= github.com/superfly/ltx v0.5.0/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s= +github.com/tetratelabs/wazero v1.2.1 h1:J4X2hrGzJvt+wqltuvcSjHQ7ujQxA9gb6PeMs4qlUWs= +github.com/tetratelabs/wazero v1.2.1/go.mod h1:wYx2gNRg8/WihJfSDxA1TIL8H+GkfLYm+bIfbblu9VQ= +github.com/wasilibs/go-re2 v1.3.0 h1:LFhBNzoStM3wMie6rN2slD1cuYH2CGiHpvNL3UtcsMw= +github.com/wasilibs/go-re2 v1.3.0/go.mod h1:AafrCXVvGRJJOImMajgJ2M7rVmWyisVK7sFshbxnVrg= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= diff --git a/src/litestream-vfs.c b/src/litestream-vfs.c index f0204900..582a651d 100644 --- a/src/litestream-vfs.c +++ b/src/litestream-vfs.c @@ -1,21 +1,177 @@ #include "litestream-vfs.h" #include "sqlite3.h" #include "sqlite3ext.h" +#include "sqlite3vfs.h" #include #include /* sqlite3vfs already called SQLITE_EXTENSION_INIT1 */ extern const sqlite3_api_routines *sqlite3_api; -// This routine is called when the extension is loaded. -// Register the new VFS. +/* Go function declarations */ +extern char* GoLitestreamRegisterConnection(void* db, sqlite3_uint64 file_id); +extern char* GoLitestreamUnregisterConnection(void* db); +extern char* GoLitestreamSetTime(void* db, const char* timestamp); +extern char* GoLitestreamResetTime(void* db); +extern char* GoLitestreamTime(void* db, char** out); +extern char* GoLitestreamTxid(void* db, char** out); +extern char* GoLitestreamLag(void* db, sqlite3_int64* out); + +/* Internal function declarations */ +static int litestream_register_connection(sqlite3* db); +static void litestream_set_time_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv); +static void litestream_time_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv); +static void litestream_txid_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv); +static void litestream_lag_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv); +static void litestream_function_destroy(void* db); +static void litestream_auto_extension(sqlite3* db, const char** pzErrMsg, const struct sqlite3_api_routines* pApi); + +/* This routine is called when the extension is loaded. */ int sqlite3_litestreamvfs_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) { int rc = SQLITE_OK; SQLITE_EXTENSION_INIT2(pApi); - // call into Go to register the VFS + /* call into Go to register the VFS */ LitestreamVFSRegister(); + /* Register SQL functions for new connections. */ + rc = sqlite3_auto_extension((void (*)(void))litestream_auto_extension); + if( rc==SQLITE_OK ) rc = SQLITE_OK_LOAD_PERMANENTLY; return rc; } + +static void litestream_auto_extension(sqlite3* db, const char** pzErrMsg, const struct sqlite3_api_routines* pApi) { + (void)pzErrMsg; + (void)pApi; + + if (litestream_register_connection(db) != SQLITE_OK) { + return; + } + + /* litestream_set_time(timestamp) - for time travel */ + sqlite3_create_function_v2(db, "litestream_set_time", 1, SQLITE_UTF8 | SQLITE_DIRECTONLY, db, litestream_set_time_impl, 0, 0, litestream_function_destroy); + + /* Read-only functions: litestream_time(), litestream_txid(), litestream_lag() */ + sqlite3_create_function_v2(db, "litestream_time", 0, SQLITE_UTF8, db, litestream_time_impl, 0, 0, 0); + sqlite3_create_function_v2(db, "litestream_txid", 0, SQLITE_UTF8, db, litestream_txid_impl, 0, 0, 0); + sqlite3_create_function_v2(db, "litestream_lag", 0, SQLITE_UTF8, db, litestream_lag_impl, 0, 0, 0); +} + +static int litestream_register_connection(sqlite3* db) { + sqlite3_file* file = 0; + int rc = sqlite3_file_control(db, "main", SQLITE_FCNTL_FILE_POINTER, &file); + if (rc != SQLITE_OK || file == 0) { + return rc; + } + + if (file->pMethods != &s3vfs_io_methods) { + /* Not using the litestream VFS. */ + return SQLITE_DONE; + } + + sqlite3_uint64 file_id = ((s3vfsFile*)file)->id; + char* err = GoLitestreamRegisterConnection(db, file_id); + if (err != 0) { + free(err); + return SQLITE_ERROR; + } + + return SQLITE_OK; +} + +static void litestream_set_time_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv) { + if (argc != 1) { + sqlite3_result_error(ctx, "expected timestamp argument", -1); + return; + } + + const unsigned char* ts = sqlite3_value_text(argv[0]); + if (!ts) { + sqlite3_result_error(ctx, "timestamp required", -1); + return; + } + + /* Handle special 'LATEST' value */ + if (sqlite3_stricmp((const char*)ts, "LATEST") == 0) { + sqlite3* db = sqlite3_context_db_handle(ctx); + char* err = GoLitestreamResetTime(db); + if (err != 0) { + sqlite3_result_error(ctx, err, -1); + free(err); + return; + } + sqlite3_result_null(ctx); + return; + } + + sqlite3* db = sqlite3_context_db_handle(ctx); + char* err = GoLitestreamSetTime(db, (const char*)ts); + if (err != 0) { + sqlite3_result_error(ctx, err, -1); + free(err); + return; + } + + sqlite3_result_null(ctx); +} + +static void litestream_time_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv) { + (void)argc; + (void)argv; + + sqlite3* db = sqlite3_context_db_handle(ctx); + char* out = 0; + char* err = GoLitestreamTime(db, &out); + if (err != 0) { + sqlite3_result_error(ctx, err, -1); + free(err); + return; + } + + sqlite3_result_text(ctx, out, -1, SQLITE_TRANSIENT); + free(out); +} + +static void litestream_txid_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv) { + (void)argc; + (void)argv; + + sqlite3* db = sqlite3_context_db_handle(ctx); + char* out = 0; + char* err = GoLitestreamTxid(db, &out); + if (err != 0) { + sqlite3_result_error(ctx, err, -1); + free(err); + return; + } + + sqlite3_result_text(ctx, out, -1, SQLITE_TRANSIENT); + free(out); +} + +static void litestream_lag_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv) { + (void)argc; + (void)argv; + + sqlite3* db = sqlite3_context_db_handle(ctx); + sqlite3_int64 out = 0; + char* err = GoLitestreamLag(db, &out); + if (err != 0) { + sqlite3_result_error(ctx, err, -1); + free(err); + return; + } + + sqlite3_result_int64(ctx, out); +} + +static void litestream_function_destroy(void* db) { + if (db == 0) { + return; + } + char* err = GoLitestreamUnregisterConnection(db); + if (err != 0) { + free(err); + } +} diff --git a/vfs.go b/vfs.go index 2984d70e..65c78faa 100644 --- a/vfs.go +++ b/vfs.go @@ -9,11 +9,14 @@ import ( "errors" "fmt" "log/slog" + "strconv" "strings" "sync" "time" + _ "unsafe" lru "github.com/hashicorp/golang-lru/v2" + "github.com/markusmobius/go-dateparser" "github.com/psanford/sqlite3vfs" "github.com/superfly/ltx" ) @@ -23,6 +26,16 @@ const ( DefaultCacheSize = 10 * 1024 * 1024 // 10MB ) +var ( + //go:linkname sqlite3vfsFileMap github.com/psanford/sqlite3vfs.fileMap + sqlite3vfsFileMap map[uint64]sqlite3vfs.File + + //go:linkname sqlite3vfsFileMux github.com/psanford/sqlite3vfs.fileMux + sqlite3vfsFileMux sync.Mutex + + vfsConnectionMap sync.Map // map[uintptr]uint64 +) + // VFS implements the SQLite VFS interface for Litestream. // It is intended to be used for read replicas that read directly from S3. type VFS struct { @@ -98,13 +111,15 @@ type VFSFile struct { client ReplicaClient name string - pos ltx.Pos // Last TXID read from level 0 or 1 - maxTXID1 ltx.TXID // Last TXID read from level 1 - index map[uint32]ltx.PageIndexElem - pending map[uint32]ltx.PageIndexElem - cache *lru.Cache[uint32, []byte] // LRU cache for page data - targetTime *time.Time // Target view time; nil means latest - lockType sqlite3vfs.LockType // Current lock state + pos ltx.Pos // Last TXID read from level 0 or 1 + maxTXID1 ltx.TXID // Last TXID read from level 1 + index map[uint32]ltx.PageIndexElem + pending map[uint32]ltx.PageIndexElem + cache *lru.Cache[uint32, []byte] // LRU cache for page data + targetTime *time.Time // Target view time; nil means latest + latestLTXTime time.Time // Timestamp of most recent LTX file + lastPollSuccess time.Time // Time of last successful poll + lockType sqlite3vfs.LockType // Current lock state wg sync.WaitGroup ctx context.Context @@ -162,6 +177,20 @@ func (f *VFSFile) LockType() sqlite3vfs.LockType { return f.lockType } +// LatestLTXTime returns the timestamp of the most recent LTX file. +func (f *VFSFile) LatestLTXTime() time.Time { + f.mu.Lock() + defer f.mu.Unlock() + return f.latestLTXTime +} + +// LastPollSuccess returns the time of the last successful poll. +func (f *VFSFile) LastPollSuccess() time.Time { + f.mu.Lock() + defer f.mu.Unlock() + return f.lastPollSuccess +} + func (f *VFSFile) Open() error { f.logger.Info("opening file") @@ -247,6 +276,9 @@ func (f *VFSFile) rebuildIndex(ctx context.Context, infos []*ltx.FileInfo, targe f.pending = make(map[uint32]ltx.PageIndexElem) f.pos = pos f.maxTXID1 = maxTXID1 + if len(infos) > 0 { + f.latestLTXTime = infos[len(infos)-1].CreatedAt + } if f.cache != nil { f.cache.Purge() } @@ -420,6 +452,32 @@ func (f *VFSFile) DeviceCharacteristics() sqlite3vfs.DeviceCharacteristic { return 0 } +// parseTimeValue parses a timestamp string, trying RFC3339 first, then relative expressions. +func parseTimeValue(value string) (time.Time, error) { + // Try RFC3339Nano first (existing behavior) + if t, err := time.Parse(time.RFC3339Nano, value); err == nil { + return t, nil + } + + // Try RFC3339 (without nanoseconds) + if t, err := time.Parse(time.RFC3339, value); err == nil { + return t, nil + } + + // Fall back to dateparser for relative expressions + cfg := &dateparser.Configuration{ + CurrentTime: time.Now().UTC(), + } + result, err := dateparser.Parse(cfg, value) + if err != nil { + return time.Time{}, fmt.Errorf("invalid timestamp (expected RFC3339 or relative time like '5 minutes ago'): %s", value) + } + if result.Time.IsZero() { + return time.Time{}, fmt.Errorf("could not parse time: %s", value) + } + return result.Time.UTC(), nil +} + // FileControl handles file control operations, specifically PRAGMA commands for time travel. func (f *VFSFile) FileControl(op int, pragmaName string, pragmaValue *string) (*string, error) { const SQLITE_FCNTL_PRAGMA = 14 @@ -433,6 +491,27 @@ func (f *VFSFile) FileControl(op int, pragmaName string, pragmaValue *string) (* f.logger.Info("file control", "pragma", name, "value", pragmaValue) switch name { + case "litestream_txid": + if pragmaValue != nil { + return nil, fmt.Errorf("litestream_txid is read-only") + } + txid := f.Pos().TXID + result := txid.String() + return &result, nil + + case "litestream_lag": + if pragmaValue != nil { + return nil, fmt.Errorf("litestream_lag is read-only") + } + lastPoll := f.LastPollSuccess() + if lastPoll.IsZero() { + result := "-1" // Never polled successfully + return &result, nil + } + lag := int64(time.Since(lastPoll).Seconds()) + result := strconv.FormatInt(lag, 10) + return &result, nil + case "litestream_time": if pragmaValue == nil { result := f.currentTimeString() @@ -446,9 +525,9 @@ func (f *VFSFile) FileControl(op int, pragmaName string, pragmaValue *string) (* return nil, nil } - t, err := time.Parse(time.RFC3339Nano, *pragmaValue) + t, err := parseTimeValue(*pragmaValue) if err != nil { - return nil, fmt.Errorf("parse timestamp: %w", err) + return nil, err } if err := f.SetTargetTime(context.Background(), t); err != nil { return nil, err @@ -465,7 +544,10 @@ func (f *VFSFile) currentTimeString() string { if t := f.TargetTime(); t != nil { return t.Format(time.RFC3339Nano) } - return "latest" + if t := f.LatestLTXTime(); !t.IsZero() { + return t.Format(time.RFC3339Nano) + } + return "latest" // Fallback if no LTX files loaded } func (f *VFSFile) monitorReplicaClient(ctx context.Context) { @@ -485,6 +567,11 @@ func (f *VFSFile) monitorReplicaClient(ctx context.Context) { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { f.logger.Error("cannot fetch new ltx files", "error", err) } + } else { + // Track successful poll time + f.mu.Lock() + f.lastPollSuccess = time.Now() + f.mu.Unlock() } } } @@ -497,14 +584,14 @@ func (f *VFSFile) pollReplicaClient(ctx context.Context) error { index := make(map[uint32]ltx.PageIndexElem) f.logger.Debug("polling replica client", "txid", pos.TXID.String()) - maxTXID0, err := f.pollLevel(ctx, 0, pos.TXID, index) + result0, err := f.pollLevel(ctx, 0, pos.TXID, index) if err != nil { return fmt.Errorf("poll L0: %w", err) } - maxTXID1, err := f.pollLevel(ctx, 1, f.maxTXID1, index) + result1, err := f.pollLevel(ctx, 1, f.maxTXID1, index) if err != nil { - return fmt.Errorf("poll L0: %w", err) + return fmt.Errorf("poll L1: %w", err) } // Send updates to a pending list if there are active readers. @@ -538,10 +625,18 @@ func (f *VFSFile) pollReplicaClient(ctx context.Context) error { } // Update to max TXID - f.pos.TXID = max(maxTXID0, maxTXID1) - f.maxTXID1 = maxTXID1 + f.pos.TXID = max(result0.maxTXID, result1.maxTXID) + f.maxTXID1 = result1.maxTXID f.logger.Debug("txid updated", "txid", f.pos.TXID.String(), "maxTXID1", f.maxTXID1.String()) + // Update latestLTXTime to the most recent timestamp from either level + if !result0.createdAt.IsZero() && result0.createdAt.After(f.latestLTXTime) { + f.latestLTXTime = result0.createdAt + } + if !result1.createdAt.IsZero() && result1.createdAt.After(f.latestLTXTime) { + f.latestLTXTime = result1.createdAt + } + return nil } @@ -561,24 +656,30 @@ func maxLevelTXID(infos []*ltx.FileInfo, level int) ltx.TXID { return maxTXID } -func (f *VFSFile) pollLevel(ctx context.Context, level int, prevMaxTXID ltx.TXID, index map[uint32]ltx.PageIndexElem) (ltx.TXID, error) { +// pollLevelResult contains the results of polling a single level. +type pollLevelResult struct { + maxTXID ltx.TXID + createdAt time.Time // CreatedAt of the most recent LTX file processed +} + +func (f *VFSFile) pollLevel(ctx context.Context, level int, prevMaxTXID ltx.TXID, index map[uint32]ltx.PageIndexElem) (pollLevelResult, error) { // Start reading from the next LTX file after the current position. itr, err := f.client.LTXFiles(ctx, level, prevMaxTXID+1, false) if err != nil { - return 0, fmt.Errorf("ltx files: %w", err) + return pollLevelResult{}, fmt.Errorf("ltx files: %w", err) } // Build an update across all new LTX files. - maxTXID := prevMaxTXID + result := pollLevelResult{maxTXID: prevMaxTXID} for itr.Next() { info := itr.Item() // Ensure we are fetching the next transaction from our current position. f.mu.Lock() - isNextTXID := info.MinTXID == maxTXID+1 + isNextTXID := info.MinTXID == result.maxTXID+1 f.mu.Unlock() if !isNextTXID { - return maxTXID, fmt.Errorf("non-contiguous ltx file: level=%d, current=%s, next=%s-%s", level, prevMaxTXID, info.MinTXID, info.MaxTXID) + return result, fmt.Errorf("non-contiguous ltx file: level=%d, current=%s, next=%s-%s", level, prevMaxTXID, info.MinTXID, info.MaxTXID) } f.logger.Debug("new ltx file", "level", info.Level, "min", info.MinTXID, "max", info.MaxTXID) @@ -586,7 +687,7 @@ func (f *VFSFile) pollLevel(ctx context.Context, level int, prevMaxTXID ltx.TXID // Read page index. idx, err := FetchPageIndex(context.Background(), f.client, info) if err != nil { - return maxTXID, fmt.Errorf("fetch page index: %w", err) + return result, fmt.Errorf("fetch page index: %w", err) } // Update the page index & current position. @@ -594,8 +695,106 @@ func (f *VFSFile) pollLevel(ctx context.Context, level int, prevMaxTXID ltx.TXID f.logger.Debug("adding new page index", "page", k, "elem", v) index[k] = v } - maxTXID = info.MaxTXID + result.maxTXID = info.MaxTXID + result.createdAt = info.CreatedAt + } + + return result, nil +} + +// RegisterVFSConnection maps a SQLite connection handle to its VFS file ID. +func RegisterVFSConnection(dbPtr uintptr, fileID uint64) error { + if _, ok := lookupVFSFile(fileID); !ok { + return fmt.Errorf("vfs file not found: id=%d", fileID) + } + vfsConnectionMap.Store(dbPtr, fileID) + return nil +} + +// UnregisterVFSConnection removes a connection mapping. +func UnregisterVFSConnection(dbPtr uintptr) { + vfsConnectionMap.Delete(dbPtr) +} + +// SetVFSConnectionTime rebuilds the VFS index for a connection at a timestamp. +func SetVFSConnectionTime(dbPtr uintptr, timestamp string) error { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return err + } + + t, err := parseTimeValue(timestamp) + if err != nil { + return err + } + return file.SetTargetTime(context.Background(), t) +} + +// ResetVFSConnectionTime rebuilds the VFS index to the latest state. +func ResetVFSConnectionTime(dbPtr uintptr) error { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return err + } + return file.ResetTime(context.Background()) +} + +// GetVFSConnectionTime returns the current time for a connection. +func GetVFSConnectionTime(dbPtr uintptr) (string, error) { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return "", err + } + return file.currentTimeString(), nil +} + +// GetVFSConnectionTXID returns the current transaction ID for a connection as a hex string. +func GetVFSConnectionTXID(dbPtr uintptr) (string, error) { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return "", err + } + return file.Pos().TXID.String(), nil +} + +// GetVFSConnectionLag returns seconds since last successful poll for a connection. +func GetVFSConnectionLag(dbPtr uintptr) (int64, error) { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return 0, err + } + lastPoll := file.LastPollSuccess() + if lastPoll.IsZero() { + return -1, nil + } + return int64(time.Since(lastPoll).Seconds()), nil +} + +func vfsFileForConnection(dbPtr uintptr) (*VFSFile, error) { + v, ok := vfsConnectionMap.Load(dbPtr) + if !ok { + return nil, fmt.Errorf("connection not registered") + } + fileID, ok := v.(uint64) + if !ok { + return nil, fmt.Errorf("invalid connection mapping") + } + file, ok := lookupVFSFile(fileID) + if !ok { + return nil, fmt.Errorf("vfs file not found: id=%d", fileID) + } + return file, nil +} + +func lookupVFSFile(fileID uint64) (*VFSFile, bool) { + sqlite3vfsFileMux.Lock() + defer sqlite3vfsFileMux.Unlock() + + file, ok := sqlite3vfsFileMap[fileID] + if !ok { + return nil, false } - return maxTXID, nil + vfsFile, ok := file.(*VFSFile) + return vfsFile, ok }