Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ require (
github.com/hashicorp/consul/api v1.18.0
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgx/v5 v5.5.4
github.com/olekukonko/tablewriter v0.0.5
github.com/pkg/errors v0.9.1
github.com/pkg/term v1.1.0
github.com/spf13/cobra v1.8.1
github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2
golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3
golang.org/x/sync v0.1.0
Expand All @@ -36,8 +38,6 @@ require (
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
Expand Down
62 changes: 59 additions & 3 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/fly-apps/postgres-flex/internal/privnet"
"github.com/fly-apps/postgres-flex/internal/utils"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/slices"
)

type Node struct {
AppName string
MachineID string
PrivateIP string
PrimaryRegion string
DataDir string
Expand Down Expand Up @@ -52,6 +54,8 @@ func NewNode() (*Node, error) {

node.PrivateIP = ipv6.String()

node.MachineID = os.Getenv("FLY_MACHINE_ID")

node.PrimaryRegion = os.Getenv("PRIMARY_REGION")
if node.PrimaryRegion == "" {
return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set")
Expand Down Expand Up @@ -89,6 +93,7 @@ func NewNode() (*Node, error) {
PasswordConfigPath: "/data/.pgpass",
DataDir: node.DataDir,
PrivateIP: node.PrivateIP,
MachineID: node.MachineID,
Port: 5433,
DatabaseName: "repmgr",
Credentials: node.ReplCredentials,
Expand Down Expand Up @@ -265,7 +270,7 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to resolve member role: %s", err)
}

// Restart repmgrd in the event the IP changes for an already registered node.
// Restart repmgrd in the event the machine ID changes for an already registered node.
// This can happen if the underlying volume is moved to a different node.
daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)

Expand All @@ -292,10 +297,10 @@ func (n *Node) PostInit(ctx context.Context) error {
}

// This should never happen
if primary != n.PrivateIP {
if primary != n.RepMgr.machineIdToDNS(n.MachineID) {
return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen",
primary,
n.PrivateIP,
n.RepMgr.machineIdToDNS(n.MachineID),
)
}

Expand All @@ -311,6 +316,10 @@ func (n *Node) PostInit(ctx context.Context) error {
}
}
case StandbyRoleName:
if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil {
return fmt.Errorf("failed to migrate node name: %s", err)
}

// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
return fmt.Errorf("failed to register existing standby: %s", err)
Expand Down Expand Up @@ -527,3 +536,50 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro

return nil
}

// migrate node name from 6pn to machine ID if needed
func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error {
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
}

primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname)
if err != nil {
return fmt.Errorf("failed to establish connection to primary: %s", err)
}
defer func() { _ = primaryConn.Close(ctx) }()

rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
if err != nil {
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
}
defer rows.Close()

var applicationNames []string
for rows.Next() {
var applicationName string
if err := rows.Scan(&applicationName); err != nil {
return fmt.Errorf("failed to scan application_name: %s", err)
}
applicationNames = append(applicationNames, applicationName)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %s", err)
}

// if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
if slices.Contains(applicationNames, n.PrivateIP) {
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")

if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
return fmt.Errorf("failed to clone standby: %s", err)
}

if err := admin.ReloadPostgresConfig(ctx, repConn); err != nil {
return fmt.Errorf("failed to reload postgresql: %s", err)
}
}

