From 44536277f7e7eede684c7c90f6195b10fe0f90a1 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Mon, 24 Nov 2025 15:07:31 -0600 Subject: [PATCH 1/5] feat(vfs): add observability PRAGMAs and relative time parsing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add VFS observability features for monitoring replica health: - PRAGMA litestream_txid: Return current transaction ID - PRAGMA litestream_lag: Return seconds since last successful poll - Extend PRAGMA litestream_time to return actual LTX timestamp instead of "latest" when viewing current data - Add relative time parsing for litestream_time (e.g., "5 minutes ago", "yesterday") using go-dateparser library Fixes #851, #852, #853, #854 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- cmd/litestream-vfs/time_travel_test.go | 184 ++++++++++++++++++++++++- go.mod | 7 + go.sum | 14 ++ vfs.go | 96 +++++++++++-- 4 files changed, 289 insertions(+), 12 deletions(-) diff --git a/cmd/litestream-vfs/time_travel_test.go b/cmd/litestream-vfs/time_travel_test.go index 778fe7ff..dfc303ff 100644 --- a/cmd/litestream-vfs/time_travel_test.go +++ b/cmd/litestream-vfs/time_travel_test.go @@ -102,8 +102,188 @@ func TestVFS_TimeTravelFunctions(t *testing.T) { if err := sqldb1.QueryRow("PRAGMA litestream_time").Scan(¤tTime); err != nil { t.Fatalf("current time after reset: %v", err) - } else if currentTime != "latest" { - t.Fatalf("current time after reset mismatch: got %s, want latest", currentTime) + } + // After reset, should return actual LTX timestamp (not "latest" anymore per #853) + if _, err := time.Parse(time.RFC3339Nano, currentTime); err != nil { + t.Fatalf("current time after reset should be valid RFC3339Nano timestamp, got %s: %v", currentTime, err) + } +} + +func TestVFS_PragmaLitestreamTxid(t *testing.T) { + ctx := context.Background() + client := file.NewReplicaClient(t.TempDir()) + vfs := newVFS(t, client) + vfs.PollInterval = 50 * time.Millisecond + if err := sqlite3vfs.RegisterVFS("litestream-txid", vfs); err != nil { + t.Fatalf("failed to register litestream vfs: %v", err) + } + + db := testingutil.NewDB(t, filepath.Join(t.TempDir(), "db")) + db.MonitorInterval = 50 * time.Millisecond + db.Replica = litestream.NewReplica(db) + db.Replica.Client = client + db.Replica.SyncInterval = 50 * time.Millisecond + if err := db.Open(); err != nil { + t.Fatal(err) + } + defer func() { _ = db.Close(ctx) }() + + sqldb0 := testingutil.MustOpenSQLDB(t, db.Path()) + defer testingutil.MustCloseSQLDB(t, sqldb0) + + if _, err := sqldb0.Exec("CREATE TABLE t (x INTEGER)"); err != nil { + t.Fatal(err) + } + if _, err := sqldb0.Exec("INSERT INTO t (x) VALUES (100)"); err != nil { + t.Fatal(err) + } + time.Sleep(6 * db.MonitorInterval) + + sqldb1, err := sql.Open("sqlite3", "file:/tmp/txid-test.db?vfs=litestream-txid") + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer sqldb1.Close() + sqldb1.SetMaxOpenConns(1) + time.Sleep(2 * vfs.PollInterval) + + var txid int64 + if err := sqldb1.QueryRow("PRAGMA litestream_txid").Scan(&txid); err != nil { + t.Fatalf("query txid: %v", err) + } + if txid <= 0 { + t.Fatalf("expected positive TXID, got %d", txid) + } + + // Test that setting litestream_txid fails (read-only) + if _, err := sqldb1.Exec("PRAGMA litestream_txid = 123"); err == nil { + t.Fatal("expected error setting litestream_txid (read-only)") + } +} + +func TestVFS_PragmaLitestreamLag(t *testing.T) { + ctx := context.Background() + client := file.NewReplicaClient(t.TempDir()) + vfs := newVFS(t, client) + vfs.PollInterval = 50 * time.Millisecond + if err := sqlite3vfs.RegisterVFS("litestream-lag", vfs); err != nil { + t.Fatalf("failed to register litestream vfs: %v", err) + } + + db := testingutil.NewDB(t, filepath.Join(t.TempDir(), "db")) + db.MonitorInterval = 50 * time.Millisecond + db.Replica = litestream.NewReplica(db) + db.Replica.Client = client + db.Replica.SyncInterval = 50 * time.Millisecond + if err := db.Open(); err != nil { + t.Fatal(err) + } + defer func() { _ = db.Close(ctx) }() + + sqldb0 := testingutil.MustOpenSQLDB(t, db.Path()) + defer testingutil.MustCloseSQLDB(t, sqldb0) + + if _, err := sqldb0.Exec("CREATE TABLE t (x INTEGER)"); err != nil { + t.Fatal(err) + } + if _, err := sqldb0.Exec("INSERT INTO t (x) VALUES (100)"); err != nil { + t.Fatal(err) + } + time.Sleep(6 * db.MonitorInterval) + + sqldb1, err := sql.Open("sqlite3", "file:/tmp/lag-test.db?vfs=litestream-lag") + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer sqldb1.Close() + sqldb1.SetMaxOpenConns(1) + + // Wait for at least one successful poll (poll runs on ticker, not immediately) + time.Sleep(5 * vfs.PollInterval) + + var lag int64 + if err := sqldb1.QueryRow("PRAGMA litestream_lag").Scan(&lag); err != nil { + t.Fatalf("query lag: %v", err) + } + // Lag should be -1 (never polled) or a small non-negative value after polling + // -1 is valid if no poll has succeeded yet + if lag < -1 || lag > 5 { + t.Fatalf("expected lag between -1 and 5 seconds, got %d", lag) + } + + // Test that setting litestream_lag fails (read-only) + if _, err := sqldb1.Exec("PRAGMA litestream_lag = 123"); err == nil { + t.Fatal("expected error setting litestream_lag (read-only)") + } +} + +func TestVFS_PragmaRelativeTime(t *testing.T) { + ctx := context.Background() + client := file.NewReplicaClient(t.TempDir()) + vfs := newVFS(t, client) + vfs.PollInterval = 50 * time.Millisecond + if err := sqlite3vfs.RegisterVFS("litestream-relative", vfs); err != nil { + t.Fatalf("failed to register litestream vfs: %v", err) + } + + db := testingutil.NewDB(t, filepath.Join(t.TempDir(), "db")) + db.MonitorInterval = 50 * time.Millisecond + db.Replica = litestream.NewReplica(db) + db.Replica.Client = client + db.Replica.SyncInterval = 50 * time.Millisecond + if err := db.Open(); err != nil { + t.Fatal(err) + } + defer func() { _ = db.Close(ctx) }() + + sqldb0 := testingutil.MustOpenSQLDB(t, db.Path()) + defer testingutil.MustCloseSQLDB(t, sqldb0) + + if _, err := sqldb0.Exec("CREATE TABLE t (x INTEGER)"); err != nil { + t.Fatal(err) + } + if _, err := sqldb0.Exec("INSERT INTO t (x) VALUES (100)"); err != nil { + t.Fatal(err) + } + time.Sleep(6 * db.MonitorInterval) + + sqldb1, err := sql.Open("sqlite3", "file:/tmp/relative-test.db?vfs=litestream-relative") + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer sqldb1.Close() + sqldb1.SetMaxOpenConns(1) + time.Sleep(2 * vfs.PollInterval) + + // Test that relative time parsing works (even if no data exists at that time) + // The parse should succeed, but may return "no backup files available" if too far in past + now := time.Now() + _, err = sqldb1.Exec("PRAGMA litestream_time = '1 second ago'") + // This might fail if no LTX files exist at that time, which is expected + // The important thing is that the parsing worked (not a "parse timestamp" error) + if err != nil && err.Error() != "no backup files available" { + // Check if it's a parse error vs a "no files" error + if err.Error() == "invalid timestamp (expected RFC3339 or relative time like '5 minutes ago'): 1 second ago" { + t.Fatalf("relative time parsing failed: %v", err) + } + } + + // Reset to latest + if _, err := sqldb1.Exec("PRAGMA litestream_time = LATEST"); err != nil { + t.Fatalf("reset to latest: %v", err) + } + + // Verify the current time is recent (within last minute) + var currentTime string + if err := sqldb1.QueryRow("PRAGMA litestream_time").Scan(¤tTime); err != nil { + t.Fatalf("query current time: %v", err) + } + ts, err := time.Parse(time.RFC3339Nano, currentTime) + if err != nil { + t.Fatalf("parse current time: %v", err) + } + if now.Sub(ts) > time.Minute { + t.Fatalf("current time too old: %v (now: %v)", ts, now) } } diff --git a/go.mod b/go.mod index a24cb8ea..db7a0388 100644 --- a/go.mod +++ b/go.mod @@ -86,9 +86,14 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/hablullah/go-hijri v1.0.2 // indirect + github.com/hablullah/go-juliandays v1.0.0 // indirect + github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/kr/fs v0.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect + github.com/magefile/mage v1.14.0 // indirect + github.com/markusmobius/go-dateparser v1.2.4 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect @@ -100,6 +105,8 @@ require ( github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/spf13/cast v1.7.1 // indirect + github.com/tetratelabs/wazero v1.2.1 // indirect + github.com/wasilibs/go-re2 v1.3.0 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect diff --git a/go.sum b/go.sum index 5ca79b76..2af048a3 100644 --- a/go.sum +++ b/go.sum @@ -159,10 +159,16 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/hablullah/go-hijri v1.0.2 h1:drT/MZpSZJQXo7jftf5fthArShcaMtsal0Zf/dnmp6k= +github.com/hablullah/go-hijri v1.0.2/go.mod h1:OS5qyYLDjORXzK4O1adFw9Q5WfhOcMdAKglDkcTxgWQ= +github.com/hablullah/go-juliandays v1.0.0 h1:A8YM7wIj16SzlKT0SRJc9CD29iiaUzpBLzh5hr0/5p0= +github.com/hablullah/go-juliandays v1.0.0/go.mod h1:0JOYq4oFOuDja+oospuc61YoX+uNEn7Z6uHYTbBzdGc= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8= github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI= +github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958 h1:qxLoi6CAcXVzjfvu+KXIXJOAsQB62LXjsfbOaErsVzE= +github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958/go.mod h1:Wqfu7mjUHj9WDzSSPI5KfBclTTEnLveRUFr/ujWnTgE= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/keybase/go-keychain v0.0.1 h1:way+bWYa6lDppZoZcgMbYsvC7GxljxrskdNInRtuthU= @@ -179,8 +185,12 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= +github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mark3labs/mcp-go v0.32.0 h1:fgwmbfL2gbd67obg57OfV2Dnrhs1HtSdlY/i5fn7MU8= github.com/mark3labs/mcp-go v0.32.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4= +github.com/markusmobius/go-dateparser v1.2.4 h1:2e8XJozaERVxGwsRg72coi51L2aiYqE2gukkdLc85ck= +github.com/markusmobius/go-dateparser v1.2.4/go.mod h1:CBAUADJuMNhJpyM6IYaWAoFhtKaqnUcznY2cL7gNugY= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= @@ -256,6 +266,10 @@ github.com/studio-b12/gowebdav v0.11.0 h1:qbQzq4USxY28ZYsGJUfO5jR+xkFtcnwWgitp4Z github.com/studio-b12/gowebdav v0.11.0/go.mod h1:bHA7t77X/QFExdeAnDzK6vKM34kEZAcE1OX4MfiwjkE= github.com/superfly/ltx v0.5.0 h1:dXNrcT3ZtMb6iKZopIV7z5UBscnapg0b0F02loQsk5o= github.com/superfly/ltx v0.5.0/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s= +github.com/tetratelabs/wazero v1.2.1 h1:J4X2hrGzJvt+wqltuvcSjHQ7ujQxA9gb6PeMs4qlUWs= +github.com/tetratelabs/wazero v1.2.1/go.mod h1:wYx2gNRg8/WihJfSDxA1TIL8H+GkfLYm+bIfbblu9VQ= +github.com/wasilibs/go-re2 v1.3.0 h1:LFhBNzoStM3wMie6rN2slD1cuYH2CGiHpvNL3UtcsMw= +github.com/wasilibs/go-re2 v1.3.0/go.mod h1:AafrCXVvGRJJOImMajgJ2M7rVmWyisVK7sFshbxnVrg= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= diff --git a/vfs.go b/vfs.go index 2984d70e..e2fef4ae 100644 --- a/vfs.go +++ b/vfs.go @@ -9,11 +9,13 @@ import ( "errors" "fmt" "log/slog" + "strconv" "strings" "sync" "time" lru "github.com/hashicorp/golang-lru/v2" + "github.com/markusmobius/go-dateparser" "github.com/psanford/sqlite3vfs" "github.com/superfly/ltx" ) @@ -98,13 +100,15 @@ type VFSFile struct { client ReplicaClient name string - pos ltx.Pos // Last TXID read from level 0 or 1 - maxTXID1 ltx.TXID // Last TXID read from level 1 - index map[uint32]ltx.PageIndexElem - pending map[uint32]ltx.PageIndexElem - cache *lru.Cache[uint32, []byte] // LRU cache for page data - targetTime *time.Time // Target view time; nil means latest - lockType sqlite3vfs.LockType // Current lock state + pos ltx.Pos // Last TXID read from level 0 or 1 + maxTXID1 ltx.TXID // Last TXID read from level 1 + index map[uint32]ltx.PageIndexElem + pending map[uint32]ltx.PageIndexElem + cache *lru.Cache[uint32, []byte] // LRU cache for page data + targetTime *time.Time // Target view time; nil means latest + latestLTXTime time.Time // Timestamp of most recent LTX file + lastPollSuccess time.Time // Time of last successful poll + lockType sqlite3vfs.LockType // Current lock state wg sync.WaitGroup ctx context.Context @@ -162,6 +166,20 @@ func (f *VFSFile) LockType() sqlite3vfs.LockType { return f.lockType } +// LatestLTXTime returns the timestamp of the most recent LTX file. +func (f *VFSFile) LatestLTXTime() time.Time { + f.mu.Lock() + defer f.mu.Unlock() + return f.latestLTXTime +} + +// LastPollSuccess returns the time of the last successful poll. +func (f *VFSFile) LastPollSuccess() time.Time { + f.mu.Lock() + defer f.mu.Unlock() + return f.lastPollSuccess +} + func (f *VFSFile) Open() error { f.logger.Info("opening file") @@ -247,6 +265,9 @@ func (f *VFSFile) rebuildIndex(ctx context.Context, infos []*ltx.FileInfo, targe f.pending = make(map[uint32]ltx.PageIndexElem) f.pos = pos f.maxTXID1 = maxTXID1 + if len(infos) > 0 { + f.latestLTXTime = infos[len(infos)-1].CreatedAt + } if f.cache != nil { f.cache.Purge() } @@ -420,6 +441,32 @@ func (f *VFSFile) DeviceCharacteristics() sqlite3vfs.DeviceCharacteristic { return 0 } +// parseTimeValue parses a timestamp string, trying RFC3339 first, then relative expressions. +func parseTimeValue(value string) (time.Time, error) { + // Try RFC3339Nano first (existing behavior) + if t, err := time.Parse(time.RFC3339Nano, value); err == nil { + return t, nil + } + + // Try RFC3339 (without nanoseconds) + if t, err := time.Parse(time.RFC3339, value); err == nil { + return t, nil + } + + // Fall back to dateparser for relative expressions + cfg := &dateparser.Configuration{ + CurrentTime: time.Now().UTC(), + } + result, err := dateparser.Parse(cfg, value) + if err != nil { + return time.Time{}, fmt.Errorf("invalid timestamp (expected RFC3339 or relative time like '5 minutes ago'): %s", value) + } + if result.Time.IsZero() { + return time.Time{}, fmt.Errorf("could not parse time: %s", value) + } + return result.Time.UTC(), nil +} + // FileControl handles file control operations, specifically PRAGMA commands for time travel. func (f *VFSFile) FileControl(op int, pragmaName string, pragmaValue *string) (*string, error) { const SQLITE_FCNTL_PRAGMA = 14 @@ -433,6 +480,27 @@ func (f *VFSFile) FileControl(op int, pragmaName string, pragmaValue *string) (* f.logger.Info("file control", "pragma", name, "value", pragmaValue) switch name { + case "litestream_txid": + if pragmaValue != nil { + return nil, fmt.Errorf("litestream_txid is read-only") + } + txid := f.Pos().TXID + result := fmt.Sprintf("%d", txid) + return &result, nil + + case "litestream_lag": + if pragmaValue != nil { + return nil, fmt.Errorf("litestream_lag is read-only") + } + lastPoll := f.LastPollSuccess() + if lastPoll.IsZero() { + result := "-1" // Never polled successfully + return &result, nil + } + lag := int64(time.Since(lastPoll).Seconds()) + result := strconv.FormatInt(lag, 10) + return &result, nil + case "litestream_time": if pragmaValue == nil { result := f.currentTimeString() @@ -446,9 +514,9 @@ func (f *VFSFile) FileControl(op int, pragmaName string, pragmaValue *string) (* return nil, nil } - t, err := time.Parse(time.RFC3339Nano, *pragmaValue) + t, err := parseTimeValue(*pragmaValue) if err != nil { - return nil, fmt.Errorf("parse timestamp: %w", err) + return nil, err } if err := f.SetTargetTime(context.Background(), t); err != nil { return nil, err @@ -465,7 +533,10 @@ func (f *VFSFile) currentTimeString() string { if t := f.TargetTime(); t != nil { return t.Format(time.RFC3339Nano) } - return "latest" + if t := f.LatestLTXTime(); !t.IsZero() { + return t.Format(time.RFC3339Nano) + } + return "latest" // Fallback if no LTX files loaded } func (f *VFSFile) monitorReplicaClient(ctx context.Context) { @@ -485,6 +556,11 @@ func (f *VFSFile) monitorReplicaClient(ctx context.Context) { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { f.logger.Error("cannot fetch new ltx files", "error", err) } + } else { + // Track successful poll time + f.mu.Lock() + f.lastPollSuccess = time.Now() + f.mu.Unlock() } } } From 391ad99b242b7e6132aff8b9009e38037e66e0e0 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Mon, 24 Nov 2025 15:44:11 -0600 Subject: [PATCH 2/5] feat(vfs): add SQL functions alongside PRAGMAs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add SQL functions (litestream_time(), litestream_txid(), litestream_lag(), litestream_set_time()) as an alternative interface to the existing PRAGMAs. This provides more flexibility for queries and monitoring. SQL Functions (via extension auto-extension): - SELECT litestream_time() - returns current timestamp - SELECT litestream_txid() - returns current transaction ID - SELECT litestream_lag() - returns seconds since last poll - SELECT litestream_set_time('timestamp') - set time travel target - SELECT litestream_set_time('LATEST') - reset to latest PRAGMAs (unchanged): - PRAGMA litestream_time - PRAGMA litestream_time = 'timestamp' - PRAGMA litestream_txid - PRAGMA litestream_lag 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- cmd/litestream-vfs/main.go | 75 +++++++++++++++++ src/litestream-vfs.c | 161 ++++++++++++++++++++++++++++++++++++- vfs_connections.go | 121 ++++++++++++++++++++++++++++ 3 files changed, 354 insertions(+), 3 deletions(-) create mode 100644 vfs_connections.go diff --git a/cmd/litestream-vfs/main.go b/cmd/litestream-vfs/main.go index 3664b42a..c8fb99ea 100644 --- a/cmd/litestream-vfs/main.go +++ b/cmd/litestream-vfs/main.go @@ -5,6 +5,11 @@ package main // import C is necessary export to the c-archive .a file +/* +#include +typedef int64_t sqlite3_int64; +typedef uint64_t sqlite3_uint64; +*/ import "C" import ( @@ -14,6 +19,7 @@ import ( "os" "strconv" "strings" + "unsafe" "github.com/psanford/sqlite3vfs" @@ -65,3 +71,72 @@ func LitestreamVFSRegister() { log.Fatalf("failed to register litestream vfs: %s", err) } } + +//export GoLitestreamRegisterConnection +func GoLitestreamRegisterConnection(dbPtr unsafe.Pointer, fileID C.sqlite3_uint64) *C.char { + if err := litestream.RegisterVFSConnection(uintptr(dbPtr), uint64(fileID)); err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export GoLitestreamUnregisterConnection +func GoLitestreamUnregisterConnection(dbPtr unsafe.Pointer) *C.char { + litestream.UnregisterVFSConnection(uintptr(dbPtr)) + return nil +} + +//export GoLitestreamSetTime +func GoLitestreamSetTime(dbPtr unsafe.Pointer, timestamp *C.char) *C.char { + if timestamp == nil { + return C.CString("timestamp required") + } + if err := litestream.SetVFSConnectionTime(uintptr(dbPtr), C.GoString(timestamp)); err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export GoLitestreamResetTime +func GoLitestreamResetTime(dbPtr unsafe.Pointer) *C.char { + if err := litestream.ResetVFSConnectionTime(uintptr(dbPtr)); err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export GoLitestreamTime +func GoLitestreamTime(dbPtr unsafe.Pointer, out **C.char) *C.char { + value, err := litestream.GetVFSConnectionTime(uintptr(dbPtr)) + if err != nil { + return C.CString(err.Error()) + } + if out != nil { + *out = C.CString(value) + } + return nil +} + +//export GoLitestreamTxid +func GoLitestreamTxid(dbPtr unsafe.Pointer, out *C.sqlite3_int64) *C.char { + value, err := litestream.GetVFSConnectionTXID(uintptr(dbPtr)) + if err != nil { + return C.CString(err.Error()) + } + if out != nil { + *out = C.sqlite3_int64(value) + } + return nil +} + +//export GoLitestreamLag +func GoLitestreamLag(dbPtr unsafe.Pointer, out *C.sqlite3_int64) *C.char { + value, err := litestream.GetVFSConnectionLag(uintptr(dbPtr)) + if err != nil { + return C.CString(err.Error()) + } + if out != nil { + *out = C.sqlite3_int64(value) + } + return nil +} diff --git a/src/litestream-vfs.c b/src/litestream-vfs.c index f0204900..71d21beb 100644 --- a/src/litestream-vfs.c +++ b/src/litestream-vfs.c @@ -1,21 +1,176 @@ #include "litestream-vfs.h" #include "sqlite3.h" #include "sqlite3ext.h" +#include "sqlite3vfs.h" #include #include /* sqlite3vfs already called SQLITE_EXTENSION_INIT1 */ extern const sqlite3_api_routines *sqlite3_api; -// This routine is called when the extension is loaded. -// Register the new VFS. +/* Go function declarations */ +extern char* GoLitestreamRegisterConnection(void* db, sqlite3_uint64 file_id); +extern char* GoLitestreamUnregisterConnection(void* db); +extern char* GoLitestreamSetTime(void* db, const char* timestamp); +extern char* GoLitestreamResetTime(void* db); +extern char* GoLitestreamTime(void* db, char** out); +extern char* GoLitestreamTxid(void* db, sqlite3_int64* out); +extern char* GoLitestreamLag(void* db, sqlite3_int64* out); + +/* Internal function declarations */ +static int litestream_register_connection(sqlite3* db); +static void litestream_set_time_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv); +static void litestream_time_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv); +static void litestream_txid_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv); +static void litestream_lag_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv); +static void litestream_function_destroy(void* db); +static void litestream_auto_extension(sqlite3* db, const char** pzErrMsg, const struct sqlite3_api_routines* pApi); + +/* This routine is called when the extension is loaded. */ int sqlite3_litestreamvfs_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) { int rc = SQLITE_OK; SQLITE_EXTENSION_INIT2(pApi); - // call into Go to register the VFS + /* call into Go to register the VFS */ LitestreamVFSRegister(); + /* Register SQL functions for new connections. */ + rc = sqlite3_auto_extension((void (*)(void))litestream_auto_extension); + if( rc==SQLITE_OK ) rc = SQLITE_OK_LOAD_PERMANENTLY; return rc; } + +static void litestream_auto_extension(sqlite3* db, const char** pzErrMsg, const struct sqlite3_api_routines* pApi) { + (void)pzErrMsg; + (void)pApi; + + if (litestream_register_connection(db) != SQLITE_OK) { + return; + } + + /* litestream_set_time(timestamp) - for time travel */ + sqlite3_create_function_v2(db, "litestream_set_time", 1, SQLITE_UTF8 | SQLITE_DIRECTONLY, db, litestream_set_time_impl, 0, 0, litestream_function_destroy); + + /* Read-only functions: litestream_time(), litestream_txid(), litestream_lag() */ + sqlite3_create_function_v2(db, "litestream_time", 0, SQLITE_UTF8, db, litestream_time_impl, 0, 0, 0); + sqlite3_create_function_v2(db, "litestream_txid", 0, SQLITE_UTF8, db, litestream_txid_impl, 0, 0, 0); + sqlite3_create_function_v2(db, "litestream_lag", 0, SQLITE_UTF8, db, litestream_lag_impl, 0, 0, 0); +} + +static int litestream_register_connection(sqlite3* db) { + sqlite3_file* file = 0; + int rc = sqlite3_file_control(db, "main", SQLITE_FCNTL_FILE_POINTER, &file); + if (rc != SQLITE_OK || file == 0) { + return rc; + } + + if (file->pMethods != &s3vfs_io_methods) { + /* Not using the litestream VFS. */ + return SQLITE_DONE; + } + + sqlite3_uint64 file_id = ((s3vfsFile*)file)->id; + char* err = GoLitestreamRegisterConnection(db, file_id); + if (err != 0) { + free(err); + return SQLITE_ERROR; + } + + return SQLITE_OK; +} + +static void litestream_set_time_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv) { + if (argc != 1) { + sqlite3_result_error(ctx, "expected timestamp argument", -1); + return; + } + + const unsigned char* ts = sqlite3_value_text(argv[0]); + if (!ts) { + sqlite3_result_error(ctx, "timestamp required", -1); + return; + } + + /* Handle special 'LATEST' value */ + if (sqlite3_stricmp((const char*)ts, "LATEST") == 0) { + sqlite3* db = sqlite3_context_db_handle(ctx); + char* err = GoLitestreamResetTime(db); + if (err != 0) { + sqlite3_result_error(ctx, err, -1); + free(err); + return; + } + sqlite3_result_null(ctx); + return; + } + + sqlite3* db = sqlite3_context_db_handle(ctx); + char* err = GoLitestreamSetTime(db, (const char*)ts); + if (err != 0) { + sqlite3_result_error(ctx, err, -1); + free(err); + return; + } + + sqlite3_result_null(ctx); +} + +static void litestream_time_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv) { + (void)argc; + (void)argv; + + sqlite3* db = sqlite3_context_db_handle(ctx); + char* out = 0; + char* err = GoLitestreamTime(db, &out); + if (err != 0) { + sqlite3_result_error(ctx, err, -1); + free(err); + return; + } + + sqlite3_result_text(ctx, out, -1, SQLITE_TRANSIENT); + free(out); +} + +static void litestream_txid_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv) { + (void)argc; + (void)argv; + + sqlite3* db = sqlite3_context_db_handle(ctx); + sqlite3_int64 out = 0; + char* err = GoLitestreamTxid(db, &out); + if (err != 0) { + sqlite3_result_error(ctx, err, -1); + free(err); + return; + } + + sqlite3_result_int64(ctx, out); +} + +static void litestream_lag_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv) { + (void)argc; + (void)argv; + + sqlite3* db = sqlite3_context_db_handle(ctx); + sqlite3_int64 out = 0; + char* err = GoLitestreamLag(db, &out); + if (err != 0) { + sqlite3_result_error(ctx, err, -1); + free(err); + return; + } + + sqlite3_result_int64(ctx, out); +} + +static void litestream_function_destroy(void* db) { + if (db == 0) { + return; + } + char* err = GoLitestreamUnregisterConnection(db); + if (err != 0) { + free(err); + } +} diff --git a/vfs_connections.go b/vfs_connections.go new file mode 100644 index 00000000..e18754cc --- /dev/null +++ b/vfs_connections.go @@ -0,0 +1,121 @@ +//go:build vfs +// +build vfs + +package litestream + +import ( + "context" + "fmt" + "sync" + "time" + _ "unsafe" + + "github.com/psanford/sqlite3vfs" +) + +var ( + //go:linkname sqlite3vfsFileMap github.com/psanford/sqlite3vfs.fileMap + sqlite3vfsFileMap map[uint64]sqlite3vfs.File + + //go:linkname sqlite3vfsFileMux github.com/psanford/sqlite3vfs.fileMux + sqlite3vfsFileMux sync.Mutex + + vfsConnectionMap sync.Map // map[uintptr]uint64 +) + +// RegisterVFSConnection maps a SQLite connection handle to its VFS file ID. +func RegisterVFSConnection(dbPtr uintptr, fileID uint64) error { + if _, ok := lookupVFSFile(fileID); !ok { + return fmt.Errorf("vfs file not found: id=%d", fileID) + } + vfsConnectionMap.Store(dbPtr, fileID) + return nil +} + +// UnregisterVFSConnection removes a connection mapping. +func UnregisterVFSConnection(dbPtr uintptr) { + vfsConnectionMap.Delete(dbPtr) +} + +// SetVFSConnectionTime rebuilds the VFS index for a connection at a timestamp. +func SetVFSConnectionTime(dbPtr uintptr, timestamp string) error { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return err + } + + t, err := parseTimeValue(timestamp) + if err != nil { + return err + } + return file.SetTargetTime(context.Background(), t) +} + +// ResetVFSConnectionTime rebuilds the VFS index to the latest state. +func ResetVFSConnectionTime(dbPtr uintptr) error { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return err + } + return file.ResetTime(context.Background()) +} + +// GetVFSConnectionTime returns the current time for a connection. +func GetVFSConnectionTime(dbPtr uintptr) (string, error) { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return "", err + } + return file.currentTimeString(), nil +} + +// GetVFSConnectionTXID returns the current transaction ID for a connection. +func GetVFSConnectionTXID(dbPtr uintptr) (int64, error) { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return 0, err + } + return int64(file.Pos().TXID), nil +} + +// GetVFSConnectionLag returns seconds since last successful poll for a connection. +func GetVFSConnectionLag(dbPtr uintptr) (int64, error) { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return 0, err + } + lastPoll := file.LastPollSuccess() + if lastPoll.IsZero() { + return -1, nil + } + return int64(time.Since(lastPoll).Seconds()), nil +} + +func vfsFileForConnection(dbPtr uintptr) (*VFSFile, error) { + v, ok := vfsConnectionMap.Load(dbPtr) + if !ok { + return nil, fmt.Errorf("connection not registered") + } + fileID, ok := v.(uint64) + if !ok { + return nil, fmt.Errorf("invalid connection mapping") + } + file, ok := lookupVFSFile(fileID) + if !ok { + return nil, fmt.Errorf("vfs file not found: id=%d", fileID) + } + return file, nil +} + +func lookupVFSFile(fileID uint64) (*VFSFile, bool) { + sqlite3vfsFileMux.Lock() + defer sqlite3vfsFileMux.Unlock() + + file, ok := sqlite3vfsFileMap[fileID] + if !ok { + return nil, false + } + + vfsFile, ok := file.(*VFSFile) + return vfsFile, ok +} From 0b228fb1a90e16194aa6ae923129b0cf0e0b4fcd Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Mon, 24 Nov 2025 16:04:08 -0600 Subject: [PATCH 3/5] fix(vfs): update latestLTXTime after polling new files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Keep latestLTXTime in sync when pollReplicaClient processes new LTX files. Previously it was only set during rebuildIndex(), causing PRAGMA litestream_time to return stale timestamps after the first poll. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- vfs.go | 41 ++++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/vfs.go b/vfs.go index e2fef4ae..3485de8d 100644 --- a/vfs.go +++ b/vfs.go @@ -573,14 +573,14 @@ func (f *VFSFile) pollReplicaClient(ctx context.Context) error { index := make(map[uint32]ltx.PageIndexElem) f.logger.Debug("polling replica client", "txid", pos.TXID.String()) - maxTXID0, err := f.pollLevel(ctx, 0, pos.TXID, index) + result0, err := f.pollLevel(ctx, 0, pos.TXID, index) if err != nil { return fmt.Errorf("poll L0: %w", err) } - maxTXID1, err := f.pollLevel(ctx, 1, f.maxTXID1, index) + result1, err := f.pollLevel(ctx, 1, f.maxTXID1, index) if err != nil { - return fmt.Errorf("poll L0: %w", err) + return fmt.Errorf("poll L1: %w", err) } // Send updates to a pending list if there are active readers. @@ -614,10 +614,18 @@ func (f *VFSFile) pollReplicaClient(ctx context.Context) error { } // Update to max TXID - f.pos.TXID = max(maxTXID0, maxTXID1) - f.maxTXID1 = maxTXID1 + f.pos.TXID = max(result0.maxTXID, result1.maxTXID) + f.maxTXID1 = result1.maxTXID f.logger.Debug("txid updated", "txid", f.pos.TXID.String(), "maxTXID1", f.maxTXID1.String()) + // Update latestLTXTime to the most recent timestamp from either level + if !result0.createdAt.IsZero() && result0.createdAt.After(f.latestLTXTime) { + f.latestLTXTime = result0.createdAt + } + if !result1.createdAt.IsZero() && result1.createdAt.After(f.latestLTXTime) { + f.latestLTXTime = result1.createdAt + } + return nil } @@ -637,24 +645,30 @@ func maxLevelTXID(infos []*ltx.FileInfo, level int) ltx.TXID { return maxTXID } -func (f *VFSFile) pollLevel(ctx context.Context, level int, prevMaxTXID ltx.TXID, index map[uint32]ltx.PageIndexElem) (ltx.TXID, error) { +// pollLevelResult contains the results of polling a single level. +type pollLevelResult struct { + maxTXID ltx.TXID + createdAt time.Time // CreatedAt of the most recent LTX file processed +} + +func (f *VFSFile) pollLevel(ctx context.Context, level int, prevMaxTXID ltx.TXID, index map[uint32]ltx.PageIndexElem) (pollLevelResult, error) { // Start reading from the next LTX file after the current position. itr, err := f.client.LTXFiles(ctx, level, prevMaxTXID+1, false) if err != nil { - return 0, fmt.Errorf("ltx files: %w", err) + return pollLevelResult{}, fmt.Errorf("ltx files: %w", err) } // Build an update across all new LTX files. - maxTXID := prevMaxTXID + result := pollLevelResult{maxTXID: prevMaxTXID} for itr.Next() { info := itr.Item() // Ensure we are fetching the next transaction from our current position. f.mu.Lock() - isNextTXID := info.MinTXID == maxTXID+1 + isNextTXID := info.MinTXID == result.maxTXID+1 f.mu.Unlock() if !isNextTXID { - return maxTXID, fmt.Errorf("non-contiguous ltx file: level=%d, current=%s, next=%s-%s", level, prevMaxTXID, info.MinTXID, info.MaxTXID) + return result, fmt.Errorf("non-contiguous ltx file: level=%d, current=%s, next=%s-%s", level, prevMaxTXID, info.MinTXID, info.MaxTXID) } f.logger.Debug("new ltx file", "level", info.Level, "min", info.MinTXID, "max", info.MaxTXID) @@ -662,7 +676,7 @@ func (f *VFSFile) pollLevel(ctx context.Context, level int, prevMaxTXID ltx.TXID // Read page index. idx, err := FetchPageIndex(context.Background(), f.client, info) if err != nil { - return maxTXID, fmt.Errorf("fetch page index: %w", err) + return result, fmt.Errorf("fetch page index: %w", err) } // Update the page index & current position. @@ -670,8 +684,9 @@ func (f *VFSFile) pollLevel(ctx context.Context, level int, prevMaxTXID ltx.TXID f.logger.Debug("adding new page index", "page", k, "elem", v) index[k] = v } - maxTXID = info.MaxTXID + result.maxTXID = info.MaxTXID + result.createdAt = info.CreatedAt } - return maxTXID, nil + return result, nil } From abcfbf0a5f3c643c822c514768bca980bbfcd0a8 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Mon, 24 Nov 2025 16:29:55 -0600 Subject: [PATCH 4/5] refactor(vfs): consolidate vfs_connections.go into vfs.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move connection mapping code into vfs.go since there's no need for a separate file - both have the same build tag and are part of the same logical unit. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- vfs.go | 108 ++++++++++++++++++++++++++++++++++++++++ vfs_connections.go | 121 --------------------------------------------- 2 files changed, 108 insertions(+), 121 deletions(-) delete mode 100644 vfs_connections.go diff --git a/vfs.go b/vfs.go index 3485de8d..1cee7e0c 100644 --- a/vfs.go +++ b/vfs.go @@ -13,6 +13,7 @@ import ( "strings" "sync" "time" + _ "unsafe" lru "github.com/hashicorp/golang-lru/v2" "github.com/markusmobius/go-dateparser" @@ -25,6 +26,16 @@ const ( DefaultCacheSize = 10 * 1024 * 1024 // 10MB ) +var ( + //go:linkname sqlite3vfsFileMap github.com/psanford/sqlite3vfs.fileMap + sqlite3vfsFileMap map[uint64]sqlite3vfs.File + + //go:linkname sqlite3vfsFileMux github.com/psanford/sqlite3vfs.fileMux + sqlite3vfsFileMux sync.Mutex + + vfsConnectionMap sync.Map // map[uintptr]uint64 +) + // VFS implements the SQLite VFS interface for Litestream. // It is intended to be used for read replicas that read directly from S3. type VFS struct { @@ -690,3 +701,100 @@ func (f *VFSFile) pollLevel(ctx context.Context, level int, prevMaxTXID ltx.TXID return result, nil } + +// RegisterVFSConnection maps a SQLite connection handle to its VFS file ID. +func RegisterVFSConnection(dbPtr uintptr, fileID uint64) error { + if _, ok := lookupVFSFile(fileID); !ok { + return fmt.Errorf("vfs file not found: id=%d", fileID) + } + vfsConnectionMap.Store(dbPtr, fileID) + return nil +} + +// UnregisterVFSConnection removes a connection mapping. +func UnregisterVFSConnection(dbPtr uintptr) { + vfsConnectionMap.Delete(dbPtr) +} + +// SetVFSConnectionTime rebuilds the VFS index for a connection at a timestamp. +func SetVFSConnectionTime(dbPtr uintptr, timestamp string) error { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return err + } + + t, err := parseTimeValue(timestamp) + if err != nil { + return err + } + return file.SetTargetTime(context.Background(), t) +} + +// ResetVFSConnectionTime rebuilds the VFS index to the latest state. +func ResetVFSConnectionTime(dbPtr uintptr) error { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return err + } + return file.ResetTime(context.Background()) +} + +// GetVFSConnectionTime returns the current time for a connection. +func GetVFSConnectionTime(dbPtr uintptr) (string, error) { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return "", err + } + return file.currentTimeString(), nil +} + +// GetVFSConnectionTXID returns the current transaction ID for a connection. +func GetVFSConnectionTXID(dbPtr uintptr) (int64, error) { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return 0, err + } + return int64(file.Pos().TXID), nil +} + +// GetVFSConnectionLag returns seconds since last successful poll for a connection. +func GetVFSConnectionLag(dbPtr uintptr) (int64, error) { + file, err := vfsFileForConnection(dbPtr) + if err != nil { + return 0, err + } + lastPoll := file.LastPollSuccess() + if lastPoll.IsZero() { + return -1, nil + } + return int64(time.Since(lastPoll).Seconds()), nil +} + +func vfsFileForConnection(dbPtr uintptr) (*VFSFile, error) { + v, ok := vfsConnectionMap.Load(dbPtr) + if !ok { + return nil, fmt.Errorf("connection not registered") + } + fileID, ok := v.(uint64) + if !ok { + return nil, fmt.Errorf("invalid connection mapping") + } + file, ok := lookupVFSFile(fileID) + if !ok { + return nil, fmt.Errorf("vfs file not found: id=%d", fileID) + } + return file, nil +} + +func lookupVFSFile(fileID uint64) (*VFSFile, bool) { + sqlite3vfsFileMux.Lock() + defer sqlite3vfsFileMux.Unlock() + + file, ok := sqlite3vfsFileMap[fileID] + if !ok { + return nil, false + } + + vfsFile, ok := file.(*VFSFile) + return vfsFile, ok +} diff --git a/vfs_connections.go b/vfs_connections.go deleted file mode 100644 index e18754cc..00000000 --- a/vfs_connections.go +++ /dev/null @@ -1,121 +0,0 @@ -//go:build vfs -// +build vfs - -package litestream - -import ( - "context" - "fmt" - "sync" - "time" - _ "unsafe" - - "github.com/psanford/sqlite3vfs" -) - -var ( - //go:linkname sqlite3vfsFileMap github.com/psanford/sqlite3vfs.fileMap - sqlite3vfsFileMap map[uint64]sqlite3vfs.File - - //go:linkname sqlite3vfsFileMux github.com/psanford/sqlite3vfs.fileMux - sqlite3vfsFileMux sync.Mutex - - vfsConnectionMap sync.Map // map[uintptr]uint64 -) - -// RegisterVFSConnection maps a SQLite connection handle to its VFS file ID. -func RegisterVFSConnection(dbPtr uintptr, fileID uint64) error { - if _, ok := lookupVFSFile(fileID); !ok { - return fmt.Errorf("vfs file not found: id=%d", fileID) - } - vfsConnectionMap.Store(dbPtr, fileID) - return nil -} - -// UnregisterVFSConnection removes a connection mapping. -func UnregisterVFSConnection(dbPtr uintptr) { - vfsConnectionMap.Delete(dbPtr) -} - -// SetVFSConnectionTime rebuilds the VFS index for a connection at a timestamp. -func SetVFSConnectionTime(dbPtr uintptr, timestamp string) error { - file, err := vfsFileForConnection(dbPtr) - if err != nil { - return err - } - - t, err := parseTimeValue(timestamp) - if err != nil { - return err - } - return file.SetTargetTime(context.Background(), t) -} - -// ResetVFSConnectionTime rebuilds the VFS index to the latest state. -func ResetVFSConnectionTime(dbPtr uintptr) error { - file, err := vfsFileForConnection(dbPtr) - if err != nil { - return err - } - return file.ResetTime(context.Background()) -} - -// GetVFSConnectionTime returns the current time for a connection. -func GetVFSConnectionTime(dbPtr uintptr) (string, error) { - file, err := vfsFileForConnection(dbPtr) - if err != nil { - return "", err - } - return file.currentTimeString(), nil -} - -// GetVFSConnectionTXID returns the current transaction ID for a connection. -func GetVFSConnectionTXID(dbPtr uintptr) (int64, error) { - file, err := vfsFileForConnection(dbPtr) - if err != nil { - return 0, err - } - return int64(file.Pos().TXID), nil -} - -// GetVFSConnectionLag returns seconds since last successful poll for a connection. -func GetVFSConnectionLag(dbPtr uintptr) (int64, error) { - file, err := vfsFileForConnection(dbPtr) - if err != nil { - return 0, err - } - lastPoll := file.LastPollSuccess() - if lastPoll.IsZero() { - return -1, nil - } - return int64(time.Since(lastPoll).Seconds()), nil -} - -func vfsFileForConnection(dbPtr uintptr) (*VFSFile, error) { - v, ok := vfsConnectionMap.Load(dbPtr) - if !ok { - return nil, fmt.Errorf("connection not registered") - } - fileID, ok := v.(uint64) - if !ok { - return nil, fmt.Errorf("invalid connection mapping") - } - file, ok := lookupVFSFile(fileID) - if !ok { - return nil, fmt.Errorf("vfs file not found: id=%d", fileID) - } - return file, nil -} - -func lookupVFSFile(fileID uint64) (*VFSFile, bool) { - sqlite3vfsFileMux.Lock() - defer sqlite3vfsFileMux.Unlock() - - file, ok := sqlite3vfsFileMap[fileID] - if !ok { - return nil, false - } - - vfsFile, ok := file.(*VFSFile) - return vfsFile, ok -} From 880e4c531ba0708697920253ebd8934cac8fb8cd Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 26 Nov 2025 14:00:56 -0600 Subject: [PATCH 5/5] fix(vfs): return litestream_txid in hex format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change litestream_txid PRAGMA and SQL function to return the TXID as a 16-character lowercase hex string using ltx.TXID.String() instead of returning it as a decimal integer. This matches how TXIDs are formatted throughout the rest of the codebase. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- cmd/litestream-vfs/main.go | 4 ++-- src/litestream-vfs.c | 7 ++++--- vfs.go | 10 +++++----- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/cmd/litestream-vfs/main.go b/cmd/litestream-vfs/main.go index c8fb99ea..246f3fc5 100644 --- a/cmd/litestream-vfs/main.go +++ b/cmd/litestream-vfs/main.go @@ -118,13 +118,13 @@ func GoLitestreamTime(dbPtr unsafe.Pointer, out **C.char) *C.char { } //export GoLitestreamTxid -func GoLitestreamTxid(dbPtr unsafe.Pointer, out *C.sqlite3_int64) *C.char { +func GoLitestreamTxid(dbPtr unsafe.Pointer, out **C.char) *C.char { value, err := litestream.GetVFSConnectionTXID(uintptr(dbPtr)) if err != nil { return C.CString(err.Error()) } if out != nil { - *out = C.sqlite3_int64(value) + *out = C.CString(value) } return nil } diff --git a/src/litestream-vfs.c b/src/litestream-vfs.c index 71d21beb..582a651d 100644 --- a/src/litestream-vfs.c +++ b/src/litestream-vfs.c @@ -14,7 +14,7 @@ extern char* GoLitestreamUnregisterConnection(void* db); extern char* GoLitestreamSetTime(void* db, const char* timestamp); extern char* GoLitestreamResetTime(void* db); extern char* GoLitestreamTime(void* db, char** out); -extern char* GoLitestreamTxid(void* db, sqlite3_int64* out); +extern char* GoLitestreamTxid(void* db, char** out); extern char* GoLitestreamLag(void* db, sqlite3_int64* out); /* Internal function declarations */ @@ -138,7 +138,7 @@ static void litestream_txid_impl(sqlite3_context* ctx, int argc, sqlite3_value** (void)argv; sqlite3* db = sqlite3_context_db_handle(ctx); - sqlite3_int64 out = 0; + char* out = 0; char* err = GoLitestreamTxid(db, &out); if (err != 0) { sqlite3_result_error(ctx, err, -1); @@ -146,7 +146,8 @@ static void litestream_txid_impl(sqlite3_context* ctx, int argc, sqlite3_value** return; } - sqlite3_result_int64(ctx, out); + sqlite3_result_text(ctx, out, -1, SQLITE_TRANSIENT); + free(out); } static void litestream_lag_impl(sqlite3_context* ctx, int argc, sqlite3_value** argv) { diff --git a/vfs.go b/vfs.go index 1cee7e0c..65c78faa 100644 --- a/vfs.go +++ b/vfs.go @@ -496,7 +496,7 @@ func (f *VFSFile) FileControl(op int, pragmaName string, pragmaValue *string) (* return nil, fmt.Errorf("litestream_txid is read-only") } txid := f.Pos().TXID - result := fmt.Sprintf("%d", txid) + result := txid.String() return &result, nil case "litestream_lag": @@ -748,13 +748,13 @@ func GetVFSConnectionTime(dbPtr uintptr) (string, error) { return file.currentTimeString(), nil } -// GetVFSConnectionTXID returns the current transaction ID for a connection. -func GetVFSConnectionTXID(dbPtr uintptr) (int64, error) { +// GetVFSConnectionTXID returns the current transaction ID for a connection as a hex string. +func GetVFSConnectionTXID(dbPtr uintptr) (string, error) { file, err := vfsFileForConnection(dbPtr) if err != nil { - return 0, err + return "", err } - return int64(file.Pos().TXID), nil + return file.Pos().TXID.String(), nil } // GetVFSConnectionLag returns seconds since last successful poll for a connection.