Skip to content

Commit cfdca35

Browse files
committed
Making PG migration friendly
1 parent 870e083 commit cfdca35

File tree

10 files changed

+70
-26
lines changed

10 files changed

+70
-26
lines changed

bin/restart-repmgrd

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
11
#!/bin/bash
22

3-
kill `cat /tmp/repmgrd.pid`
3+
if [ -f /tmp/repmgrd.pid ]; then
4+
PID=$(cat /tmp/repmgrd.pid)
5+
6+
# Check if the process is running
7+
if ps -p $PID > /dev/null 2>&1; then
8+
kill $PID
9+
fi
10+
fi

cmd/pg_unregister/main.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ func main() {
2727

2828
func processUnregistration(ctx context.Context) error {
2929
encodedArg := os.Args[1]
30-
hostnameBytes, err := base64.StdEncoding.DecodeString(encodedArg)
30+
machineBytes, err := base64.StdEncoding.DecodeString(encodedArg)
3131
if err != nil {
32-
return fmt.Errorf("failed to decode hostname: %v", err)
32+
return fmt.Errorf("failed to decode machine: %v", err)
3333
}
3434

3535
node, err := flypg.NewNode()
@@ -43,9 +43,15 @@ func processUnregistration(ctx context.Context) error {
4343
}
4444
defer func() { _ = conn.Close(ctx) }()
4545

46-
member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes))
46+
machineID := string(machineBytes)
47+
48+
if len(machineID) != 16 {
49+
return fmt.Errorf("invalid machine id: %s. (expected length 16, got %d)", machineID, len(machineBytes))
50+
}
51+
52+
member, err := node.RepMgr.MemberByNodeName(ctx, conn, machineID)
4753
if err != nil {
48-
return fmt.Errorf("failed to resolve member: %s", err)
54+
return fmt.Errorf("failed to resolve member using %s: %s", machineID, err)
4955
}
5056

5157
if err := node.RepMgr.UnregisterMember(*member); err != nil {

internal/flypg/node.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ func NewNode() (*Node, error) {
9292
UserConfigPath: "/data/repmgr.user.conf",
9393
PasswordConfigPath: "/data/.pgpass",
9494
DataDir: node.DataDir,
95+
HostName: node.Hostname(),
9596
PrivateIP: node.PrivateIP,
9697
MachineID: node.MachineID,
9798
Port: 5433,
@@ -187,7 +188,7 @@ func (n *Node) Init(ctx context.Context) error {
187188
}
188189
} else {
189190
log.Println("Provisioning standby")
190-
cloneTarget, err := n.RepMgr.ResolveMemberOverDNS(ctx)
191+
cloneTarget, err := n.RepMgr.ResolvePrimaryOverDNS(ctx)
191192
if err != nil {
192193
return fmt.Errorf("failed to resolve member over dns: %s", err)
193194
}
@@ -408,7 +409,7 @@ func (n *Node) PostInit(ctx context.Context) error {
408409
return fmt.Errorf("failed to enable repmgr: %s", err)
409410
}
410411

411-
primary, err := n.RepMgr.ResolveMemberOverDNS(ctx)
412+
primary, err := n.RepMgr.ResolvePrimaryOverDNS(ctx)
412413
if err != nil {
413414
return fmt.Errorf("failed to resolve primary member: %s", err)
414415
}
@@ -583,3 +584,8 @@ func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) e
583584

584585
return nil
585586
}
587+
588+
// Hostname returns the hostname of the node.
589+
func (n *Node) Hostname() string {
590+
return fmt.Sprintf("%s.vm.%s.internal", n.MachineID, n.AppName)
591+
}