return nil
}
4 changes: 2 additions & 2 deletions internal/flypg/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {

for _, member := range members {
if member.Role == PrimaryRoleName {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target)
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err)
Expand All @@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {
}

for _, member := range members {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint)
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err)
Expand Down
61 changes: 51 additions & 10 deletions internal/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type RepMgr struct {
PrimaryRegion string
Region string
PrivateIP string
MachineID string
DataDir string
DatabaseName string
Credentials admin.Credential
Expand Down Expand Up @@ -161,10 +162,12 @@ func (r *RepMgr) setDefaults() error {
return err
}

hostname := r.machineIdToDNS(r.MachineID)

conf := ConfigMap{
"node_id": nodeID,
"node_name": fmt.Sprintf("'%s'", r.PrivateIP),
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", r.PrivateIP, r.Port, r.Credentials.Username, r.DatabaseName),
"node_name": fmt.Sprintf("'%s'", hostname),
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", hostname, r.Port, r.Credentials.Username, r.DatabaseName),
"data_directory": fmt.Sprintf("'%s'", r.DataDir),
"failover": "'automatic'",
"use_replication_slots": "yes",
Expand Down Expand Up @@ -276,7 +279,7 @@ func (*RepMgr) restartDaemon() error {
}

func (r *RepMgr) daemonRestartRequired(m *Member) bool {
return m.Hostname != r.PrivateIP
return m.Hostname != r.MachineID
}

func (r *RepMgr) unregisterWitness(id int) error {
Expand All @@ -301,14 +304,14 @@ func (r *RepMgr) rejoinCluster(hostname string) error {
return err
}

func (r *RepMgr) clonePrimary(ipStr string) error {
func (r *RepMgr) clonePrimary(hostname string) error {
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
return fmt.Errorf("failed to create pg directory: %s", err)
}

cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -c -F",
ipStr,
hostname,
r.Port,
r.DatabaseName,
r.Credentials.Username,
Expand All @@ -322,6 +325,21 @@ func (r *RepMgr) clonePrimary(ipStr string) error {
return nil
}

func (r *RepMgr) regenReplicationConf(ctx context.Context) error {
// TODO: do we need -c?
if _, err := utils.RunCmd(ctx, "postgres",
"repmgr", "--replication-conf-only",
"-h", "",
"-p", fmt.Sprint(r.Port),
"-d", r.DatabaseName,
"-U", r.Credentials.Username,
"-f", r.ConfigPath,
"standby", "clone", "-F"); err != nil {
return fmt.Errorf("failed to regenerate replication conf: %s", err)
}
return nil
}

type Member struct {
ID int
Hostname string
Expand Down Expand Up @@ -432,25 +450,25 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri
}

func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) {
ips, err := r.InRegionPeerIPs(ctx)
machineIds, err := r.InRegionPeerMachines(ctx)
if err != nil {
return nil, err
}

var target *Member

for _, ip := range ips {
if ip.String() == r.PrivateIP {
for _, machineId := range machineIds {
if machineId == r.MachineID {
continue
}

conn, err := r.NewRemoteConnection(ctx, ip.String())
conn, err := r.NewRemoteConnection(ctx, r.machineIdToDNS(machineId))
if err != nil {
continue
}
defer func() { _ = conn.Close(ctx) }()

member, err := r.MemberByHostname(ctx, conn, ip.String())
member, err := r.MemberByHostname(ctx, conn, r.machineIdToDNS(machineId))
if err != nil {
continue
}
Expand All @@ -477,6 +495,21 @@ func (r *RepMgr) InRegionPeerIPs(ctx context.Context) ([]net.IPAddr, error) {
return privnet.AllPeers(ctx, targets)
}

func (r *RepMgr) InRegionPeerMachines(ctx context.Context) ([]string, error) {
machines, err := privnet.AllMachines(ctx, r.AppName)
if err != nil {
return nil, err
}

var machineIDs []string
for _, machine := range machines {
if machine.Region == r.PrimaryRegion {
machineIDs = append(machineIDs, machine.Id)
}
}
return machineIDs, nil
}

func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error) {
ips, err := r.InRegionPeerIPs(ctx)
if err != nil {
Expand Down Expand Up @@ -514,3 +547,11 @@ func (r *RepMgr) UnregisterMember(member Member) error {
func (r *RepMgr) eligiblePrimary() bool {
return r.Region == r.PrimaryRegion
}

func (r *RepMgr) machineIdToDNS(nodeName string) string {
if len(nodeName) != 14 {
panic("invalid machine id")
}

return fmt.Sprintf("%s.vm.%s.internal", nodeName, r.AppName)
}
6 changes: 4 additions & 2 deletions internal/flypg/repmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestRepmgrInitialization(t *testing.T) {
UserConfigPath: repgmrUserConfigFilePath,
PasswordConfigPath: repgmrPasswordConfigFilePath,
DataDir: repmgrTestDirectory,
MachineID: "abcdefg1234567",
PrivateIP: "127.0.0.1",
Credentials: admin.Credential{
Username: "user",
Expand Down Expand Up @@ -91,8 +92,8 @@ func TestRepmgrInitialization(t *testing.T) {
t.Fatal(err)
}

if config["node_name"] != "'127.0.0.1'" {
t.Fatalf("expected node_name to be '127.0.0.1', got %v", config["node_name"])
if config["node_name"] != "'abcdefg1234567.vm.test-app.internal'" {
t.Fatalf("expected node_name to be 'abcdefg1234567.vm.test-app.internal', got %v", config["node_name"])
}

if config["location"] != "'dev'" {
Expand Down Expand Up @@ -122,6 +123,7 @@ func TestRepmgrNodeIDGeneration(t *testing.T) {

DataDir: repmgrTestDirectory,
PrivateIP: "127.0.0.1",
MachineID: "abcdefg1234567",
Port: 5433,
DatabaseName: "repmgr",
Credentials: admin.Credential{
Expand Down
19 changes: 7 additions & 12 deletions internal/flypg/zombie.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"log"
"net"
"os"

"github.com/fly-apps/postgres-flex/internal/utils"
Expand Down Expand Up @@ -85,7 +84,7 @@ type DNASample struct {

func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASample, error) {
sample := &DNASample{
hostname: node.PrivateIP,
hostname: node.RepMgr.machineIdToDNS(node.MachineID),
totalMembers: len(standbys) + 1,
totalActive: 1,
totalInactive: 0,
Expand Down Expand Up @@ -118,7 +117,8 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp
sample.totalActive++

// Record conflict when primary doesn't match.
if primary.Hostname != node.PrivateIP {
// We're checking PrivateIP here for backwards compatibility
if primary.Hostname != node.RepMgr.machineIdToDNS(node.MachineID) && primary.Hostname != node.PrivateIP {
sample.totalConflicts++
sample.conflictMap[primary.Hostname]++
}
Expand Down Expand Up @@ -199,24 +199,19 @@ func handleZombieLock(ctx context.Context, n *Node) error {
// If the zombie lock contains a hostname, it means we were able to
// resolve the real primary and will attempt to rejoin it.
if primaryStr != "" {
ip := net.ParseIP(primaryStr)
if ip == nil {
return fmt.Errorf("zombie.lock file contains an invalid ipv6 address")
}

conn, err := n.RepMgr.NewRemoteConnection(ctx, ip.String())
conn, err := n.RepMgr.NewRemoteConnection(ctx, primaryStr)
if err != nil {
return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", ip.String(), err)
return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", primaryStr, err)
}
defer func() { _ = conn.Close(ctx) }()

primary, err := n.RepMgr.PrimaryMember(ctx, conn)
if err != nil {
return fmt.Errorf("failed to confirm primary on recover target %s: %s", ip.String(), err)
return fmt.Errorf("failed to confirm primary on recover target %s: %s", primaryStr, err)
}

// Confirm that our rejoin target still identifies itself as the primary.
if primary.Hostname != ip.String() {
if primary.Hostname != primaryStr {
// Clear the zombie.lock file so we can attempt to re-resolve the correct primary.
if err := RemoveZombieLock(); err != nil {
return fmt.Errorf("failed to remove zombie lock: %s", err)
Expand Down
Loading