Skip to content

Commit a166ff2

Browse files
authored
Merge pull request #33 from fly-apps/unregister-member-script
Unregister member + cleanup replication slots
2 parents 234cd08 + 4af8ee5 commit a166ff2

File tree

8 files changed

+291
-44
lines changed

8 files changed

+291
-44
lines changed

.flyctl/cmd/pg_unregister/main.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"fmt"
7+
"os"
8+
9+
"github.com/fly-apps/postgres-flex/pkg/flypg"
10+
"github.com/fly-apps/postgres-flex/pkg/utils"
11+
)
12+
13+
func main() {
14+
encodedArg := os.Args[1]
15+
hostnameBytes, err := base64.StdEncoding.DecodeString(encodedArg)
16+
if err != nil {
17+
utils.WriteError(fmt.Errorf("failed to decode hostname: %v", err))
18+
os.Exit(1)
19+
return
20+
}
21+
22+
node, err := flypg.NewNode()
23+
if err != nil {
24+
utils.WriteError(err)
25+
os.Exit(1)
26+
return
27+
}
28+
29+
if err := node.UnregisterMemberByHostname(context.Background(), string(hostnameBytes)); err != nil {
30+
utils.WriteError(fmt.Errorf("failed to unregister member: %v", err))
31+
os.Exit(1)
32+
return
33+
}
34+
35+
utils.WriteOutput("Member has been succesfully unregistered", "")
36+
}

Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/event_handler ./cmd/event_h
1010
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/failover_validation ./cmd/failover_validation
1111

