Skip to content

Commit 01004f2

Browse files
authored
Merge pull request #271 from bootjp/feature/fix-create-node
fix stale read by followers
2 parents 7b45c7e + fbef078 commit 01004f2

File tree

7 files changed

+127
-22
lines changed

7 files changed

+127
-22
lines changed

adapter/grpc.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,12 @@ func (r GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetRespons
140140

141141
v, err := r.store.Get(ctx, req.Key)
142142
if err != nil {
143-
return nil, errors.WithStack(err)
143+
switch {
144+
case errors.Is(err, store.ErrKeyNotFound):
145+
return &pb.GetResponse{Value: nil}, nil
146+
default:
147+
return nil, errors.WithStack(err)
148+
}
144149
}
145150

146151
r.log.InfoContext(ctx, "Get",

adapter/redis.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ import (
88
"strings"
99

1010
"github.com/bootjp/elastickv/kv"
11+
pb "github.com/bootjp/elastickv/proto"
1112
"github.com/bootjp/elastickv/store"
1213
"github.com/cockroachdb/errors"
1314
"github.com/tidwall/redcon"
15+
"google.golang.org/grpc"
16+
"google.golang.org/grpc/credentials/insecure"
1417
)
1518

1619
//nolint:mnd
@@ -113,6 +116,15 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) {
113116
}
114117

115118
func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
119+
if v, ok := r.tryLeaderGet(cmd.Args[1]); ok {
120+
if v == nil {
121+
conn.WriteNull()
122+
} else {
123+
conn.WriteBulk(v)
124+
}
125+
return
126+
}
127+
116128
v, err := r.store.Get(context.Background(), cmd.Args[1])
117129
if err != nil {
118130
conn.WriteNull()
@@ -190,3 +202,32 @@ func (r *RedisServer) keys(conn redcon.Conn, cmd redcon.Command) {
190202
conn.WriteBulk(kvPair.Key)
191203
}
192204
}
205+
206+
// tryLeaderGet proxies a GET to the current Raft leader, returning the value and
207+
// whether the proxy succeeded.
208+
func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, bool) {
209+
c, ok := r.coordinator.(*kv.Coordinate)
210+
if !ok || r.coordinator.IsLeader() {
211+
return nil, false
212+
}
213+
addr := c.RaftLeader()
214+
if addr == "" {
215+
return nil, false
216+
}
217+
218+
conn, err := grpc.NewClient(string(addr),
219+
grpc.WithTransportCredentials(insecure.NewCredentials()),
220+
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
221+
)
222+
if err != nil {
223+
return nil, false
224+
}
225+
defer conn.Close()
226+
227+
cli := pb.NewRawKVClient(conn)
228+
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
229+
if err != nil {
230+
return nil, false
231+
}
232+
return resp.Value, true
233+
}

kv/coordinator.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kv
22

