Skip to content

Commit 4127f47

Browse files
committed
Replace readOnlyStateMonitor with clusterStateMonitor
1 parent 5d7c1eb commit 4127f47

File tree

6 files changed

+120
-116
lines changed

6 files changed

+120
-116
lines changed

cmd/event_handler/main.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,27 +71,12 @@ func main() {
7171
}
7272

7373
func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error {
74-
standbys, err := node.RepMgr.StandbyMembers(ctx, conn)
75-
if err != nil {
76-
if !errors.Is(err, pgx.ErrNoRows) {
77-
return fmt.Errorf("failed to query standbys")
78-
}
79-
}
80-
81-
sample, err := flypg.TakeDNASample(ctx, node, standbys)
82-
if err != nil {
83-
return fmt.Errorf("failed to evaluate cluster data: %s", err)
84-
}
85-
86-
log.Println(flypg.DNASampleString(sample))
87-
88-
primary, err := flypg.ZombieDiagnosis(sample)
74+
primary, err := node.EvaluateClusterState(ctx, conn)
8975
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
9076
// Quarantine primary
9177
if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
9278
return fmt.Errorf("failed to quarantine failed primary: %s", err)
9379
}
94-
9580
return fmt.Errorf("primary has been quarantined: %s", err)
9681
} else if err != nil {
9782
return fmt.Errorf("failed to run zombie diagnosis: %s", err)

cmd/monitor/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
var (
1313
deadMemberMonitorFrequency = time.Hour * 1
1414
replicationStateMonitorFrequency = time.Hour * 1
15-
readonlyStateMonitorFrequency = time.Minute * 1
15+
clusterStateMonitorFrequency = time.Minute * 15
1616

1717
defaultDeadMemberRemovalThreshold = time.Hour * 12
1818
defaultInactiveSlotRemovalThreshold = time.Hour * 12
@@ -35,8 +35,8 @@ func main() {
3535
}()
3636

3737
// Readonly monitor
38-
log.Println("Monitoring readonly state")
39-
go monitorReadOnly(ctx, node)
38+
log.Println("Monitoring cluster state")
39+
go monitorClusterState(ctx, node)
4040

4141
// Replication slot monitor
4242
log.Println("Monitoring replication slots")
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"time"
9+
10+
"github.com/fly-apps/postgres-flex/internal/flypg"
11+
"github.com/jackc/pgx/v5"
12+
)
13+
14+
func monitorClusterState(ctx context.Context, node *flypg.Node) {
15+
ticker := time.NewTicker(clusterStateMonitorFrequency)
16+
defer ticker.Stop()
17+
for range ticker.C {
18+
if err := clusterStateMonitorTick(ctx, node); err != nil {
19+
log.Printf("clusterStateMonitorTick failed with: %s", err)
20+
}
21+
}
22+
}
23+
24+
func clusterStateMonitorTick(ctx context.Context, node *flypg.Node) error {
25+
conn, err := node.RepMgr.NewLocalConnection(ctx)
26+
if err != nil {
27+
return fmt.Errorf("failed to open local connection: %s", err)
28+
}
29+
defer conn.Close(ctx)
30+
31+
member, err := node.RepMgr.Member(ctx, conn)
32+
if err != nil {
33+
return fmt.Errorf("failed to query local member: %s", err)
34+
}
35+
36+
// We only need to monitor the primary
37+
if member.Role != flypg.PrimaryRoleName {
38+
return nil
39+
}
40+
41+
primary, err := node.EvaluateClusterState(ctx, conn)
42+
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
43+
if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
44+
return fmt.Errorf("failed to quarantine failed primary: %s", err)
45+
}
46+
return fmt.Errorf("primary has been quarantined: %s", err)
47+
} else if err != nil {
48+
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
49+
}
50+
51+
// Clear zombie lock if it exists
52+
if flypg.ZombieLockExists() {
53+
log.Println("Clearing zombie lock and enabling read/write")
54+
if err := flypg.RemoveZombieLock(); err != nil {
55+
return fmt.Errorf("failed to remove zombie lock: %s", err)
56+
}
57+
58+
log.Println("Broadcasting readonly state change")
59+
if err := flypg.BroadcastReadonlyChange(ctx, node, false); err != nil {
60+
log.Printf("errors while disabling readonly: %s", err)
61+
}
62+
}
63+
64+
return nil
65+
}
66+
67+
func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error {
68+
primary, err := node.EvaluateClusterState(ctx, conn)
69+
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
70+
// Quarantine primary
71+
if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
72+
return fmt.Errorf("failed to quarantine failed primary: %s", err)
73+
}
74+
return fmt.Errorf("primary has been quarantined: %s", err)
75+
} else if err != nil {
76+
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
77+
}
78+
79+
// Clear zombie lock if it exists
80+
if flypg.ZombieLockExists() {
81+
log.Println("Clearing zombie lock and enabling read/write")
82+
if err := flypg.RemoveZombieLock(); err != nil {
83+
return fmt.Errorf("failed to remove zombie lock: %s", err)
84+
}
85+
86+
log.Println("Broadcasting readonly state change")
87+
if err := flypg.BroadcastReadonlyChange(ctx, node, false); err != nil {
88+
log.Printf("errors while disabling readonly: %s", err)
89+
}
90+
}
91+
92+
return nil
93+
}

