Skip to content

Commit fabea60

Browse files
committed
Removed unnecessary waitApply and fixed a bug that caused stale reads on gRPC routes.
1 parent fbef078 commit fabea60

File tree

5 files changed

+80
-59
lines changed

5 files changed

+80
-59
lines changed

adapter/grpc.go

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/bootjp/elastickv/store"
1212
"github.com/cockroachdb/errors"
1313
"github.com/spaolacci/murmur3"
14+
"google.golang.org/grpc"
15+
"google.golang.org/grpc/credentials/insecure"
1416
)
1517

1618
var _ pb.RawKVServer = (*GRPCServer)(nil)
@@ -38,16 +40,33 @@ func NewGRPCServer(store store.ScanStore, coordinate *kv.Coordinate) *GRPCServer
3840
}
3941

4042
func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error) {
41-
v, err := r.store.Get(ctx, req.Key)
42-
if err != nil {
43-
switch {
44-
case errors.Is(err, store.ErrKeyNotFound):
45-
return &pb.RawGetResponse{
46-
Value: nil,
47-
}, nil
48-
default:
49-
return nil, errors.WithStack(err)
43+
44+
if r.coordinator.IsLeader() {
45+
v, err := r.store.Get(ctx, req.Key)
46+
if err != nil {
47+
switch {
48+
case errors.Is(err, store.ErrKeyNotFound):
49+
return &pb.RawGetResponse{
50+
Value: nil,
51+
}, nil
52+
default:
53+
return nil, errors.WithStack(err)
54+
}
5055
}
56+
r.log.InfoContext(ctx, "Get",
57+
slog.String("key", string(req.Key)),
58+
slog.String("value", string(v)))
59+
60+
return &pb.RawGetResponse{
61+
Value: v,
62+
}, nil
63+
}
64+
65+
v, err := r.tryLeaderGet(req.Key)
66+
if err != nil {
67+
return &pb.RawGetResponse{
68+
Value: nil,
69+
}, err
5170
}
5271

5372
r.log.InfoContext(ctx, "Get",
@@ -59,6 +78,30 @@ func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawG
5978
}, nil
6079
}
6180

81+
func (r GRPCServer) tryLeaderGet(key []byte) ([]byte, error) {
82+
addr := r.coordinator.RaftLeader()
83+
if addr == "" {
84+
return nil, ErrLeaderNotFound
85+
}
86+
87+
conn, err := grpc.NewClient(string(addr),
88+
grpc.WithTransportCredentials(insecure.NewCredentials()),
89+
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
90+
)
91+
if err != nil {
92+
return nil, errors.WithStack(err)
93+
}
94+
defer conn.Close()
95+
96+
cli := pb.NewRawKVClient(conn)
97+
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
98+
if err != nil {
99+
return nil, errors.WithStack(err)
100+
}
101+
102+
return resp.Value, nil
103+
}
104+
62105
func (r GRPCServer) RawPut(_ context.Context, req *pb.RawPutRequest) (*pb.RawPutResponse, error) {
63106
m, err := r.grpcTranscoder.RawPutToRequest(req)
64107
if err != nil {

adapter/internal.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Internal struct {
2626
var _ pb.InternalServer = (*Internal)(nil)
2727

2828
var ErrNotLeader = errors.New("not leader")
29+
var ErrLeaderNotFound = errors.New("leader not found")
2930

3031
func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.ForwardResponse, error) {
3132
if i.raft.State() != raft.Leader {

adapter/redis.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,28 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) {
116116
}
117117

118118
func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
119-
if v, ok := r.tryLeaderGet(cmd.Args[1]); ok {
120-
if v == nil {
121-
conn.WriteNull()
122-
} else {
123-
conn.WriteBulk(v)
119+
if r.coordinator.IsLeader() {
120+
v, err := r.store.Get(context.Background(), cmd.Args[1])
121+
if err != nil {
122+
switch {
123+
case errors.Is(err, store.ErrKeyNotFound):
124+
conn.WriteNull()
125+
default:
126+
conn.WriteError(err.Error())
127+
}
128+
return
124129
}
125-
return
130+
conn.WriteBulk(v)
126131
}
127132

128-
v, err := r.store.Get(context.Background(), cmd.Args[1])
133+
v, err := r.tryLeaderGet(cmd.Args[1])
129134
if err != nil {
130-
conn.WriteNull()
135+
switch {
136+
case errors.Is(err, store.ErrKeyNotFound):
137+
conn.WriteNull()
138+
default:
139+
conn.WriteError(err.Error())
140+
}
131141
return
132142
}
133143

@@ -205,29 +215,26 @@ func (r *RedisServer) keys(conn redcon.Conn, cmd redcon.Command) {
205215

206216
// tryLeaderGet proxies a GET to the current Raft leader, returning the value and
207217
// whether the proxy succeeded.
208-
func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, bool) {
209-
c, ok := r.coordinator.(*kv.Coordinate)
210-
if !ok || r.coordinator.IsLeader() {
211-
return nil, false
212-
}
213-
addr := c.RaftLeader()
218+
func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, error) {
219+
addr := r.coordinator.RaftLeader()
214220
if addr == "" {
215-
return nil, false
221+
return nil, ErrLeaderNotFound
216222
}
217223

218224
conn, err := grpc.NewClient(string(addr),
219225
grpc.WithTransportCredentials(insecure.NewCredentials()),
220226
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
221227
)
222228
if err != nil {
223-
return nil, false
229+
return nil, errors.WithStack(err)
224230
}
225231
defer conn.Close()
226232

227233
cli := pb.NewRawKVClient(conn)
228234
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
229235
if err != nil {
230-
return nil, false
236+
return nil, errors.WithStack(err)
231237
}
232-
return resp.Value, true
238+
239+
return resp.Value, nil
233240
}

adapter/redis_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func TestRedis_follower_redirect_node_set_get_deleted(t *testing.T) {
6868
assert.Equal(t, int64(1), res3.Val())
6969

7070
res4 := rdb.Get(ctx, string(key))
71-
assert.Equal(t, redis.Nil, res4.Err())
71+
assert.NoError(t, res4.Err())
7272
assert.Equal(t, "", res4.Val())
7373
}
7474

kv/coordinator.go

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package kv
22

33
import (
44
"context"
5-
"time"
65

76
pb "github.com/bootjp/elastickv/proto"
87
"github.com/cockroachdb/errors"
@@ -32,6 +31,7 @@ var _ Coordinator = (*Coordinate)(nil)
3231
type Coordinator interface {
3332
Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, error)
3433
IsLeader() bool
34+
RaftLeader() raft.ServerAddress
3535
}
3636

3737
func (c *Coordinate) Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, error) {
@@ -209,41 +209,11 @@ func (c *Coordinate) redirect(reqs *OperationGroup[OP]) (*CoordinateResponse, er
209209
return nil, ErrInvalidRequest
210210
}
211211

212-
if err := c.waitApplied(r.CommitIndex); err != nil {
213-
return nil, errors.WithStack(err)
214-
}
215-
216212
return &CoordinateResponse{
217213
CommitIndex: r.CommitIndex,
218214
}, nil
219215
}
220216

221-
// waitApplied blocks until the local raft FSM has applied at least the given
222-
// log index. This is used on followers after redirecting a write to the leader
223-
// so that immediately subsequent reads served locally observe the new value.
224-
func (c *Coordinate) waitApplied(idx uint64) error {
225-
// Fast path: already applied.
226-
if c.raft.AppliedIndex() >= idx {
227-
return nil
228-
}
229-
230-
const (
231-
pollInterval = 5 * time.Millisecond
232-
// Keep generous but finite to avoid hanging client calls.
233-
timeout = 2 * time.Second
234-
)
235-
236-
deadline := time.Now().Add(timeout)
237-
for time.Now().Before(deadline) {
238-
if c.raft.AppliedIndex() >= idx {
239-
return nil
240-
}
241-
time.Sleep(pollInterval)
242-
}
243-
244-
return errors.Wrapf(ErrInvalidRequest, "timeout waiting for apply: want=%d applied=%d", idx, c.raft.AppliedIndex())
245-
}
246-
247217
func (c *Coordinate) toForwardRequest(reqs []*pb.Request) *pb.ForwardRequest {
248218
if len(reqs) == 0 {
249219
return nil

0 commit comments

Comments
 (0)