internal/flypg/repmgr.go

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type RepMgr struct {
3333
AppName string
3434
PrimaryRegion string
3535
Region string
36+
HostName string
3637
PrivateIP string
3738
MachineID string
3839
DataDir string
@@ -162,12 +163,10 @@ func (r *RepMgr) setDefaults() error {
162163
return err
163164
}
164165

165-
hostname := r.machineIdToDNS(r.MachineID)
166-
167166
conf := ConfigMap{
168167
"node_id": nodeID,
169-
"node_name": fmt.Sprintf("'%s'", hostname),
170-
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", hostname, r.Port, r.Credentials.Username, r.DatabaseName),
168+
"node_name": fmt.Sprintf("'%s'", r.MachineID),
169+
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", r.HostName, r.Port, r.Credentials.Username, r.DatabaseName),
171170
"data_directory": fmt.Sprintf("'%s'", r.DataDir),
172171
"failover": "'automatic'",
173172
"use_replication_slots": "yes",
@@ -329,7 +328,7 @@ func (r *RepMgr) regenReplicationConf(ctx context.Context) error {
329328
// TODO: do we need -c?
330329
if _, err := utils.RunCmd(ctx, "postgres",
331330
"repmgr", "--replication-conf-only",
332-
"-h", r.PrivateIP,
331+
"-h", r.HostName,
333332
"-p", fmt.Sprint(r.Port),
334333
"-d", r.DatabaseName,
335334
"-U", r.Credentials.Username,
@@ -342,13 +341,14 @@ func (r *RepMgr) regenReplicationConf(ctx context.Context) error {
342341

343342
type Member struct {
344343
ID int
344+
Name string
345345
Hostname string
346346
Active bool
347347
Region string
348348
Role string
349349
}
350350

351-
func (*RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) {
351+
func (r *RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) {
352352
sql := "select node_id, node_name, location, active, type from repmgr.nodes;"
353353
rows, err := pg.Query(ctx, sql)
354354
if err != nil {
@@ -359,10 +359,12 @@ func (*RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) {
359359
var members []Member
360360
for rows.Next() {
361361
var member Member
362-
if err := rows.Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role); err != nil {
362+
if err := rows.Scan(&member.ID, &member.Name, &member.Region, &member.Active, &member.Role); err != nil {
363363
return nil, err
364364
}
365365

366+
member.Hostname = r.machineIDToDNS(member.Name)
367+
366368
members = append(members, member)
367369
}
368370

@@ -389,14 +391,16 @@ func (r *RepMgr) Member(ctx context.Context, conn *pgx.Conn) (*Member, error) {
389391
return nil, pgx.ErrNoRows
390392
}
391393

392-
func (*RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error) {
394+
func (r *RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error) {
393395
var member Member
394396
sql := "select node_id, node_name, location, active, type from repmgr.nodes where type = 'primary' and active = true;"
395-
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role)
397+
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Name, &member.Region, &member.Active, &member.Role)
396398
if err != nil {
397399
return nil, err
398400
}
399401

402+
member.Hostname = r.machineIDToDNS(member.Name)
403+
400404
return &member, nil
401405
}
402406

@@ -437,6 +441,20 @@ func (*RepMgr) MemberByID(ctx context.Context, pg *pgx.Conn, id int) (*Member, e
437441
return &member, nil
438442
}
439443

444+
func (r *RepMgr) MemberByNodeName(ctx context.Context, pg *pgx.Conn, name string) (*Member, error) {
445+
var member Member
446+
sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_name = '%s';", name)
447+
448+
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Name, &member.Region, &member.Active, &member.Role)
449+
if err != nil {
450+
return nil, err
451+
}
452+
453+
member.Hostname = r.machineIDToDNS(name)
454+
455+
return &member, nil
456+
}
457+
440458
func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname string) (*Member, error) {
441459
var member Member
442460
sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_name = '%s';", hostname)
@@ -449,26 +467,28 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri
449467
return &member, nil
450468
}
451469

452-
func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) {
453-
machineIds, err := r.InRegionPeerMachines(ctx)
470+
func (r *RepMgr) ResolvePrimaryOverDNS(ctx context.Context) (*Member, error) {
471+
machineIDs, err := r.InRegionPeerMachines(ctx)
454472
if err != nil {
455473
return nil, err
456474
}
457475

458476
var target *Member
459477

460-
for _, machineId := range machineIds {
461-
if machineId == r.MachineID {
478+
for _, machineID := range machineIDs {
479+
if machineID == r.MachineID {
462480
continue
463481
}
464482

465-
conn, err := r.NewRemoteConnection(ctx, r.machineIdToDNS(machineId))
483+
hostname := r.machineIDToDNS(machineID)
484+
485+
conn, err := r.NewRemoteConnection(ctx, hostname)
466486
if err != nil {
467487
continue
468488
}
469489
defer func() { _ = conn.Close(ctx) }()
470490

471-
member, err := r.MemberByHostname(ctx, conn, r.machineIdToDNS(machineId))
491+
member, err := r.MemberByNodeName(ctx, conn, machineID)
472492
if err != nil {
473493
continue
474494
}
@@ -548,7 +568,7 @@ func (r *RepMgr) eligiblePrimary() bool {
548568
return r.Region == r.PrimaryRegion
549569
}
550570

551-
func (r *RepMgr) machineIdToDNS(nodeName string) string {
571+
func (r *RepMgr) machineIDToDNS(nodeName string) string {
552572
if len(nodeName) != 14 {
553573
panic("invalid machine id")
554574
}

internal/flypg/repmgr_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ func TestRepmgrInitialization(t *testing.T) {
9292
t.Fatal(err)
9393
}
9494

95-
if config["node_name"] != "'abcdefg1234567.vm.test-app.internal'" {
96-
t.Fatalf("expected node_name to be 'abcdefg1234567.vm.test-app.internal', got %v", config["node_name"])
95+
if config["node_name"] != "'abcdefg1234567'" {
96+
t.Fatalf("expected node_name to be 'abcdefg1234567', got %v", config["node_name"])
9797
}
9898

9999
if config["location"] != "'dev'" {

internal/flypg/zombie.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp
117117
sample.totalActive++
118118

119119
// Record conflict when primary doesn't match.
120-
if primary.Hostname != node.MachineID && primary.Hostname != node.PrivateIP {
120+
// TODO - Figure out why we are checking both conditions here.
121+
if primary.Hostname != node.Hostname() && primary.Hostname != node.PrivateIP {
121122
sample.totalConflicts++
122123
sample.conflictMap[primary.Hostname]++
123124
}

pg15/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ ENV AWS_SHARED_CREDENTIALS_FILE=/data/.aws/credentials
2626

2727
ARG VERSION
2828
ARG PG_MAJOR_VERSION
29+
ARG PG_VERSION
2930
ARG POSTGIS_MAJOR=3
3031
ARG HAPROXY_VERSION=2.8
3132

pg15/Dockerfile-timescaledb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ ENV AWS_SHARED_CREDENTIALS_FILE=/data/.aws/credentials
2727

2828
ARG VERSION
2929
ARG PG_MAJOR_VERSION
30+
ARG PG_VERSION
3031
ARG POSTGIS_MAJOR=3
3132
ARG HAPROXY_VERSION=2.8
3233

pg16/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ FROM ubuntu:24.04
2323

2424
ARG VERSION
2525
ARG PG_MAJOR_VERSION
26+
ARG PG_VERSION
2627
ARG POSTGIS_MAJOR=3
2728
ARG HAPROXY_VERSION=2.8
2829
ARG REPMGR_VERSION=5.4.1-1build2

pg16/Dockerfile-timescaledb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ FROM ubuntu:24.04
2323

2424
ARG VERSION
2525
ARG PG_MAJOR_VERSION
26+
ARG PG_VERSION
2627
ARG POSTGIS_MAJOR=3
2728
ARG HAPROXY_VERSION=2.8
2829
ARG REPMGR_VERSION=5.4.1-1build2

0 commit comments

Comments
 (0)