cmd/monitor/monitor_readonly.go

Lines changed: 0 additions & 78 deletions
This file was deleted.

internal/flypg/node.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -326,30 +326,12 @@ func (n *Node) PostInit(ctx context.Context) error {
326326

327327
switch role {
328328
case PrimaryRoleName:
329-
standbys, err := repmgr.StandbyMembers(ctx, conn)
330-
if err != nil {
331-
if !errors.Is(err, pgx.ErrNoRows) {
332-
return fmt.Errorf("failed to query standbys")
333-
}
334-
}
335-
336-
// Collect sample data from registered standbys
337-
sample, err := TakeDNASample(ctx, n, standbys)
338-
if err != nil {
339-
return fmt.Errorf("failed to resolve cluster metrics: %s", err)
340-
}
341-
342-
fmt.Println(DNASampleString(sample))
343-
344-
// Evaluate whether we are a zombie or not.
345-
primary, err := ZombieDiagnosis(sample)
329+
primary, err := n.EvaluateClusterState(ctx, conn)
346330
if errors.Is(err, ErrZombieDiagnosisUndecided) {
347331
fmt.Println("Unable to confirm that we are the true primary!")
348-
349332
if err := Quarantine(ctx, conn, n, primary); err != nil {
350333
return fmt.Errorf("failed to quarantine failed primary: %s", err)
351334
}
352-
353335
} else if errors.Is(err, ErrZombieDiscovered) {
354336
fmt.Printf("The majority of registered members agree that '%s' is the real primary.\n", primary)
355337

@@ -678,3 +660,21 @@ func setDirOwnership() error {
678660
_, err = cmd.Output()
679661
return err
680662
}
663+
664+
func (n *Node) EvaluateClusterState(ctx context.Context, conn *pgx.Conn) (string, error) {
665+
standbys, err := n.RepMgr.StandbyMembers(ctx, conn)
666+
if err != nil {
667+
if !errors.Is(err, pgx.ErrNoRows) {
668+
return "", fmt.Errorf("failed to query standbys")
669+
}
670+
}
671+
672+
sample, err := TakeDNASample(ctx, n, standbys)
673+
if err != nil {
674+
return "", fmt.Errorf("failed to evaluate cluster data: %s", err)
675+
}
676+
677+
fmt.Println(DNASampleString(sample))
678+
679+
return ZombieDiagnosis(sample)
680+
}

internal/flypg/zombie.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ func ZombieDiagnosis(s *DNASample) (string, error) {
157157
}
158158

159159
func Quarantine(ctx context.Context, conn *pgx.Conn, n *Node, primary string) error {
160+
if ZombieLockExists() {
161+
return nil
162+
}
163+
160164
fmt.Println("Writing zombie.lock file.")
161165
if err := writeZombieLock(primary); err != nil {
162166
return fmt.Errorf("failed to set zombie lock: %s", err)

0 commit comments

Comments
 (0)