Skip to content

Commit 967556f

Browse files
committed
Refactor: change flag names to camelCase and improve demo cluster startup
1 parent 3a52b71 commit 967556f

File tree

3 files changed

+177
-39
lines changed

3 files changed

+177
-39
lines changed

cmd/server/demo.go

Lines changed: 167 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@ package main
33
import (
44
"context"
55
"flag"
6+
"fmt"
67
"log/slog"
78
"net"
89
"os"
910
"path/filepath"
1011
"strings"
12+
"time"
1113

1214
"github.com/Jille/raft-grpc-leader-rpc/leaderhealth"
1315
transport "github.com/Jille/raft-grpc-transport"
1416
"github.com/Jille/raftadmin"
17+
raftadminpb "github.com/Jille/raftadmin/proto"
1518
"github.com/bootjp/elastickv/adapter"
1619
"github.com/bootjp/elastickv/kv"
1720
pb "github.com/bootjp/elastickv/proto"
@@ -27,17 +30,20 @@ import (
2730

2831
var (
2932
address = flag.String("address", ":50051", "gRPC/Raft address")
30-
redisAddress = flag.String("redis_address", ":6379", "Redis address")
31-
raftID = flag.String("raft_id", "", "Raft ID")
32-
raftDataDir = flag.String("raft_data_dir", "/var/lib/elastickv", "Raft data directory")
33-
raftBootstrap = flag.Bool("raft_bootstrap", false, "Bootstrap cluster")
34-
raftRedisMap = flag.String("raft_redis_map", "", "Map of Raft address to Redis address (raftAddr=redisAddr,...)")
33+
redisAddress = flag.String("redisAddress", ":6379", "Redis address")
34+
raftID = flag.String("raftId", "", "Raft ID")
35+
raftDataDir = flag.String("raftDataDir", "/var/lib/elastickv", "Raft data directory")
36+
raftBootstrap = flag.Bool("raftBootstrap", false, "Bootstrap cluster")
37+
raftRedisMap = flag.String("raftRedisMap", "", "Map of Raft address to Redis address (raftAddr=redisAddr,...)")
3538
)
3639

3740
const (
3841
raftSnapshotsRetain = 2
3942
kvParts = 2
4043
defaultFileMode = 0755
44+
joinRetries = 20
45+
joinWait = 3 * time.Second
46+
joinRetryInterval = 1 * time.Second
4147
)
4248

4349
func init() {
@@ -46,24 +52,156 @@ func init() {
4652
})))
4753
}
4854