1212
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/standby_cleaner ./cmd/standby_cleaner
13+
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/pg_unregister ./.flyctl/cmd/pg_unregister
14+
1315
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/start ./cmd/start
1416
COPY ./bin/* /fly/bin/
1517

cmd/event_handler/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func main() {
3535
fmt.Printf("failed initialize cluster state store. %v", err)
3636
}
3737

38-
member, err := cs.FindMember(int32(*nodeID))
38+
member, err := cs.FindMemberByID(int32(*nodeID))
3939
if err != nil {
4040
fmt.Printf("failed to find member %v: %s", *nodeID, err)
4141
}
@@ -64,7 +64,7 @@ func main() {
6464
fmt.Printf("failed to parse new member id: %s", err)
6565
}
6666

67-
member, err := cs.FindMember(int32(newMemberID))
67+
member, err := cs.FindMemberByID(int32(newMemberID))
6868
if err != nil {
6969
fmt.Printf("failed to find member in consul: %s", err)
7070
}

cmd/standby_cleaner/main.go

Lines changed: 82 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,15 @@ import (
77
"time"
88

99
"github.com/fly-apps/postgres-flex/pkg/flypg"
10-
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
10+
"github.com/fly-apps/postgres-flex/pkg/flypg/admin"
11+
"github.com/jackc/pgx/v4"
1112
)
1213

13-
var Minute int64 = 60
14+
var (
15+
monitorFrequency = time.Minute * 5
16+
// TODO - Make this configurable and/or extend this to 12-24 hours.
17+
deadMemberRemovalThreshold = time.Hour * 1
18+
)
1419

1520
func main() {
1621
ctx := context.Background()
@@ -20,55 +25,95 @@ func main() {
2025
os.Exit(1)
2126
}
2227

28+
// TODO - We should connect using the flypgadmin user so we can differentiate between
29+
// internal admin connection usage and the actual repmgr process.
2330
conn, err := flypgNode.RepMgr.NewLocalConnection(ctx)
2431
if err != nil {
2532
fmt.Printf("failed to open local connection: %s\n", err)
2633
os.Exit(1)
2734
}
2835

29-
ticker := time.NewTicker(5 * time.Second)
36+
seenAt := map[int]time.Time{}
37+
38+
ticker := time.NewTicker(monitorFrequency)
3039
defer ticker.Stop()
3140

32-
seenAt := map[int]int64{}
41+
for {
42+
select {
43+
case <-ticker.C:
44+
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
45+
if err != nil {
46+
fmt.Printf("Failed to check role: %s\n", err)
47+
continue
48+
}
3349

34-
for _ = range ticker.C {
35-
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
36-
if err != nil {
37-
fmt.Printf("Failed to check role: %s", err)
38-
continue
39-
}
40-
if role != "primary" {
41-
continue
42-
}
43-
standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
44-
if err != nil {
45-
fmt.Printf("Failed to get standbys: %s", err)
46-
continue
47-
}
48-
for _, standby := range standbys {
49-
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
50+
if role != flypg.PrimaryRoleName {
51+
continue
52+
}
53+
54+
standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
5055
if err != nil {
51-
if time.Now().Unix()-seenAt[standby.Id] >= 10*Minute {
52-
cs, err := state.NewClusterState()
53-
if err != nil {
54-
fmt.Printf("failed initialize cluster state store. %v", err)
55-
}
56+
fmt.Printf("Failed to query standbys: %s\n", err)
57+
continue
58+
}
5659

57-
err = flypgNode.RepMgr.UnregisterStandby(standby.Id)
58-
if err != nil {
59-
fmt.Printf("Failed to unregister %d: %s", standby.Id, err)
60-
continue
61-
}
62-
delete(seenAt, standby.Id)
60+
for _, standby := range standbys {
61+
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
62+
defer newConn.Close(ctx)
63+
if err != nil {
64+
// TODO - Verify the exception that's getting thrown.
65+
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
66+
if err := flypgNode.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
67+
fmt.Printf("failed to unregister member %d: %v\n", standby.Id, err.Error())
68+
continue
69+
}
6370

64-
// Remove from Consul
65-
if err = cs.UnregisterMember(int32(standby.Id)); err != nil {
66-
fmt.Printf("Failed to unregister %d from consul: %s", standby.Id, err)
71+
delete(seenAt, standby.Id)
6772
}
73+
74+
continue
6875
}
69-
} else {
70-
seenAt[standby.Id] = time.Now().Unix()
71-
newConn.Close(ctx)
76+
77+
seenAt[standby.Id] = time.Now()
78+
}
79+
80+
removeOrphanedReplicationSlots(ctx, conn, standbys)
81+
}
82+
}
83+
}
84+
85+
func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Standby) {
86+
var orphanedSlots []admin.ReplicationSlot
87+
88+
slots, err := admin.ListReplicationSlots(ctx, conn)
89+
if err != nil {
90+
fmt.Printf("failed to list replication slots: %s", err)
91+
}
92+
93+
// An orphaned replication slot is defined as an inactive replication slot that is no longer tied to
94+
// and existing repmgr member.
95+
for _, slot := range slots {
96+
matchFound := false
97+
for _, standby := range standbys {
98+
if slot.MemberID == int32(standby.Id) {
99+
matchFound = true
100+
}
101+
}
102+
103+
if !matchFound && !slot.Active {
104+
orphanedSlots = append(orphanedSlots, slot)
105+
}
106+
}
107+
108+
if len(orphanedSlots) > 0 {
109+
fmt.Printf("%d orphaned replication slot(s) detected\n", len(orphanedSlots))
110+
111+
for _, slot := range orphanedSlots {
112+
fmt.Printf("Dropping replication slot: %s\n", slot.Name)
113+
114+
if err := admin.DropReplicationSlot(ctx, conn, slot.Name); err != nil {
115+
fmt.Printf("failed to drop replication slot %s: %v\n", slot.Name, err)
116+
continue
72117
}
73118
}
74119
}

pkg/flypg/admin/admin.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package admin
33
import (
44
"context"
55
"fmt"
6+
"strconv"
7+
"strings"
68

79
"github.com/jackc/pgx/v4"
810
)
@@ -75,6 +77,60 @@ func DeleteDatabase(ctx context.Context, pg *pgx.Conn, name string) error {
7577
return nil
7678
}
7779

80+
type ReplicationSlot struct {
81+
MemberID int32
82+
Name string
83+
Type string
84+
Active bool
85+
WalStatus string
86+
}
87+
88+
func ListReplicationSlots(ctx context.Context, pg *pgx.Conn) ([]ReplicationSlot, error) {
89+
sql := fmt.Sprintf("SELECT slot_name, slot_type, active, wal_status from pg_replication_slots;")
90+
rows, err := pg.Query(ctx, sql)
91+
defer rows.Close()
92+
if err != nil {
93+
return nil, err
94+
}
95+
96+
var slots []ReplicationSlot
97+
98+
for rows.Next() {
99+
var slot ReplicationSlot
100+
if err := rows.Scan(&slot.Name, &slot.Type, &slot.Active, &slot.WalStatus); err != nil {
101+
return nil, err
102+
}
103+
104+
// Extract the repmgr member id from the slot name.
105+
// Slot name has the following format: repmgr_slot_<member-id>
106+
slotArr := strings.Split(slot.Name, "_")
107+
if slotArr[0] == "repmgr" {
108+
idStr := slotArr[2]
109+
110+
num, err := strconv.ParseInt(idStr, 10, 32)
111+
if err != nil {
112+
return nil, err
113+
}
114+
115+
slot.MemberID = int32(num)
116+
slots = append(slots, slot)
117+
}
118+
}
119+
120+
return slots, nil
121+
}
122+
123+
func DropReplicationSlot(ctx context.Context, pg *pgx.Conn, name string) error {
124+
sql := fmt.Sprintf("SELECT pg_drop_replication_slot('%s');", name)
125+
126+
_, err := pg.Exec(ctx, sql)
127+
if err != nil {
128+
return err
129+
}
130+
131+
return nil
132+
}
133+
78134
func EnableExtension(ctx context.Context, pg *pgx.Conn, extension string) error {
79135
sql := fmt.Sprintf("CREATE EXTENSION IF NOT EXISTS %s;", extension)
80136
_, err := pg.Exec(context.Background(), sql)

pkg/flypg/node.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,6 @@ func (n *Node) PostInit(ctx context.Context) error {
305305
return fmt.Errorf("failed to register member with consul: %s", err)
306306
}
307307
}
308-
309308
// Requery the primaryIP from consul in case the primary was assigned above.
310309
primary, err = cs.PrimaryMember()
311310
if err != nil {
@@ -324,6 +323,34 @@ func (n *Node) NewLocalConnection(ctx context.Context, database string) (*pgx.Co
324323
return openConnection(ctx, host, database, n.OperatorCredentials)
325324
}
326325

326+
func (n *Node) UnregisterMemberByHostname(ctx context.Context, hostname string) error {
327+
cs, err := state.NewClusterState()
328+
if err != nil {
329+
fmt.Printf("failed initialize cluster state store. %v", err)
330+
}
331+
332+
member, err := cs.FindMemberByHostname(hostname)
333+
if err != nil {
334+
return err
335+
}
336+
337+
return n.unregisterNode(ctx, cs, member)
338+
}
339+
340+
func (n *Node) UnregisterMemberByID(ctx context.Context, id int32) error {
341+
cs, err := state.NewClusterState()
342+
if err != nil {
343+
fmt.Printf("failed initialize cluster state store. %v", err)
344+
}
345+
346+
member, err := cs.FindMemberByID(id)
347+
if err != nil {
348+
return err
349+
}
350+
351+
return n.unregisterNode(ctx, cs, member)
352+
}
353+
327354
func (n *Node) isInitialized() bool {
328355
_, err := os.Stat(n.DataDir)
329356
if os.IsNotExist(err) {
@@ -384,6 +411,25 @@ func (n *Node) createRequiredUsers(ctx context.Context, conn *pgx.Conn) error {
384411
return nil
385412
}
386413

414+
func (n *Node) unregisterNode(ctx context.Context, cs *state.ClusterState, member *state.Member) error {
415+
if member == nil {
416+
return state.ErrMemberNotFound
417+
}
418+
419+
// Unregister from repmgr
420+
err := n.RepMgr.UnregisterStandby(int(member.ID))
421+
if err != nil {
422+
return fmt.Errorf("failed to unregister member %d from repmgr: %s", member.ID, err)
423+
}
424+
425+
// Unregister from consul
426+
if err := cs.UnregisterMember(member.ID); err != nil {
427+
return fmt.Errorf("failed to unregister member %d from consul: %v", member.ID, err)
428+
}
429+
430+
return nil
431+
}
432+
387433
type HBAEntry struct {
388434
Type string
389435
Database string

0 commit comments

Comments
 (0)