Skip to content

Commit d71d4f1

Browse files
committed
Rework the standby cleaner a bit to remove replication slots
1 parent 49dec7e commit d71d4f1

File tree

2 files changed

+89
-27
lines changed

2 files changed

+89
-27
lines changed

cmd/standby_cleaner/main.go

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,14 @@ import (
77
"time"
88

99
"github.com/fly-apps/postgres-flex/pkg/flypg"
10+
"github.com/fly-apps/postgres-flex/pkg/flypg/admin"
11+
"github.com/jackc/pgx/v4"
1012
)
1113

12-
// We need to adjust this to look at replication slots.
13-
// Pull ids from repmgr.show_nodes
14-
// Pull replication_slot ids that are inactive.
15-
16-
// Remove replication slot if the slot_name id is inactive and is not
17-
// present as a repmgr node.
18-
19-
var Minute int64 = 60
14+
var (
15+
monitorFrequency = time.Minute * 5
16+
deadMemberRemovalThreshold = time.Hour * 24
17+
)
2018

2119
func main() {
2220
ctx := context.Background()
@@ -26,44 +24,91 @@ func main() {
2624
os.Exit(1)
2725
}
2826

27+
// TODO - We should connect using the flypgadmin user so we can differentiate between
28+
// internal admin connection usage and the actual repmgr process.
2929
conn, err := flypgNode.RepMgr.NewLocalConnection(ctx)
3030
if err != nil {
3131
fmt.Printf("failed to open local connection: %s\n", err)
3232
os.Exit(1)
3333
}
3434

35-
ticker := time.NewTicker(5 * time.Second)
36-
defer ticker.Stop()
35+
seenAt := map[int]time.Time{}
3736

38-
seenAt := map[int]int64{}
37+
ticker := time.NewTicker(monitorFrequency)
38+
defer ticker.Stop()
3939

40-
for _ = range ticker.C {
40+
for range ticker.C {
4141
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
4242
if err != nil {
43-
fmt.Printf("Failed to check role: %s", err)
43+
fmt.Printf("Failed to check role: %s\n", err)
4444
continue
4545
}
46+
4647
if role != flypg.PrimaryRoleName {
4748
continue
4849
}
50+
4951
standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
5052
if err != nil {
51-
fmt.Printf("Failed to get standbys: %s", err)
53+
fmt.Printf("Failed to query standbys: %s\n", err)
5254
continue
5355
}
56+
5457
for _, standby := range standbys {
5558
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
59+
defer newConn.Close(ctx)
5660
if err != nil {
57-
if time.Now().Unix()-seenAt[standby.Id] >= 10*Minute {
61+
// TODO - Verify the exception that's getting thrown.
62+
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
5863
if err := flypg.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
59-
fmt.Println(err.Error())
64+
fmt.Printf("failed to unregister member %d: %v\n", standby.Id, err.Error())
65+
continue
6066
}
6167

6268
delete(seenAt, standby.Id)
6369
}
64-
} else {
65-
seenAt[standby.Id] = time.Now().Unix()
66-
newConn.Close(ctx)
70+
71+
continue
72+
}
73+
74+
seenAt[standby.Id] = time.Now()
75+
}
76+
77+
removeOrphanedReplicationSlots(ctx, conn, standbys)
78+
}
79+
}
80+
81+
func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Standby) {
82+
var orphanedSlots []admin.ReplicationSlot
83+
84+
slots, err := admin.ListReplicationSlots(ctx, conn)
85+
if err != nil {
86+
fmt.Printf("failed to list replication slots: %s", err)
87+
}
88+
89+
// Identify orphaned replication slots. active replication slots with the listed standbys
90+
for _, slot := range slots {
91+
matchFound := false
92+
for _, standby := range standbys {
93+
if slot.MemberID == int32(standby.Id) {
94+
matchFound = true
95+
}
96+
}
97+
98+
if !matchFound && !slot.Active {
99+
orphanedSlots = append(orphanedSlots, slot)
100+
}
101+
}
102+
103+
if len(orphanedSlots) > 0 {
104+
fmt.Printf("%d orphaned replication slots detected", len(orphanedSlots))
105+
106+
for _, slot := range orphanedSlots {
107+
fmt.Printf("dropping replication slot: %s", slot.Name)
108+
109+
if err := admin.DropReplicationSlot(ctx, conn, slot.Name); err != nil {
110+
fmt.Printf("failed to drop replication slot %s: %v", slot.Name, err)
111+
continue
67112
}
68113
}
69114
}

pkg/flypg/admin/admin.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package admin
33
import (
44
"context"
55
"fmt"
6+
"strconv"
7+
"strings"
68

79
"github.com/jackc/pgx/v4"
810
)
@@ -75,6 +77,14 @@ func DeleteDatabase(ctx context.Context, pg *pgx.Conn, name string) error {
7577
return nil
7678
}
7779

80+
type ReplicationSlot struct {
81+
MemberID int32
82+
Name string
83+
Type string
84+
Active bool
85+
WalStatus string
86+
}
87+
7888
func ListReplicationSlots(ctx context.Context, pg *pgx.Conn) ([]ReplicationSlot, error) {
7989
sql := fmt.Sprintf("SELECT slot_name, slot_type, active, wal_status from pg_replication_slots;")
8090
rows, err := pg.Query(ctx, sql)
@@ -91,7 +101,21 @@ func ListReplicationSlots(ctx context.Context, pg *pgx.Conn) ([]ReplicationSlot,
91101
return nil, err
92102
}
93103

94-
slots = append(slots, slot)
104+
slotArr := strings.Split(slot.Name, "_")
105+
// Only look at repmgr replication slots.
106+
if slotArr[0] == "repmgr" {
107+
// Resolve member id from slot name.
108+
idStr := slotArr[2]
109+
110+
num, err := strconv.ParseInt(idStr, 10, 32)
111+
if err != nil {
112+
fmt.Printf("failed to parse member id %s", idStr)
113+
continue
114+
}
115+
116+
slot.MemberID = int32(num)
117+
slots = append(slots, slot)
118+
}
95119
}
96120

97121
return slots, nil
@@ -174,13 +198,6 @@ type DbInfo struct {
174198
Users []string `json:"users"`
175199
}
176200

177-
type ReplicationSlot struct {
178-
Name string
179-
Type string
180-
Active bool
181-
WalStatus string
182-
}
183-
184201
func ListUsers(ctx context.Context, pg *pgx.Conn) ([]UserInfo, error) {
185202
sql := `
186203
select u.usename,

0 commit comments

Comments
 (0)