Skip to content

Commit 00c0ca5

Browse files
authored
Periodically check backup server for updates (#362)
1 parent 39b247a commit 00c0ca5

File tree

4 files changed

+138
-37
lines changed

4 files changed

+138
-37
lines changed

cmd/litefs/config.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func NewConfig() Config {
5353
config.Lease.DemoteDelay = litefs.DefaultDemoteDelay
5454

5555
config.Backup.Delay = litefs.DefaultBackupDelay
56+
config.Backup.FullSyncInterval = litefs.DefaultBackupFullSyncInterval
5657

5758
config.Log.Format = "text"
5859

@@ -155,12 +156,13 @@ type LeaseConfig struct {
155156

156157
// BackupConfig represents a config for backup services.
157158
type BackupConfig struct {
158-
Type string `yaml:"type"`
159-
Path string `yaml:"path"` // "file" only
160-
URL string `yaml:"url"` // "litefs-cloud" only
161-
Cluster string `yaml:"cluster"` // "litefs-cloud" only
162-
AuthToken string `yaml:"auth-token"` // "litefs-cloud" only
163-
Delay time.Duration `yaml:"-"`
159+
Type string `yaml:"type"`
160+
Path string `yaml:"path"` // "file" only
161+
URL string `yaml:"url"` // "litefs-cloud" only
162+
Cluster string `yaml:"cluster"` // "litefs-cloud" only
163+
AuthToken string `yaml:"auth-token"` // "litefs-cloud" only
164+
Delay time.Duration `yaml:"-"`
165+
FullSyncInterval time.Duration `yaml:"-"`
164166
}
165167

166168
// LogConfig represents the configuration for logging.

cmd/litefs/mount_linux.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,10 @@ func (c *MountCommand) initEnvironment(ctx context.Context) {
378378
func (c *MountCommand) initStoreBackupClient(ctx context.Context) error {
379379
c.initBackupConfigFromEnv(ctx)
380380

381+
// Copy common fields.
382+
c.Store.BackupDelay = c.Config.Backup.Delay
383+
c.Store.BackupFullSyncInterval = c.Config.Backup.FullSyncInterval
384+
381385
switch typ := c.Config.Backup.Type; typ {
382386
case "":
383387
log.Printf("no backup client configured, skipping")

cmd/litefs/mount_test.go

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ func TestSingleNode_BackupClient(t *testing.T) {
357357
t.Run("InitiateSnapshot", func(t *testing.T) {
358358
cmd0 := newMountCommand(t, t.TempDir(), nil)
359359
cmd0.Config.Data.Retention = 1 * time.Microsecond
360-
cmd0.Config.Backup = main.BackupConfig{Type: "file", Path: t.TempDir()}
360+
cmd0.Config.Backup = main.BackupConfig{Type: "file", Path: t.TempDir(), Delay: 1 * time.Millisecond}
361361
runMountCommand(t, cmd0)
362362

363363
// Create a simple table with a single value.
@@ -394,8 +394,10 @@ func TestSingleNode_BackupClient(t *testing.T) {
394394
// Ensure LiteFS can restore a database from backup if it doesn't exist locally.
395395
t.Run("RestoreFromBackup/NoLocalDatabase", func(t *testing.T) {
396396
backupDir := t.TempDir()
397+
backupConfig := main.BackupConfig{Type: "file", Path: backupDir, Delay: 1 * time.Millisecond}
398+
397399
cmd0 := newMountCommand(t, t.TempDir(), nil)
398-
cmd0.Config.Backup = main.BackupConfig{Type: "file", Path: backupDir}
400+
cmd0.Config.Backup = backupConfig
399401
runMountCommand(t, cmd0)
400402

401403
// Create a simple table with a single value.
@@ -416,7 +418,7 @@ func TestSingleNode_BackupClient(t *testing.T) {
416418
t.Logf("shutdown original mount, starting new one")
417419

418420
cmd1 := newMountCommand(t, t.TempDir(), nil)
419-
cmd1.Config.Backup = main.BackupConfig{Type: "file", Path: backupDir}
421+
cmd1.Config.Backup = backupConfig
420422
runMountCommand(t, cmd1)
421423
waitForBackupSync(t, cmd1)
422424

@@ -434,13 +436,14 @@ func TestSingleNode_BackupClient(t *testing.T) {
434436
t.Run("RestoreFromBackup/RemoteAhead", func(t *testing.T) {
435437
dir0, dir1 := t.TempDir(), t.TempDir()
436438
backupDir := t.TempDir()
439+
backupConfig := main.BackupConfig{Type: "file", Path: backupDir, Delay: 1 * time.Millisecond}
437440

438441
cmd0 := newMountCommand(t, dir0, nil)
439-
cmd0.Config.Backup = main.BackupConfig{Type: "file", Path: backupDir}
442+
cmd0.Config.Backup = backupConfig
440443
waitForPrimary(t, runMountCommand(t, cmd0))
441444

442445
cmd1 := newMountCommand(t, dir1, cmd0)
443-
cmd1.Config.Backup = main.BackupConfig{Type: "file", Path: backupDir}
446+
cmd1.Config.Backup = backupConfig
444447
runMountCommand(t, cmd1)
445448

446449
// Create a simple table with a single value.
@@ -479,7 +482,7 @@ func TestSingleNode_BackupClient(t *testing.T) {
479482
// Restart the previous replica so it becomes the new primary.
480483
t.Logf("restart replica and wait for promotion")
481484
cmd1 = newMountCommand(t, dir1, nil)
482-
cmd1.Config.Backup = main.BackupConfig{Type: "file", Path: backupDir}
485+
cmd1.Config.Backup = backupConfig
483486
waitForPrimary(t, runMountCommand(t, cmd1))
484487
waitForBackupSync(t, cmd1)
485488

@@ -498,13 +501,14 @@ func TestSingleNode_BackupClient(t *testing.T) {
498501
t.Run("RestoreFromBackup/ChecksumMismatch", func(t *testing.T) {
499502
dir0, dir1 := t.TempDir(), t.TempDir()
500503
backupDir := t.TempDir()
504+
backupConfig := main.BackupConfig{Type: "file", Path: backupDir, Delay: 1 * time.Millisecond}
501505

502506
cmd0 := newMountCommand(t, dir0, nil)
503-
cmd0.Config.Backup = main.BackupConfig{Type: "file", Path: backupDir}
507+
cmd0.Config.Backup = backupConfig
504508
waitForPrimary(t, runMountCommand(t, cmd0))
505509

506510
cmd1 := newMountCommand(t, dir1, cmd0)
507-
cmd1.Config.Backup = main.BackupConfig{Type: "file", Path: backupDir}
511+
cmd1.Config.Backup = backupConfig
508512
runMountCommand(t, cmd1)
509513

510514
// Create a simple table with a single value.
@@ -661,14 +665,85 @@ func TestSingleNode_BackupClient(t *testing.T) {
661665
}
662666

663667
if got, want := cmd0.Config.Backup, (main.BackupConfig{
664-
Type: "litefs-cloud",
665-
URL: "https://litefs.fly.io",
666-
AuthToken: "TOKENDATA",
667-
Delay: litefs.DefaultBackupDelay,
668+
Type: "litefs-cloud",
669+
URL: "https://litefs.fly.io",
670+
AuthToken: "TOKENDATA",
671+
Delay: litefs.DefaultBackupDelay,
672+
FullSyncInterval: litefs.DefaultBackupFullSyncInterval,
668673
}); got != want {
669674
t.Fatalf("config=%#v, want %#v", got, want)
670675
}
671676
})
677+
678+
// Ensure LiteFS periodically syncs with the backup server to detect restores.
679+
t.Run("FullSync", func(t *testing.T) {
680+
backupConfig := main.BackupConfig{
681+
Type: "file",
682+
Path: t.TempDir(),
683+
Delay: 10 * time.Microsecond,
684+
FullSyncInterval: 10 * time.Millisecond,
685+
}
686+
687+
cmd0 := newMountCommand(t, t.TempDir(), nil)
688+
cmd0.Config.Backup = backupConfig
689+
runMountCommand(t, cmd0)
690+
waitForPrimary(t, cmd0)
691+
692+
// Create a simple table with a single value.
693+
db0 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db"))
694+
if _, err := db0.Exec(`CREATE TABLE t (x)`); err != nil {
695+
t.Fatal(err)
696+
} else if _, err := db0.Exec(`INSERT INTO t VALUES (100)`); err != nil {
697+
t.Fatal(err)
698+
} else if err := db0.Close(); err != nil {
699+
t.Fatal(err)
700+
}
701+
702+
// Sync to backup.
703+
if err := cmd0.Store.SyncBackup(context.Background()); err != nil {
704+
t.Fatal(err)
705+
}
706+
707+
// Start replica, sync & shutdown.
708+
dir1 := t.TempDir()
709+
cmd1 := newMountCommand(t, dir1, cmd0)
710+
cmd1.Config.Backup = backupConfig
711+
runMountCommand(t, cmd1)
712+
waitForSync(t, "db", cmd0, cmd1)
713+
if err := cmd1.Close(); err != nil {
714+
t.Fatal(err)
715+
}
716+
717+
// Restart replica as its own cluster so we can update the backup while
718+
// the old primary is still live.
719+
cmd1 = newMountCommand(t, dir1, nil)
720+
cmd1.Config.Backup = backupConfig
721+
runMountCommand(t, cmd1)
722+
waitForPrimary(t, cmd1)
723+
724+
// Insert data into second primary.
725+
db1 := testingutil.OpenSQLDB(t, filepath.Join(cmd1.Config.FUSE.Dir, "db"))
726+
if _, err := db1.Exec(`INSERT INTO t VALUES (200)`); err != nil {
727+
t.Fatal(err)
728+
} else if err := db1.Close(); err != nil {
729+
t.Fatal(err)
730+
}
731+
if err := cmd1.Store.SyncBackup(context.Background()); err != nil {
732+
t.Fatal(err)
733+
}
734+
waitForBackupSync(t, cmd1)
735+
736+
// Wait a moment for the first primary to pick up the changes.
737+
time.Sleep(1 * time.Second)
738+
739+
db0 = testingutil.OpenSQLDB(t, filepath.Join(cmd1.Config.FUSE.Dir, "db"))
740+
var n int
741+
if err := db0.QueryRow(`SELECT SUM(x) FROM t`).Scan(&n); err != nil {
742+
t.Fatal(err)
743+
} else if got, want := n, 300; got != want {
744+
t.Fatalf("sum=%d, want %d", got, want)
745+
}
746+
})
672747
}
673748

674749
func TestMultiNode_Simple(t *testing.T) {

store.go

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ const (
4141
DefaultHaltLockTTL = 30 * time.Second
4242
DefaultHaltLockMonitorInterval = 5 * time.Second
4343

44-
DefaultBackupDelay = 1 * time.Second
44+
DefaultBackupDelay = 1 * time.Second
45+
DefaultBackupFullSyncInterval = 10 * time.Second
4546
)
4647

4748
var ErrStoreClosed = fmt.Errorf("store closed")
@@ -103,6 +104,10 @@ type Store struct {
103104
// This allows multiple changes in quick succession to be batched together.
104105
BackupDelay time.Duration
105106

107+
// Interval between checks to re-fetch the position map. This ensures that
108+
// restores on the backup server are detected by the LiteFS primary.
109+
BackupFullSyncInterval time.Duration
110+
106111
// Callback to notify kernel of file changes.
107112
Invalidator Invalidator
108113

@@ -140,7 +145,8 @@ func NewStore(path string, candidate bool) *Store {
140145
HaltLockTTL: DefaultHaltLockTTL,
141146
HaltLockMonitorInterval: DefaultHaltLockMonitorInterval,
142147

143-
BackupDelay: DefaultBackupDelay,
148+
BackupDelay: DefaultBackupDelay,
149+
BackupFullSyncInterval: DefaultBackupFullSyncInterval,
144150

145151
Environment: &nopEnvironment{},
146152
}
@@ -995,33 +1001,44 @@ func (s *Store) SyncBackup(ctx context.Context) error {
9951001
}
9961002

9971003
// streamBackup connects to a backup server and continuously streams LTX files.
998-
func (s *Store) streamBackup(ctx context.Context, oneTime bool) error {
1004+
func (s *Store) streamBackup(ctx context.Context, oneTime bool) (err error) {
9991005
// Start subscription immediately so we can collect any changes.
10001006
subscription := s.Subscribe(0)
10011007
defer func() { _ = subscription.Close() }()
10021008

1003-
// Fetch position map from backup server.
1004-
posMap, err := s.BackupClient.PosMap(ctx)
1005-
if err != nil {
1006-
return fmt.Errorf("fetch position map: %w", err)
1007-
}
1008-
1009-
slog.Info("begin streaming backup", slog.Int("n", len(posMap)))
1009+
slog.Info("begin streaming backup", slog.Duration("full-sync-interval", s.BackupFullSyncInterval))
10101010
defer func() { slog.Info("exiting streaming backup") }()
10111011

1012-
// Build initial dirty set of databases.
1013-
dirtySet := make(map[string]struct{})
1014-
for name := range posMap {
1015-
dirtySet[name] = struct{}{}
1016-
}
1017-
for _, db := range s.DBs() {
1018-
dirtySet[db.Name()] = struct{}{}
1019-
}
1020-
10211012
timer := time.NewTimer(0)
10221013
defer timer.Stop()
10231014

1015+
var posMapTickerCh <-chan time.Time
1016+
if s.BackupFullSyncInterval > 0 {
1017+
ticker := time.NewTicker(s.BackupFullSyncInterval)
1018+
defer ticker.Stop()
1019+
posMapTickerCh = ticker.C
1020+
}
1021+
1022+
var posMap map[string]ltx.Pos
1023+
dirtySet := make(map[string]struct{})
1024+
1025+
LOOP:
10241026
for {
1027+
// If we don't have a position map yet or if it's been reset then fetch
1028+
// a new one from the backup server and update our dirty set.
1029+
if posMap == nil {
1030+
slog.Debug("fetching position map from backup server")
1031+
if posMap, err = s.BackupClient.PosMap(ctx); err != nil {
1032+
return fmt.Errorf("fetch position map: %w", err)
1033+
}
1034+
for name := range posMap {
1035+
dirtySet[name] = struct{}{}
1036+
}
1037+
for _, db := range s.DBs() {
1038+
dirtySet[db.Name()] = struct{}{}
1039+
}
1040+
}
1041+
10251042
slog.Debug("syncing databases to backup", slog.Int("dirty", len(dirtySet)))
10261043

10271044
// Send pending transactions for each database.
@@ -1064,6 +1081,9 @@ func (s *Store) streamBackup(ctx context.Context, oneTime bool) error {
10641081
select {
10651082
case <-ctx.Done():
10661083
return nil
1084+
case <-posMapTickerCh: // periodically re-fetch position map from backup server
1085+
posMap = nil
1086+
continue LOOP
10671087
case <-subscription.NotifyCh():
10681088
}
10691089

0 commit comments

Comments
 (0)