Skip to content

Commit 2f40d2d

Browse files
authored
Random cleanup (#110)
* Random cleanup * Cleanup * Revert arg add * bug fix * Ensure connections and files are being closed * Close connection
1 parent 043469e commit 2f40d2d

File tree

11 files changed

+175
-166
lines changed

11 files changed

+175
-166
lines changed

cmd/event_handler/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func main() {
7171
}
7272

7373
func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error {
74-
primary, err := node.EvaluateClusterState(ctx, conn)
74+
primary, err := flypg.PerformScreening(ctx, conn, node)
7575
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
7676
if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
7777
return fmt.Errorf("failed to quarantine failed primary: %s", err)

cmd/monitor/monitor_cluster_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func clusterStateMonitorTick(ctx context.Context, node *flypg.Node) error {
3737
return nil
3838
}
3939

40-
primary, err := node.EvaluateClusterState(ctx, conn)
40+
primary, err := flypg.PerformScreening(ctx, conn, node)
4141
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
4242
if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
4343
return fmt.Errorf("failed to quarantine failed primary: %s", err)

cmd/pg_unregister/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func main() {
3434
os.Exit(1)
3535
return
3636
}
37+
defer conn.Close(ctx)
3738

3839
member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes))
3940
if err != nil {

internal/flypg/admin/admin.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ func FindDatabase(ctx context.Context, pg *pgx.Conn, name string) (*DbInfo, erro
182182
`
183183

184184
sql = fmt.Sprintf(sql, name)
185-
186185
row := pg.QueryRow(ctx, sql)
187186

188187
db := new(DbInfo)

internal/flypg/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ func WriteConfigFiles(c Config) error {
102102
return err
103103
}
104104
defer internalFile.Close()
105+
105106
userFile, err := os.OpenFile(c.UserConfigFile(), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
106107
if err != nil {
107108
return err

internal/flypg/node.go

Lines changed: 29 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@ import (
55
"encoding/binary"
66
"errors"
77
"fmt"
8-
"io/ioutil"
98
"math/rand"
109
"net"
1110
"os"
1211
"os/exec"
13-
"os/user"
1412
"strconv"
1513
"time"
1614

@@ -119,7 +117,7 @@ func (n *Node) Init(ctx context.Context) error {
119117
return err
120118
}
121119

122-
// Initiate a restore
120+
// Check to see if we were just restored
123121
if os.Getenv("FLY_RESTORED_FROM") != "" {
124122
// Check to see if there's an active restore.
125123
active, err := isRestoreActive()
@@ -134,64 +132,10 @@ func (n *Node) Init(ctx context.Context) error {
134132
}
135133
}
136134

135+
// Verify whether we are a booting zombie.
137136
if ZombieLockExists() {
138-
fmt.Println("Zombie lock detected!")
139-
primaryStr, err := ReadZombieLock()
140-
if err != nil {
141-
return fmt.Errorf("failed to read zombie lock: %s", primaryStr)
142-
}
143-
144-
// If the zombie lock contains a hostname, it means we were able to resolve the real primary and
145-
// will attempt to rejoin it.
146-
if primaryStr != "" {
147-
ip := net.ParseIP(primaryStr)
148-
if ip == nil {
149-
return fmt.Errorf("zombie.lock file contains an invalid ipv6 address")
150-
}
151-
152-
conn, err := n.RepMgr.NewRemoteConnection(ctx, ip.String())
153-
if err != nil {
154-
return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", ip.String(), err)
155-
}
156-
defer conn.Close(ctx)
157-
158-
primary, err := n.RepMgr.PrimaryMember(ctx, conn)
159-
if err != nil {
160-
return fmt.Errorf("failed to confirm primary on recover target %s: %s", ip.String(), err)
161-
}
162-
163-
// Confirm that our rejoin target still identifies itself as the primary.
164-
if primary.Hostname != ip.String() {
165-
// Clear the zombie.lock file so we can attempt to re-resolve the correct primary.
166-
if err := RemoveZombieLock(); err != nil {
167-
return fmt.Errorf("failed to remove zombie lock: %s", err)
168-
}
169-
170-
return ErrZombieLockPrimaryMismatch
171-
}
172-
173-
// If the primary does not reside within our primary region, we cannot rejoin until it is.
174-
if primary.Region != n.PrimaryRegion {
175-
fmt.Printf("Primary region mismatch detected. The primary lives in '%s', while PRIMARY_REGION is set to '%s'\n", primary.Region, n.PrimaryRegion)
176-
return ErrZombieLockRegionMismatch
177-
}
178-
179-
if err := n.RepMgr.rejoinCluster(primary.Hostname); err != nil {
180-
return fmt.Errorf("failed to rejoin cluster: %s", err)
181-
}
182-
183-
// TODO - Wait for target cluster to register self as a standby.
184-
185-
if err := RemoveZombieLock(); err != nil {
186-
return fmt.Errorf("failed to remove zombie lock: %s", err)
187-
}
188-
189-
// Ensure the single instance created with the --force-rewind process is cleaned up properly.
190-
utils.RunCommand("pg_ctl -D /data/postgresql/ stop", "postgres")
191-
} else {
192-
// TODO - Provide link to documention on how to address this
193-
fmt.Println("Zombie lock file does not contain a hostname.")
194-
fmt.Println("This likely means that we were unable to determine who the real primary is.")
137+
if err := handleZombieLock(ctx, n); err != nil {
138+
return err
195139
}
196140
}
197141

@@ -205,18 +149,24 @@ func (n *Node) Init(ctx context.Context) error {
205149
return fmt.Errorf("failed initialize cluster state store: %s", err)
206150
}
207151

208-
if err := n.configure(ctx, store); err != nil {
209-
return fmt.Errorf("failed to configure node: %s", err)
152+
if err := n.configureInternal(store); err != nil {
153+
return fmt.Errorf("failed to set internal config: %s", err)
154+
}
155+
156+
if err := n.configureRepmgr(store); err != nil {
157+
return fmt.Errorf("failed to configure repmgr config: %s", err)
210158
}
211159

212160
if !n.isPGInitialized() {
213-
// Check to see if repmgr cluster has been initialized.
161+
// Check to see if cluster has already been initialized.
214162
clusterInitialized, err := store.IsInitializationFlagSet()
215163
if err != nil {
216164
return fmt.Errorf("failed to verify cluster state %s", err)
217165
}
218166

219167
if !clusterInitialized {
168+
fmt.Println("Provisioning primary")
169+
220170
// Initialize ourselves as the primary.
221171
if err := n.initializePG(); err != nil {
222172
return fmt.Errorf("failed to initialize postgres %s", err)
@@ -227,6 +177,8 @@ func (n *Node) Init(ctx context.Context) error {
227177
}
228178

229179
} else {
180+
fmt.Println("Provisioning standby")
181+
// Initialize ourselves as a standby
230182
cloneTarget, err := n.RepMgr.ResolveMemberOverDNS(ctx)
231183
if err != nil {
232184
return err
@@ -300,7 +252,8 @@ func (n *Node) PostInit(ctx context.Context) error {
300252
return fmt.Errorf("failed to register repmgr primary: %s", err)
301253
}
302254

303-
// Set flag within consul to let future new members that the cluster exists
255+
// Set initialization flag within consul so future members know they are joining
256+
// an existing cluster.
304257
if err := store.SetInitializationFlag(); err != nil {
305258
return fmt.Errorf("failed to register cluster with consul")
306259
}
@@ -326,7 +279,7 @@ func (n *Node) PostInit(ctx context.Context) error {
326279

327280
switch role {
328281
case PrimaryRoleName:
329-
primary, err := n.EvaluateClusterState(ctx, conn)
282+
primary, err := PerformScreening(ctx, conn, n)
330283
if errors.Is(err, ErrZombieDiagnosisUndecided) {
331284
fmt.Println("Unable to confirm that we are the true primary!")
332285
if err := Quarantine(ctx, conn, n, primary); err != nil {
@@ -383,13 +336,15 @@ func (n *Node) initializePG() error {
383336
return nil
384337
}
385338

386-
if err := ioutil.WriteFile("/data/.default_password", []byte(n.OperatorCredentials.Password), 0644); err != nil {
339+
if err := os.WriteFile("/data/.default_password", []byte(n.OperatorCredentials.Password), 0644); err != nil {
387340
return err
388341
}
389342
cmd := exec.Command("gosu", "postgres", "initdb", "--pgdata", n.DataDir, "--pwfile=/data/.default_password")
390-
_, err := cmd.CombinedOutput()
343+
if _, err := cmd.CombinedOutput(); err != nil {
344+
return err
345+
}
391346

392-
return err
347+
return nil
393348
}
394349

395350
func (n *Node) isPGInitialized() bool {
@@ -400,56 +355,6 @@ func (n *Node) isPGInitialized() bool {
400355
return true
401356
}
402357

403-
func (n *Node) configure(ctx context.Context, store *state.Store) error {
404-
if err := n.configureInternal(store); err != nil {
405-
return fmt.Errorf("failed to set internal config: %s", err)
406-
}
407-
408-
if err := n.configureRepmgr(store); err != nil {
409-
return fmt.Errorf("failed to configure repmgr config: %s", err)
410-
}
411-
412-
return nil
413-
}
414-
415-
func writeSSHKey() error {
416-
err := os.Mkdir("/data/.ssh", 0700)
417-
if err != nil && !os.IsExist(err) {
418-
return err
419-
}
420-
421-
key := os.Getenv("SSH_KEY")
422-
423-
keyFile, err := os.Create("/data/.ssh/id_rsa")
424-
if err != nil {
425-
return err
426-
}
427-
defer keyFile.Close()
428-
_, err = keyFile.Write([]byte(key))
429-
if err != nil {
430-
return err
431-
}
432-
433-
cert := os.Getenv("SSH_CERT")
434-
435-
certFile, err := os.Create("/data/.ssh/id_rsa-cert.pub")
436-
if err != nil {
437-
return err
438-
}
439-
defer certFile.Close()
440-
_, err = certFile.Write([]byte(cert))
441-
if err != nil {
442-
return err
443-
}
444-
445-
err = setSSHOwnership()
446-
if err != nil {
447-
return err
448-
}
449-
450-
return nil
451-
}
452-
453358
func (n *Node) configureInternal(store *state.Store) error {
454359
if err := n.InternalConfig.initialize(); err != nil {
455360
return fmt.Errorf("failed to initialize internal config: %s", err)
@@ -605,6 +510,7 @@ func (n *Node) setDefaultHBA() error {
605510
if err != nil {
606511
return err
607512
}
513+
defer file.Close()
608514

609515
for _, entry := range entries {
610516
str := fmt.Sprintf("%s %s %s %s %s\n", entry.Type, entry.Database, entry.User, entry.Address, entry.Method)
@@ -634,47 +540,17 @@ func openConnection(parentCtx context.Context, host string, database string, cre
634540
return pgx.ConnectConfig(ctx, conf)
635541
}
636542

637-
func setSSHOwnership() error {
638-
cmdStr := fmt.Sprintf("chmod 600 %s %s", "/data/.ssh/id_rsa", "/data/.ssh/id_rsa-cert.pub")
639-
cmd := exec.Command("sh", "-c", cmdStr)
640-
_, err := cmd.Output()
641-
return err
642-
}
643-
644543
func setDirOwnership() error {
645-
pgUser, err := user.Lookup("postgres")
646-
if err != nil {
647-
return err
648-
}
649-
pgUID, err := strconv.Atoi(pgUser.Uid)
650-
if err != nil {
651-
return err
652-
}
653-
pgGID, err := strconv.Atoi(pgUser.Gid)
544+
pgUID, pgGID, err := utils.SystemUserIDs("postgres")
654545
if err != nil {
655-
return err
546+
return fmt.Errorf("failed to find postgres user ids: %s", err)
656547
}
657548

658549
cmdStr := fmt.Sprintf("chown -R %d:%d %s", pgUID, pgGID, "/data")
659550
cmd := exec.Command("sh", "-c", cmdStr)
660-
_, err = cmd.Output()
661-
return err
662-
}
663-
664-
func (n *Node) EvaluateClusterState(ctx context.Context, conn *pgx.Conn) (string, error) {
665-
standbys, err := n.RepMgr.StandbyMembers(ctx, conn)
666-
if err != nil {
667-
if !errors.Is(err, pgx.ErrNoRows) {
668-
return "", fmt.Errorf("failed to query standbys")
669-
}
670-
}
671-
672-
sample, err := TakeDNASample(ctx, n, standbys)
673-
if err != nil {
674-
return "", fmt.Errorf("failed to evaluate cluster data: %s", err)
551+
if _, err = cmd.Output(); err != nil {
552+
return err
675553
}
676554

677-
fmt.Println(DNASampleString(sample))
678-
679-
return ZombieDiagnosis(sample)
555+
return nil
680556
}

internal/flypg/readonly.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {
5858
if err != nil {
5959
return fmt.Errorf("failed to establish connection: %s", err)
6060
}
61+
defer conn.Close(ctx)
6162

6263
members, err := n.RepMgr.Members(ctx, conn)
6364
if err != nil {

internal/flypg/repmgr.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,16 @@ func (r *RepMgr) NewRemoteConnection(ctx context.Context, hostname string) (*pgx
9797
func (r *RepMgr) initialize() error {
9898
r.setDefaults()
9999

100-
f, err := os.OpenFile(r.ConfigPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
100+
file, err := os.OpenFile(r.ConfigPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
101101
if err != nil {
102102
return nil
103103
}
104-
defer f.Close()
104+
defer file.Close()
105105

106106
entries := []string{"include 'repmgr.internal.conf'\n", "include 'repmgr.user.conf'\n"}
107107

108108
for _, entry := range entries {
109-
if _, err := f.WriteString(entry); err != nil {
109+
if _, err := file.WriteString(entry); err != nil {
110110
return fmt.Errorf("failed append configuration entry: %s", err)
111111
}
112112
}
@@ -238,6 +238,7 @@ func (r *RepMgr) writePasswdConf() error {
238238
if err != nil {
239239
return err
240240
}
241+
defer file.Close()
241242

242243
entries := []string{
243244
fmt.Sprintf("*:*:*:%s:%s", r.Credentials.Username, r.Credentials.Password),
@@ -268,6 +269,7 @@ func (r *RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) {
268269
if err != nil {
269270
return nil, err
270271
}
272+
defer rows.Close()
271273

272274
var members []Member
273275

@@ -316,7 +318,6 @@ func (r *RepMgr) StandbyMembers(ctx context.Context, conn *pgx.Conn) ([]Member,
316318
}
317319

318320
var standbys []Member
319-
320321
for _, member := range members {
321322
if member.Role == StandbyRoleName {
322323
standbys = append(standbys, member)
@@ -391,7 +392,6 @@ func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) {
391392

392393
func (r *RepMgr) InRegionPeerIPs(ctx context.Context) ([]net.IPAddr, error) {
393394
targets := fmt.Sprintf("%s.%s", r.PrimaryRegion, r.AppName)
394-
395395
return privnet.AllPeers(ctx, targets)
396396
}
397397

0 commit comments

Comments
 (0)