Skip to content

Commit 389bcad

Browse files
authored
Merge pull request #8 from fly-apps/separate-pgbouncer-logic
Separate pgbouncer logic
2 parents 3ba479d + 333e0cf commit 389bcad

File tree

6 files changed

+139
-90
lines changed

6 files changed

+139
-90
lines changed

cmd/event_handler/main.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package main
33
import (
44
"flag"
55
"fmt"
6-
"github.com/fly-apps/postgres-flex/pkg/flypg"
76
"strconv"
87

8+
"github.com/fly-apps/postgres-flex/pkg/flypg"
9+
910
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
1011
)
1112

@@ -47,7 +48,7 @@ func main() {
4748
}
4849

4950
fmt.Println("Reconfiguring pgbouncer primary")
50-
if err := flypgNode.ConfigurePGBouncerPrimary(string(node.Value), true); err != nil {
51+
if err := flypgNode.PGBouncer.ConfigurePrimary(string(node.Value), true); err != nil {
5152
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
5253
}
5354
case "standby_follow":
@@ -68,7 +69,7 @@ func main() {
6869
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
6970
}
7071
fmt.Println("Reconfiguring pgbouncer primary")
71-
if err := flypgNode.ConfigurePGBouncerPrimary(string(node.Value), true); err != nil {
72+
if err := flypgNode.PGBouncer.ConfigurePrimary(string(node.Value), true); err != nil {
7273
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
7374
}
7475
default:

cmd/start/main.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,21 @@ func main() {
3636
fmt.Printf("failed post-init: %s. Retrying...\n", err)
3737
continue
3838
}
39-
4039
return
4140
}
4241
}()
4342

4443
svisor := supervisor.New("flypg", 5*time.Minute)
44+
svisor.AddProcess("flypg", fmt.Sprintf("gosu postgres postgres -D %s -p %d", node.DataDir, node.Port))
4545

46-
svisor.AddProcess("pgbouncer", "/usr/sbin/pgbouncer /fly/pgbouncer.ini", supervisor.WithRestart(0, 1*time.Second))
47-
48-
env := map[string]string{
49-
"PGDATA": os.Getenv("PGDATA"),
50-
"PGPASSFILE": os.Getenv("PGPASSFILE"),
51-
"PATH": os.Getenv("PATH"),
52-
}
46+
svisor.AddProcess("pgbouncer", fmt.Sprintf("pgbouncer %s/pgbouncer.ini", node.PGBouncer.ConfigPath),
47+
supervisor.WithRestart(0, 1*time.Second),
48+
)
49+
svisor.AddProcess("repmgrd", fmt.Sprintf("gosu postgres repmgrd -f %s --daemonize=false", node.ManagerConfigPath),
50+
supervisor.WithRestart(0, 5*time.Second),
51+
)
5352

54-
svisor.AddProcess("flypg", fmt.Sprintf("gosu postgres postgres -D %s -p %d", node.DataDir, node.PGPort))
55-
svisor.AddProcess("repmgrd", "gosu postgres repmgrd -f /data/repmgr.conf --daemonize=false", supervisor.WithEnv(env), supervisor.WithRestart(0, 5*time.Second))
5653
svisor.StopOnSignal(syscall.SIGINT, syscall.SIGTERM)
57-
5854
svisor.StartHttpListener()
5955