33
import (
44
"context"
5+
"time"
56

67
pb "github.com/bootjp/elastickv/proto"
78
"github.com/cockroachdb/errors"
@@ -49,6 +50,12 @@ func (c *Coordinate) IsLeader() bool {
4950
return c.raft.State() == raft.Leader
5051
}
5152

53+
// RaftLeader returns the current leader's address as known by this node.
54+
func (c *Coordinate) RaftLeader() raft.ServerAddress {
55+
addr, _ := c.raft.LeaderWithID()
56+
return addr
57+
}
58+
5259
func (c *Coordinate) dispatchTxn(reqs []*Elem[OP]) (*CoordinateResponse, error) {
5360
var logs []*pb.Request
5461
for _, req := range reqs {
@@ -202,11 +209,41 @@ func (c *Coordinate) redirect(reqs *OperationGroup[OP]) (*CoordinateResponse, er
202209
return nil, ErrInvalidRequest
203210
}
204211

212+
if err := c.waitApplied(r.CommitIndex); err != nil {
213+
return nil, errors.WithStack(err)
214+
}
215+
205216
return &CoordinateResponse{
206217
CommitIndex: r.CommitIndex,
207218
}, nil
208219
}
209220

221+
// waitApplied blocks until the local raft FSM has applied at least the given
222+
// log index. This is used on followers after redirecting a write to the leader
223+
// so that immediately subsequent reads served locally observe the new value.
224+
func (c *Coordinate) waitApplied(idx uint64) error {
225+
// Fast path: already applied.
226+
if c.raft.AppliedIndex() >= idx {
227+
return nil
228+
}
229+
230+
const (
231+
pollInterval = 5 * time.Millisecond
232+
// Keep generous but finite to avoid hanging client calls.
233+
timeout = 2 * time.Second
234+
)
235+
236+
deadline := time.Now().Add(timeout)
237+
for time.Now().Before(deadline) {
238+
if c.raft.AppliedIndex() >= idx {
239+
return nil
240+
}
241+
time.Sleep(pollInterval)
242+
}
243+
244+
return errors.Wrapf(ErrInvalidRequest, "timeout waiting for apply: want=%d applied=%d", idx, c.raft.AppliedIndex())
245+
}
246+
210247
func (c *Coordinate) toForwardRequest(reqs []*pb.Request) *pb.ForwardRequest {
211248
if len(reqs) == 0 {
212249
return nil

kv/transaction.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,30 @@ type TransactionResponse struct {
2828
CommitIndex uint64
2929
}
3030

31+
// applyAndBarrier submits a log entry, waits for it to be applied, and
32+
// surfaces both Raft transport errors and errors returned from FSM.Apply.
33+
// HashiCorp Raft delivers FSM errors via ApplyFuture.Response(), not Error(),
34+
// so we must inspect the response to avoid silently treating failed writes as
35+
// successes.
36+
func applyAndBarrier(r *raft.Raft, b []byte) (uint64, error) {
37+
af := r.Apply(b, time.Second)
38+
if err := af.Error(); err != nil {
39+
return 0, errors.WithStack(err)
40+
}
41+
42+
if resp := af.Response(); resp != nil {
43+
if err, ok := resp.(error); ok && err != nil {
44+
return 0, errors.WithStack(err)
45+
}
46+
}
47+
48+
if f := r.Barrier(time.Second); f.Error() != nil {
49+
return 0, errors.WithStack(f.Error())
50+
}
51+
52+
return af.Index(), nil
53+
}
54+
3155
func (t *TransactionManager) Commit(reqs []*pb.Request) (*TransactionResponse, error) {
3256
commitIndex, err := func() (uint64, error) {
3357
commitIndex := uint64(0)
@@ -37,15 +61,11 @@ func (t *TransactionManager) Commit(reqs []*pb.Request) (*TransactionResponse, e
3761
return 0, errors.WithStack(err)
3862
}
3963

40-
af := t.raft.Apply(b, time.Second)
41-
if af.Error() != nil {
42-
return 0, errors.WithStack(af.Error())
43-
}
44-
f := t.raft.Barrier(time.Second)
45-
if f.Error() != nil {
46-
return 0, errors.WithStack(f.Error())
64+
idx, err := applyAndBarrier(t.raft, b)
65+
if err != nil {
66+
return 0, err
4767
}
48-
commitIndex = af.Index()
68+
commitIndex = idx
4969
}
5070

5171
return commitIndex, nil
@@ -83,15 +103,11 @@ func (t *TransactionManager) Abort(reqs []*pb.Request) (*TransactionResponse, er
83103
return nil, errors.WithStack(err)
84104
}
85105

86-
af := t.raft.Apply(b, time.Second)
87-
if af.Error() != nil {
88-
return nil, errors.WithStack(af.Error())
89-
}
90-
f := t.raft.Barrier(time.Second)
91-
if f.Error() != nil {
92-
return nil, errors.WithStack(f.Error())
106+
idx, err := applyAndBarrier(t.raft, b)
107+
if err != nil {
108+
return nil, err
93109
}
94-
commitIndex = af.Index()
110+
commitIndex = idx
95111
}
96112

97113
return &TransactionResponse{

store/bolt_store.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,14 @@ func (s *boltStore) Get(ctx context.Context, key []byte) ([]byte, error) {
6161
return nil
6262
})
6363

64-
return v, errors.WithStack(err)
64+
if err != nil {
65+
return nil, errors.WithStack(err)
66+
}
67+
if v == nil {
68+
return nil, ErrKeyNotFound
69+
}
70+
71+
return v, nil
6572
}
6673

6774
func (s *boltStore) Scan(ctx context.Context, start []byte, end []byte, limit int) ([]*KVPair, error) {

store/bolt_store_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@ func TestBoltStore(t *testing.T) {
3434
assert.Equal(t, []byte("bar"), res)
3535
assert.NoError(t, st.Delete(ctx, key))
3636
// bolt store does not support NotFound
37-
res, err = st.Get(ctx, []byte("aaaaaa"))
38-
assert.NoError(t, err)
39-
assert.Nil(t, res)
37+
_, err = st.Get(ctx, []byte("aaaaaa"))
38+
assert.Equal(t, err, ErrKeyNotFound)
4039
}
4140
}
4241

store/rb_memory_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (s *rbMemoryStore) Get(ctx context.Context, key []byte) ([]byte, error) {
8585

8686
vv, ok := v.([]byte)
8787
if !ok {
88-
return nil, errors.WithStack(ErrKeyNotFound)
88+
return nil, ErrKeyNotFound
8989
}
9090

9191
return vv, nil

0 commit comments

Comments
 (0)