Skip to content

Commit 901612f

Browse files
authored
Witness support (#174)
* Add support for witness cleanup this connection is not needed Conside witness nodes as well as standbys when calculating quorum Remove unnecessary logic * Cleanup * cleaning up logic
1 parent 4b8dc5c commit 901612f

File tree

6 files changed

+127
-65
lines changed

6 files changed

+127
-65
lines changed

cmd/monitor/monitor_dead_members.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,27 +67,27 @@ func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int
6767
return nil
6868
}
6969

70-
standbys, err := node.RepMgr.StandbyMembers(ctx, conn)
70+
votingMembers, err := node.RepMgr.VotingMembers(ctx, conn)
7171
if err != nil {
7272
return fmt.Errorf("failed to query standbys: %s", err)
7373
}
7474

75-
for _, standby := range standbys {
76-
sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname)
75+
for _, voter := range votingMembers {
76+
sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.Hostname)
7777
if err != nil {
7878
// TODO - Verify the exception that's getting thrown.
79-
if time.Since(seenAt[standby.ID]) >= deadMemberRemovalThreshold {
80-
log.Printf("Removing dead member: %s\n", standby.Hostname)
81-
if err := node.RepMgr.UnregisterMember(standby); err != nil {
82-
log.Printf("failed to unregister member %s: %v", standby.Hostname, err)
79+
if time.Since(seenAt[voter.ID]) >= deadMemberRemovalThreshold {
80+
log.Printf("Removing dead member: %s\n", voter.Hostname)
81+
if err := node.RepMgr.UnregisterMember(voter); err != nil {
82+
log.Printf("failed to unregister member %s: %v", voter.Hostname, err)
8383
continue
8484
}
85-
delete(seenAt, standby.ID)
85+
delete(seenAt, voter.ID)
8686
}
8787

8888
continue
8989
}
90-
seenAt[standby.ID] = time.Now()
90+
seenAt[voter.ID] = time.Now()
9191

9292
if err := sConn.Close(ctx); err != nil {
9393
return fmt.Errorf("failed to close connection: %s", err)

internal/flycheck/role.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ func PostgreSQLRole(ctx context.Context, checks *check.CheckSuite) (*check.Check
4646

4747
case flypg.StandbyRoleName:
4848
return "replica", nil
49+
case flypg.WitnessRoleName:
50+
return "witness", nil
4951
default:
5052
return "unknown", nil
5153
}

internal/flypg/node.go

Lines changed: 67 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ func NewNode() (*Node, error) {
9494
Credentials: node.ReplCredentials,
9595
}
9696

97+
_, present := os.LookupEnv("WITNESS")
98+
node.RepMgr.Witness = present
99+
97100
node.PGConfig = PGConfig{
98101
DataDir: node.DataDir,
99102
Port: node.Port,
@@ -122,7 +125,6 @@ func (n *Node) Init(ctx context.Context) error {
122125

123126
// Check to see if we were just restored
124127
if os.Getenv("FLY_RESTORED_FROM") != "" {
125-
// Check to see if there's an active restore.
126128
active, err := isRestoreActive()
127129
if err != nil {
128130
return fmt.Errorf("failed to verify active restore: %s", err)
@@ -163,31 +165,41 @@ func (n *Node) Init(ctx context.Context) error {
163165
return fmt.Errorf("failed to verify cluster state %s", err)
164166
}
165167

166-
if !clusterInitialized {
168+
if clusterInitialized {
169+
if n.RepMgr.Witness {
170+
log.Println("Provisioning witness")
171+
if err := n.PGConfig.writePasswordFile(n.OperatorCredentials.Password); err != nil {
172+
return fmt.Errorf("failed to write pg password file: %s", err)
173+
}
174+
175+
if err := n.PGConfig.initdb(); err != nil {
176+
return fmt.Errorf("failed to initialize postgres %s", err)
177+
}
178+
} else {
179+
log.Println("Provisioning standby")
180+
cloneTarget, err := n.RepMgr.ResolveMemberOverDNS(ctx)
181+
if err != nil {
182+
return fmt.Errorf("failed to resolve member over dns: %s", err)
183+
}
184+
185+
if err := n.RepMgr.clonePrimary(cloneTarget.Hostname); err != nil {
186+
// Clean-up the directory so it can be retried.
187+
if rErr := os.Remove(n.DataDir); rErr != nil {
188+
log.Printf("[ERROR] failed to cleanup postgresql dir after clone error: %s\n", rErr)
189+
}
190+
191+
return fmt.Errorf("failed to clone primary: %s", err)
192+
}
193+
}
194+
} else {
167195
log.Println("Provisioning primary")
168-
// TODO - This should probably run on boot in case the password changes.
169196
if err := n.PGConfig.writePasswordFile(n.OperatorCredentials.Password); err != nil {
170197
return fmt.Errorf("failed to write pg password file: %s", err)
171198
}
172199

173200
if err := n.PGConfig.initdb(); err != nil {
174201
return fmt.Errorf("failed to initialize postgres %s", err)
175202
}
176-
} else {
177-
log.Println("Provisioning standby")
178-
cloneTarget, err := n.RepMgr.ResolveMemberOverDNS(ctx)
179-
if err != nil {
180-
return fmt.Errorf("failed to resolve member over dns: %s", err)
181-
}
182-
183-
if err := n.RepMgr.clonePrimary(cloneTarget.Hostname); err != nil {
184-
// Clean-up the directory so it can be retried.
185-
if rErr := os.Remove(n.DataDir); rErr != nil {
186-
log.Printf("[ERROR] failed to cleanup postgresql dir after clone error: %s\n", rErr)
187-
}
188-
189-
return fmt.Errorf("failed to clone primary: %s", err)
190-
}
191203
}
192204
}
193205

@@ -236,13 +248,13 @@ func (n *Node) PostInit(ctx context.Context) error {
236248
}
237249

238250
if registered {
239-
// Existing member
240251
repConn, err := n.RepMgr.NewLocalConnection(ctx)
241252
if err != nil {
242253
return fmt.Errorf("failed to establish connection to local repmgr: %s", err)
243254
}
244255
defer func() { _ = repConn.Close(ctx) }()
245256

257+
// Existing member
246258
member, err := n.RepMgr.Member(ctx, repConn)
247259
if err != nil {
248260
return fmt.Errorf("failed to resolve member role: %s", err)
@@ -289,6 +301,16 @@ func (n *Node) PostInit(ctx context.Context) error {
289301
if err := n.RepMgr.registerStandby(); err != nil {
290302
return fmt.Errorf("failed to register existing standby: %s", err)
291303
}
304+
case WitnessRoleName:
305+
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
306+
if err != nil {
307+
return fmt.Errorf("failed to resolve primary member when updating witness: %s", err)
308+
}
309+
310+
// Register existing witness to take-on any configuration changes.
311+
if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
312+
return fmt.Errorf("failed to register existing witness: %s", err)
313+
}
292314
default:
293315
return fmt.Errorf("member has unknown role: %q", member.Role)
294316
}
@@ -350,10 +372,32 @@ func (n *Node) PostInit(ctx context.Context) error {
350372
return fmt.Errorf("failed to issue registration certificate: %s", err)
351373
}
352374
} else {
353-
// Configure as standby
354-
log.Println("Registering standby")
355-
if err := n.RepMgr.registerStandby(); err != nil {
356-
return fmt.Errorf("failed to register new standby: %s", err)
375+
if n.RepMgr.Witness {
376+
log.Println("Registering witness")
377+
378+
// Create required users
379+
if err := n.setupCredentials(ctx, conn); err != nil {
380+
return fmt.Errorf("failed to create required users: %s", err)
381+
}
382+
383+
// Setup repmgr database and extension
384+
if err := n.RepMgr.enable(ctx, conn); err != nil {
385+
return fmt.Errorf("failed to enable repmgr: %s", err)
386+
}
387+
388+
primary, err := n.RepMgr.ResolveMemberOverDNS(ctx)
389+
if err != nil {
390+
return fmt.Errorf("failed to resolve primary member: %s", err)
391+
}
392+
393+
if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
394+
return fmt.Errorf("failed to register witness: %s", err)
395+
}
396+
} else {
397+
log.Println("Registering standby")
398+
if err := n.RepMgr.registerStandby(); err != nil {
399+
return fmt.Errorf("failed to register new standby: %s", err)
400+
}
357401
}
358402

359403
// Let the boot process know that we've already been configured.

internal/flypg/registration.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func isRegistered(ctx context.Context, conn *pgx.Conn, n *Node) (bool, error) {
4848
if errors.Is(err, pgx.ErrNoRows) {
4949
return false, nil
5050
}
51+
5152
return false, fmt.Errorf("failed to resolve member role: %s", err)
5253
}
5354

internal/flypg/repmgr.go

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
const (
2323
PrimaryRoleName = "primary"
2424
StandbyRoleName = "standby"
25+
WitnessRoleName = "witness"
2526
UnknownRoleName = ""
2627

2728
repmgrConsulKey = "repmgr"
@@ -41,6 +42,7 @@ type RepMgr struct {
4142
PasswordConfigPath string
4243
InternalConfigPath string
4344
Port int
45+
Witness bool
4446

4547
internalConfig ConfigMap
4648
userConfig ConfigMap
@@ -216,7 +218,21 @@ func (r *RepMgr) resolveNodeID() (string, error) {
216218
}
217219

218220
func (r *RepMgr) registerPrimary() error {
219-
cmdStr := fmt.Sprintf("repmgr -f %s primary register -F -v", r.ConfigPath)
221+
cmdStr := fmt.Sprintf("repmgr primary register -f %s -F", r.ConfigPath)
222+
_, err := utils.RunCommand(cmdStr, "postgres")
223+
224+
return err
225+
}
226+
227+
func (r *RepMgr) registerStandby() error {
228+
cmdStr := fmt.Sprintf("repmgr standby register -f %s -F", r.ConfigPath)
229+
_, err := utils.RunCommand(cmdStr, "postgres")
230+
231+
return err
232+
}
233+
234+
func (r *RepMgr) registerWitness(primaryHostname string) error {
235+
cmdStr := fmt.Sprintf("repmgr witness register -f %s -h %s -F", r.ConfigPath, primaryHostname)
220236
_, err := utils.RunCommand(cmdStr, "postgres")
221237

222238
return err
@@ -229,6 +245,20 @@ func (r *RepMgr) unregisterPrimary(id int) error {
229245
return err
230246
}
231247

248+
func (r *RepMgr) unregisterStandby(id int) error {
249+
cmdStr := fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", r.ConfigPath, id)
250+
_, err := utils.RunCommand(cmdStr, "postgres")
251+
252+
return err
253+
}
254+
255+
func (r *RepMgr) unregisterWitness(id int) error {
256+
cmdStr := fmt.Sprintf("repmgr witness unregister -f %s --node-id=%d", r.ConfigPath, id)
257+
_, err := utils.RunCommand(cmdStr, "postgres")
258+
259+
return err
260+
}
261+
232262
func (r *RepMgr) rejoinCluster(hostname string) error {
233263
cmdStr := fmt.Sprintf("repmgr -f %s node rejoin -h %s -p %d -U %s -d %s --force-rewind --no-wait",
234264
r.ConfigPath,
@@ -244,25 +274,6 @@ func (r *RepMgr) rejoinCluster(hostname string) error {
244274
return err
245275
}
246276

247-
func (r *RepMgr) registerStandby() error {
248-
// Force re-registry to ensure the standby picks up any new configuration changes.
249-
cmdStr := fmt.Sprintf("repmgr -f %s standby register -F", r.ConfigPath)
250-
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
251-
return err
252-
}
253-
254-
return nil
255-
}
256-
257-
func (r *RepMgr) unregisterStandby(id int) error {
258-
cmdStr := fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", r.ConfigPath, id)
259-
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
260-
return err
261-
}
262-
263-
return nil
264-
}
265-
266277
func (r *RepMgr) clonePrimary(ipStr string) error {
267278
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
268279
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
@@ -339,20 +350,20 @@ func (*RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error)
339350
return &member, nil
340351
}
341352

342-
func (r *RepMgr) StandbyMembers(ctx context.Context, conn *pgx.Conn) ([]Member, error) {
353+
func (r *RepMgr) VotingMembers(ctx context.Context, conn *pgx.Conn) ([]Member, error) {
343354
members, err := r.Members(ctx, conn)
344355
if err != nil {
345356
return nil, err
346357
}
347358

348-
var standbys []Member
359+
var voters []Member
349360
for _, member := range members {
350-
if member.Role == StandbyRoleName {
351-
standbys = append(standbys, member)
361+
if member.Role == StandbyRoleName || member.Role == WitnessRoleName {
362+
voters = append(voters, member)
352363
}
353364
}
354365

355-
return standbys, nil
366+
return voters, nil
356367
}
357368

358369
func (*RepMgr) MemberByID(ctx context.Context, pg *pgx.Conn, id int) (*Member, error) {
@@ -441,15 +452,19 @@ func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error
441452
}
442453

443454
func (r *RepMgr) UnregisterMember(member Member) error {
444-
if member.Role == PrimaryRoleName {
455+
switch member.Role {
456+
case PrimaryRoleName:
445457
if err := r.unregisterPrimary(member.ID); err != nil {
446458
return fmt.Errorf("failed to unregister member %d: %s", member.ID, err)
447459
}
448-
return nil
449-
}
450-
451-
if err := r.unregisterStandby(member.ID); err != nil {
452-
return fmt.Errorf("failed to unregister member %d: %s", member.ID, err)
460+
case StandbyRoleName:
461+
if err := r.unregisterStandby(member.ID); err != nil {
462+
return fmt.Errorf("failed to unregister standby %d: %s", member.ID, err)
463+
}
464+
case WitnessRoleName:
465+
if err := r.unregisterWitness(member.ID); err != nil {
466+
return fmt.Errorf("failed to unregister witness %d: %s", member.ID, err)
467+
}
453468
}
454469

455470
return nil

internal/flypg/zombie.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@ func ReadZombieLock() (string, error) {
5757
}
5858

5959
func PerformScreening(ctx context.Context, conn *pgx.Conn, n *Node) (string, error) {
60-
standbys, err := n.RepMgr.StandbyMembers(ctx, conn)
60+
members, err := n.RepMgr.VotingMembers(ctx, conn)
6161
if err != nil {
6262
if !errors.Is(err, pgx.ErrNoRows) {
6363
return "", fmt.Errorf("failed to query standbys")
6464
}
6565
}
6666

67-
sample, err := TakeDNASample(ctx, n, standbys)
67+
sample, err := TakeDNASample(ctx, n, members)
6868
if err != nil {
6969
return "", fmt.Errorf("failed to evaluate cluster data: %s", err)
7070
}

0 commit comments

Comments
 (0)