From 2bde496f798ffacd3f94b39f93c158c5dd4a37e8 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Wed, 31 Jul 2024 01:35:51 -0400 Subject: [PATCH 01/20] WIP switch node names to machine ID --- internal/flypg/node.go | 4 +++ internal/flypg/repmgr.go | 32 ++++++++++++++++----- internal/privnet/sixpn.go | 58 +++++++++++++++++++++++++++++---------- 3 files changed, 73 insertions(+), 21 deletions(-) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 3b099218..bfb82519 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -20,6 +20,7 @@ import ( type Node struct { AppName string + MachineID string PrivateIP string PrimaryRegion string DataDir string @@ -52,6 +53,8 @@ func NewNode() (*Node, error) { node.PrivateIP = ipv6.String() + node.MachineID = os.Getenv("FLY_MACHINE_ID") + node.PrimaryRegion = os.Getenv("PRIMARY_REGION") if node.PrimaryRegion == "" { return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set") @@ -89,6 +92,7 @@ func NewNode() (*Node, error) { PasswordConfigPath: "/data/.pgpass", DataDir: node.DataDir, PrivateIP: node.PrivateIP, + MachineID: node.MachineID, Port: 5433, DatabaseName: "repmgr", Credentials: node.ReplCredentials, diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 46ceab8c..aaaa30e0 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -34,6 +34,7 @@ type RepMgr struct { PrimaryRegion string Region string PrivateIP string + MachineID string DataDir string DatabaseName string Credentials admin.Credential @@ -161,10 +162,12 @@ func (r *RepMgr) setDefaults() error { return err } + hostname := fmt.Sprintf("%s.vm.%s.internal", r.MachineID, r.AppName) + conf := ConfigMap{ "node_id": nodeID, - "node_name": fmt.Sprintf("'%s'", r.PrivateIP), - "conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", r.PrivateIP, r.Port, r.Credentials.Username, r.DatabaseName), + "node_name": fmt.Sprintf("'%s'", r.MachineID), + "conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", hostname, r.Port, r.Credentials.Username, r.DatabaseName), "data_directory": fmt.Sprintf("'%s'", r.DataDir), "failover": "'automatic'", "use_replication_slots": "yes", @@ -432,25 +435,25 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri } func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) { - ips, err := r.InRegionPeerIPs(ctx) + machineIds, err := r.InRegionPeerMachines(ctx) if err != nil { return nil, err } var target *Member - for _, ip := range ips { - if ip.String() == r.PrivateIP { + for _, machineId := range machineIds { + if machineId == r.MachineID { continue } - conn, err := r.NewRemoteConnection(ctx, ip.String()) + conn, err := r.NewRemoteConnection(ctx, fmt.Sprintf("%s.vm.%s.internal", machineId, r.AppName)) if err != nil { continue } defer func() { _ = conn.Close(ctx) }() - member, err := r.MemberByHostname(ctx, conn, ip.String()) + member, err := r.MemberByHostname(ctx, conn, machineId) if err != nil { continue } @@ -477,6 +480,21 @@ func (r *RepMgr) InRegionPeerIPs(ctx context.Context) ([]net.IPAddr, error) { return privnet.AllPeers(ctx, targets) } +func (r *RepMgr) InRegionPeerMachines(ctx context.Context) ([]string, error) { + machines, err := privnet.AllMachines(ctx, r.AppName) + if err != nil { + return nil, err + } + + var machineIDs []string + for _, machine := range machines { + if machine.Region == r.PrimaryRegion { + machineIDs = append(machineIDs, machine.Id) + } + } + return machineIDs, nil +} + func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error) { ips, err := r.InRegionPeerIPs(ctx) if err != nil { diff --git a/internal/privnet/sixpn.go b/internal/privnet/sixpn.go index 85b1eaeb..7c4120f5 100644 --- a/internal/privnet/sixpn.go +++ b/internal/privnet/sixpn.go @@ -14,20 +14,7 @@ func AllPeers(ctx context.Context, appName string) ([]net.IPAddr, error) { } func Get6PN(ctx context.Context, hostname string) ([]net.IPAddr, error) { - nameserver := os.Getenv("FLY_NAMESERVER") - if nameserver == "" { - nameserver = "fdaa::3" - } - nameserver = net.JoinHostPort(nameserver, "53") - r := &net.Resolver{ - PreferGo: true, - Dial: func(ctx context.Context, network, address string) (net.Conn, error) { - d := net.Dialer{ - Timeout: 1 * time.Second, - } - return d.DialContext(ctx, "udp6", nameserver) - }, - } + r := getResolver() ips, err := r.LookupIPAddr(ctx, hostname) if err != nil { @@ -54,6 +41,49 @@ func Get6PN(ctx context.Context, hostname string) ([]net.IPAddr, error) { return ips, err } +type Machine struct { + Id string + Region string +} + +func AllMachines(ctx context.Context, appName string) ([]Machine, error) { + r := getResolver() + txts, err := r.LookupTXT(ctx, fmt.Sprintf("vms.%s.internal", appName)) + if err != nil { + return nil, err + } + + machines := make([]Machine, 0) + for _, txt := range txts { + parts := strings.Split(txt, ",") + for _, part := range parts { + parts := strings.Split(part, " ") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid machine DNS TXT format: %s", txt) + } + machines = append(machines, Machine{Id: parts[0], Region: parts[1]}) + } + } + return machines, nil +} + +func getResolver() *net.Resolver { + nameserver := os.Getenv("FLY_NAMESERVER") + if nameserver == "" { + nameserver = "fdaa::3" + } + nameserver = net.JoinHostPort(nameserver, "53") + return &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + d := net.Dialer{ + Timeout: 1 * time.Second, + } + return d.DialContext(ctx, "udp6", nameserver) + }, + } +} + func PrivateIPv6() (net.IP, error) { ips, err := net.LookupIP("fly-local-6pn") if err != nil && !strings.HasSuffix(err.Error(), "no such host") && !strings.HasSuffix(err.Error(), "server misbehaving") { From b56391d7c385bd5d167a46474b9caf53f3c4cb23 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Wed, 31 Jul 2024 01:48:45 -0400 Subject: [PATCH 02/20] fix replica clone --- internal/flypg/repmgr.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index aaaa30e0..e8804d46 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -304,14 +304,14 @@ func (r *RepMgr) rejoinCluster(hostname string) error { return err } -func (r *RepMgr) clonePrimary(ipStr string) error { +func (r *RepMgr) clonePrimary(machineId string) error { cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir) if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil { return fmt.Errorf("failed to create pg directory: %s", err) } cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -c -F", - ipStr, + fmt.Sprintf("%s.vm.%s.internal", machineId, r.AppName), r.Port, r.DatabaseName, r.Credentials.Username, From af508f69fe9aa8346c6bac98c1c8be17ef989337 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Wed, 31 Jul 2024 20:10:46 -0400 Subject: [PATCH 03/20] updates --- internal/flypg/readonly.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index a714c254..3fe424bf 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { for _, member := range members { if member.Role == PrimaryRoleName { - endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target) + endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.Hostname, n.AppName), target) resp, err := http.Get(endpoint) if err != nil { log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err) @@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { } for _, member := range members { - endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint) + endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.Hostname, n.AppName), RestartHaproxyEndpoint) resp, err := http.Get(endpoint) if err != nil { log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err) From 68092141aa1059cfe55c974e13518bb51e676fb4 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Thu, 1 Aug 2024 00:50:28 -0400 Subject: [PATCH 04/20] fixes --- cmd/monitor/monitor_dead_members.go | 6 +++--- internal/flypg/node.go | 6 +++--- internal/flypg/readonly.go | 12 ++++++------ internal/flypg/repmgr.go | 19 ++++++++++--------- internal/flypg/zombie.go | 29 ++++++++++++++--------------- 5 files changed, 36 insertions(+), 36 deletions(-) diff --git a/cmd/monitor/monitor_dead_members.go b/cmd/monitor/monitor_dead_members.go index 5602cc06..befb4f18 100644 --- a/cmd/monitor/monitor_dead_members.go +++ b/cmd/monitor/monitor_dead_members.go @@ -71,13 +71,13 @@ func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int } for _, voter := range votingMembers { - sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.Hostname) + sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.NodeName) if err != nil { // TODO - Verify the exception that's getting thrown. if time.Since(seenAt[voter.ID]) >= deadMemberRemovalThreshold { - log.Printf("Removing dead member: %s\n", voter.Hostname) + log.Printf("Removing dead member: %s\n", voter.NodeName) if err := node.RepMgr.UnregisterMember(voter); err != nil { - log.Printf("failed to unregister member %s: %v", voter.Hostname, err) + log.Printf("failed to unregister member %s: %v", voter.NodeName, err) continue } delete(seenAt, voter.ID) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index bfb82519..168a33d5 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -191,7 +191,7 @@ func (n *Node) Init(ctx context.Context) error { return fmt.Errorf("failed to resolve member over dns: %s", err) } - if err := n.RepMgr.clonePrimary(cloneTarget.Hostname); err != nil { + if err := n.RepMgr.clonePrimary(cloneTarget.NodeName); err != nil { // Clean-up the directory so it can be retried. if rErr := os.Remove(n.DataDir); rErr != nil { log.Printf("[ERROR] failed to cleanup postgresql dir after clone error: %s\n", rErr) @@ -326,7 +326,7 @@ func (n *Node) PostInit(ctx context.Context) error { } // Register existing witness to apply any configuration changes. - if err := n.RepMgr.registerWitness(primary.Hostname); err != nil { + if err := n.RepMgr.registerWitness(primary.NodeName); err != nil { return fmt.Errorf("failed to register existing witness: %s", err) } default: @@ -408,7 +408,7 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to resolve primary member: %s", err) } - if err := n.RepMgr.registerWitness(primary.Hostname); err != nil { + if err := n.RepMgr.registerWitness(primary.NodeName); err != nil { return fmt.Errorf("failed to register witness: %s", err) } } else { diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index 3fe424bf..4419a48e 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -70,31 +70,31 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { for _, member := range members { if member.Role == PrimaryRoleName { - endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.Hostname, n.AppName), target) + endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.NodeName, n.AppName), target) resp, err := http.Get(endpoint) if err != nil { - log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err) + log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.NodeName, err) continue } defer func() { _ = resp.Body.Close() }() if resp.StatusCode > 299 { - log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.Hostname, resp.StatusCode) + log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.NodeName, resp.StatusCode) } } } for _, member := range members { - endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.Hostname, n.AppName), RestartHaproxyEndpoint) + endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.NodeName, n.AppName), RestartHaproxyEndpoint) resp, err := http.Get(endpoint) if err != nil { - log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err) + log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.NodeName, err) continue } defer func() { _ = resp.Body.Close() }() if resp.StatusCode > 299 { - log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.Hostname, resp.StatusCode) + log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.NodeName, resp.StatusCode) } } diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index e8804d46..65f9ce73 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -100,7 +100,8 @@ func (r *RepMgr) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) { return openConnection(ctx, host, r.DatabaseName, r.Credentials) } -func (r *RepMgr) NewRemoteConnection(ctx context.Context, hostname string) (*pgx.Conn, error) { +func (r *RepMgr) NewRemoteConnection(ctx context.Context, machineID string) (*pgx.Conn, error) { + hostname := fmt.Sprintf("%s.vm.%s.internal", machineID, r.AppName) host := net.JoinHostPort(hostname, strconv.Itoa(r.Port)) return openConnection(ctx, host, r.DatabaseName, r.Credentials) } @@ -289,10 +290,10 @@ func (r *RepMgr) unregisterWitness(id int) error { return err } -func (r *RepMgr) rejoinCluster(hostname string) error { +func (r *RepMgr) rejoinCluster(machineID string) error { cmdStr := fmt.Sprintf("repmgr -f %s node rejoin -h %s -p %d -U %s -d %s --force-rewind --no-wait", r.ConfigPath, - hostname, + fmt.Sprintf("%s.vm.%s.internal", machineID, r.AppName), r.Port, r.Credentials.Username, r.DatabaseName, @@ -327,7 +328,7 @@ func (r *RepMgr) clonePrimary(machineId string) error { type Member struct { ID int - Hostname string + NodeName string Active bool Region string Role string @@ -344,7 +345,7 @@ func (*RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) { var members []Member for rows.Next() { var member Member - if err := rows.Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role); err != nil { + if err := rows.Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role); err != nil { return nil, err } @@ -377,7 +378,7 @@ func (r *RepMgr) Member(ctx context.Context, conn *pgx.Conn) (*Member, error) { func (*RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error) { var member Member sql := "select node_id, node_name, location, active, type from repmgr.nodes where type = 'primary' and active = true;" - err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role) + err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role) if err != nil { return nil, err } @@ -414,7 +415,7 @@ func (*RepMgr) MemberByID(ctx context.Context, pg *pgx.Conn, id int) (*Member, e var member Member sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_id = %d;", id) - err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role) + err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role) if err != nil { return nil, err } @@ -426,7 +427,7 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri var member Member sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_name = '%s';", hostname) - err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role) + err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role) if err != nil { return nil, err } @@ -447,7 +448,7 @@ func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) { continue } - conn, err := r.NewRemoteConnection(ctx, fmt.Sprintf("%s.vm.%s.internal", machineId, r.AppName)) + conn, err := r.NewRemoteConnection(ctx, machineId) if err != nil { continue } diff --git a/internal/flypg/zombie.go b/internal/flypg/zombie.go index 7204846d..571f1ca0 100644 --- a/internal/flypg/zombie.go +++ b/internal/flypg/zombie.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "log" - "net" "os" "github.com/fly-apps/postgres-flex/internal/utils" @@ -95,9 +94,9 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp for _, standby := range standbys { // Check for connectivity - mConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname) + mConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.NodeName) if err != nil { - log.Printf("[WARN] Failed to connect to %s\n", standby.Hostname) + log.Printf("[WARN] Failed to connect to %s\n", standby.NodeName) sample.totalInactive++ continue } @@ -106,7 +105,7 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp // Verify the primary primary, err := node.RepMgr.PrimaryMember(ctx, mConn) if err != nil { - log.Printf("[WARN] Failed to resolve primary from standby %s\n", standby.Hostname) + log.Printf("[WARN] Failed to resolve primary from standby %s\n", standby.NodeName) sample.totalInactive++ continue } @@ -118,9 +117,9 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp sample.totalActive++ // Record conflict when primary doesn't match. - if primary.Hostname != node.PrivateIP { + if primary.NodeName != node.MachineID { sample.totalConflicts++ - sample.conflictMap[primary.Hostname]++ + sample.conflictMap[primary.NodeName]++ } } @@ -199,24 +198,24 @@ func handleZombieLock(ctx context.Context, n *Node) error { // If the zombie lock contains a hostname, it means we were able to // resolve the real primary and will attempt to rejoin it. if primaryStr != "" { - ip := net.ParseIP(primaryStr) - if ip == nil { - return fmt.Errorf("zombie.lock file contains an invalid ipv6 address") - } + //ip := net.ParseIP(primaryStr) + //if ip == nil { + // return fmt.Errorf("zombie.lock file contains an invalid ipv6 address") + //} - conn, err := n.RepMgr.NewRemoteConnection(ctx, ip.String()) + conn, err := n.RepMgr.NewRemoteConnection(ctx, primaryStr) if err != nil { - return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", ip.String(), err) + return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", primaryStr, err) } defer func() { _ = conn.Close(ctx) }() primary, err := n.RepMgr.PrimaryMember(ctx, conn) if err != nil { - return fmt.Errorf("failed to confirm primary on recover target %s: %s", ip.String(), err) + return fmt.Errorf("failed to confirm primary on recover target %s: %s", primaryStr, err) } // Confirm that our rejoin target still identifies itself as the primary. - if primary.Hostname != ip.String() { + if primary.NodeName != primaryStr { // Clear the zombie.lock file so we can attempt to re-resolve the correct primary. if err := RemoveZombieLock(); err != nil { return fmt.Errorf("failed to remove zombie lock: %s", err) @@ -231,7 +230,7 @@ func handleZombieLock(ctx context.Context, n *Node) error { return ErrZombieLockRegionMismatch } - if err := n.RepMgr.rejoinCluster(primary.Hostname); err != nil { + if err := n.RepMgr.rejoinCluster(primary.NodeName); err != nil { return fmt.Errorf("failed to rejoin cluster: %s", err) } From 74fadf31656bd34cd7e28eb03d1688191f2b3b08 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Thu, 1 Aug 2024 01:51:54 -0400 Subject: [PATCH 05/20] TODO --- internal/flypg/node.go | 1 + internal/flypg/repmgr.go | 4 ++-- internal/flypg/zombie.go | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 168a33d5..42d9016c 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -271,6 +271,7 @@ func (n *Node) PostInit(ctx context.Context) error { // Restart repmgrd in the event the IP changes for an already registered node. // This can happen if the underlying volume is moved to a different node. + // TODO - this isn't an IP anymore daemonRestartRequired := n.RepMgr.daemonRestartRequired(member) switch member.Role { diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 65f9ce73..33ae2ae9 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -254,7 +254,7 @@ func (r *RepMgr) registerStandby(restartDaemon bool) error { } func (r *RepMgr) registerWitness(primaryHostname string) error { - cmdStr := fmt.Sprintf("repmgr witness register -f %s -h %s -F", r.ConfigPath, primaryHostname) + cmdStr := fmt.Sprintf("repmgr witness register -f %s -h %s -F", r.ConfigPath, primaryHostname) // TODO _, err := utils.RunCommand(cmdStr, "postgres") return err @@ -280,7 +280,7 @@ func (*RepMgr) restartDaemon() error { } func (r *RepMgr) daemonRestartRequired(m *Member) bool { - return m.Hostname != r.PrivateIP + return m.NodeName != r.PrivateIP // TODO } func (r *RepMgr) unregisterWitness(id int) error { diff --git a/internal/flypg/zombie.go b/internal/flypg/zombie.go index 571f1ca0..ff45c157 100644 --- a/internal/flypg/zombie.go +++ b/internal/flypg/zombie.go @@ -215,6 +215,7 @@ func handleZombieLock(ctx context.Context, n *Node) error { } // Confirm that our rejoin target still identifies itself as the primary. + // TODO - machine IDs don't change if primary.NodeName != primaryStr { // Clear the zombie.lock file so we can attempt to re-resolve the correct primary. if err := RemoveZombieLock(); err != nil { From bbd55f02891e1bf20bffda3bb018d3191df60d9e Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Fri, 2 Aug 2024 00:13:11 -0400 Subject: [PATCH 06/20] fix application_name on replica startup --- internal/flypg/node.go | 47 +++++++++++++++++++++++++++++++++++++++- internal/flypg/pg.go | 8 +++++++ internal/flypg/repmgr.go | 30 +++++++++++++++++++++++-- 3 files changed, 82 insertions(+), 3 deletions(-) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 42d9016c..7f852b00 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -16,6 +16,7 @@ import ( "github.com/fly-apps/postgres-flex/internal/privnet" "github.com/fly-apps/postgres-flex/internal/utils" "github.com/jackc/pgx/v5" + "golang.org/x/exp/slices" ) type Node struct { @@ -272,7 +273,8 @@ func (n *Node) PostInit(ctx context.Context) error { // Restart repmgrd in the event the IP changes for an already registered node. // This can happen if the underlying volume is moved to a different node. // TODO - this isn't an IP anymore - daemonRestartRequired := n.RepMgr.daemonRestartRequired(member) + //daemonRestartRequired := n.RepMgr.daemonRestartRequired(member) + daemonRestartRequired := false switch member.Role { case PrimaryRoleName: @@ -316,6 +318,49 @@ func (n *Node) PostInit(ctx context.Context) error { } } case StandbyRoleName: + // This section handles migration from 6pn as repmgr node name to machine ID as repmgr node name + primary, err := n.RepMgr.PrimaryMember(ctx, repConn) + if err != nil { + return fmt.Errorf("failed to resolve primary member when updating standby: %s", err) + } + + primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName) + if err != nil { + return fmt.Errorf("failed to establish connection to primary: %s", err) + } + defer func() { _ = primaryConn.Close(ctx) }() + + rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication") + if err != nil { + return fmt.Errorf("failed to query pg_stat_replication: %s", err) + } + defer rows.Close() + + var applicationNames []string + for rows.Next() { + var applicationName string + if err := rows.Scan(&applicationName); err != nil { + return fmt.Errorf("failed to scan application_name: %s", err) + } + applicationNames = append(applicationNames, applicationName) + } + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to iterate over rows: %s", err) + } + + if slices.Contains(applicationNames, n.PrivateIP) { + log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...") + + if err := n.RepMgr.regenReplicationConf(ctx); err != nil { + return fmt.Errorf("failed to clone standby: %s", err) + } + + if err := n.PGConfig.reload(ctx); err != nil { + return fmt.Errorf("failed to reload postgresql: %s", err) + } + } + // end of 6pn -> machine ID migration stuff + // Register existing standby to apply any configuration changes. if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil { return fmt.Errorf("failed to register existing standby: %s", err) diff --git a/internal/flypg/pg.go b/internal/flypg/pg.go index 94a294ab..af7d648d 100644 --- a/internal/flypg/pg.go +++ b/internal/flypg/pg.go @@ -606,3 +606,11 @@ func diskSizeInBytes(dir string) (uint64, error) { } return stat.Blocks * uint64(stat.Bsize), nil } + +func (*PGConfig) reload(ctx context.Context) error { + _, err := utils.RunCmd(ctx, "postgres", "pg_ctl", "-D", "/data/postgresql/", "reload") + if err != nil { + return fmt.Errorf("failed to reload postgres: %s", err) + } + return nil +} diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 33ae2ae9..4e01cbd3 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -100,8 +100,19 @@ func (r *RepMgr) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) { return openConnection(ctx, host, r.DatabaseName, r.Credentials) } -func (r *RepMgr) NewRemoteConnection(ctx context.Context, machineID string) (*pgx.Conn, error) { - hostname := fmt.Sprintf("%s.vm.%s.internal", machineID, r.AppName) +// target - can be an IP address, machine ID in the current app, or other hostname +func (r *RepMgr) NewRemoteConnection(ctx context.Context, target string) (*pgx.Conn, error) { + var hostname string + + ip := net.ParseIP(target) + if ip != nil { + hostname = target + } else if len(target) == 14 { + hostname = fmt.Sprintf("%s.vm.%s.internal", target, r.AppName) + } else { + hostname = target + } + host := net.JoinHostPort(hostname, strconv.Itoa(r.Port)) return openConnection(ctx, host, r.DatabaseName, r.Credentials) } @@ -326,6 +337,21 @@ func (r *RepMgr) clonePrimary(machineId string) error { return nil } +func (r *RepMgr) regenReplicationConf(ctx context.Context) error { + // TODO: do we need -c? + if _, err := utils.RunCmd(ctx, "postgres", + "repmgr", "--replication-conf-only", + "-h", r.PrivateIP, // TODO: should this be the hostname, or even just localhost + "-p", fmt.Sprint(r.Port), + "-d", r.DatabaseName, + "-U", r.Credentials.Username, + "-f", r.ConfigPath, + "standby", "clone", "-F"); err != nil { + return fmt.Errorf("failed to regenerate replication conf: %s", err) + } + return nil +} + type Member struct { ID int NodeName string From 09167358c630c60b66abcf3f6fc95d380a157d6b Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Fri, 2 Aug 2024 00:18:39 -0400 Subject: [PATCH 07/20] move to function --- internal/flypg/node.go | 90 +++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 41 deletions(-) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 7f852b00..4726865b 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -318,48 +318,9 @@ func (n *Node) PostInit(ctx context.Context) error { } } case StandbyRoleName: - // This section handles migration from 6pn as repmgr node name to machine ID as repmgr node name - primary, err := n.RepMgr.PrimaryMember(ctx, repConn) - if err != nil { - return fmt.Errorf("failed to resolve primary member when updating standby: %s", err) - } - - primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName) - if err != nil { - return fmt.Errorf("failed to establish connection to primary: %s", err) - } - defer func() { _ = primaryConn.Close(ctx) }() - - rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication") - if err != nil { - return fmt.Errorf("failed to query pg_stat_replication: %s", err) - } - defer rows.Close() - - var applicationNames []string - for rows.Next() { - var applicationName string - if err := rows.Scan(&applicationName); err != nil { - return fmt.Errorf("failed to scan application_name: %s", err) - } - applicationNames = append(applicationNames, applicationName) - } - if err := rows.Err(); err != nil { - return fmt.Errorf("failed to iterate over rows: %s", err) - } - - if slices.Contains(applicationNames, n.PrivateIP) { - log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...") - - if err := n.RepMgr.regenReplicationConf(ctx); err != nil { - return fmt.Errorf("failed to clone standby: %s", err) - } - - if err := n.PGConfig.reload(ctx); err != nil { - return fmt.Errorf("failed to reload postgresql: %s", err) - } + if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil { + return fmt.Errorf("failed to migrate node name: %s", err) } - // end of 6pn -> machine ID migration stuff // Register existing standby to apply any configuration changes. if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil { @@ -577,3 +538,50 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro return nil } + +// migrate node name from 6pn to machine ID if needed +func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error { + primary, err := n.RepMgr.PrimaryMember(ctx, repConn) + if err != nil { + return fmt.Errorf("failed to resolve primary member when updating standby: %s", err) + } + + primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName) + if err != nil { + return fmt.Errorf("failed to establish connection to primary: %s", err) + } + defer func() { _ = primaryConn.Close(ctx) }() + + rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication") + if err != nil { + return fmt.Errorf("failed to query pg_stat_replication: %s", err) + } + defer rows.Close() + + var applicationNames []string + for rows.Next() { + var applicationName string + if err := rows.Scan(&applicationName); err != nil { + return fmt.Errorf("failed to scan application_name: %s", err) + } + applicationNames = append(applicationNames, applicationName) + } + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to iterate over rows: %s", err) + } + + // if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql + if slices.Contains(applicationNames, n.PrivateIP) { + log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...") + + if err := n.RepMgr.regenReplicationConf(ctx); err != nil { + return fmt.Errorf("failed to clone standby: %s", err) + } + + if err := n.PGConfig.reload(ctx); err != nil { + return fmt.Errorf("failed to reload postgresql: %s", err) + } + } + + return nil +} From a995af1a502cced0104c83a7a00552bbd1494b38 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Fri, 2 Aug 2024 14:41:13 -0400 Subject: [PATCH 08/20] reuse code --- internal/flypg/node.go | 2 +- internal/flypg/pg.go | 8 -------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 4726865b..86aff6d3 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -578,7 +578,7 @@ func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) e return fmt.Errorf("failed to clone standby: %s", err) } - if err := n.PGConfig.reload(ctx); err != nil { + if err := admin.ReloadPostgresConfig(ctx, repConn); err != nil { return fmt.Errorf("failed to reload postgresql: %s", err) } } diff --git a/internal/flypg/pg.go b/internal/flypg/pg.go index af7d648d..94a294ab 100644 --- a/internal/flypg/pg.go +++ b/internal/flypg/pg.go @@ -606,11 +606,3 @@ func diskSizeInBytes(dir string) (uint64, error) { } return stat.Blocks * uint64(stat.Bsize), nil } - -func (*PGConfig) reload(ctx context.Context) error { - _, err := utils.RunCmd(ctx, "postgres", "pg_ctl", "-D", "/data/postgresql/", "reload") - if err != nil { - return fmt.Errorf("failed to reload postgres: %s", err) - } - return nil -} From 8778bd574317769198d0335b2e269682354b2a2e Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Tue, 6 Aug 2024 03:14:17 -0400 Subject: [PATCH 09/20] upgrade primary --- go.mod | 4 ++-- internal/flypg/readonly.go | 4 ++-- internal/flypg/repmgr.go | 24 ++++++++++++++++-------- internal/flypg/zombie.go | 2 +- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index b1b1e9fb..9cf3f9ad 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,10 @@ require ( github.com/hashicorp/consul/api v1.18.0 github.com/jackc/pgconn v1.14.3 github.com/jackc/pgx/v5 v5.5.4 + github.com/olekukonko/tablewriter v0.0.5 github.com/pkg/errors v0.9.1 github.com/pkg/term v1.1.0 + github.com/spf13/cobra v1.8.1 github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2 golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 golang.org/x/sync v0.1.0 @@ -36,8 +38,6 @@ require ( github.com/mattn/go-runewidth v0.0.9 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect - github.com/olekukonko/tablewriter v0.0.5 // indirect - github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.0 // indirect golang.org/x/crypto v0.20.0 // indirect diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index 4419a48e..b19bcf21 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { for _, member := range members { if member.Role == PrimaryRoleName { - endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.NodeName, n.AppName), target) + endpoint := fmt.Sprintf("http://%s:5500/%s", member.NodeName, target) resp, err := http.Get(endpoint) if err != nil { log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.NodeName, err) @@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { } for _, member := range members { - endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.NodeName, n.AppName), RestartHaproxyEndpoint) + endpoint := fmt.Sprintf("http://%s:5500/%s", member.NodeName, RestartHaproxyEndpoint) resp, err := http.Get(endpoint) if err != nil { log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.NodeName, err) diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 4e01cbd3..8b55c606 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -174,11 +174,11 @@ func (r *RepMgr) setDefaults() error { return err } - hostname := fmt.Sprintf("%s.vm.%s.internal", r.MachineID, r.AppName) + hostname := r.machineIdToDNS(r.MachineID) conf := ConfigMap{ "node_id": nodeID, - "node_name": fmt.Sprintf("'%s'", r.MachineID), + "node_name": fmt.Sprintf("'%s'", hostname), "conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", hostname, r.Port, r.Credentials.Username, r.DatabaseName), "data_directory": fmt.Sprintf("'%s'", r.DataDir), "failover": "'automatic'", @@ -301,10 +301,10 @@ func (r *RepMgr) unregisterWitness(id int) error { return err } -func (r *RepMgr) rejoinCluster(machineID string) error { +func (r *RepMgr) rejoinCluster(nodeName string) error { cmdStr := fmt.Sprintf("repmgr -f %s node rejoin -h %s -p %d -U %s -d %s --force-rewind --no-wait", r.ConfigPath, - fmt.Sprintf("%s.vm.%s.internal", machineID, r.AppName), + nodeName, r.Port, r.Credentials.Username, r.DatabaseName, @@ -316,14 +316,14 @@ func (r *RepMgr) rejoinCluster(machineID string) error { return err } -func (r *RepMgr) clonePrimary(machineId string) error { +func (r *RepMgr) clonePrimary(hostname string) error { cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir) if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil { return fmt.Errorf("failed to create pg directory: %s", err) } cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -c -F", - fmt.Sprintf("%s.vm.%s.internal", machineId, r.AppName), + hostname, r.Port, r.DatabaseName, r.Credentials.Username, @@ -474,13 +474,13 @@ func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) { continue } - conn, err := r.NewRemoteConnection(ctx, machineId) + conn, err := r.NewRemoteConnection(ctx, r.machineIdToDNS(machineId)) if err != nil { continue } defer func() { _ = conn.Close(ctx) }() - member, err := r.MemberByHostname(ctx, conn, machineId) + member, err := r.MemberByHostname(ctx, conn, r.machineIdToDNS(machineId)) if err != nil { continue } @@ -559,3 +559,11 @@ func (r *RepMgr) UnregisterMember(member Member) error { func (r *RepMgr) eligiblePrimary() bool { return r.Region == r.PrimaryRegion } + +func (r *RepMgr) machineIdToDNS(nodeName string) string { + if len(nodeName) != 14 { + panic("invalid machine id") + } + + return fmt.Sprintf("%s.vm.%s.internal", nodeName, r.AppName) +} diff --git a/internal/flypg/zombie.go b/internal/flypg/zombie.go index ff45c157..1753558a 100644 --- a/internal/flypg/zombie.go +++ b/internal/flypg/zombie.go @@ -117,7 +117,7 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp sample.totalActive++ // Record conflict when primary doesn't match. - if primary.NodeName != node.MachineID { + if primary.NodeName != node.MachineID && primary.NodeName != node.PrivateIP { sample.totalConflicts++ sample.conflictMap[primary.NodeName]++ } From 21f810af06f50f356002b3357fd0e0bbe2916025 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Tue, 6 Aug 2024 03:32:18 -0400 Subject: [PATCH 10/20] clean up the diff --- cmd/monitor/monitor_dead_members.go | 6 ++--- internal/flypg/node.go | 14 +++++------- internal/flypg/readonly.go | 12 +++++----- internal/flypg/repmgr.go | 34 ++++++++++------------------- internal/flypg/zombie.go | 20 ++++++----------- 5 files changed, 33 insertions(+), 53 deletions(-) diff --git a/cmd/monitor/monitor_dead_members.go b/cmd/monitor/monitor_dead_members.go index befb4f18..5602cc06 100644 --- a/cmd/monitor/monitor_dead_members.go +++ b/cmd/monitor/monitor_dead_members.go @@ -71,13 +71,13 @@ func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int } for _, voter := range votingMembers { - sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.NodeName) + sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.Hostname) if err != nil { // TODO - Verify the exception that's getting thrown. if time.Since(seenAt[voter.ID]) >= deadMemberRemovalThreshold { - log.Printf("Removing dead member: %s\n", voter.NodeName) + log.Printf("Removing dead member: %s\n", voter.Hostname) if err := node.RepMgr.UnregisterMember(voter); err != nil { - log.Printf("failed to unregister member %s: %v", voter.NodeName, err) + log.Printf("failed to unregister member %s: %v", voter.Hostname, err) continue } delete(seenAt, voter.ID) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 86aff6d3..f2ae0bec 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -192,7 +192,7 @@ func (n *Node) Init(ctx context.Context) error { return fmt.Errorf("failed to resolve member over dns: %s", err) } - if err := n.RepMgr.clonePrimary(cloneTarget.NodeName); err != nil { + if err := n.RepMgr.clonePrimary(cloneTarget.Hostname); err != nil { // Clean-up the directory so it can be retried. if rErr := os.Remove(n.DataDir); rErr != nil { log.Printf("[ERROR] failed to cleanup postgresql dir after clone error: %s\n", rErr) @@ -270,11 +270,9 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to resolve member role: %s", err) } - // Restart repmgrd in the event the IP changes for an already registered node. + // Restart repmgrd in the event the machine ID changes for an already registered node. // This can happen if the underlying volume is moved to a different node. - // TODO - this isn't an IP anymore - //daemonRestartRequired := n.RepMgr.daemonRestartRequired(member) - daemonRestartRequired := false + daemonRestartRequired := n.RepMgr.daemonRestartRequired(member) switch member.Role { case PrimaryRoleName: @@ -333,7 +331,7 @@ func (n *Node) PostInit(ctx context.Context) error { } // Register existing witness to apply any configuration changes. - if err := n.RepMgr.registerWitness(primary.NodeName); err != nil { + if err := n.RepMgr.registerWitness(primary.Hostname); err != nil { return fmt.Errorf("failed to register existing witness: %s", err) } default: @@ -415,7 +413,7 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to resolve primary member: %s", err) } - if err := n.RepMgr.registerWitness(primary.NodeName); err != nil { + if err := n.RepMgr.registerWitness(primary.Hostname); err != nil { return fmt.Errorf("failed to register witness: %s", err) } } else { @@ -546,7 +544,7 @@ func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) e return fmt.Errorf("failed to resolve primary member when updating standby: %s", err) } - primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName) + primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname) if err != nil { return fmt.Errorf("failed to establish connection to primary: %s", err) } diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index b19bcf21..1c075538 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -70,31 +70,31 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { for _, member := range members { if member.Role == PrimaryRoleName { - endpoint := fmt.Sprintf("http://%s:5500/%s", member.NodeName, target) + endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target) resp, err := http.Get(endpoint) if err != nil { - log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.NodeName, err) + log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err) continue } defer func() { _ = resp.Body.Close() }() if resp.StatusCode > 299 { - log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.NodeName, resp.StatusCode) + log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.Hostname, resp.StatusCode) } } } for _, member := range members { - endpoint := fmt.Sprintf("http://%s:5500/%s", member.NodeName, RestartHaproxyEndpoint) + endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint) resp, err := http.Get(endpoint) if err != nil { - log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.NodeName, err) + log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err) continue } defer func() { _ = resp.Body.Close() }() if resp.StatusCode > 299 { - log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.NodeName, resp.StatusCode) + log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.Hostname, resp.StatusCode) } } diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 8b55c606..a79b41de 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -100,19 +100,7 @@ func (r *RepMgr) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) { return openConnection(ctx, host, r.DatabaseName, r.Credentials) } -// target - can be an IP address, machine ID in the current app, or other hostname -func (r *RepMgr) NewRemoteConnection(ctx context.Context, target string) (*pgx.Conn, error) { - var hostname string - - ip := net.ParseIP(target) - if ip != nil { - hostname = target - } else if len(target) == 14 { - hostname = fmt.Sprintf("%s.vm.%s.internal", target, r.AppName) - } else { - hostname = target - } - +func (r *RepMgr) NewRemoteConnection(ctx context.Context, hostname string) (*pgx.Conn, error) { host := net.JoinHostPort(hostname, strconv.Itoa(r.Port)) return openConnection(ctx, host, r.DatabaseName, r.Credentials) } @@ -265,7 +253,7 @@ func (r *RepMgr) registerStandby(restartDaemon bool) error { } func (r *RepMgr) registerWitness(primaryHostname string) error { - cmdStr := fmt.Sprintf("repmgr witness register -f %s -h %s -F", r.ConfigPath, primaryHostname) // TODO + cmdStr := fmt.Sprintf("repmgr witness register -f %s -h %s -F", r.ConfigPath, primaryHostname) _, err := utils.RunCommand(cmdStr, "postgres") return err @@ -291,7 +279,7 @@ func (*RepMgr) restartDaemon() error { } func (r *RepMgr) daemonRestartRequired(m *Member) bool { - return m.NodeName != r.PrivateIP // TODO + return m.Hostname != r.MachineID } func (r *RepMgr) unregisterWitness(id int) error { @@ -301,10 +289,10 @@ func (r *RepMgr) unregisterWitness(id int) error { return err } -func (r *RepMgr) rejoinCluster(nodeName string) error { +func (r *RepMgr) rejoinCluster(hostname string) error { cmdStr := fmt.Sprintf("repmgr -f %s node rejoin -h %s -p %d -U %s -d %s --force-rewind --no-wait", r.ConfigPath, - nodeName, + hostname, r.Port, r.Credentials.Username, r.DatabaseName, @@ -341,7 +329,7 @@ func (r *RepMgr) regenReplicationConf(ctx context.Context) error { // TODO: do we need -c? if _, err := utils.RunCmd(ctx, "postgres", "repmgr", "--replication-conf-only", - "-h", r.PrivateIP, // TODO: should this be the hostname, or even just localhost + "-h", r.PrivateIP, "-p", fmt.Sprint(r.Port), "-d", r.DatabaseName, "-U", r.Credentials.Username, @@ -354,7 +342,7 @@ func (r *RepMgr) regenReplicationConf(ctx context.Context) error { type Member struct { ID int - NodeName string + Hostname string Active bool Region string Role string @@ -371,7 +359,7 @@ func (*RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) { var members []Member for rows.Next() { var member Member - if err := rows.Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role); err != nil { + if err := rows.Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role); err != nil { return nil, err } @@ -404,7 +392,7 @@ func (r *RepMgr) Member(ctx context.Context, conn *pgx.Conn) (*Member, error) { func (*RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error) { var member Member sql := "select node_id, node_name, location, active, type from repmgr.nodes where type = 'primary' and active = true;" - err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role) + err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role) if err != nil { return nil, err } @@ -441,7 +429,7 @@ func (*RepMgr) MemberByID(ctx context.Context, pg *pgx.Conn, id int) (*Member, e var member Member sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_id = %d;", id) - err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role) + err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role) if err != nil { return nil, err } @@ -453,7 +441,7 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri var member Member sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_name = '%s';", hostname) - err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role) + err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role) if err != nil { return nil, err } diff --git a/internal/flypg/zombie.go b/internal/flypg/zombie.go index 1753558a..ca3a85d4 100644 --- a/internal/flypg/zombie.go +++ b/internal/flypg/zombie.go @@ -94,9 +94,9 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp for _, standby := range standbys { // Check for connectivity - mConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.NodeName) + mConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname) if err != nil { - log.Printf("[WARN] Failed to connect to %s\n", standby.NodeName) + log.Printf("[WARN] Failed to connect to %s\n", standby.Hostname) sample.totalInactive++ continue } @@ -105,7 +105,7 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp // Verify the primary primary, err := node.RepMgr.PrimaryMember(ctx, mConn) if err != nil { - log.Printf("[WARN] Failed to resolve primary from standby %s\n", standby.NodeName) + log.Printf("[WARN] Failed to resolve primary from standby %s\n", standby.Hostname) sample.totalInactive++ continue } @@ -117,9 +117,9 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp sample.totalActive++ // Record conflict when primary doesn't match. - if primary.NodeName != node.MachineID && primary.NodeName != node.PrivateIP { + if primary.Hostname != node.MachineID && primary.Hostname != node.PrivateIP { sample.totalConflicts++ - sample.conflictMap[primary.NodeName]++ + sample.conflictMap[primary.Hostname]++ } } @@ -198,11 +198,6 @@ func handleZombieLock(ctx context.Context, n *Node) error { // If the zombie lock contains a hostname, it means we were able to // resolve the real primary and will attempt to rejoin it. if primaryStr != "" { - //ip := net.ParseIP(primaryStr) - //if ip == nil { - // return fmt.Errorf("zombie.lock file contains an invalid ipv6 address") - //} - conn, err := n.RepMgr.NewRemoteConnection(ctx, primaryStr) if err != nil { return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", primaryStr, err) @@ -215,8 +210,7 @@ func handleZombieLock(ctx context.Context, n *Node) error { } // Confirm that our rejoin target still identifies itself as the primary. - // TODO - machine IDs don't change - if primary.NodeName != primaryStr { + if primary.Hostname != primaryStr { // Clear the zombie.lock file so we can attempt to re-resolve the correct primary. if err := RemoveZombieLock(); err != nil { return fmt.Errorf("failed to remove zombie lock: %s", err) @@ -231,7 +225,7 @@ func handleZombieLock(ctx context.Context, n *Node) error { return ErrZombieLockRegionMismatch } - if err := n.RepMgr.rejoinCluster(primary.NodeName); err != nil { + if err := n.RepMgr.rejoinCluster(primary.Hostname); err != nil { return fmt.Errorf("failed to rejoin cluster: %s", err) } From 63e4bfc324484d73a1709253742d25c35ae7dd2b Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Tue, 6 Aug 2024 03:38:40 -0400 Subject: [PATCH 11/20] fix tests --- internal/flypg/repmgr_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/flypg/repmgr_test.go b/internal/flypg/repmgr_test.go index 8251c7d4..d5233805 100644 --- a/internal/flypg/repmgr_test.go +++ b/internal/flypg/repmgr_test.go @@ -33,6 +33,7 @@ func TestRepmgrInitialization(t *testing.T) { UserConfigPath: repgmrUserConfigFilePath, PasswordConfigPath: repgmrPasswordConfigFilePath, DataDir: repmgrTestDirectory, + MachineID: "abcdefg1234567", PrivateIP: "127.0.0.1", Credentials: admin.Credential{ Username: "user", @@ -91,8 +92,8 @@ func TestRepmgrInitialization(t *testing.T) { t.Fatal(err) } - if config["node_name"] != "'127.0.0.1'" { - t.Fatalf("expected node_name to be '127.0.0.1', got %v", config["node_name"]) + if config["node_name"] != "'abcdefg1234567.vm.test-app.internal'" { + t.Fatalf("expected node_name to be 'abcdefg1234567.vm.test-app.internal', got %v", config["node_name"]) } if config["location"] != "'dev'" { @@ -122,6 +123,7 @@ func TestRepmgrNodeIDGeneration(t *testing.T) { DataDir: repmgrTestDirectory, PrivateIP: "127.0.0.1", + MachineID: "abcdefg1234567", Port: 5433, DatabaseName: "repmgr", Credentials: admin.Credential{ From f2a0e13818f85a42a96c8024155bc7ea29e7af41 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Tue, 6 Aug 2024 15:51:43 -0400 Subject: [PATCH 12/20] silence warning --- internal/privnet/sixpn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/privnet/sixpn.go b/internal/privnet/sixpn.go index 7c4120f5..4c5ce42f 100644 --- a/internal/privnet/sixpn.go +++ b/internal/privnet/sixpn.go @@ -75,7 +75,7 @@ func getResolver() *net.Resolver { nameserver = net.JoinHostPort(nameserver, "53") return &net.Resolver{ PreferGo: true, - Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + Dial: func(ctx context.Context, _network, _address string) (net.Conn, error) { d := net.Dialer{ Timeout: 1 * time.Second, } From 8db49fb86523a3c062aabe7f7d14aa50903e654b Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Tue, 6 Aug 2024 19:11:02 -0400 Subject: [PATCH 13/20] missed a few --- internal/flypg/node.go | 4 ++-- internal/flypg/repmgr.go | 2 +- internal/flypg/zombie.go | 5 +++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index f2ae0bec..579b1d67 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -297,10 +297,10 @@ func (n *Node) PostInit(ctx context.Context) error { } // This should never happen - if primary != n.PrivateIP { + if primary != n.RepMgr.machineIdToDNS(n.MachineID) { return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen", primary, - n.PrivateIP, + n.RepMgr.machineIdToDNS(n.MachineID), ) } diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index a79b41de..cc10d6d2 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -329,7 +329,7 @@ func (r *RepMgr) regenReplicationConf(ctx context.Context) error { // TODO: do we need -c? if _, err := utils.RunCmd(ctx, "postgres", "repmgr", "--replication-conf-only", - "-h", r.PrivateIP, + "-h", "", "-p", fmt.Sprint(r.Port), "-d", r.DatabaseName, "-U", r.Credentials.Username, diff --git a/internal/flypg/zombie.go b/internal/flypg/zombie.go index ca3a85d4..6380338e 100644 --- a/internal/flypg/zombie.go +++ b/internal/flypg/zombie.go @@ -84,7 +84,7 @@ type DNASample struct { func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASample, error) { sample := &DNASample{ - hostname: node.PrivateIP, + hostname: node.RepMgr.machineIdToDNS(node.MachineID), totalMembers: len(standbys) + 1, totalActive: 1, totalInactive: 0, @@ -117,7 +117,8 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp sample.totalActive++ // Record conflict when primary doesn't match. - if primary.Hostname != node.MachineID && primary.Hostname != node.PrivateIP { + // We're checking PrivateIP here for backwards compatibility + if primary.Hostname != node.RepMgr.machineIdToDNS(node.MachineID) && primary.Hostname != node.PrivateIP { sample.totalConflicts++ sample.conflictMap[primary.Hostname]++ } From b9dc8f13a27116ef74eb561b167d536af8f91510 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Tue, 6 Aug 2024 19:13:59 -0400 Subject: [PATCH 14/20] fix deepsource callout --- internal/privnet/sixpn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/privnet/sixpn.go b/internal/privnet/sixpn.go index 4c5ce42f..d67aea2a 100644 --- a/internal/privnet/sixpn.go +++ b/internal/privnet/sixpn.go @@ -75,7 +75,7 @@ func getResolver() *net.Resolver { nameserver = net.JoinHostPort(nameserver, "53") return &net.Resolver{ PreferGo: true, - Dial: func(ctx context.Context, _network, _address string) (net.Conn, error) { + Dial: func(ctx context.Context, _, _ string) (net.Conn, error) { d := net.Dialer{ Timeout: 1 * time.Second, } From aeb078fbcf9b88dda1322241662028cc561668f4 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Wed, 7 Aug 2024 00:39:24 -0400 Subject: [PATCH 15/20] make restart-repmgrd more resilient --- bin/restart-repmgrd | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/bin/restart-repmgrd b/bin/restart-repmgrd index 10ef6bef..9b3d4db0 100755 --- a/bin/restart-repmgrd +++ b/bin/restart-repmgrd @@ -1,3 +1,10 @@ #!/bin/bash -kill `cat /tmp/repmgrd.pid` +if [ -f /tmp/repmgrd.pid ]; then + PID=$(cat /tmp/repmgrd.pid) + + # Check if the process is running + if ps -p $PID > /dev/null 2>&1; then + kill $PID + fi +fi \ No newline at end of file From 6b392d6064f17a883f1c47aeec376b7198507f79 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Wed, 7 Aug 2024 00:39:43 -0400 Subject: [PATCH 16/20] make pg_unregister work with new names --- cmd/pg_unregister/main.go | 10 +++++++++- internal/flypg/repmgr.go | 31 +++++++++++++++++++++++++++++++ internal/privnet/sixpn.go | 6 +++--- 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/cmd/pg_unregister/main.go b/cmd/pg_unregister/main.go index 1f4c1ac1..ad381007 100644 --- a/cmd/pg_unregister/main.go +++ b/cmd/pg_unregister/main.go @@ -3,11 +3,13 @@ package main import ( "context" "encoding/base64" + "errors" "fmt" "os" "github.com/fly-apps/postgres-flex/internal/flypg" "github.com/fly-apps/postgres-flex/internal/utils" + "github.com/jackc/pgx/v5" ) func main() { @@ -40,7 +42,13 @@ func processUnregistration(ctx context.Context) error { defer func() { _ = conn.Close(ctx) }() member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes)) - if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + // for historical reasons, flyctl passes in the 6pn as the hostname + member, err = node.RepMgr.MemberBy6PN(ctx, conn, string(hostnameBytes)) + if err != nil { + return fmt.Errorf("failed to resolve member by hostname and 6pn: %s", err) + } + } else if err != nil { return fmt.Errorf("failed to resolve member: %s", err) } diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index cc10d6d2..61f91b22 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -449,6 +449,37 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri return &member, nil } +// MemberBy6PN returns a member by its 6PN address. +// This assumes the hostnames are hostnames +func (r *RepMgr) MemberBy6PN(ctx context.Context, pg *pgx.Conn, ip string) (*Member, error) { + members, err := r.Members(ctx, pg) + if err != nil { + return nil, err + } + + resolver := privnet.GetResolver() + var lastErr error + for _, member := range members { + ips, err := resolver.LookupIPAddr(ctx, member.Hostname) + if err != nil { + lastErr = err + continue + } + + for _, addr := range ips { + if addr.IP.String() == ip { + return &member, nil + } + } + } + + if lastErr != nil { + return nil, fmt.Errorf("no matches found for %s, and error encountered: %s", ip, lastErr) + } + + return nil, nil +} + func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) { machineIds, err := r.InRegionPeerMachines(ctx) if err != nil { diff --git a/internal/privnet/sixpn.go b/internal/privnet/sixpn.go index d67aea2a..5232b9b8 100644 --- a/internal/privnet/sixpn.go +++ b/internal/privnet/sixpn.go @@ -14,7 +14,7 @@ func AllPeers(ctx context.Context, appName string) ([]net.IPAddr, error) { } func Get6PN(ctx context.Context, hostname string) ([]net.IPAddr, error) { - r := getResolver() + r := GetResolver() ips, err := r.LookupIPAddr(ctx, hostname) if err != nil { @@ -47,7 +47,7 @@ type Machine struct { } func AllMachines(ctx context.Context, appName string) ([]Machine, error) { - r := getResolver() + r := GetResolver() txts, err := r.LookupTXT(ctx, fmt.Sprintf("vms.%s.internal", appName)) if err != nil { return nil, err @@ -67,7 +67,7 @@ func AllMachines(ctx context.Context, appName string) ([]Machine, error) { return machines, nil } -func getResolver() *net.Resolver { +func GetResolver() *net.Resolver { nameserver := os.Getenv("FLY_NAMESERVER") if nameserver == "" { nameserver = "fdaa::3" From 2473d7a801d9ef3dd403cb91678d660b7c4dd6a0 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Thu, 8 Aug 2024 15:15:34 -0400 Subject: [PATCH 17/20] Accept migration failures --- internal/flypg/node.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 579b1d67..70fc96cb 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -317,7 +317,8 @@ func (n *Node) PostInit(ctx context.Context) error { } case StandbyRoleName: if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil { - return fmt.Errorf("failed to migrate node name: %s", err) + log.Printf("[ERROR] failed to migrate node name: %s", err) + // We try to bring the standby up anyway } // Register existing standby to apply any configuration changes. From c21634d1d34a6db4299a0a267c9ca45019c2a2a6 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Tue, 13 Aug 2024 10:34:59 -0400 Subject: [PATCH 18/20] add missing panic --- internal/flypg/node.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 70fc96cb..28d832c0 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -284,6 +284,8 @@ func (n *Node) PostInit(ctx context.Context) error { if err := Quarantine(ctx, n, primary); err != nil { return fmt.Errorf("failed to quarantine failed primary: %s", err) } + + panic(err) } else if errors.Is(err, ErrZombieDiscovered) { log.Printf("[ERROR] The majority of registered members agree that '%s' is the real primary.\n", primary) // Turn member read-only From ee606a1c81b4813609c710e9093bf274b2560386 Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Tue, 13 Aug 2024 10:35:10 -0400 Subject: [PATCH 19/20] update pg_unregister comment --- cmd/pg_unregister/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/pg_unregister/main.go b/cmd/pg_unregister/main.go index 735e40d6..049ab09a 100644 --- a/cmd/pg_unregister/main.go +++ b/cmd/pg_unregister/main.go @@ -46,7 +46,9 @@ func processUnregistration(ctx context.Context) error { member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes)) if errors.Is(err, pgx.ErrNoRows) { - // for historical reasons, flyctl passes in the 6pn as the hostname + // for historical reasons, old versions of flyctl passes in the 6pn as the hostname + // most likely this won't work because the hostname does not resolve if the machine is stopped, + // but we try anyway member, err = node.RepMgr.MemberBy6PN(ctx, conn, string(hostnameBytes)) if err != nil { return fmt.Errorf("failed to resolve member by hostname and 6pn: %s", err) From bc979c660fbac22b5656d0547e00d5beca5a2f6a Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Tue, 13 Aug 2024 10:56:42 -0400 Subject: [PATCH 20/20] remove old comment --- internal/flypg/repmgr.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 61f91b22..f6687169 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -450,7 +450,6 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri } // MemberBy6PN returns a member by its 6PN address. -// This assumes the hostnames are hostnames func (r *RepMgr) MemberBy6PN(ctx context.Context, pg *pgx.Conn, ip string) (*Member, error) { members, err := r.Members(ctx, pg) if err != nil {