Skip to content

Commit 8e0534f

Browse files
committed
multi: add leader check to the healthcheck monitor
This commit extends our healtcheck with an optional leader check. This is to ensure that given network partition or other cluster wide failure we act as soon as possible to avoid a split-brain situation where a new leader is elected but we still hold onto our etcd client.
1 parent 7784d6a commit 8e0534f

File tree

8 files changed

+121
-12
lines changed

8 files changed

+121
-12
lines changed

cluster/etcd_elector.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,27 @@ func (e *etcdLeaderElector) Leader(ctx context.Context) (string, error) {
9999
return "", err
100100
}
101101

102+
if resp == nil || len(resp.Kvs) == 0 {
103+
return "", nil
104+
}
105+
102106
return string(resp.Kvs[0].Value), nil
103107
}
104108

109+
// IsLeader returns true if the caller is the leader.
110+
func (e *etcdLeaderElector) IsLeader(ctx context.Context) (bool, error) {
111+
resp, err := e.election.Leader(ctx)
112+
if err != nil {
113+
return false, err
114+
}
115+
116+
if resp == nil || len(resp.Kvs) == 0 {
117+
return false, nil
118+
}
119+
120+
return string(resp.Kvs[0].Value) == e.id, nil
121+
}
122+
105123
// Campaign will start a new leader election campaign. Campaign will block until
106124
// the elector context is canceled or the caller is elected as the leader.
107125
func (e *etcdLeaderElector) Campaign(ctx context.Context) error {
@@ -110,6 +128,6 @@ func (e *etcdLeaderElector) Campaign(ctx context.Context) error {
110128

111129
// Resign resigns the leader role allowing other election members to take
112130
// the place.
113-
func (e *etcdLeaderElector) Resign() error {
114-
return e.election.Resign(context.Background())
131+
func (e *etcdLeaderElector) Resign(ctx context.Context) error {
132+
return e.election.Resign(ctx)
115133
}

cluster/etcd_elector_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,12 @@ func TestEtcdElector(t *testing.T) {
8787
tmp := <-ch
8888
first, err := tmp.Leader(ctxb)
8989
require.NoError(t, err)
90-
require.NoError(t, tmp.Resign())
90+
require.NoError(t, tmp.Resign(ctxb))
9191

9292
tmp = <-ch
9393
second, err := tmp.Leader(ctxb)
9494
require.NoError(t, err)
95-
require.NoError(t, tmp.Resign())
95+
require.NoError(t, tmp.Resign(ctxb))
9696

9797
require.Contains(t, []string{id1, id2}, first)
9898
require.Contains(t, []string{id1, id2}, second)

cluster/interface.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ type LeaderElector interface {
1919

2020
// Resign resigns from the leader role, allowing other election members
2121
// to take on leadership.
22-
Resign() error
22+
Resign(ctx context.Context) error
2323

2424
// Leader returns the leader value for the current election.
2525
Leader(ctx context.Context) (string, error)
26+
27+
// IsLeader returns true if the caller is the leader.
28+
IsLeader(ctx context.Context) (bool, error)
2629
}

config.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,17 @@ const (
169169
defaultRSBackoff = time.Second * 30
170170
defaultRSAttempts = 1
171171

172+
// Set defaults for a health check which ensures that the leader
173+
// election is functioning correctly. Although this check is off by
174+
// default (as etcd leader election is only used in a clustered setup),
175+
// we still set the default values so that the health check can be
176+
// easily enabled with sane defaults. Note that by default we only run
177+
// this check once, as it is critical for the node's operation.
178+
defaultLeaderCheckInterval = time.Minute
179+
defaultLeaderCheckTimeout = time.Second * 5
180+
defaultLeaderCheckBackoff = time.Second * 5
181+
defaultLeaderCheckAttempts = 1
182+
172183
// defaultRemoteMaxHtlcs specifies the default limit for maximum
173184
// concurrent HTLCs the remote party may add to commitment transactions.
174185
// This value can be overridden with --default-remote-max-htlcs.
@@ -672,6 +683,12 @@ func DefaultConfig() Config {
672683
Attempts: defaultRSAttempts,
673684
Backoff: defaultRSBackoff,
674685
},
686+
LeaderCheck: &lncfg.CheckConfig{
687+
Interval: defaultLeaderCheckInterval,
688+
Timeout: defaultLeaderCheckTimeout,
689+
Attempts: defaultLeaderCheckAttempts,
690+
Backoff: defaultLeaderCheckBackoff,
691+
},
675692
},
676693
Gossip: &lncfg.Gossip{
677694
MaxChannelUpdateBurst: discovery.DefaultMaxChannelUpdateBurst,

lncfg/healthcheck.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type HealthCheckConfig struct {
3434
TorConnection *CheckConfig `group:"torconnection" namespace:"torconnection"`
3535

3636
RemoteSigner *CheckConfig `group:"remotesigner" namespace:"remotesigner"`
37+
38+
LeaderCheck *CheckConfig `group:"leader" namespace:"leader"`
3739
}
3840

3941
// Validate checks the values configured for our health checks.

lnd.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/lightningnetwork/lnd/build"
2525
"github.com/lightningnetwork/lnd/chanacceptor"
2626
"github.com/lightningnetwork/lnd/channeldb"
27+
"github.com/lightningnetwork/lnd/cluster"
2728
"github.com/lightningnetwork/lnd/keychain"
2829
"github.com/lightningnetwork/lnd/lncfg"
2930
"github.com/lightningnetwork/lnd/lnrpc"
@@ -56,6 +57,14 @@ const (
5657
// admin macaroon unless the administrator explicitly allowed it. Thus
5758
// there's no harm allowing group read.
5859
adminMacaroonFilePermissions = 0640
60+
61+
// leaderResignTimeout is the timeout used when resigning from the
62+
// leader role. This is kept short so LND can shut down quickly in case
63+
// of a system failure or network partition making the cluster
64+
// unresponsive. The cluster itself should ensure that the leader is not
65+
// elected again until the previous leader has resigned or the leader
66+
// election timeout has passed.
67+
leaderResignTimeout = 5 * time.Second
5968
)
6069

6170
// AdminAuthOptions returns a list of DialOptions that can be used to
@@ -381,6 +390,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
381390
// blocked until this instance is elected as the current leader or
382391
// shutting down.
383392
elected := false
393+
var leaderElector cluster.LeaderElector
384394
if cfg.Cluster.EnableLeaderElection {
385395
electionCtx, cancelElection := context.WithCancel(ctx)
386396

@@ -392,7 +402,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
392402
ltndLog.Infof("Using %v leader elector",
393403
cfg.Cluster.LeaderElector)
394404

395-
leaderElector, err := cfg.Cluster.MakeLeaderElector(
405+
leaderElector, err = cfg.Cluster.MakeLeaderElector(
396406
electionCtx, cfg.DB,
397407
)
398408
if err != nil {
@@ -407,7 +417,17 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
407417
ltndLog.Infof("Attempting to resign from leader role "+
408418
"(%v)", cfg.Cluster.ID)
409419

410-
if err := leaderElector.Resign(); err != nil {
420+
// Ensure that we don't block the shutdown process if
421+
// the leader resigning process takes too long. The
422+
// cluster will ensure that the leader is not elected
423+
// again until the previous leader has resigned or the
424+
// leader election timeout has passed.
425+
timeoutCtx, cancel := context.WithTimeout(
426+
ctx, leaderResignTimeout,
427+
)
428+
defer cancel()
429+
430+
if err := leaderElector.Resign(timeoutCtx); err != nil {
411431
ltndLog.Errorf("Leader elector failed to "+
412432
"resign: %v", err)
413433
}
@@ -579,7 +599,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
579599
server, err := newServer(
580600
cfg, cfg.Listeners, dbs, activeChainControl, &idKeyDesc,
581601
activeChainControl.Cfg.WalletUnlockParams.ChansToRestore,
582-
multiAcceptor, torController, tlsManager,
602+
multiAcceptor, torController, tlsManager, leaderElector,
583603
)
584604
if err != nil {
585605
return mkErr("unable to create server: %v", err)

lntest/node/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,9 @@ func ExtraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool,
315315
leaderSessionTTL),
316316
}
317317
extraArgs = append(extraArgs, clusterArgs...)
318+
extraArgs = append(
319+
extraArgs, "--healthcheck.leader.interval=10s",
320+
)
318321
}
319322

320323
return extraArgs

server.go

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/lightningnetwork/lnd/channeldb/models"
3737
"github.com/lightningnetwork/lnd/channelnotifier"
3838
"github.com/lightningnetwork/lnd/clock"
39+
"github.com/lightningnetwork/lnd/cluster"
3940
"github.com/lightningnetwork/lnd/contractcourt"
4041
"github.com/lightningnetwork/lnd/discovery"
4142
"github.com/lightningnetwork/lnd/feature"
@@ -484,8 +485,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
484485
nodeKeyDesc *keychain.KeyDescriptor,
485486
chansToRestore walletunlocker.ChannelsToRecover,
486487
chanPredicate chanacceptor.ChannelAcceptor,
487-
torController *tor.Controller, tlsManager *TLSManager) (*server,
488-
error) {
488+
torController *tor.Controller, tlsManager *TLSManager,
489+
leaderElector cluster.LeaderElector) (*server, error) {
489490

490491
var (
491492
err error
@@ -1674,7 +1675,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
16741675
}
16751676

16761677
// Create liveness monitor.
1677-
s.createLivenessMonitor(cfg, cc)
1678+
s.createLivenessMonitor(cfg, cc, leaderElector)
16781679

16791680
// Create the connection manager which will be responsible for
16801681
// maintaining persistent outbound connections and also accepting new
@@ -1721,7 +1722,9 @@ func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate) (*ecdsa.Signature,
17211722
//
17221723
// If a health check has been disabled by setting attempts to 0, our monitor
17231724
// will not run it.
1724-
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl) {
1725+
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl,
1726+
leaderElector cluster.LeaderElector) {
1727+
17251728
chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts
17261729
if cfg.Bitcoin.Node == "nochainbackend" {
17271730
srvrLog.Info("Disabling chain backend checks for " +
@@ -1837,6 +1840,49 @@ func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl) {
18371840
checks = append(checks, remoteSignerConnectionCheck)
18381841
}
18391842

1843+
// If we have a leader elector, we add a health check to ensure we are
1844+
// still the leader. During normal operation, we should always be the
1845+
// leader, but there are circumstances where this may change, such as
1846+
// when we lose network connectivity for long enough expiring out lease.
1847+
if leaderElector != nil {
1848+
leaderCheck := healthcheck.NewObservation(
1849+
"leader status",
1850+
func() error {
1851+
// Check if we are still the leader. Note that
1852+
// we don't need to use a timeout context here
1853+
// as the healthcheck observer will handle the
1854+
// timeout case for us.
1855+
timeoutCtx, cancel := context.WithTimeout(
1856+
context.Background(),
1857+
cfg.HealthChecks.LeaderCheck.Timeout,
1858+
)
1859+
defer cancel()
1860+
1861+
leader, err := leaderElector.IsLeader(
1862+
timeoutCtx,
1863+
)
1864+
if err != nil {
1865+
return fmt.Errorf("unable to check if "+
1866+
"still leader: %v", err)
1867+
}
1868+
1869+
if !leader {
1870+
srvrLog.Debug("Not the current leader")
1871+
return fmt.Errorf("not the current " +
1872+
"leader")
1873+
}
1874+
1875+
return nil
1876+
},
1877+
cfg.HealthChecks.LeaderCheck.Interval,
1878+
cfg.HealthChecks.LeaderCheck.Timeout,
1879+
cfg.HealthChecks.LeaderCheck.Backoff,
1880+
cfg.HealthChecks.LeaderCheck.Attempts,
1881+
)
1882+
1883+
checks = append(checks, leaderCheck)
1884+
}
1885+
18401886
// If we have not disabled all of our health checks, we create a
18411887
// liveness monitor with our configured checks.
18421888
s.livenessMonitor = healthcheck.NewMonitor(

0 commit comments

Comments
 (0)