6056
if err := svisor.Run(); err != nil {

config/pgbouncer.ini

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ listen_port = 5432
55
unix_socket_dir = /tmp
66

77
auth_user = postgres
8-
auth_file = /data/pgbouncer.auth
8+
auth_file = /data/pgbouncer/pgbouncer.auth
99

10-
admin_users = postgres,flypgadmin
10+
admin_users = postgres
1111

1212
user = postgres
1313

@@ -23,4 +23,4 @@ log_connections = 1
2323
log_disconnections = 1
2424
log_pooler_errors = 1
2525

26-
%include /data/pgbouncer.database.ini
26+
%include /data/pgbouncer/pgbouncer.database.ini

pkg/flypg/node.go

Lines changed: 27 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,22 @@ type Node struct {
3232
PrivateIP string
3333
DataDir string
3434
Region string
35-
PGPort int
36-
ProxyPort int
35+
Port int
36+
37+
PGBouncer PGBouncer
3738

3839
SUCredentials Credentials
3940
OperatorCredentials Credentials
40-
ManagerCredentials Credentials
4141

42+
ManagerCredentials Credentials
4243
ManagerConfigPath string
4344
ManagerDatabaseName string
4445
}
4546

4647
func NewNode() (*Node, error) {
4748
node := &Node{
4849
AppName: "local",
49-
PGPort: 5433,
50-
ProxyPort: 5432,
50+
Port: 5433,
5151
DataDir: "/data/postgresql",
5252
ManagerDatabaseName: "repmgr",
5353
ManagerConfigPath: "/data/repmgr.conf",
@@ -64,14 +64,14 @@ func NewNode() (*Node, error) {
6464
}
6565
node.PrivateIP = ipv6.String()
6666

67-
machineID := os.Getenv("FLY_ALLOC_ID")
6867
// Generate a random, reconstructable signed int32
68+
machineID := os.Getenv("FLY_ALLOC_ID")
6969
seed := binary.LittleEndian.Uint64([]byte(machineID))
7070
rand.Seed(int64(seed))
7171
node.ID = rand.Int31()
7272

7373
if port, err := strconv.Atoi(os.Getenv("PG_PORT")); err == nil {
74-
node.PGPort = port
74+
node.Port = port
7575
}
7676

7777
// Internal user
@@ -92,6 +92,14 @@ func NewNode() (*Node, error) {
9292
Password: "supersecret",
9393
}
9494

95+
node.PGBouncer = PGBouncer{
96+
PrivateIP: node.PrivateIP,
97+
Port: 5432,
98+
ForwardPort: 5433,
99+
ConfigPath: "/data/pgbouncer",
100+
Credentials: node.OperatorCredentials,
101+
}
102+
95103
return node, nil
96104
}
97105

@@ -116,6 +124,12 @@ func (n *Node) Init() error {
116124
fmt.Printf("Failed to initialize replmgr: %s\n", err.Error())
117125
}
118126

127+
// Initialize PGBouncer
128+
fmt.Println("Initializing PGBouncer")
129+
if err := n.PGBouncer.initialize(); err != nil {
130+
return err
131+
}
132+
119133
switch primaryIP {
120134
case n.PrivateIP:
121135
// Noop
@@ -163,11 +177,6 @@ func (n *Node) Init() error {
163177
return fmt.Errorf("failed to configure postgres %s", err)
164178
}
165179

166-
fmt.Println("Configuring pgbouncer auth")
167-
if err := n.ConfigurePGBouncerAuth(); err != nil {
168-
return fmt.Errorf("failed to configure pgbouncer auth %s", err)
169-
}
170-
171180
return nil
172181
}
173182

@@ -270,31 +279,25 @@ func (n *Node) PostInit() error {
270279
return fmt.Errorf("failed to query current primary: %s", err)
271280
}
272281

273-
fmt.Println("Configuring pgbouncer primary")
274-
if err := n.ConfigurePGBouncerPrimary(primaryIP, false); err != nil {
275-
return fmt.Errorf("failed to configure pgbouncer primary %s", err)
282+
if err := n.PGBouncer.ConfigurePrimary(primaryIP, true); err != nil {
283+
return fmt.Errorf("failed to configure pgbouncer's primary: %s", err)
276284
}
277285

278286
return nil
279287
}
280288

281-
func (n *Node) NewPGBouncerConnection(ctx context.Context) (*pgx.Conn, error) {
282-
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.ProxyPort))
283-
return openConnection(ctx, host, "pgbouncer", n.SUCredentials)
284-
}
285-
286289
func (n *Node) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) {
287-
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.PGPort))
288-
return openConnection(ctx, host, "postgres", n.SUCredentials)
290+
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.Port))
291+
return openConnection(ctx, host, "postgres", n.OperatorCredentials)
289292
}
290293

291294
func (n *Node) NewRepLocalConnection(ctx context.Context) (*pgx.Conn, error) {
292-
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.PGPort))
295+
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.Port))
293296
return openConnection(ctx, host, "repmgr", n.ManagerCredentials)
294297
}
295298

296299
func (n *Node) NewRepRemoteConnection(ctx context.Context, hostname string) (*pgx.Conn, error) {
297-
host := net.JoinHostPort(hostname, strconv.Itoa(n.PGPort))
300+
host := net.JoinHostPort(hostname, strconv.Itoa(n.Port))
298301
return openConnection(ctx, host, "repmgr", n.ManagerCredentials)
299302
}
300303

@@ -370,53 +373,6 @@ func (n *Node) initializePostgres() error {
370373
return nil
371374
}
372375

