Skip to content

Commit 1b97e67

Browse files
committed
Bug fixes
1 parent a7cc6c4 commit 1b97e67

File tree

2 files changed

+38
-35
lines changed

2 files changed

+38
-35
lines changed

cmd/standby_cleaner/main.go

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ import (
1212
)
1313

1414
var (
15-
monitorFrequency = time.Minute * 5
16-
deadMemberRemovalThreshold = time.Hour * 24
15+
monitorFrequency = time.Minute * 5
16+
// TODO - Make this configurable and/or extend this to 12-24 hours.
17+
deadMemberRemovalThreshold = time.Hour * 1
1718
)
1819

1920
func main() {
@@ -37,44 +38,47 @@ func main() {
3738
ticker := time.NewTicker(monitorFrequency)
3839
defer ticker.Stop()
3940

40-
for range ticker.C {
41-
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
42-
if err != nil {
43-
fmt.Printf("Failed to check role: %s\n", err)
44-
continue
45-
}
46-
47-
if role != flypg.PrimaryRoleName {
48-
continue
49-
}
41+
for {
42+
select {
43+
case <-ticker.C:
44+
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
45+
if err != nil {
46+
fmt.Printf("Failed to check role: %s\n", err)
47+
continue
48+
}
5049

51-
standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
52-
if err != nil {
53-
fmt.Printf("Failed to query standbys: %s\n", err)
54-
continue
55-
}
50+
if role != flypg.PrimaryRoleName {
51+
continue
52+
}
5653

57-
for _, standby := range standbys {
58-
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
59-
defer newConn.Close(ctx)
54+
standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
6055
if err != nil {
61-
// TODO - Verify the exception that's getting thrown.
62-
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
63-
if err := flypgNode.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
64-
fmt.Printf("failed to unregister member %d: %v\n", standby.Id, err.Error())
65-
continue
56+
fmt.Printf("Failed to query standbys: %s\n", err)
57+
continue
58+
}
59+
60+
for _, standby := range standbys {
61+
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
62+
defer newConn.Close(ctx)
63+
if err != nil {
64+
// TODO - Verify the exception that's getting thrown.
65+
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
66+
if err := flypgNode.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
67+
fmt.Printf("failed to unregister member %d: %v\n", standby.Id, err.Error())
68+
continue
69+
}
70+
71+
delete(seenAt, standby.Id)
6672
}
6773

68-
delete(seenAt, standby.Id)
74+
continue
6975
}
7076

71-
continue
77+
seenAt[standby.Id] = time.Now()
7278
}
7379

74-
seenAt[standby.Id] = time.Now()
80+
removeOrphanedReplicationSlots(ctx, conn, standbys)
7581
}
76-
77-
removeOrphanedReplicationSlots(ctx, conn, standbys)
7882
}
7983
}
8084

@@ -102,13 +106,13 @@ func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standby
102106
}
103107

104108
if len(orphanedSlots) > 0 {
105-
fmt.Printf("%d orphaned replication slots detected", len(orphanedSlots))
109+
fmt.Printf("%d orphaned replication slot(s) detected\n", len(orphanedSlots))
106110

107111
for _, slot := range orphanedSlots {
108-
fmt.Printf("dropping replication slot: %s", slot.Name)
112+
fmt.Printf("Dropping replication slot: %s\n", slot.Name)
109113

110114
if err := admin.DropReplicationSlot(ctx, conn, slot.Name); err != nil {
111-
fmt.Printf("failed to drop replication slot %s: %v", slot.Name, err)
115+
fmt.Printf("failed to drop replication slot %s: %v\n", slot.Name, err)
112116
continue
113117
}
114118
}

pkg/flypg/admin/admin.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ func ListReplicationSlots(ctx context.Context, pg *pgx.Conn) ([]ReplicationSlot,
109109

110110
num, err := strconv.ParseInt(idStr, 10, 32)
111111
if err != nil {
112-
fmt.Printf("failed to parse member id %s", idStr)
113-
continue
112+
return nil, err
114113
}
115114

116115
slot.MemberID = int32(num)

0 commit comments

Comments
 (0)