Skip to content

Commit d96d263

Browse files
authored
Add 'promote' flag to config (#306)
1 parent 3e60f36 commit d96d263

File tree

3 files changed

+94
-4
lines changed

3 files changed

+94
-4
lines changed

cmd/litefs/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ type LeaseConfig struct {
104104
// Replicas in a state lease should set this to false.
105105
Candidate bool `yaml:"candidate"`
106106

107+
// If true & node is a candidate, it will attempt to promote itself
108+
// automatically once it connects to the cluster and syncs.
109+
Promote bool `yaml:"promote"`
110+
107111
// After disconnect, time before node tries to reconnect to primary or
108112
// becomes primary itself.
109113
ReconnectDelay time.Duration `yaml:"reconnect-delay"`

cmd/litefs/mount_linux.go

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"regexp"
1414
"strings"
1515
"sync"
16+
"time"
1617

1718
"github.com/mattn/go-shellwords"
1819
"github.com/superfly/litefs"
@@ -237,18 +238,32 @@ func (c *MountCommand) Run(ctx context.Context) (err error) {
237238
case <-c.Store.ReadyCh():
238239
log.Printf("connected to cluster, ready")
239240
}
240-
}
241241

242-
// Execute subcommand, if specified in config.
243-
if err := c.execCmd(ctx); err != nil {
244-
return fmt.Errorf("cannot exec: %w", err)
242+
// Automatically promote the server if requested & it is a candidate.
243+
if c.Config.Lease.Promote {
244+
if c.Store.Candidate() {
245+
log.Printf("node is a candidate, automatically promoting to primary")
246+
if err := c.promote(ctx); err != nil {
247+
return fmt.Errorf("promote: %w", err)
248+
}
249+
} else {
250+
log.Printf("node is not a candidate, skipping automatic promotion")
251+
}
252+
}
245253
}
246254

255+
// Start the proxy server before the subcommand in case we need to hold
256+
// requests after we promote but before the server is ready.
247257
if c.ProxyServer != nil {
248258
c.ProxyServer.Serve()
249259
log.Printf("proxy server listening on: %s", c.ProxyServer.URL())
250260
}
251261

262+
// Execute subcommand, if specified in config.
263+
if err := c.execCmd(ctx); err != nil {
264+
return fmt.Errorf("cannot exec: %w", err)
265+
}
266+
252267
return nil
253268
}
254269

@@ -395,4 +410,35 @@ func (c *MountCommand) execCmd(ctx context.Context) error {
395410
return nil
396411
}
397412

413+
// promote issues a lease handoff request to the current primary.
414+
func (c *MountCommand) promote(ctx context.Context) (err error) {
415+
isPrimary, info := c.Store.PrimaryInfo()
416+
if isPrimary {
417+
log.Printf("node is already primary, skipping promotion")
418+
return nil
419+
}
420+
421+
client := http.NewClient()
422+
if err := client.Handoff(ctx, info.AdvertiseURL, c.Store.ID()); err != nil {
423+
return fmt.Errorf("handoff: %w", err)
424+
}
425+
426+
// Wait for the local node to become primary.
427+
ticker := time.NewTicker(1 * time.Millisecond)
428+
defer ticker.Stop()
429+
timeout := time.NewTicker(10 * time.Second)
430+
defer timeout.Stop()
431+
432+
for {
433+
select {
434+
case <-timeout.C:
435+
return fmt.Errorf("timed out waiting for promotion")
436+
case <-ticker.C:
437+
if c.Store.IsPrimary() {
438+
return nil
439+
}
440+
}
441+
}
442+
}
443+
398444
var expvarOnce sync.Once

cmd/litefs/mount_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,6 +1299,46 @@ func TestMultiNode_Handoff(t *testing.T) {
12991299
}
13001300
}
13011301

1302+
func TestMultiNode_Autopromotion(t *testing.T) {
1303+
cmd0 := newMountCommand(t, t.TempDir(), nil)
1304+
runMountCommand(t, cmd0)
1305+
waitForPrimary(t, cmd0)
1306+
1307+
// Create a simple table with a single value.
1308+
db0 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db"))
1309+
if _, err := db0.Exec(`CREATE TABLE t (x)`); err != nil {
1310+
t.Fatal(err)
1311+
} else if _, err := db0.Exec(`INSERT INTO t VALUES (100)`); err != nil {
1312+
t.Fatal(err)
1313+
}
1314+
1315+
// Start a new instance with autopromotion enabled.
1316+
cmd1 := newMountCommand(t, t.TempDir(), cmd0)
1317+
cmd1.Config.Lease.Promote = true
1318+
runMountCommand(t, cmd1)
1319+
db1 := testingutil.OpenSQLDB(t, filepath.Join(cmd1.Config.FUSE.Dir, "db"))
1320+
1321+
// Ensure we can update the new primary node.
1322+
if _, err := db1.Exec(`INSERT INTO t VALUES (200)`); err != nil {
1323+
t.Fatal(err)
1324+
}
1325+
1326+
// Ensure the data exists on both nodes.
1327+
waitForSync(t, "db", cmd0, cmd1)
1328+
var sum int
1329+
if err := db1.QueryRow(`SELECT SUM(x) FROM t`).Scan(&sum); err != nil {
1330+
t.Fatal(err)
1331+
} else if got, want := sum, 300; got != want {
1332+
t.Fatalf("sum=%d, want %d", got, want)
1333+
}
1334+
1335+
if err := db0.QueryRow(`SELECT SUM(x) FROM t`).Scan(&sum); err != nil {
1336+
t.Fatal(err)
1337+
} else if got, want := sum, 300; got != want {
1338+
t.Fatalf("sum=%d, want %d", got, want)
1339+
}
1340+
}
1341+
13021342
func TestMultiNode_StaticLeaser(t *testing.T) {
13031343
dir0, dir1 := t.TempDir(), t.TempDir()
13041344
cmd0 := newMountCommand(t, dir0, nil)

0 commit comments

Comments
 (0)