Skip to content

Commit bc6fbb5

Browse files
committed
Refactor RedisServer for leader-based key management
1 parent f6e2f77 commit bc6fbb5

File tree

6 files changed

+107
-52
lines changed

6 files changed

+107
-52
lines changed

.github/workflows/jepsen-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
with:
2222
go-version-file: 'go.mod'
2323
- name: Install netcat
24-
run: sudo apt-get update && sudo apt-get install -y netcat
24+
run: sudo apt-get update && sudo apt-get install -y netcat-openbsd
2525
- name: Install Leiningen
2626
run: |
2727
curl -L https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein > ~/lein

adapter/redis.go

Lines changed: 81 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
pb "github.com/bootjp/elastickv/proto"
1515
"github.com/bootjp/elastickv/store"
1616
"github.com/cockroachdb/errors"
17+
"github.com/hashicorp/raft"
18+
"github.com/redis/go-redis/v9"
1719
"github.com/tidwall/redcon"
1820
"google.golang.org/grpc"
1921
"google.golang.org/grpc/credentials/insecure"
@@ -39,6 +41,8 @@ type RedisServer struct {
3941
store store.ScanStore
4042
coordinator kv.Coordinator
4143
redisTranscoder *redisTranscoder
44+
// TODO manage membership from raft log
45+
leaderRedis map[raft.ServerAddress]string
4246

4347
route map[string]func(conn redcon.Conn, cmd redcon.Command)
4448
}
@@ -68,12 +72,13 @@ type redisResult struct {
6872
err error
6973
}
7074

