Skip to content

Commit 904799f

Browse files
committed
Refactor to use MVCCStore with explicit timestamps
1 parent 01aac5b commit 904799f

28 files changed

+860
-3457
lines changed

adapter/dynamodb.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ const updateSplitCount = 2
2626

2727
type DynamoDBServer struct {
2828
listen net.Listener
29-
store store.ScanStore
29+
store store.MVCCStore
3030
coordinator kv.Coordinator
3131
dynamoTranscoder *dynamodbTranscoder
3232
httpServer *http.Server
3333
}
3434

35-
func NewDynamoDBServer(listen net.Listener, st store.ScanStore, coordinate *kv.Coordinate) *DynamoDBServer {
35+
func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate *kv.Coordinate) *DynamoDBServer {
3636
d := &DynamoDBServer{
3737
listen: listen,
3838
store: st,
@@ -114,7 +114,8 @@ func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) {
114114
http.Error(w, "missing key", http.StatusBadRequest)
115115
return
116116
}
117-
v, err := d.store.Get(r.Context(), []byte(keyAttr.S))
117+
readTS := snapshotTS(d.coordinator.Clock())
118+
v, err := d.store.GetAt(r.Context(), []byte(keyAttr.S), readTS)
118119
if err != nil {
119120
if errors.Is(err, store.ErrKeyNotFound) {
120121
w.Header().Set("Content-Type", "application/x-amz-json-1.0")
@@ -233,7 +234,8 @@ func (d *DynamoDBServer) validateCondition(ctx context.Context, expr string, nam
233234
if expr == "" {
234235
return nil
235236
}
236-
exists, err := d.store.Exists(ctx, key)
237+
readTS := snapshotTS(d.coordinator.Clock())
238+
exists, err := d.store.ExistsAt(ctx, key, readTS)
237239
if err != nil {
238240
return errors.WithStack(err)
239241
}

adapter/dynamodb_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
1313
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
1414
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
1516
)
1617

1718
func TestDynamoDB_PutItem_GetItem(t *testing.T) {
@@ -248,7 +249,7 @@ func TestDynamoDB_TransactWriteItems_Concurrent(t *testing.T) {
248249
})
249250
assert.NoError(t, err, "Get failed for key1 in goroutine %d", i)
250251
value1Attr, ok := out1.Item["value"].(*types.AttributeValueMemberS)
251-
assert.True(t, ok, "Type assertion failed for key1 in goroutine %d", i)
252+
require.True(t, ok, "Type assertion failed for key1 in goroutine %d", i)
252253
assert.Equal(t, value1, value1Attr.Value, "Value mismatch for key1 in goroutine %d", i)
253254

254255
out2, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{
@@ -259,7 +260,7 @@ func TestDynamoDB_TransactWriteItems_Concurrent(t *testing.T) {
259260
})
260261
assert.NoError(t, err, "Get failed for key2 in goroutine %d", i)
261262
value2Attr, ok := out2.Item["value"].(*types.AttributeValueMemberS)
262-
assert.True(t, ok, "Type assertion failed for key2 in goroutine %d", i)
263+
require.True(t, ok, "Type assertion failed for key2 in goroutine %d", i)
263264
assert.Equal(t, value2, value2Attr.Value, "Value mismatch for key2 in goroutine %d", i)
264265
}(i)
265266
}

adapter/grpc.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ type GRPCServer struct {
2222
log *slog.Logger
2323
grpcTranscoder *grpcTranscoder
2424
coordinator kv.Coordinator
25-
store store.ScanStore
25+
store store.MVCCStore
2626

2727
pb.UnimplementedRawKVServer
2828
pb.UnimplementedTransactionalKVServer
2929
}
3030

31-
func NewGRPCServer(store store.ScanStore, coordinate *kv.Coordinate) *GRPCServer {
31+
func NewGRPCServer(store store.MVCCStore, coordinate *kv.Coordinate) *GRPCServer {
3232
return &GRPCServer{
3333
log: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
3434
Level: slog.LevelWarn,
@@ -40,8 +40,13 @@ func NewGRPCServer(store store.ScanStore, coordinate *kv.Coordinate) *GRPCServer
4040
}
4141

4242
func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error) {
43+
readTS := req.GetTs()
44+
if readTS == 0 {
45+
readTS = snapshotTS(r.coordinator.Clock())
46+
}
47+
4348
if r.coordinator.IsLeader() {
44-
v, err := r.store.Get(ctx, req.Key)
49+
v, err := r.store.GetAt(ctx, req.Key, readTS)
4550
if err != nil {
4651
switch {
4752
case errors.Is(err, store.ErrKeyNotFound):
@@ -93,7 +98,8 @@ func (r GRPCServer) tryLeaderGet(key []byte) ([]byte, error) {
9398
defer conn.Close()
9499

95100
cli := pb.NewRawKVClient(conn)
96-
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
101+
ts := snapshotTS(r.coordinator.Clock())
102+
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key, Ts: ts})
97103
if err != nil {
98104
return nil, errors.WithStack(err)
99105
}
@@ -180,7 +186,8 @@ func (r GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetRespons
180186
return nil, errors.WithStack(err)
181187
}
182188

183-
v, err := r.store.Get(ctx, req.Key)
189+
readTS := snapshotTS(r.coordinator.Clock())
190+
v, err := r.store.GetAt(ctx, req.Key, readTS)
184191
if err != nil {
185192
switch {
186193
case errors.Is(err, store.ErrKeyNotFound):
@@ -227,7 +234,8 @@ func (r GRPCServer) Scan(ctx context.Context, req *pb.ScanRequest) (*pb.ScanResp
227234
Kv: nil,
228235
}, errors.WithStack(err)
229236
}
230-
res, err := r.store.Scan(ctx, req.StartKey, req.EndKey, limit)
237+
readTS := snapshotTS(r.coordinator.Clock())
238+
res, err := r.store.ScanAt(ctx, req.StartKey, req.EndKey, limit, readTS)
231239
if err != nil {
232240
return &pb.ScanResponse{
233241
Kv: nil,

adapter/internal.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,7 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa
3535
return nil, errors.WithStack(ErrNotLeader)
3636
}
3737

38-
// Ensure leader issues start_ts when followers forward txn groups without it.
39-
if req.IsTxn {
40-
var startTs uint64
41-
for _, r := range req.Requests {
42-
if r.Ts == 0 {
43-
if startTs == 0 {
44-
startTs = i.clock.Next()
45-
}
46-
r.Ts = startTs
47-
}
48-
}
49-
}
38+
i.stampTimestamps(req)
5039

5140
r, err := i.transactionManager.Commit(req.Requests)
5241
if err != nil {
@@ -61,3 +50,27 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa
6150
CommitIndex: r.CommitIndex,
6251
}, nil
6352
}
53+
54+
func (i *Internal) stampTimestamps(req *pb.ForwardRequest) {
55+
if req == nil {
56+
return
57+
}
58+
if req.IsTxn {
59+
var startTs uint64
60+
for _, r := range req.Requests {
61+
if r.Ts == 0 {
62+
if startTs == 0 {
63+
startTs = i.clock.Next()
64+
}
65+
r.Ts = startTs
66+
}
67+
}
68+
return
69+
}
70+
71+
for _, r := range req.Requests {
72+
if r.Ts == 0 {
73+
r.Ts = i.clock.Next()
74+
}
75+
}
76+
}

adapter/redis.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,9 @@ var argsLen = map[string]int{
3737

3838
type RedisServer struct {
3939
listen net.Listener
40-
store store.ScanStore
40+
store store.MVCCStore
4141
coordinator kv.Coordinator
4242
redisTranscoder *redisTranscoder
43-
listStore *store.ListStore
4443
// TODO manage membership from raft log
4544
leaderRedis map[raft.ServerAddress]string
4645

@@ -72,15 +71,12 @@ type redisResult struct {
7271
err error
7372
}
7473

75-
func store2list(st store.ScanStore) *store.ListStore { return store.NewListStore(st) }
76-
77-
func NewRedisServer(listen net.Listener, store store.ScanStore, coordinate *kv.Coordinate, leaderRedis map[raft.ServerAddress]string) *RedisServer {
74+
func NewRedisServer(listen net.Listener, store store.MVCCStore, coordinate *kv.Coordinate, leaderRedis map[raft.ServerAddress]string) *RedisServer {
7875
r := &RedisServer{
7976
listen: listen,
8077
store: store,
8178
coordinator: coordinate,
8279
redisTranscoder: newRedisTranscoder(),
83-
listStore: store2list(store),
8480
leaderRedis: leaderRedis,
8581
}
8682

@@ -112,6 +108,10 @@ func getConnState(conn redcon.Conn) *connState {
112108
return st
113109
}
114110

111+
func (r *RedisServer) readTS() uint64 {
112+
return snapshotTS(r.coordinator.Clock())
113+
}
114+
115115
func (r *RedisServer) Run() error {
116116
err := redcon.Serve(r.listen,
117117
func(conn redcon.Conn, cmd redcon.Command) {
@@ -211,7 +211,8 @@ func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
211211
}
212212

213213
if r.coordinator.IsLeader() {
214-
v, err := r.store.Get(context.Background(), cmd.Args[1])
214+
readTS := r.readTS()
215+
v, err := r.store.GetAt(context.Background(), cmd.Args[1], readTS)
215216
if err != nil {
216217
switch {
217218
case errors.Is(err, store.ErrKeyNotFound):
@@ -276,7 +277,8 @@ func (r *RedisServer) exists(conn redcon.Conn, cmd redcon.Command) {
276277
return
277278
}
278279

279-
ok, err := r.store.Exists(context.Background(), cmd.Args[1])
280+
readTS := r.readTS()
281+
ok, err := r.store.ExistsAt(context.Background(), cmd.Args[1], readTS)
280282
if err != nil {
281283
conn.WriteError(err.Error())
282284
return
@@ -325,7 +327,8 @@ func (r *RedisServer) localKeys(pattern []byte) ([][]byte, error) {
325327
}
326328

327329
func (r *RedisServer) localKeysExact(pattern []byte) ([][]byte, error) {
328-
res, err := r.store.Exists(context.Background(), pattern)
330+
readTS := r.readTS()
331+
res, err := r.store.ExistsAt(context.Background(), pattern, readTS)
329332
if err != nil {
330333
return nil, errors.WithStack(err)
331334
}
@@ -338,7 +341,8 @@ func (r *RedisServer) localKeysExact(pattern []byte) ([][]byte, error) {
338341
func (r *RedisServer) localKeysPattern(pattern []byte) ([][]byte, error) {
339342
start := r.patternStart(pattern)
340343

341-
keys, err := r.store.Scan(context.Background(), start, nil, math.MaxInt)
344+
readTS := r.readTS()
345+
keys, err := r.store.ScanAt(context.Background(), start, nil, math.MaxInt, readTS)
342346
if err != nil {
343347
return nil, errors.WithStack(err)
344348
}
@@ -828,13 +832,24 @@ func clampRange(start, end, length int) (int, int) {
828832
}
829833

830834
func (r *RedisServer) loadListMeta(ctx context.Context, key []byte) (store.ListMeta, bool, error) {
831-
meta, exists, err := r.listStore.LoadMeta(ctx, key)
832-
return meta, exists, errors.WithStack(err)
835+
readTS := r.readTS()
836+
val, err := r.store.GetAt(ctx, store.ListMetaKey(key), readTS)
837+
if err != nil {
838+
if errors.Is(err, store.ErrKeyNotFound) {
839+
return store.ListMeta{}, false, nil
840+
}
841+
return store.ListMeta{}, false, errors.WithStack(err)
842+
}
843+
meta, err := store.UnmarshalListMeta(val)
844+
if err != nil {
845+
return store.ListMeta{}, false, errors.WithStack(err)
846+
}
847+
return meta, true, nil
833848
}
834849

835850
func (r *RedisServer) isListKey(ctx context.Context, key []byte) (bool, error) {
836-
isList, err := r.listStore.IsList(ctx, key)
837-
return isList, errors.WithStack(err)
851+
_, exists, err := r.loadListMeta(ctx, key)
852+
return exists, err
838853
}
839854

840855
func (r *RedisServer) buildRPushOps(meta store.ListMeta, key []byte, values [][]byte) ([]*kv.Elem[kv.OP], store.ListMeta, error) {
@@ -895,7 +910,8 @@ func (r *RedisServer) deleteList(ctx context.Context, key []byte) error {
895910
start := listItemKey(key, math.MinInt64)
896911
end := listItemKey(key, math.MaxInt64)
897912

898-
kvs, err := r.store.Scan(ctx, start, end, math.MaxInt)
913+
readTS := r.readTS()
914+
kvs, err := r.store.ScanAt(ctx, start, end, math.MaxInt, readTS)
899915
if err != nil {
900916
return errors.WithStack(err)
901917
}
@@ -926,7 +942,8 @@ func (r *RedisServer) fetchListRange(ctx context.Context, key []byte, meta store
926942
startKey := listItemKey(key, startSeq)
927943
endKey := listItemKey(key, endSeq+1) // exclusive
928944

929-
kvs, err := r.store.Scan(ctx, startKey, endKey, int(endIdx-startIdx+1))
945+
readTS := r.readTS()
946+
kvs, err := r.store.ScanAt(ctx, startKey, endKey, int(endIdx-startIdx+1), readTS)
930947
if err != nil {
931948
return nil, errors.WithStack(err)
932949
}
@@ -1039,7 +1056,8 @@ func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, error) {
10391056
defer conn.Close()
10401057

10411058
cli := pb.NewRawKVClient(conn)
1042-
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
1059+
ts := r.readTS()
1060+
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key, Ts: ts})
10431061
if err != nil {
10441062
return nil, errors.WithStack(err)
10451063
}
@@ -1048,8 +1066,9 @@ func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, error) {
10481066
}
10491067

10501068
func (r *RedisServer) getValue(key []byte) ([]byte, error) {
1069+
readTS := r.readTS()
10511070
if r.coordinator.IsLeader() {
1052-
v, err := r.store.Get(context.Background(), key)
1071+
v, err := r.store.GetAt(context.Background(), key, readTS)
10531072
return v, errors.WithStack(err)
10541073
}
10551074
return r.tryLeaderGet(key)

adapter/ts.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package adapter
2+
3+
import "github.com/bootjp/elastickv/kv"
4+
5+
// snapshotTS returns a timestamp suitable for snapshot reads without
6+
// unnecessarily advancing the logical clock. It relies solely on the shared
7+
// HLC; if none has been issued yet, fall back to MaxUint64 to see latest
8+
// committed versions irrespective of local clock lag.
9+
func snapshotTS(clock *kv.HLC) uint64 {
10+
if clock == nil {
11+
return ^uint64(0)
12+
}
13+
if cur := clock.Current(); cur != 0 {
14+
return cur
15+
}
16+
return ^uint64(0)
17+
}

kv/coordinator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type Coordinator interface {
3434
Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, error)
3535
IsLeader() bool
3636
RaftLeader() raft.ServerAddress
37+
Clock() *HLC
3738
}
3839

3940
func (c *Coordinate) Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, error) {

0 commit comments

Comments
 (0)