Skip to content

Commit 21f810a

Browse files
author
Ben Iofel
committed
clean up the diff
1 parent 8778bd5 commit 21f810a

File tree

5 files changed

+33
-53
lines changed

5 files changed

+33
-53
lines changed

cmd/monitor/monitor_dead_members.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,13 @@ func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int
7171
}
7272

7373
for _, voter := range votingMembers {
74-
sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.NodeName)
74+
sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.Hostname)
7575
if err != nil {
7676
// TODO - Verify the exception that's getting thrown.
7777
if time.Since(seenAt[voter.ID]) >= deadMemberRemovalThreshold {
78-
log.Printf("Removing dead member: %s\n", voter.NodeName)
78+
log.Printf("Removing dead member: %s\n", voter.Hostname)
7979
if err := node.RepMgr.UnregisterMember(voter); err != nil {
80-
log.Printf("failed to unregister member %s: %v", voter.NodeName, err)
80+
log.Printf("failed to unregister member %s: %v", voter.Hostname, err)
8181
continue
8282
}
8383
delete(seenAt, voter.ID)

internal/flypg/node.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func (n *Node) Init(ctx context.Context) error {
192192
return fmt.Errorf("failed to resolve member over dns: %s", err)
193193
}
194194

195-
if err := n.RepMgr.clonePrimary(cloneTarget.NodeName); err != nil {
195+
if err := n.RepMgr.clonePrimary(cloneTarget.Hostname); err != nil {
196196
// Clean-up the directory so it can be retried.
197197
if rErr := os.Remove(n.DataDir); rErr != nil {
198198
log.Printf("[ERROR] failed to cleanup postgresql dir after clone error: %s\n", rErr)
@@ -270,11 +270,9 @@ func (n *Node) PostInit(ctx context.Context) error {
270270
return fmt.Errorf("failed to resolve member role: %s", err)
271271
}
272272

273-
// Restart repmgrd in the event the IP changes for an already registered node.
273+
// Restart repmgrd in the event the machine ID changes for an already registered node.
274274
// This can happen if the underlying volume is moved to a different node.
275-
// TODO - this isn't an IP anymore
276-
//daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)
277-
daemonRestartRequired := false
275+
daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)
278276

279277
switch member.Role {
280278
case PrimaryRoleName:
@@ -333,7 +331,7 @@ func (n *Node) PostInit(ctx context.Context) error {
333331
}
334332

335333
// Register existing witness to apply any configuration changes.
336-
if err := n.RepMgr.registerWitness(primary.NodeName); err != nil {
334+
if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
337335
return fmt.Errorf("failed to register existing witness: %s", err)
338336
}
339337
default:
@@ -415,7 +413,7 @@ func (n *Node) PostInit(ctx context.Context) error {
415413
return fmt.Errorf("failed to resolve primary member: %s", err)
416414
}
417415

418-
if err := n.RepMgr.registerWitness(primary.NodeName); err != nil {
416+
if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
419417
return fmt.Errorf("failed to register witness: %s", err)
420418
}
421419
} else {
@@ -546,7 +544,7 @@ func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) e
546544
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
547545
}
548546

549-
primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName)
547+
primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname)
550548
if err != nil {
551549
return fmt.Errorf("failed to establish connection to primary: %s", err)
552550
}

internal/flypg/readonly.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,31 +70,31 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {
7070

7171
for _, member := range members {
7272
if member.Role == PrimaryRoleName {
73-
endpoint := fmt.Sprintf("http://%s:5500/%s", member.NodeName, target)
73+
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target)
7474
resp, err := http.Get(endpoint)
7575
if err != nil {
76-
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.NodeName, err)
76+
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err)
7777
continue
7878
}
7979
defer func() { _ = resp.Body.Close() }()
8080

8181
if resp.StatusCode > 299 {
82-
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.NodeName, resp.StatusCode)
82+
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.Hostname, resp.StatusCode)
8383
}
8484
}
8585
}
8686

