Skip to content

Commit 5bef5c1

Browse files
authored
Merge pull request #274 from bootjp/feature/fix-create-node
Removed unnecessary waitApply and fixed a bug that caused stale reads…
2 parents 01004f2 + 82e6185 commit 5bef5c1

File tree

5 files changed

+79
-58
lines changed

5 files changed

+79
-58
lines changed

adapter/grpc.go

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/bootjp/elastickv/store"
1212
"github.com/cockroachdb/errors"
1313
"github.com/spaolacci/murmur3"
14+
"google.golang.org/grpc"
15+
"google.golang.org/grpc/credentials/insecure"
1416
)
1517

1618
var _ pb.RawKVServer = (*GRPCServer)(nil)
@@ -38,16 +40,32 @@ func NewGRPCServer(store store.ScanStore, coordinate *kv.Coordinate) *GRPCServer
3840
}
3941

4042
func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error) {
41-
v, err := r.store.Get(ctx, req.Key)
42-
if err != nil {
43-
switch {
44-
case errors.Is(err, store.ErrKeyNotFound):
45-
return &pb.RawGetResponse{
46-
Value: nil,
47-
}, nil
48-
default:
49-
return nil, errors.WithStack(err)
43+
if r.coordinator.IsLeader() {
44+
v, err := r.store.Get(ctx, req.Key)
45+
if err != nil {
46+
switch {
47+
case errors.Is(err, store.ErrKeyNotFound):
48+
return &pb.RawGetResponse{
49+
Value: nil,
50+
}, nil
51+
default:
52+
return nil, errors.WithStack(err)
53+
}
5054
}
55+
r.log.InfoContext(ctx, "Get",
56+
slog.String("key", string(req.Key)),
57+
slog.String("value", string(v)))
58+
59+
return &pb.RawGetResponse{
60+
Value: v,
61+
}, nil
62+
}
63+
64+
v, err := r.tryLeaderGet(req.Key)
65+
if err != nil {
66+
return &pb.RawGetResponse{
67+
Value: nil,
68+
}, err
5169
}
5270

5371
r.log.InfoContext(ctx, "Get",
@@ -59,6 +77,30 @@ func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawG
5977
}, nil
6078
}
6179

80+
func (r GRPCServer) tryLeaderGet(key []byte) ([]byte, error) {
81+
addr := r.coordinator.RaftLeader()
82+
if addr == "" {
83+
return nil, ErrLeaderNotFound
84+
}
85+
86+
conn, err := grpc.NewClient(string(addr),
87+
grpc.WithTransportCredentials(insecure.NewCredentials()),
88+
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
89+
)
90+
if err != nil {
91+
return nil, errors.WithStack(err)
92+
}
93+
defer conn.Close()
94+
95+
cli := pb.NewRawKVClient(conn)
96+
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
97+
if err != nil {
98+
return nil, errors.WithStack(err)
99+
}
100+
101+
return resp.Value, nil
102+
}
103+
62104
func (r GRPCServer) RawPut(_ context.Context, req *pb.RawPutRequest) (*pb.RawPutResponse, error) {
63105
m, err := r.grpcTranscoder.RawPutToRequest(req)
64106
if err != nil {

adapter/internal.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Internal struct {
2626
var _ pb.InternalServer = (*Internal)(nil)
2727

2828
var ErrNotLeader = errors.New("not leader")
29+
var ErrLeaderNotFound = errors.New("leader not found")
2930

3031
func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.ForwardResponse, error) {
3132
if i.raft.State() != raft.Leader {

adapter/redis.go

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,29 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) {
116116
}
117117

118118
func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
119-
if v, ok := r.tryLeaderGet(cmd.Args[1]); ok {
120-
if v == nil {
121-
conn.WriteNull()
122-
} else {
123-
conn.WriteBulk(v)
119+
if r.coordinator.IsLeader() {
120+
v, err := r.store.Get(context.Background(), cmd.Args[1])
121+
if err != nil {
122+
switch {
123+
case errors.Is(err, store.ErrKeyNotFound):
124+
conn.WriteNull()
125+
default:
126+
conn.WriteError(err.Error())
127+
}
128+
return
124129
}
130+
conn.WriteBulk(v)
125131
return
126132
}
127133

128-
v, err := r.store.Get(context.Background(), cmd.Args[1])
134+
v, err := r.tryLeaderGet(cmd.Args[1])
129135
if err != nil {
130-
conn.WriteNull()
136+
switch {
137+
case errors.Is(err, store.ErrKeyNotFound):
138+
conn.WriteNull()
139+
default:
140+
conn.WriteError(err.Error())
141+
}
131142
return
132143
}
133144

@@ -205,29 +216,26 @@ func (r *RedisServer) keys(conn redcon.Conn, cmd redcon.Command) {
205216

206217
// tryLeaderGet proxies a GET to the current Raft leader, returning the value and
207218
// whether the proxy succeeded.
208-
func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, bool) {
209-
c, ok := r.coordinator.(*kv.Coordinate)
210-
if !ok || r.coordinator.IsLeader() {
211-
return nil, false
212-
}
213-
addr := c.RaftLeader()
219+
func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, error) {
220+
addr := r.coordinator.RaftLeader()
214221
if addr == "" {
215-
return nil, false
222+
return nil, ErrLeaderNotFound
216223
}
217224

218225
conn, err := grpc.NewClient(string(addr),
219226
grpc.WithTransportCredentials(insecure.NewCredentials()),
220227
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
221228
)
222229
if err != nil {
223-
return nil, false
230+
return nil, errors.WithStack(err)
224231
}
225232
defer conn.Close()
226233

227234
cli := pb.NewRawKVClient(conn)
228235
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
229236
if err != nil {
230-
return nil, false
237+
return nil, errors.WithStack(err)
231238
}
232-
return resp.Value, true
239+
240+
return resp.Value, nil
233241
}

adapter/redis_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func TestRedis_follower_redirect_node_set_get_deleted(t *testing.T) {
6868
assert.Equal(t, int64(1), res3.Val())
6969

7070
res4 := rdb.Get(ctx, string(key))
71-
assert.Equal(t, redis.Nil, res4.Err())
71+
assert.NoError(t, res4.Err())
7272
assert.Equal(t, "", res4.Val())
7373
}
7474

kv/coordinator.go

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package kv
22

33
import (
44
"context"
5-
"time"
65

76
pb "github.com/bootjp/elastickv/proto"
87
"github.com/cockroachdb/errors"
@@ -32,6 +31,7 @@ var _ Coordinator = (*Coordinate)(nil)
3231
type Coordinator interface {
3332
Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, error)
3433
IsLeader() bool
34+
RaftLeader() raft.ServerAddress
3535
}
3636

3737
func (c *Coordinate) Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, error) {
@@ -209,41 +209,11 @@ func (c *Coordinate) redirect(reqs *OperationGroup[OP]) (*CoordinateResponse, er
209209
return nil, ErrInvalidRequest
210210
}
211211

212-
if err := c.waitApplied(r.CommitIndex); err != nil {
213-
return nil, errors.WithStack(err)
214-
}
215-
216212
return &CoordinateResponse{
217213
CommitIndex: r.CommitIndex,
218214
}, nil
219215
}
220216

221-
// waitApplied blocks until the local raft FSM has applied at least the given
222-
// log index. This is used on followers after redirecting a write to the leader
223-
// so that immediately subsequent reads served locally observe the new value.
224-
func (c *Coordinate) waitApplied(idx uint64) error {
225-
// Fast path: already applied.
226-
if c.raft.AppliedIndex() >= idx {
227-
return nil
228-
}
229-
230-
const (
231-
pollInterval = 5 * time.Millisecond
232-
// Keep generous but finite to avoid hanging client calls.
233-
timeout = 2 * time.Second
234-
)
235-
236-
deadline := time.Now().Add(timeout)
237-
for time.Now().Before(deadline) {
238-
if c.raft.AppliedIndex() >= idx {
239-
return nil
240-
}
241-
time.Sleep(pollInterval)
242-
}
243-
244-
return errors.Wrapf(ErrInvalidRequest, "timeout waiting for apply: want=%d applied=%d", idx, c.raft.AppliedIndex())
245-
}
246-
247217
func (c *Coordinate) toForwardRequest(reqs []*pb.Request) *pb.ForwardRequest {
248218
if len(reqs) == 0 {
249219
return nil

0 commit comments

Comments
 (0)