Skip to content

Commit fba4ee2

Browse files
authored
Merge pull request #6 from fly-apps/pgbouncer
Rip out haproxy and replace it with pgbouncer
2 parents 904d023 + 9fa52d8 commit fba4ee2

File tree

8 files changed

+145
-49
lines changed

8 files changed

+145
-49
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea/

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ LABEL fly.version=${VERSION}
2020
LABEL fly.pg-version=${PG_VERSION}
2121

2222
RUN apt-get update && apt-get install --no-install-recommends -y \
23-
ca-certificates iproute2 haproxy postgresql-14-repmgr curl bash dnsutils vim procps jq \
23+
ca-certificates iproute2 postgresql-14-repmgr curl bash dnsutils vim procps jq pgbouncer \
2424
&& apt autoremove -y
2525

2626
COPY --from=0 /fly/bin/* /usr/local/bin

cmd/event_handler/main.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,20 @@ package main
33
import (
44
"flag"
55
"fmt"
6+
"github.com/fly-apps/postgres-flex/pkg/flypg"
7+
"strconv"
68

79
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
810
)
911

1012
func main() {
1113
event := flag.String("event", "", "event type")
1214
nodeID := flag.Int("node-id", 0, "the node id")
15+
// This might not actually always be the new primary. %p from repmgr is variably the new or
16+
// old primary. In the events that we subscribe to it's always either empty or the new primary.
17+
// In the future if we subscribe to repmgrd_failover_promote, then we would have to change this
18+
// name.
19+
newPrimary := flag.String("new-node-id", "", "the new primary node id")
1320
success := flag.String("success", "", "success (1) failure (0)")
1421
details := flag.String("details", "", "details")
1522
flag.Parse()
@@ -33,6 +40,37 @@ func main() {
3340
if err := client.RegisterPrimary(string(node.Value)); err != nil {
3441
fmt.Printf("failed to register primary: %s", err)
3542
}
43+
44+
flypgNode, err := flypg.NewNode()
45+
if err != nil {
46+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
47+
}
48+
49+
fmt.Println("Reconfiguring pgbouncer primary")
50+
if err := flypgNode.ConfigurePGBouncerPrimary(string(node.Value), true); err != nil {
51+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
52+
}
53+
case "standby_follow":
54+
client, err := state.NewConsulClient()
55+
if err != nil {
56+
fmt.Printf("failed to initialize consul client: %s", err)
57+
}
58+
newNodeID, err := strconv.Atoi(*newPrimary)
59+
if err != nil {
60+
fmt.Printf("failed to parse new node id: %s", err)
61+
}
62+
node, err := client.Node(int32(newNodeID))
63+
if err != nil {
64+
fmt.Printf("failed to find node: %s", err)
65+
}
66+
flypgNode, err := flypg.NewNode()
67+
if err != nil {
68+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
69+
}
70+
fmt.Println("Reconfiguring pgbouncer primary")
71+
if err := flypgNode.ConfigurePGBouncerPrimary(string(node.Value), true); err != nil {
72+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
73+
}
3674
default:
3775
// noop
3876
}

cmd/start/main.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func main() {
3333
for range t.C {
3434

3535
if err := node.PostInit(); err != nil {
36-
fmt.Printf("failed post-init: %s. Retrying...", err)
36+
fmt.Printf("failed post-init: %s. Retrying...\n", err)
3737
continue
3838
}
3939

@@ -43,12 +43,7 @@ func main() {
4343

4444
svisor := supervisor.New("flypg", 5*time.Minute)
4545

46-
proxyEnv := map[string]string{
47-
"FLY_APP_NAME": os.Getenv("FLY_APP_NAME"),
48-
"PRIMARY_REGION": os.Getenv("PRIMARY_REGION"),
49-
"PG_LISTEN_ADDRESS": node.PrivateIP,
50-
}
51-
svisor.AddProcess("proxy", "/usr/sbin/haproxy -W -db -f /fly/haproxy.cfg", supervisor.WithEnv(proxyEnv), supervisor.WithRestart(0, 1*time.Second))
46+
svisor.AddProcess("pgbouncer", "/usr/sbin/pgbouncer /fly/pgbouncer.ini", supervisor.WithRestart(0, 1*time.Second))
5247

5348
env := map[string]string{
5449
"PGDATA": os.Getenv("PGDATA"),

config/haproxy.cfg

Lines changed: 0 additions & 38 deletions
This file was deleted.

config/pgbouncer.ini

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[pgbouncer]
2+
3+
listen_addr = *
4+
listen_port = 5432
5+
unix_socket_dir = /tmp
6+
7+
auth_user = postgres
8+
auth_file = /data/pgbouncer.auth
9+
10+
admin_users = postgres
11+
12+
user = postgres
13+
14+
pool_mode = transaction
15+
16+
max_client_conn = 100
17+
default_pool_size = 20
18+
min_pool_size = 5
19+
reserve_pool_size = 5
20+
reserve_pool_timeout = 3
21+
22+
log_connections = 1
23+
log_disconnections = 1
24+
log_pooler_errors = 1
25+
26+
%include /data/pgbouncer.database.ini

pkg/flypg/node.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type Node struct {
3333
DataDir string
3434
Region string
3535
PGPort int
36+
ProxyPort int
3637

3738
SUCredentials Credentials
3839
OperatorCredentials Credentials
@@ -46,6 +47,7 @@ func NewNode() (*Node, error) {
4647
node := &Node{
4748
AppName: "local",
4849
PGPort: 5433,
50+
ProxyPort: 5432,
4951
DataDir: "/data/postgresql",
5052
ManagerDatabaseName: "repmgr",
5153
ManagerConfigPath: "/data/repmgr.conf",
@@ -148,6 +150,11 @@ func (n *Node) Init() error {
148150
return fmt.Errorf("failed to configure postgres %s", err)
149151
}
150152

153+
fmt.Println("Configuring pgbouncer auth")
154+
if err := n.ConfigurePGBouncerAuth(); err != nil {
155+
return fmt.Errorf("failed to configure pgbouncer auth %s", err)
156+
}
157+
151158
return nil
152159
}
153160

@@ -258,9 +265,24 @@ func (n *Node) PostInit() error {
258265
}
259266
}
260267

268+
primaryIP, err = client.CurrentPrimary()
269+
if err != nil {
270+
return fmt.Errorf("failed to query current primary: %s", err)
271+
}
272+
273+
fmt.Println("Configuring pgbouncer primary")
274+
if err := n.ConfigurePGBouncerPrimary(primaryIP, false); err != nil {
275+
return fmt.Errorf("failed to configure pgbouncer primary %s", err)
276+
}
277+
261278
return nil
262279
}
263280

281+
func (n *Node) NewPGBouncerConnection(ctx context.Context) (*pgx.Conn, error) {
282+
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.ProxyPort))
283+
return openConnection(ctx, host, "pgbouncer", n.OperatorCredentials)
284+
}
285+
264286
func (n *Node) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) {
265287
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.PGPort))
266288
return openConnection(ctx, host, "postgres", n.OperatorCredentials)
@@ -325,6 +347,53 @@ func (n *Node) initializePostgres() error {
325347
return err
326348
}
327349

350+
func (n *Node) ConfigurePGBouncerAuth() error {
351+
path := fmt.Sprintf("%s/pgbouncer.auth", "/data")
352+
file, err := os.OpenFile(path, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0644)
353+
if err != nil {
354+
return err
355+
}
356+
contents := fmt.Sprintf("\"%s\" \"%s\"", n.OperatorCredentials.Username, n.OperatorCredentials.Password)
357+
_, err = file.Write([]byte(contents))
358+
if err != nil {
359+
return err
360+
}
361+
return nil
362+
}
363+
364+
func (n *Node) ConfigurePGBouncerPrimary(primary string, reload bool) error {
365+
path := fmt.Sprintf("%s/pgbouncer.database.ini", "/data")
366+
file, err := os.OpenFile(path, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0644)
367+
if err != nil {
368+
return err
369+
}
370+
contents := fmt.Sprintf("[databases]\n* = host=%s port=%d\n", primary, n.PGPort)
371+
_, err = file.Write([]byte(contents))
372+
if err != nil {
373+
return err
374+
}
375+
376+
if reload {
377+
err = n.ReloadPGBouncerConfig()
378+
if err != nil {
379+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
380+
}
381+
}
382+
return nil
383+
}
384+
385+
func (n *Node) ReloadPGBouncerConfig() error {
386+
conn, err := n.NewPGBouncerConnection(context.TODO())
387+
if err != nil {
388+
return err
389+
}
390+
_, err = conn.Exec(context.TODO(), "RELOAD;")
391+
if err != nil {
392+
return err
393+
}
394+
return nil
395+
}
396+
328397
func (n *Node) configurePostgres() error {
329398
cmdStr := fmt.Sprintf("sed -i \"s/#shared_preload_libraries.*/shared_preload_libraries = 'repmgr'/\" /data/postgresql/postgresql.conf")
330399
return runCommand(cmdStr)