71-
func NewRedisServer(listen net.Listener, store store.ScanStore, coordinate *kv.Coordinate) *RedisServer {
75+
func NewRedisServer(listen net.Listener, store store.ScanStore, coordinate *kv.Coordinate, leaderRedis map[raft.ServerAddress]string) *RedisServer {
7276
r := &RedisServer{
7377
listen: listen,
7478
store: store,
7579
coordinator: coordinate,
7680
redisTranscoder: newRedisTranscoder(),
81+
leaderRedis: leaderRedis,
7782
}
7883

7984
r.route = map[string]func(conn redcon.Conn, cmd redcon.Command){
@@ -243,42 +248,87 @@ func (r *RedisServer) exists(conn redcon.Conn, cmd redcon.Command) {
243248
}
244249

245250
func (r *RedisServer) keys(conn redcon.Conn, cmd redcon.Command) {
251+
pattern := cmd.Args[1]
246252

247-
// If an asterisk (*) is not included, the match will be exact,
248-
// so check if the key exists.
249-
if !bytes.Contains(cmd.Args[1], []byte("*")) {
250-
res, err := r.store.Exists(context.Background(), cmd.Args[1])
253+
if r.coordinator.IsLeader() {
254+
keys, err := r.localKeys(pattern)
251255
if err != nil {
252256
conn.WriteError(err.Error())
253257
return
254258
}
255-
if res {
256-
conn.WriteArray(1)
257-
conn.WriteBulk(cmd.Args[1])
258-
return
259+
conn.WriteArray(len(keys))
260+
for _, k := range keys {
261+
conn.WriteBulk(k)
259262
}
260-
conn.WriteArray(0)
261263
return
262264
}
263265

266+
keys, err := r.proxyKeys(pattern)
267+
if err != nil {
268+
conn.WriteError(err.Error())
269+
return
270+
}
271+
272+
conn.WriteArray(len(keys))
273+
for _, k := range keys {
274+
conn.WriteBulkString(k)
275+
}
276+
}
277+
278+
func (r *RedisServer) localKeys(pattern []byte) ([][]byte, error) {
279+
// If an asterisk (*) is not included, the match will be exact,
280+
// so check if the key exists.
281+
if !bytes.Contains(pattern, []byte("*")) {
282+
res, err := r.store.Exists(context.Background(), pattern)
283+
if err != nil {
284+
return nil, errors.WithStack(err)
285+
}
286+
if res {
287+
return [][]byte{bytes.Clone(pattern)}, nil
288+
}
289+
return [][]byte{}, nil
290+
}
291+
264292
var start []byte
265293
switch {
266-
case bytes.Equal(cmd.Args[1], []byte("*")):
294+
case bytes.Equal(pattern, []byte("*")):
267295
start = nil
268296
default:
269-
start = bytes.ReplaceAll(cmd.Args[1], []byte("*"), nil)
297+
start = bytes.ReplaceAll(pattern, []byte("*"), nil)
270298
}
271299

272300
keys, err := r.store.Scan(context.Background(), start, nil, math.MaxInt)
273301
if err != nil {
274-
conn.WriteError(err.Error())
275-
return
302+
return nil, errors.WithStack(err)
276303
}
277304

278-
conn.WriteArray(len(keys))
305+
out := make([][]byte, 0, len(keys))
279306
for _, kvPair := range keys {
280-
conn.WriteBulk(kvPair.Key)
307+
out = append(out, kvPair.Key)
308+
}
309+
return out, nil
310+
}
311+
312+
func (r *RedisServer) proxyKeys(pattern []byte) ([]string, error) {
313+
leader := r.coordinator.RaftLeader()
314+
if leader == "" {
315+
return nil, ErrLeaderNotFound
281316
}
317+
318+
leaderAddr, ok := r.leaderRedis[leader]
319+
if !ok || leaderAddr == "" {
320+
return nil, errors.WithStack(errors.Newf("leader redis address unknown for %s", leader))
321+
}
322+
323+
cli := redis.NewClient(&redis.Options{
324+
Addr: leaderAddr,
325+
})
326+
defer func() {
327+
_ = cli.Close()
328+
}()
329+
330+
keys, err := cli.Keys(context.Background(), string(pattern)).Result()
331+
return keys, errors.WithStack(err)
282332
}
283333

284334
// MULTI/EXEC/DISCARD handling
@@ -577,34 +627,6 @@ func encodeList(list []string) ([]byte, error) {
577627
return b, errors.WithStack(err)
578628
}
579629

580-
func (r *RedisServer) pushList(key []byte, values [][]byte) (int, error) {
581-
current, err := r.getValue(key)
582-
if err != nil && !errors.Is(err, store.ErrKeyNotFound) {
583-
return 0, errors.WithStack(err)
584-
}
585-
586-
list, err := decodeList(current)
587-
if err != nil {
588-
return 0, err
589-
}
590-
for _, v := range values {
591-
list = append(list, string(v))
592-
}
593-
enc, err := encodeList(list)
594-
if err != nil {
595-
return 0, err
596-
}
597-
598-
req, err := r.redisTranscoder.SetToRequest(key, enc)
599-
if err != nil {
600-
return 0, err
601-
}
602-
if _, err := r.coordinator.Dispatch(req); err != nil {
603-
return 0, errors.WithStack(err)
604-
}
605-
return len(list), nil
606-
}
607-
608630
func clampRange(start, end, length int) (int, int) {
609631
if start < 0 {
610632
start = length + start
@@ -683,12 +705,25 @@ func (r *RedisServer) getValue(key []byte) ([]byte, error) {
683705
}
684706

685707
func (r *RedisServer) rpush(conn redcon.Conn, cmd redcon.Command) {
686-
length, err := r.pushList(cmd.Args[1], cmd.Args[2:])
708+
results, err := r.runTransaction([]redcon.Command{cmd})
687709
if err != nil {
688710
conn.WriteError(err.Error())
689711
return
690712
}
691-
conn.WriteInt(length)
713+
if len(results) != 1 {
714+
conn.WriteError("ERR internal error: rpush should have one result")
715+
return
716+
}
717+
res := results[0]
718+
if res.err != nil {
719+
conn.WriteError(res.err.Error())
720+
return
721+
}
722+
if res.typ != resultInt {
723+
conn.WriteError("ERR internal error: rpush result should be an integer")
724+
return
725+
}
726+
conn.WriteInt64(res.integer)
692727
}
693728

694729
func (r *RedisServer) lrange(conn redcon.Conn, cmd redcon.Command) {

adapter/test_util.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,11 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
258258
var nodes []Node
259259
var lc net.ListenConfig
260260

261+
leaderRedis := make(map[raft.ServerAddress]string, n)
262+
for i := 0; i < n; i++ {
263+
leaderRedis[raft.ServerAddress(ports[i].raftAddress)] = ports[i].redisAddress
264+
}
265+
261266
for i := 0; i < n; i++ {
262267
st := store.NewRbMemoryStore()
263268
trxSt := store.NewMemoryStoreDefaultTTL()
@@ -297,7 +302,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
297302

298303
l, err := lc.Listen(ctx, "tcp", port.redisAddress)
299304
require.NoError(t, err)
300-
rd := NewRedisServer(l, st, coordinator)
305+
rd := NewRedisServer(l, st, coordinator, leaderRedis)
301306
go func(server *RedisServer) {
302307
assert.NoError(t, server.Run())
303308
}(rd)

cmd/server/demo.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ func run(eg *errgroup.Group) error {
7575
cfg.Servers = append(cfg.Servers, server)
7676
}
7777

78+
leaderRedis := make(map[raft.ServerAddress]string)
79+
for i := 0; i < 3; i++ {
80+
leaderRedis[raft.ServerAddress(grpcAdders[i])] = redisAdders[i]
81+
}
82+
7883
for i := 0; i < 3; i++ {
7984
st := store.NewRbMemoryStore()
8085
trxSt := store.NewMemoryStoreDefaultTTL()
@@ -109,7 +114,7 @@ func run(eg *errgroup.Group) error {
109114
if err != nil {
110115
return errors.WithStack(err)
111116
}
112-
rd := adapter.NewRedisServer(l, st, coordinator)
117+
rd := adapter.NewRedisServer(l, st, coordinator, leaderRedis)
113118

114119
eg.Go(func() error {
115120
return errors.WithStack(rd.Run())

kv/coordinator.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,14 @@ func (c *Coordinate) redirect(reqs *OperationGroup[OP]) (*CoordinateResponse, er
196196
cli := pb.NewInternalClient(conn)
197197

198198
var requests []*pb.Request
199-
for _, req := range reqs.Elems {
200-
requests = append(requests, c.toRawRequest(req))
199+
if reqs.IsTxn {
200+
for _, req := range reqs.Elems {
201+
requests = append(requests, c.toTxnRequests(req)...)
202+
}
203+
} else {
204+
for _, req := range reqs.Elems {
205+
requests = append(requests, c.toRawRequest(req))
206+
}
201207
}
202208

203209
r, err := cli.Forward(ctx, c.toForwardRequest(requests))

main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,16 @@ func main() {
7979
log.Fatalf("failed to listen: %v", err)
8080
}
8181

82+
leaderRedis := map[raft.ServerAddress]string{
83+
raft.ServerAddress(*myAddr): *redisAddr,
84+
}
85+
8286
eg := errgroup.Group{}
8387
eg.Go(func() error {
8488
return errors.WithStack(gs.Serve(grpcSock))
8589
})
8690
eg.Go(func() error {
87-
return errors.WithStack(adapter.NewRedisServer(redisL, s, coordinate).Run())
91+
return errors.WithStack(adapter.NewRedisServer(redisL, s, coordinate, leaderRedis).Run())
8892
})
8993

9094
err = eg.Wait()

0 commit comments

Comments
 (0)