Skip to content

Commit cbe1103

Browse files
authored
Merge pull request #66 from fly-apps/pgbouncer-readonly-fixes
Readonly changes need to be broadcasted
2 parents 2dc657f + e1192c7 commit cbe1103

File tree

12 files changed

+397
-232
lines changed

12 files changed

+397
-232
lines changed

Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ COPY . .
99

1010
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/event_handler ./cmd/event_handler
1111
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/failover_validation ./cmd/failover_validation
12-
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/standby_cleaner ./cmd/standby_cleaner
1312
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/pg_unregister ./cmd/pg_unregister
13+
14+
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/start_monitor ./cmd/monitor
1415
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/start_admin_server ./cmd/admin_server
1516

1617
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/start ./cmd/start

cmd/monitor/main.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
"os"
9+
"time"
10+
11+
"github.com/fly-apps/postgres-flex/internal/flypg"
12+
"github.com/fly-apps/postgres-flex/internal/flypg/admin"
13+
"github.com/jackc/pgx/v5"
14+
15+
"golang.org/x/exp/maps"
16+
)
17+
18+
var (
19+
deadMemberMonitorFrequency = time.Minute * 5
20+
readonlyStateMonitorFrequency = time.Minute * 1
21+
)
22+
23+
func main() {
24+
ctx := context.Background()
25+
flypgNode, err := flypg.NewNode()
26+
if err != nil {
27+
fmt.Printf("failed to reference node: %s\n", err)
28+
os.Exit(1)
29+
}
30+
31+
// Dead member monitor
32+
go func() {
33+
internal, err := flypg.ReadFromFile("/data/flypg.internal.conf")
34+
if err != nil {
35+
fmt.Printf("failed to open config: %s\n", err)
36+
os.Exit(1)
37+
}
38+
39+
user, err := flypg.ReadFromFile("/data/flypg.user.conf")
40+
if err != nil {
41+
fmt.Printf("failed to open config: %s\n", err)
42+
os.Exit(1)
43+
}
44+
45+
maps.Copy(user, internal)
46+
47+
deadMemberRemovalThreshold, err := time.ParseDuration(fmt.Sprint(internal["standby_clean_interval"]))
48+
if err != nil {
49+
fmt.Printf(fmt.Sprintf("Failed to parse config: %s", err))
50+
os.Exit(1)
51+
}
52+
53+
seenAt := map[int]time.Time{}
54+
55+
ticker := time.NewTicker(deadMemberMonitorFrequency)
56+
defer ticker.Stop()
57+
58+
fmt.Printf("Pruning every %s...\n", deadMemberRemovalThreshold)
59+
60+
for range ticker.C {
61+
err := handleDeadMemberMonitorTick(ctx, flypgNode, seenAt, deadMemberRemovalThreshold)
62+
if err != nil {
63+
fmt.Println(err)
64+
}
65+
}
66+
}()
67+
68+
// Readonly monitor
69+
ticker := time.NewTicker(readonlyStateMonitorFrequency)
70+
defer ticker.Stop()
71+
for range ticker.C {
72+
if err := handleReadonlyMonitorTick(ctx, flypgNode); err != nil {
73+
fmt.Println(err)
74+
}
75+
}
76+
77+
}
78+
79+
type readonlyStateResponse struct {
80+
Result bool
81+
}
82+
83+
func handleReadonlyMonitorTick(ctx context.Context, node *flypg.Node) error {
84+
conn, err := node.RepMgr.NewLocalConnection(ctx)
85+
if err != nil {
86+
return fmt.Errorf("failed to open local connection: %s", err)
87+
}
88+
defer conn.Close(ctx)
89+
90+
member, err := node.RepMgr.Member(ctx, conn)
91+
if err != nil {
92+
return fmt.Errorf("failed to query local member: %s", err)
93+
}
94+
95+
if member.Role == flypg.PrimaryRoleName {
96+
return nil
97+
}
98+
99+
primary, err := node.RepMgr.PrimaryMember(ctx, conn)
100+
if err != nil {
101+
return fmt.Errorf("failed to query primary member: %s", err)
102+
}
103+
104+
endpoint := fmt.Sprintf("http://[%s]:5500/%s", primary.Hostname, flypg.ReadOnlyStateEndpoint)
105+
resp, err := http.Get(endpoint)
106+
if err != nil {
107+
return fmt.Errorf("failed to query primary readonly state: %s", err)
108+
}
109+
defer resp.Body.Close()
110+
111+
var state readonlyStateResponse
112+
if err := json.NewDecoder(resp.Body).Decode(&state); err != nil {
113+
return fmt.Errorf("failed to decode result: %s", err)
114+
}
115+
116+
if state.Result {
117+
if !flypg.ReadOnlyLockExists() {
118+
fmt.Printf("Setting connections running under %s to readonly\n", node.PrivateIP)
119+
if err := flypg.EnableReadonly(ctx, node); err != nil {
120+
return fmt.Errorf("failed to set connection under %s to readonly: %s", node.PrivateIP, err)
121+
}
122+
}
123+
} else {
124+
if !flypg.ZombieLockExists() && flypg.ReadOnlyLockExists() {
125+
fmt.Printf("Setting connections running under %s to read/write\n", node.PrivateIP)
126+
if err := flypg.DisableReadonly(ctx, node); err != nil {
127+
return fmt.Errorf("failed to set connections under %s read/write: %s", node.PrivateIP, err)
128+
}
129+
}
130+
}
131+
132+
return nil
133+
}
134+
135+
func handleDeadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int]time.Time, deadMemberRemovalThreshold time.Duration) error {
136+
// TODO - We should connect using the flypgadmin user so we can differentiate between
137+
// internal admin connection usage and the actual repmgr process.
138+
conn, err := node.RepMgr.NewLocalConnection(ctx)
139+
if err != nil {
140+
fmt.Printf("failed to open local connection: %s\n", err)
141+
os.Exit(1)
142+
}
143+
defer conn.Close(ctx)
144+
145+
member, err := node.RepMgr.MemberByID(ctx, conn, int(node.RepMgr.ID))
146+
if err != nil {
147+
return err
148+
}
149+
150+
if member.Role != flypg.PrimaryRoleName {
151+
return nil
152+
}
153+
154+
standbys, err := node.RepMgr.StandbyMembers(ctx, conn)
155+
if err != nil {
156+
return fmt.Errorf("failed to query standbys: %s", err)
157+
}
158+
159+
for _, standby := range standbys {
160+
sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname)
161+
if err != nil {
162+
// TODO - Verify the exception that's getting thrown.
163+
if time.Since(seenAt[standby.ID]) >= deadMemberRemovalThreshold {
164+
if err := node.RepMgr.UnregisterMember(ctx, standby); err != nil {
165+
fmt.Printf("failed to unregister member %s: %v", standby.Hostname, err)
166+
continue
167+
}
168+
169+
delete(seenAt, standby.ID)
170+
}
171+
172+
continue
173+
}
174+
defer sConn.Close(ctx)
175+
176+
seenAt[standby.ID] = time.Now()
177+
}
178+
179+
removeOrphanedReplicationSlots(ctx, conn, standbys)
180+
181+
return nil
182+
}
183+
184+
func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Member) {
185+
var orphanedSlots []admin.ReplicationSlot
186+
187+
slots, err := admin.ListReplicationSlots(ctx, conn)
188+
if err != nil {
189+
fmt.Printf("failed to list replication slots: %s", err)
190+
}
191+
192+
// An orphaned replication slot is defined as an inactive replication slot that is no longer tied to
193+
// and existing repmgr member.
194+
for _, slot := range slots {
195+
matchFound := false
196+
for _, standby := range standbys {
197+
if slot.MemberID == int32(standby.ID) {
198+
matchFound = true
199+
}
200+
}
201+
202+
if !matchFound && !slot.Active {
203+
orphanedSlots = append(orphanedSlots, slot)
204+
}
205+
}
206+
207+
if len(orphanedSlots) > 0 {
208+
fmt.Printf("%d orphaned replication slot(s) detected\n", len(orphanedSlots))
209+
210+
for _, slot := range orphanedSlots {
211+
fmt.Printf("Dropping replication slot: %s\n", slot.Name)
212+
213+
if err := admin.DropReplicationSlot(ctx, conn, slot.Name); err != nil {
214+
fmt.Printf("failed to drop replication slot %s: %v\n", slot.Name, err)
215+
continue
216+
}
217+
}
218+
}
219+
}

cmd/standby_cleaner/main.go

Lines changed: 0 additions & 147 deletions
This file was deleted.

cmd/start/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func main() {
5454
svisor.AddProcess("repmgrd", fmt.Sprintf("gosu postgres repmgrd -f %s --daemonize=false", node.RepMgr.ConfigPath),
5555
supervisor.WithRestart(0, 5*time.Second),
5656
)
57-
svisor.AddProcess("standby_cleaner", "/usr/local/bin/standby_cleaner",
57+
svisor.AddProcess("monitor", "/usr/local/bin/start_monitor",
5858
supervisor.WithRestart(0, 5*time.Second),
5959
)
6060
svisor.AddProcess("admin", "/usr/local/bin/start_admin_server",

0 commit comments

Comments
 (0)