Skip to content

Commit 234cd08

Browse files
authored
Merge pull request #32 from fly-apps/cluster-state
Playing around with state consolidation
2 parents 010395c + fb6cd4e commit 234cd08

File tree

8 files changed

+374
-221
lines changed

8 files changed

+374
-221
lines changed

cmd/event_handler/main.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,18 @@ func main() {
2929
switch *event {
3030
case "repmgrd_failover_promote", "standby_promote":
3131
// TODO - Need to figure out what to do when success == 0.
32-
consul, err := state.NewConsulClient()
32+
33+
cs, err := state.NewClusterState()
3334
if err != nil {
34-
fmt.Printf("failed to initialize consul client: %s", err)
35+
fmt.Printf("failed initialize cluster state store. %v", err)
3536
}
3637

37-
node, err := consul.Node(int32(*nodeID))
38+
member, err := cs.FindMember(int32(*nodeID))
3839
if err != nil {
39-
fmt.Printf("failed to find node: %s", err)
40+
fmt.Printf("failed to find member %v: %s", *nodeID, err)
4041
}
4142

42-
if err := consul.RegisterPrimary(string(node.Value)); err != nil {
43+
if err := cs.AssignPrimary(member.ID); err != nil {
4344
fmt.Printf("failed to register primary with consul: %s", err)
4445
}
4546

@@ -49,28 +50,32 @@ func main() {
4950
}
5051

5152
fmt.Println("Reconfiguring pgbouncer primary")
52-
if err := flypgNode.PGBouncer.ConfigurePrimary(context.TODO(), string(node.Value), true); err != nil {
53+
if err := flypgNode.PGBouncer.ConfigurePrimary(context.TODO(), member.Hostname, true); err != nil {
5354
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
5455
}
5556
case "standby_follow":
56-
consul, err := state.NewConsulClient()
57+
cs, err := state.NewClusterState()
5758
if err != nil {
58-
fmt.Printf("failed to initialize consul client: %s", err)
59+
fmt.Printf("failed initialize cluster state store. %v", err)
5960
}
60-
newNodeID, err := strconv.Atoi(*newPrimary)
61+
62+
newMemberID, err := strconv.Atoi(*newPrimary)
6163
if err != nil {
62-
fmt.Printf("failed to parse new node id: %s", err)
64+
fmt.Printf("failed to parse new member id: %s", err)
6365
}
64-
node, err := consul.Node(int32(newNodeID))
66+
67+
member, err := cs.FindMember(int32(newMemberID))
6568
if err != nil {
66-
fmt.Printf("failed to find node in consul: %s", err)
69+
fmt.Printf("failed to find member in consul: %s", err)
6770
}
71+
6872
flypgNode, err := flypg.NewNode()
6973
if err != nil {
70-
fmt.Printf("failed to reference node: %s\n", err)
74+
fmt.Printf("failed to reference member: %s\n", err)
7175
}
76+
7277
fmt.Println("Reconfiguring pgbouncer primary")
73-
if err := flypgNode.PGBouncer.ConfigurePrimary(context.TODO(), string(node.Value), true); err != nil {
78+
if err := flypgNode.PGBouncer.ConfigurePrimary(context.TODO(), member.Hostname, true); err != nil {
7479
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
7580
}
7681
default:

cmd/standby_cleaner/main.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"github.com/fly-apps/postgres-flex/pkg/flypg"
76
"os"
87
"time"
8+
9+
"github.com/fly-apps/postgres-flex/pkg/flypg"
10+
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
911
)
1012

1113
var Minute int64 = 60
@@ -47,12 +49,22 @@ func main() {
4749
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
4850
if err != nil {
4951
if time.Now().Unix()-seenAt[standby.Id] >= 10*Minute {
50-
err := flypgNode.RepMgr.UnregisterStandby(standby.Id)
52+
cs, err := state.NewClusterState()
53+
if err != nil {
54+
fmt.Printf("failed initialize cluster state store. %v", err)
55+
}
56+
57+
err = flypgNode.RepMgr.UnregisterStandby(standby.Id)
5158
if err != nil {
5259
fmt.Printf("Failed to unregister %d: %s", standby.Id, err)
5360
continue
5461
}
5562
delete(seenAt, standby.Id)
63+
64+
// Remove from Consul
65+
if err = cs.UnregisterMember(int32(standby.Id)); err != nil {
66+
fmt.Printf("Failed to unregister %d from consul: %s", standby.Id, err)
67+
}
5668
}
5769
} else {
5870
seenAt[standby.Id] = time.Now().Unix()

pkg/api/handle_admin.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6+
"net/http"
7+
"strings"
8+
69
"github.com/fly-apps/postgres-flex/pkg/flypg"
710
"github.com/fly-apps/postgres-flex/pkg/flypg/admin"
811
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
912
"golang.org/x/exp/slices"
10-
"net/http"
11-
"strings"
1213
)
1314

1415
func handleRole(w http.ResponseWriter, r *http.Request) {
@@ -118,7 +119,7 @@ func (s *Server) handleApplyConfig(w http.ResponseWriter, r *http.Request) {
118119
}
119120
defer close()
120121

121-
consul, err := state.NewConsulClient()
122+
consul, err := state.NewStore()
122123
if err != nil {
123124
renderErr(w, err)
124125
return

pkg/flypg/config.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"bufio"
55
"encoding/json"
66
"fmt"
7-
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
87
"os"
98
"strings"
9+
10+
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
1011
)
1112

1213
type ConfigMap map[string]interface{}
@@ -20,7 +21,7 @@ type Config interface {
2021
ConsulKey() string
2122
}
2223

23-
func WriteUserConfig(c Config, consul *state.ConsulClient) error {
24+
func WriteUserConfig(c Config, consul *state.Store) error {
2425
if c.UserConfig() != nil {
2526
if err := pushToConsul(c, consul); err != nil {
2627
return fmt.Errorf("failed to write to consul: %s", err)
@@ -34,7 +35,7 @@ func WriteUserConfig(c Config, consul *state.ConsulClient) error {
3435
return nil
3536
}
3637

37-
func SyncUserConfig(c Config, consul *state.ConsulClient) error {
38+
func SyncUserConfig(c Config, consul *state.Store) error {
3839
cfg, err := pullFromConsul(c, consul)
3940
if err != nil {
4041
return fmt.Errorf("failed to pull config from consul: %s", err)
@@ -51,7 +52,7 @@ func SyncUserConfig(c Config, consul *state.ConsulClient) error {
5152
return nil
5253
}
5354

54-
func pushToConsul(c Config, consul *state.ConsulClient) error {
55+
func pushToConsul(c Config, consul *state.Store) error {
5556
if c.UserConfig() == nil {
5657
return nil
5758
}
@@ -68,7 +69,7 @@ func pushToConsul(c Config, consul *state.ConsulClient) error {
6869
return nil
6970
}
7071

71-
func pullFromConsul(c Config, consul *state.ConsulClient) (ConfigMap, error) {
72+
func pullFromConsul(c Config, consul *state.Store) (ConfigMap, error) {
7273
configBytes, err := consul.PullUserConfig(c.ConsulKey())
7374
if err != nil {
7475
return nil, err

pkg/flypg/node.go

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ func (n *Node) Init(ctx context.Context) error {
114114
return err
115115
}
116116

117-
consul, err := state.NewConsulClient()
117+
cs, err := state.NewClusterState()
118118
if err != nil {
119-
return fmt.Errorf("failed to establish connection with consul: %s", err)
119+
return fmt.Errorf("failed initialize cluster state store. %v", err)
120120
}
121121

122-
primaryIP, err := consul.CurrentPrimary()
122+
primary, err := cs.PrimaryMember()
123123
if err != nil {
124124
return fmt.Errorf("failed to query current primary: %s", err)
125125
}
@@ -133,7 +133,7 @@ func (n *Node) Init(ctx context.Context) error {
133133
fmt.Printf("Failed to initialize repmgr: %s\n", err.Error())
134134
}
135135

136-
err = SyncUserConfig(&repmgr, consul)
136+
err = SyncUserConfig(&repmgr, cs.Store)
137137
if err != nil {
138138
fmt.Printf("Failed to sync user config from consul for repmgr: %s\n", err.Error())
139139
}
@@ -148,7 +148,7 @@ func (n *Node) Init(ctx context.Context) error {
148148
return err
149149
}
150150

151-
err = SyncUserConfig(&pgbouncer, consul)
151+
err = SyncUserConfig(&pgbouncer, cs.Store)
152152
if err != nil {
153153
fmt.Printf("Failed to sync user config from consul for pgbouncer: %s\n", err.Error())
154154
}
@@ -158,10 +158,8 @@ func (n *Node) Init(ctx context.Context) error {
158158
fmt.Printf("Failed to write config files for pgbouncer: %s\n", err.Error())
159159
}
160160

161-
switch primaryIP {
162-
case n.PrivateIP:
163-
// noop
164-
case "":
161+
switch {
162+
case primary == nil:
165163
// Initialize ourselves as the primary.
166164
fmt.Println("Initializing postgres")
167165
if err := n.initialize(); err != nil {
@@ -172,20 +170,22 @@ func (n *Node) Init(ctx context.Context) error {
172170
if err := n.setDefaultHBA(); err != nil {
173171
return fmt.Errorf("failed updating pg_hba.conf: %s", err)
174172
}
173+
case primary.Hostname == n.PrivateIP:
174+
// noop
175175
default:
176176
// If we are here we are either a standby, new node or primary coming back from the dead.
177177
clonePrimary := true
178178
if n.isInitialized() {
179179
// Attempt to resolve our role by querying the primary.
180-
remoteConn, err := repmgr.NewRemoteConnection(ctx, primaryIP)
180+
remoteConn, err := repmgr.NewRemoteConnection(ctx, primary.Hostname)
181181
if err != nil {
182182
return fmt.Errorf("failed to resolve my role according to the primary: %s", err)
183183
}
184184
defer remoteConn.Close(ctx)
185185

186186
role, err := repmgr.memberRoleByHostname(ctx, remoteConn, n.PrivateIP)
187187
if err != nil {
188-
return fmt.Errorf("failed to resolve role for %s: %s", primaryIP, err)
188+
return fmt.Errorf("failed to resolve role for %s: %s", primary.Hostname, err)
189189
}
190190

191191
fmt.Printf("My role is: %s\n", role)
@@ -196,7 +196,7 @@ func (n *Node) Init(ctx context.Context) error {
196196

197197
if clonePrimary {
198198
fmt.Println("Cloning from primary")
199-
if err := repmgr.clonePrimary(primaryIP); err != nil {
199+
if err := repmgr.clonePrimary(primary.Hostname); err != nil {
200200
return fmt.Errorf("failed to clone primary: %s", err)
201201
}
202202
}
@@ -205,7 +205,7 @@ func (n *Node) Init(ctx context.Context) error {
205205
fmt.Println("Resolving PG configuration settings.")
206206
PGConfig.Setup()
207207

208-
err = SyncUserConfig(PGConfig, consul)
208+
err = SyncUserConfig(PGConfig, cs.Store)
209209
if err != nil {
210210
fmt.Printf("Failed to sync user config from consul for pgbouncer: %s\n", err.Error())
211211
}
@@ -226,27 +226,21 @@ func (n *Node) PostInit(ctx context.Context) error {
226226
}
227227
defer conn.Close(ctx)
228228

229-
consul, err := state.NewConsulClient()
229+
cs, err := state.NewClusterState()
230230
if err != nil {
231-
return fmt.Errorf("failed to establish connection with consul: %s", err)
231+
return fmt.Errorf("failed initialize cluster state store. %v", err)
232232
}
233233

234-
primaryIP, err := consul.CurrentPrimary()
234+
primary, err := cs.PrimaryMember()
235235
if err != nil {
236236
return fmt.Errorf("failed to query current primary: %s", err)
237237
}
238238

239239
repmgr := n.RepMgr
240240
pgbouncer := n.PGBouncer
241241

242-
switch primaryIP {
243-
case n.PrivateIP:
244-
// Re-register the primary in order to pick up any changes made to the configuration file.
245-
fmt.Println("Updating primary record")
246-
if err := repmgr.registerPrimary(); err != nil {
247-
fmt.Printf("failed to register primary with repmgr: %s", err)
248-
}
249-
case "":
242+
switch {
243+
case primary == nil:
250244
// Check if we can be a primary
251245
if !repmgr.eligiblePrimary() {
252246
return fmt.Errorf("no primary to follow and can't configure self as primary because primary region is '%s' and we are in '%s'", os.Getenv("PRIMARY_REGION"), repmgr.Region)
@@ -258,17 +252,22 @@ func (n *Node) PostInit(ctx context.Context) error {
258252
}
259253

260254
// Setup repmgr database, extension, and register ourselves as the primary
261-
fmt.Println("Perform Repmgr setup")
255+
fmt.Println("Performing Repmgr setup")
262256
if err := repmgr.setup(ctx, conn); err != nil {
263257
fmt.Printf("failed to setup repmgr: %s\n", err)
264258
}
265259

266-
if err := consul.RegisterPrimary(n.PrivateIP); err != nil {
267-
return fmt.Errorf("failed to register primary with consul: %s", err)
260+
// Register primary member with consul
261+
fmt.Println("Registering member")
262+
if err := cs.RegisterMember(repmgr.ID, n.PrivateIP, repmgr.Region, true); err != nil {
263+
return fmt.Errorf("failed to register member with consul: %s", err)
268264
}
269265

270-
if err := consul.RegisterNode(repmgr.ID, n.PrivateIP); err != nil {
271-
return fmt.Errorf("failed to register member with consul: %s", err)
266+
case primary.Hostname == n.PrivateIP:
267+
// Re-register the primary in order to pick up any changes made to the configuration file.
268+
fmt.Println("Updating primary record")
269+
if err := repmgr.registerPrimary(); err != nil {
270+
fmt.Printf("failed to register primary with repmgr: %s", err)
272271
}
273272
default:
274273
// If we are here we are a new node, standby or a demoted primary who needs to be reconfigured as a standby.
@@ -301,19 +300,19 @@ func (n *Node) PostInit(ctx context.Context) error {
301300
fmt.Printf("failed to register standby: %s\n", err)
302301
}
303302

304-
fmt.Println("Registering Node with Consul")
305-
if err := consul.RegisterNode(repmgr.ID, n.PrivateIP); err != nil {
303+
// Register member with consul if it hasn't been already
304+
if err := cs.RegisterMember(repmgr.ID, n.PrivateIP, repmgr.Region, false); err != nil {
306305
return fmt.Errorf("failed to register member with consul: %s", err)
307306
}
308307
}
309308

310-
// Requery the primaryIP in case a new primary was assigned above.
311-
primaryIP, err = consul.CurrentPrimary()
309+
// Requery the primaryIP from consul in case the primary was assigned above.
310+
primary, err = cs.PrimaryMember()
312311
if err != nil {
313312
return fmt.Errorf("failed to query current primary: %s", err)
314313
}
315314

316-
if err := pgbouncer.ConfigurePrimary(ctx, primaryIP, true); err != nil {
315+
if err := pgbouncer.ConfigurePrimary(ctx, primary.Hostname, true); err != nil {
317316
return fmt.Errorf("failed to configure pgbouncer's primary: %s", err)
318317
}
319318

0 commit comments

Comments
 (0)