Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/monitor/monitor_dead_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int
}

for _, voter := range votingMembers {
sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.Hostname)
sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.NodeName)
if err != nil {
// TODO - Verify the exception that's getting thrown.
if time.Since(seenAt[voter.ID]) >= deadMemberRemovalThreshold {
log.Printf("Removing dead member: %s\n", voter.Hostname)
log.Printf("Removing dead member: %s\n", voter.NodeName)
if err := node.RepMgr.UnregisterMember(voter); err != nil {
log.Printf("failed to unregister member %s: %v", voter.Hostname, err)
log.Printf("failed to unregister member %s: %v", voter.NodeName, err)
continue
}
delete(seenAt, voter.ID)
Expand Down
66 changes: 62 additions & 4 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/fly-apps/postgres-flex/internal/privnet"
"github.com/fly-apps/postgres-flex/internal/utils"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/slices"
)

type Node struct {
AppName string
MachineID string
PrivateIP string
PrimaryRegion string
DataDir string
Expand Down Expand Up @@ -52,6 +54,8 @@ func NewNode() (*Node, error) {

node.PrivateIP = ipv6.String()

node.MachineID = os.Getenv("FLY_MACHINE_ID")

node.PrimaryRegion = os.Getenv("PRIMARY_REGION")
if node.PrimaryRegion == "" {
return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set")
Expand Down Expand Up @@ -89,6 +93,7 @@ func NewNode() (*Node, error) {
PasswordConfigPath: "/data/.pgpass",
DataDir: node.DataDir,
PrivateIP: node.PrivateIP,
MachineID: node.MachineID,
Port: 5433,
DatabaseName: "repmgr",
Credentials: node.ReplCredentials,
Expand Down Expand Up @@ -187,7 +192,7 @@ func (n *Node) Init(ctx context.Context) error {
return fmt.Errorf("failed to resolve member over dns: %s", err)
}

if err := n.RepMgr.clonePrimary(cloneTarget.Hostname); err != nil {
if err := n.RepMgr.clonePrimary(cloneTarget.NodeName); err != nil {
// Clean-up the directory so it can be retried.
if rErr := os.Remove(n.DataDir); rErr != nil {
log.Printf("[ERROR] failed to cleanup postgresql dir after clone error: %s\n", rErr)
Expand Down Expand Up @@ -267,7 +272,9 @@ func (n *Node) PostInit(ctx context.Context) error {

// Restart repmgrd in the event the IP changes for an already registered node.
// This can happen if the underlying volume is moved to a different node.
daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)
// TODO - this isn't an IP anymore
//daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)
daemonRestartRequired := false

switch member.Role {
case PrimaryRoleName:
Expand Down Expand Up @@ -311,6 +318,10 @@ func (n *Node) PostInit(ctx context.Context) error {
}
}
case StandbyRoleName:
if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil {
return fmt.Errorf("failed to migrate node name: %s", err)
}

// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
return fmt.Errorf("failed to register existing standby: %s", err)
Expand All @@ -322,7 +333,7 @@ func (n *Node) PostInit(ctx context.Context) error {
}

// Register existing witness to apply any configuration changes.
if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
if err := n.RepMgr.registerWitness(primary.NodeName); err != nil {
return fmt.Errorf("failed to register existing witness: %s", err)
}
default:
Expand Down Expand Up @@ -404,7 +415,7 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to resolve primary member: %s", err)
}

if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
if err := n.RepMgr.registerWitness(primary.NodeName); err != nil {
return fmt.Errorf("failed to register witness: %s", err)
}
} else {
Expand Down Expand Up @@ -527,3 +538,50 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro

return nil
}

// migrate node name from 6pn to machine ID if needed
func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error {
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
}

primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName)
if err != nil {
return fmt.Errorf("failed to establish connection to primary: %s", err)
}
defer func() { _ = primaryConn.Close(ctx) }()

rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
if err != nil {
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
}
defer rows.Close()

var applicationNames []string
for rows.Next() {
var applicationName string
if err := rows.Scan(&applicationName); err != nil {
return fmt.Errorf("failed to scan application_name: %s", err)
}
applicationNames = append(applicationNames, applicationName)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %s", err)
}

// if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
if slices.Contains(applicationNames, n.PrivateIP) {
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")

if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
return fmt.Errorf("failed to clone standby: %s", err)
}

if err := n.PGConfig.reload(ctx); err != nil {
return fmt.Errorf("failed to reload postgresql: %s", err)
}
}

return nil
}
8 changes: 8 additions & 0 deletions internal/flypg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,11 @@ func diskSizeInBytes(dir string) (uint64, error) {
}
return stat.Blocks * uint64(stat.Bsize), nil
}

func (*PGConfig) reload(ctx context.Context) error {
_, err := utils.RunCmd(ctx, "postgres", "pg_ctl", "-D", "/data/postgresql/", "reload")
if err != nil {
return fmt.Errorf("failed to reload postgres: %s", err)
}
return nil
}
12 changes: 6 additions & 6 deletions internal/flypg/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,31 +70,31 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {

for _, member := range members {
if member.Role == PrimaryRoleName {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target)
endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.NodeName, n.AppName), target)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err)
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.NodeName, err)
continue
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode > 299 {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.Hostname, resp.StatusCode)
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.NodeName, resp.StatusCode)
}
}
}

for _, member := range members {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint)
endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.NodeName, n.AppName), RestartHaproxyEndpoint)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err)
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.NodeName, err)
continue
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode > 299 {
log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.Hostname, resp.StatusCode)
log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.NodeName, resp.StatusCode)
}
}

Expand Down
Loading