pkg/flypg/repmgr.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ func cloneFromPrimary(node Node, ipStr string) error {
7171
return err
7272
}
7373

74-
cmdStr = fmt.Sprintf("repmgr -h %s -d %s -U %s -f %s standby clone -F",
74+
cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -F",
7575
ipStr,
76+
node.PGPort,
7677
node.ManagerDatabaseName,
7778
node.ManagerCredentials.Username,
7879
node.ManagerConfigPath)
@@ -99,11 +100,15 @@ func writeManagerConf(node Node) error {
99100
"failover": "'automatic'",
100101
"promote_command": fmt.Sprintf("'repmgr standby promote -f %s --log-to-file'", node.ManagerConfigPath),
101102
"follow_command": fmt.Sprintf("'repmgr standby follow -f %s --log-to-file --upstream-node-id=%%n'", node.ManagerConfigPath),
102-
"event_notification_command": fmt.Sprintf("'/usr/local/bin/event_handler -node-id %%n -event %%e -success %%s -details \"%%d\"'"),
103-
"event_notifications": "'repmgrd_failover_promote,standby_promote'",
103+
"event_notification_command": fmt.Sprintf("'/usr/local/bin/event_handler -node-id %%n -event %%e -success %%s -details \"%%d\" -new-node-id \\'%%p\\''"),
104+
"event_notifications": "'repmgrd_failover_promote,standby_promote,standby_follow'",
104105
"location": node.Region,
105106
}
106107

108+
if !node.ValidPrimary() {
109+
conf["priority"] = "0"
110+
}
111+
107112
for key, value := range conf {
108113
str := fmt.Sprintf("%s=%s\n", key, value)
109114
_, err := file.Write([]byte(str))

0 commit comments

Comments
 (0)