8787
for _, member := range members {
88-
endpoint := fmt.Sprintf("http://%s:5500/%s", member.NodeName, RestartHaproxyEndpoint)
88+
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint)
8989
resp, err := http.Get(endpoint)
9090
if err != nil {
91-
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.NodeName, err)
91+
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err)
9292
continue
9393
}
9494
defer func() { _ = resp.Body.Close() }()
9595

9696
if resp.StatusCode > 299 {
97-
log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.NodeName, resp.StatusCode)
97+
log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.Hostname, resp.StatusCode)
9898
}
9999
}
100100

internal/flypg/repmgr.go

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -100,19 +100,7 @@ func (r *RepMgr) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) {
100100
return openConnection(ctx, host, r.DatabaseName, r.Credentials)
101101
}
102102

103-
// target - can be an IP address, machine ID in the current app, or other hostname
104-
func (r *RepMgr) NewRemoteConnection(ctx context.Context, target string) (*pgx.Conn, error) {
105-
var hostname string
106-
107-
ip := net.ParseIP(target)
108-
if ip != nil {
109-
hostname = target
110-
} else if len(target) == 14 {
111-
hostname = fmt.Sprintf("%s.vm.%s.internal", target, r.AppName)
112-
} else {
113-
hostname = target
114-
}
115-
103+
func (r *RepMgr) NewRemoteConnection(ctx context.Context, hostname string) (*pgx.Conn, error) {
116104
host := net.JoinHostPort(hostname, strconv.Itoa(r.Port))
117105
return openConnection(ctx, host, r.DatabaseName, r.Credentials)
118106
}
@@ -265,7 +253,7 @@ func (r *RepMgr) registerStandby(restartDaemon bool) error {
265253
}
266254

267255
func (r *RepMgr) registerWitness(primaryHostname string) error {
268-
cmdStr := fmt.Sprintf("repmgr witness register -f %s -h %s -F", r.ConfigPath, primaryHostname) // TODO
256+
cmdStr := fmt.Sprintf("repmgr witness register -f %s -h %s -F", r.ConfigPath, primaryHostname)
269257
_, err := utils.RunCommand(cmdStr, "postgres")
270258

271259
return err
@@ -291,7 +279,7 @@ func (*RepMgr) restartDaemon() error {
291279
}
292280

293281
func (r *RepMgr) daemonRestartRequired(m *Member) bool {
294-
return m.NodeName != r.PrivateIP // TODO
282+
return m.Hostname != r.MachineID
295283
}
296284

297285
func (r *RepMgr) unregisterWitness(id int) error {
@@ -301,10 +289,10 @@ func (r *RepMgr) unregisterWitness(id int) error {
301289
return err
302290
}
303291

304-
func (r *RepMgr) rejoinCluster(nodeName string) error {
292+
func (r *RepMgr) rejoinCluster(hostname string) error {
305293
cmdStr := fmt.Sprintf("repmgr -f %s node rejoin -h %s -p %d -U %s -d %s --force-rewind --no-wait",
306294
r.ConfigPath,
307-
nodeName,
295+
hostname,
308296
r.Port,
309297
r.Credentials.Username,
310298
r.DatabaseName,
@@ -341,7 +329,7 @@ func (r *RepMgr) regenReplicationConf(ctx context.Context) error {
341329
// TODO: do we need -c?
342330
if _, err := utils.RunCmd(ctx, "postgres",
343331
"repmgr", "--replication-conf-only",
344-
"-h", r.PrivateIP, // TODO: should this be the hostname, or even just localhost
332+
"-h", r.PrivateIP,
345333
"-p", fmt.Sprint(r.Port),
346334
"-d", r.DatabaseName,
347335
"-U", r.Credentials.Username,
@@ -354,7 +342,7 @@ func (r *RepMgr) regenReplicationConf(ctx context.Context) error {
354342

355343
type Member struct {
356344
ID int
357-
NodeName string
345+
Hostname string
358346
Active bool
359347
Region string
360348
Role string
@@ -371,7 +359,7 @@ func (*RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) {
371359
var members []Member
372360
for rows.Next() {
373361
var member Member
374-
if err := rows.Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role); err != nil {
362+
if err := rows.Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role); err != nil {
375363
return nil, err
376364
}
377365

@@ -404,7 +392,7 @@ func (r *RepMgr) Member(ctx context.Context, conn *pgx.Conn) (*Member, error) {
404392
func (*RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error) {
405393
var member Member
406394
sql := "select node_id, node_name, location, active, type from repmgr.nodes where type = 'primary' and active = true;"
407-
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role)
395+
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role)
408396
if err != nil {
409397
return nil, err
410398
}
@@ -441,7 +429,7 @@ func (*RepMgr) MemberByID(ctx context.Context, pg *pgx.Conn, id int) (*Member, e
441429
var member Member
442430
sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_id = %d;", id)
443431

444-
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role)
432+
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role)
445433
if err != nil {
446434
return nil, err
447435
}
@@ -453,7 +441,7 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri
453441
var member Member
454442
sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_name = '%s';", hostname)
455443

456-
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role)
444+
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role)
457445
if err != nil {
458446
return nil, err
459447
}

internal/flypg/zombie.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,9 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp
9494

9595
for _, standby := range standbys {
9696
// Check for connectivity
97-
mConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.NodeName)
97+
mConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname)
9898
if err != nil {
99-
log.Printf("[WARN] Failed to connect to %s\n", standby.NodeName)
99+
log.Printf("[WARN] Failed to connect to %s\n", standby.Hostname)
100100
sample.totalInactive++
101101
continue
102102
}
@@ -105,7 +105,7 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp
105105
// Verify the primary
106106
primary, err := node.RepMgr.PrimaryMember(ctx, mConn)
107107
if err != nil {
108-
log.Printf("[WARN] Failed to resolve primary from standby %s\n", standby.NodeName)
108+
log.Printf("[WARN] Failed to resolve primary from standby %s\n", standby.Hostname)
109109
sample.totalInactive++
110110
continue
111111
}
@@ -117,9 +117,9 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp
117117
sample.totalActive++
118118

119119
// Record conflict when primary doesn't match.
120-
if primary.NodeName != node.MachineID && primary.NodeName != node.PrivateIP {
120+
if primary.Hostname != node.MachineID && primary.Hostname != node.PrivateIP {
121121
sample.totalConflicts++
122-
sample.conflictMap[primary.NodeName]++
122+
sample.conflictMap[primary.Hostname]++
123123
}
124124
}
125125

@@ -198,11 +198,6 @@ func handleZombieLock(ctx context.Context, n *Node) error {
198198
// If the zombie lock contains a hostname, it means we were able to
199199
// resolve the real primary and will attempt to rejoin it.
200200
if primaryStr != "" {
201-
//ip := net.ParseIP(primaryStr)
202-
//if ip == nil {
203-
// return fmt.Errorf("zombie.lock file contains an invalid ipv6 address")
204-
//}
205-
206201
conn, err := n.RepMgr.NewRemoteConnection(ctx, primaryStr)
207202
if err != nil {
208203
return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", primaryStr, err)
@@ -215,8 +210,7 @@ func handleZombieLock(ctx context.Context, n *Node) error {
215210
}
216211

217212
// Confirm that our rejoin target still identifies itself as the primary.
218-
// TODO - machine IDs don't change
219-
if primary.NodeName != primaryStr {
213+
if primary.Hostname != primaryStr {
220214
// Clear the zombie.lock file so we can attempt to re-resolve the correct primary.
221215
if err := RemoveZombieLock(); err != nil {
222216
return fmt.Errorf("failed to remove zombie lock: %s", err)
@@ -231,7 +225,7 @@ func handleZombieLock(ctx context.Context, n *Node) error {
231225
return ErrZombieLockRegionMismatch
232226
}
233227

234-
if err := n.RepMgr.rejoinCluster(primary.NodeName); err != nil {
228+
if err := n.RepMgr.rejoinCluster(primary.Hostname); err != nil {
235229
return fmt.Errorf("failed to rejoin cluster: %s", err)
236230
}
237231

0 commit comments

Comments
 (0)