373-
func (n *Node) ConfigurePGBouncerAuth() error {
374-
path := fmt.Sprintf("%s/pgbouncer.auth", "/data")
375-
file, err := os.OpenFile(path, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0644)
376-
if err != nil {
377-
return err
378-
}
379-
contents := fmt.Sprintf("\"%s\" \"%s\"", n.OperatorCredentials.Username, n.OperatorCredentials.Password)
380-
_, err = file.Write([]byte(contents))
381-
if err != nil {
382-
return err
383-
}
384-
return nil
385-
}
386-
387-
func (n *Node) ConfigurePGBouncerPrimary(primary string, reload bool) error {
388-
path := fmt.Sprintf("%s/pgbouncer.database.ini", "/data")
389-
file, err := os.OpenFile(path, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0644)
390-
if err != nil {
391-
return err
392-
}
393-
contents := fmt.Sprintf("[databases]\n* = host=%s port=%d\n", primary, n.PGPort)
394-
_, err = file.Write([]byte(contents))
395-
if err != nil {
396-
return err
397-
}
398-
399-
if reload {
400-
err = n.ReloadPGBouncerConfig()
401-
if err != nil {
402-
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
403-
}
404-
}
405-
return nil
406-
}
407-
408-
func (n *Node) ReloadPGBouncerConfig() error {
409-
conn, err := n.NewPGBouncerConnection(context.TODO())
410-
if err != nil {
411-
return err
412-
}
413-
_, err = conn.Exec(context.TODO(), "RELOAD;")
414-
if err != nil {
415-
return err
416-
}
417-
return nil
418-
}
419-
420376
func (n *Node) configurePostgres() error {
421377
cmdStr := fmt.Sprintf("sed -i \"s/#shared_preload_libraries.*/shared_preload_libraries = 'repmgr'/\" /data/postgresql/postgresql.conf")
422378
return runCommand(cmdStr)

pkg/flypg/pgbouncer.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package flypg
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"os"
8+
"strconv"
9+
10+
"github.com/jackc/pgx/v4"
11+
)
12+
13+
type PGBouncer struct {
14+
PrivateIP string
15+
Credentials Credentials
16+
ConfigPath string
17+
Port int
18+
ForwardPort int
19+
}
20+
21+
func (p *PGBouncer) NewConnection(ctx context.Context) (*pgx.Conn, error) {
22+
host := net.JoinHostPort(p.PrivateIP, strconv.Itoa(p.Port))
23+
return openConnection(ctx, host, "pgbouncer", p.Credentials)
24+
}
25+
26+
func (p *PGBouncer) ConfigurePrimary(primary string, reload bool) error {
27+
path := fmt.Sprintf("%s/pgbouncer.database.ini", p.ConfigPath)
28+
file, err := os.OpenFile(path, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0644)
29+
if err != nil {
30+
return err
31+
}
32+
contents := fmt.Sprintf("[databases]\n* = host=%s port=%d\n", primary, p.ForwardPort)
33+
_, err = file.Write([]byte(contents))
34+
if err != nil {
35+
return err
36+
}
37+
38+
if reload {
39+
err = p.reloadConfig()
40+
if err != nil {
41+
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
42+
}
43+
}
44+
return nil
45+
}
46+
47+
func (p *PGBouncer) initialize() error {
48+
cmdStr := fmt.Sprintf("mkdir -p %s", p.ConfigPath)
49+
if err := runCommand(cmdStr); err != nil {
50+
return err
51+
}
52+
53+
// If pgbouncer.ini file is not present, set defaults.
54+
if _, err := os.Stat(fmt.Sprintf("%s/pgbouncer.ini", p.ConfigPath)); err != nil {
55+
if os.IsNotExist(err) {
56+
cmdStr := fmt.Sprintf("cp /fly/pgbouncer.ini %s", p.ConfigPath)
57+
if err := runCommand(cmdStr); err != nil {
58+
return err
59+
}
60+
} else {
61+
return err
62+
}
63+
}
64+
65+
if err := p.configureAuth(); err != nil {
66+
return fmt.Errorf("failed to configure pgbouncer auth. %s", err)
67+
}
68+
69+
return nil
70+
}
71+
72+
func (p *PGBouncer) configureAuth() error {
73+
path := fmt.Sprintf("%s/pgbouncer.auth", p.ConfigPath)
74+
file, err := os.OpenFile(path, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0644)
75+
if err != nil {
76+
return err
77+
}
78+
contents := fmt.Sprintf("\"%s\" \"%s\"", p.Credentials.Username, p.Credentials.Password)
79+
_, err = file.Write([]byte(contents))
80+
if err != nil {
81+
return err
82+
}
83+
return nil
84+
}
85+
86+
func (p *PGBouncer) reloadConfig() error {
87+
conn, err := p.NewConnection(context.TODO())
88+
if err != nil {
89+
return err
90+
}
91+
_, err = conn.Exec(context.TODO(), "RELOAD;")
92+
if err != nil {
93+
return err
94+
}
95+
return nil
96+
}

pkg/flypg/repmgr.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func cloneFromPrimary(node Node, ipStr string) error {
8181

8282
cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -F",
8383
ipStr,
84-
node.PGPort,
84+
node.Port,
8585
node.ManagerDatabaseName,
8686
node.ManagerCredentials.Username,
8787
node.ManagerConfigPath)
@@ -103,7 +103,7 @@ func writeManagerConf(node Node) error {
103103
conf := map[string]interface{}{
104104
"node_id": fmt.Sprint(node.ID),
105105
"node_name": fmt.Sprintf("'%s'", node.PrivateIP),
106-
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=10'", node.PrivateIP, node.PGPort, node.ManagerCredentials.Username, node.ManagerDatabaseName),
106+
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=10'", node.PrivateIP, node.Port, node.ManagerCredentials.Username, node.ManagerDatabaseName),
107107
"data_directory": fmt.Sprintf("'%s'", node.DataDir),
108108
"failover": "'automatic'",
109109
"promote_command": fmt.Sprintf("'repmgr standby promote -f %s --log-to-file'", node.ManagerConfigPath),

0 commit comments

Comments
 (0)