55+
type config struct {
56+
address string
57+
redisAddress string
58+
raftID string
59+
raftDataDir string
60+
raftBootstrap bool
61+
raftRedisMap string
62+
}
63+
4964
func main() {
5065
flag.Parse()
51-
if *raftID == "" {
52-
slog.Error("raft_id is required")
53-
os.Exit(1)
54-
}
5566

5667
eg := &errgroup.Group{}
57-
if err := run(eg); err != nil {
58-
slog.Error(err.Error())
59-
os.Exit(1)
68+
69+
if *raftID != "" {
70+
// Single node mode
71+
cfg := config{
72+
address: *address,
73+
redisAddress: *redisAddress,
74+
raftID: *raftID,
75+
raftDataDir: *raftDataDir,
76+
raftBootstrap: *raftBootstrap,
77+
raftRedisMap: *raftRedisMap,
78+
}
79+
if err := run(eg, cfg); err != nil {
80+
slog.Error(err.Error())
81+
os.Exit(1)
82+
}
83+
} else {
84+
// Demo cluster mode (3 nodes)
85+
slog.Info("Starting demo cluster with 3 nodes...")
86+
nodes := []config{
87+
{
88+
address: "127.0.0.1:50051",
89+
redisAddress: "127.0.0.1:63791",
90+
raftID: "n1",
91+
raftDataDir: "", // In-memory
92+
raftBootstrap: true,
93+
},
94+
{
95+
address: "127.0.0.1:50052",
96+
redisAddress: "127.0.0.1:63792",
97+
raftID: "n2",
98+
raftDataDir: "",
99+
raftBootstrap: false,
100+
},
101+
{
102+
address: "127.0.0.1:50053",
103+
redisAddress: "127.0.0.1:63793",
104+
raftID: "n3",
105+
raftDataDir: "",
106+
raftBootstrap: false,
107+
},
108+
}
109+
110+
// Build raftRedisMap string
111+
var mapParts []string
112+
for _, n := range nodes {
113+
mapParts = append(mapParts, n.address+"="+n.redisAddress)
114+
}
115+
raftRedisMapStr := strings.Join(mapParts, ",")
116+
117+
for _, n := range nodes {
118+
n.raftRedisMap = raftRedisMapStr
119+
cfg := n // capture loop variable
120+
if err := run(eg, cfg); err != nil {
121+
slog.Error(err.Error())
122+
os.Exit(1)
123+
}
124+
}
125+
126+
// Wait for n1 to be ready then join others?
127+
// Actually, standard bootstrap expects a configuration.
128+
// If we only bootstrap n1, we need to join n2 and n3.
129+
// For simplicity in this demo, let's bootstrap n1 with just n1, and have n2/n3 join.
130+
// Or better: bootstrap n1 with {n1, n2, n3}.
131+
// But run() logic for bootstrap only adds *raftID to configuration.
132+
133+
// Let's modify bootstrapping logic in run() slightly or just rely on manual join?
134+
// The original demo likely used raftadmin to join or predefined bootstrap.
135+
// Since we can't easily change run() logic too much without breaking Jepsen,
136+
// let's use a separate goroutine to join n2/n3 to n1 after a delay.
137+
138+
eg.Go(func() error {
139+
// Wait a bit for n1 to start
140+
// This is hacky but sufficient for a demo
141+
// Better would be to wait for gRPC readiness
142+
// But standard 'sleep' is unavailable here without import time
143+
// We can use a simple retry loop to join.
144+
145+
// Actually, let's keep it simple: just start them.
146+
// If n1 bootstraps as a single node cluster, n2 and n3 won't be part of it automatically.
147+
// We need to issue 'add_voter' commands.
148+
// Let's rely on an external script or add a helper here?
149+
150+
// For this specific demo restoration, we'll assume the external script might handle joins
151+
// OR we check if the CI script does it.
152+
// The CI script just waits for ports. It runs `lein run ...` which assumes a cluster.
153+
// If the cluster isn't formed, the tests might fail.
154+
// BUT, looking at the previous demo.go (if I could), it probably did the joins.
155+
156+
// Let's add a joiner goroutine.
157+
return joinCluster(nodes)
158+
})
60159
}
160+
61161
if err := eg.Wait(); err != nil {
62162
slog.Error(err.Error())
63163
os.Exit(1)
64164
}
65165
}
66166

167+
func joinCluster(nodes []config) error {
168+
leader := nodes[0]
169+
// Give servers some time to start
170+
time.Sleep(joinWait)
171+
172+
// Connect to leader
173+
conn, err := grpc.NewClient(leader.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
174+
if err != nil {
175+
return fmt.Errorf("failed to dial leader: %w", err)
176+
}
177+
defer conn.Close()
178+
client := raftadminpb.NewRaftAdminClient(conn)
179+
180+
ctx := context.Background()
181+
for _, n := range nodes[1:] {
182+
var joined bool
183+
for i := 0; i < joinRetries; i++ {
184+
slog.Info("Attempting to join node", "id", n.raftID, "address", n.address)
185+
_, err := client.AddVoter(ctx, &raftadminpb.AddVoterRequest{
186+
Id: n.raftID,
187+
Address: n.address,
188+
PreviousIndex: 0,
189+
})
190+
if err == nil {
191+
slog.Info("Successfully joined node", "id", n.raftID)
192+
joined = true
193+
break
194+
}
195+
slog.Warn("Failed to join node, retrying...", "id", n.raftID, "err", err)
196+
time.Sleep(joinRetryInterval)
197+
}
198+
if !joined {
199+
return fmt.Errorf("failed to join node %s after retries", n.raftID)
200+
}
201+
}
202+
return nil
203+
}
204+
67205
func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotStore, error) {
68206
if dir == "" {
69207
return raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), nil
@@ -99,10 +237,10 @@ func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordina
99237
return s
100238
}
101239

102-
func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, coordinator *kv.Coordinate, addr string) (*adapter.RedisServer, error) {
240+
func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, coordinator *kv.Coordinate, addr, redisAddr, raftRedisMapStr string) (*adapter.RedisServer, error) {
103241
leaderRedis := make(map[raft.ServerAddress]string)
104-
if *raftRedisMap != "" {
105-
parts := strings.Split(*raftRedisMap, ",")
242+
if raftRedisMapStr != "" {
243+
parts := strings.Split(raftRedisMapStr, ",")
106244
for _, part := range parts {
107245
kv := strings.Split(part, "=")
108246
if len(kv) == kvParts {
@@ -111,20 +249,20 @@ func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, co
111249
}
112250
}
113251
// Ensure self is in map (override if present)
114-
leaderRedis[raft.ServerAddress(addr)] = *redisAddress
252+
leaderRedis[raft.ServerAddress(addr)] = redisAddr
115253

116-
l, err := lc.Listen(ctx, "tcp", *redisAddress)
254+
l, err := lc.Listen(ctx, "tcp", redisAddr)
117255
if err != nil {
118256
return nil, errors.WithStack(err)
119257
}
120258
return adapter.NewRedisServer(l, st, coordinator, leaderRedis), nil
121259
}
122260

123-
func run(eg *errgroup.Group) error {
261+
func run(eg *errgroup.Group, cfg config) error {
124262
ctx := context.Background()
125263
var lc net.ListenConfig
126264

127-
ldb, sdb, fss, err := setupStorage(*raftDataDir)
265+
ldb, sdb, fss, err := setupStorage(cfg.raftDataDir)
128266
if err != nil {
129267
return err
130268
}
@@ -134,15 +272,15 @@ func run(eg *errgroup.Group) error {
134272

135273
// Config
136274
c := raft.DefaultConfig()
137-
c.LocalID = raft.ServerID(*raftID)
275+
c.LocalID = raft.ServerID(cfg.raftID)
138276
c.Logger = hclog.New(&hclog.LoggerOptions{
139-
Name: "raft-" + *raftID,
277+
Name: "raft-" + cfg.raftID,
140278
JSONFormat: true,
141279
Level: hclog.Info,
142280
})
143281

144282
// Transport
145-
tm := transport.New(raft.ServerAddress(*address), []grpc.DialOption{
283+
tm := transport.New(raft.ServerAddress(cfg.address), []grpc.DialOption{
146284
grpc.WithTransportCredentials(insecure.NewCredentials()),
147285
})
148286

@@ -151,13 +289,13 @@ func run(eg *errgroup.Group) error {
151289
return errors.WithStack(err)
152290
}
153291

154-
if *raftBootstrap {
292+
if cfg.raftBootstrap {
155293
cfg := raft.Configuration{
156294
Servers: []raft.Server{
157295
{
158296
Suffrage: raft.Voter,
159-
ID: raft.ServerID(*raftID),
160-
Address: raft.ServerAddress(*address),
297+
ID: raft.ServerID(cfg.raftID),
298+
Address: raft.ServerAddress(cfg.address),
161299
},
162300
},
163301
}
@@ -172,23 +310,23 @@ func run(eg *errgroup.Group) error {
172310

173311
s := setupGRPC(r, st, tm, coordinator)
174312

175-
grpcSock, err := lc.Listen(ctx, "tcp", *address)
313+
grpcSock, err := lc.Listen(ctx, "tcp", cfg.address)
176314
if err != nil {
177315
return errors.WithStack(err)
178316
}
179317

180318
eg.Go(func() error {
181-
slog.Info("Starting gRPC server", "address", *address)
319+
slog.Info("Starting gRPC server", "address", cfg.address)
182320
return errors.WithStack(s.Serve(grpcSock))
183321
})
184322

185-
rd, err := setupRedis(ctx, lc, st, coordinator, *address)
323+
rd, err := setupRedis(ctx, lc, st, coordinator, cfg.address, cfg.redisAddress, cfg.raftRedisMap)
186324
if err != nil {
187325
return err
188326
}
189327

190328
eg.Go(func() error {
191-
slog.Info("Starting Redis server", "address", *redisAddress)
329+
slog.Info("Starting Redis server", "address", cfg.redisAddress)
192330
return errors.WithStack(rd.Run())
193331
})
194332

jepsen/src/elastickv/db.clj

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@
8787
bootstrap? (= node bootstrap-node)
8888
args (cond-> [server-bin
8989
"--address" grpc
90-
"--redis_address" redis
91-
"--raft_id" (name node)
92-
"--raft_data_dir" data-dir
93-
"--raft_redis_map" raft-redis-map]
94-
bootstrap? (conj "--raft_bootstrap"))]
90+
"--redisAddress" redis
91+
"--raftId" (name node)
92+
"--raftDataDir" data-dir
93+
"--raftRedisMap" raft-redis-map]
94+
bootstrap? (conj "--raftBootstrap"))]
9595
(c/on node
9696
(c/su
9797
(c/exec :mkdir :-p data-dir)

main.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,17 @@ const (
3434

3535
var (
3636
myAddr = flag.String("address", "localhost:50051", "TCP host+port for this node")
37-
redisAddr = flag.String("redis_address", "localhost:6379", "TCP host+port for redis")
38-
raftId = flag.String("raft_id", "", "Node id used by Raft")
39-
raftDir = flag.String("raft_data_dir", "data/", "Raft data dir")
40-
raftBootstrap = flag.Bool("raft_bootstrap", false, "Whether to bootstrap the Raft cluster")
37+
redisAddr = flag.String("redisAddress", "localhost:6379", "TCP host+port for redis")
38+
raftId = flag.String("raftId", "", "Node id used by Raft")
39+
raftDir = flag.String("raftDataDir", "data/", "Raft data dir")
40+
raftBootstrap = flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
4141
)
4242

4343
func main() {
4444
flag.Parse()
4545

4646
if *raftId == "" {
47-
log.Fatalf("flag --raft_id is required")
47+
log.Fatalf("flag --raftId is required")
4848
}
4949

5050
ctx := context.Background()

0 commit comments

Comments
 (0)