Skip to content

Commit 8638aef

Browse files
committed
Refactor snapshotTS to include last commit TS
1 parent 904799f commit 8638aef

File tree

6 files changed

+37
-18
lines changed

6 files changed

+37
-18
lines changed

adapter/dynamodb.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) {
114114
http.Error(w, "missing key", http.StatusBadRequest)
115115
return
116116
}
117-
readTS := snapshotTS(d.coordinator.Clock())
117+
readTS := snapshotTS(d.coordinator.Clock(), d.store)
118118
v, err := d.store.GetAt(r.Context(), []byte(keyAttr.S), readTS)
119119
if err != nil {
120120
if errors.Is(err, store.ErrKeyNotFound) {
@@ -234,7 +234,7 @@ func (d *DynamoDBServer) validateCondition(ctx context.Context, expr string, nam
234234
if expr == "" {
235235
return nil
236236
}
237-
readTS := snapshotTS(d.coordinator.Clock())
237+
readTS := snapshotTS(d.coordinator.Clock(), d.store)
238238
exists, err := d.store.ExistsAt(ctx, key, readTS)
239239
if err != nil {
240240
return errors.WithStack(err)

adapter/grpc.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func NewGRPCServer(store store.MVCCStore, coordinate *kv.Coordinate) *GRPCServer
4242
func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error) {
4343
readTS := req.GetTs()
4444
if readTS == 0 {
45-
readTS = snapshotTS(r.coordinator.Clock())
45+
readTS = snapshotTS(r.coordinator.Clock(), r.store)
4646
}
4747

4848
if r.coordinator.IsLeader() {
@@ -98,7 +98,7 @@ func (r GRPCServer) tryLeaderGet(key []byte) ([]byte, error) {
9898
defer conn.Close()
9999

100100
cli := pb.NewRawKVClient(conn)
101-
ts := snapshotTS(r.coordinator.Clock())
101+
ts := snapshotTS(r.coordinator.Clock(), r.store)
102102
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key, Ts: ts})
103103
if err != nil {
104104
return nil, errors.WithStack(err)
@@ -186,7 +186,7 @@ func (r GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetRespons
186186
return nil, errors.WithStack(err)
187187
}
188188

189-
readTS := snapshotTS(r.coordinator.Clock())
189+
readTS := snapshotTS(r.coordinator.Clock(), r.store)
190190
v, err := r.store.GetAt(ctx, req.Key, readTS)
191191
if err != nil {
192192
switch {
@@ -234,7 +234,7 @@ func (r GRPCServer) Scan(ctx context.Context, req *pb.ScanRequest) (*pb.ScanResp
234234
Kv: nil,
235235
}, errors.WithStack(err)
236236
}
237-
readTS := snapshotTS(r.coordinator.Clock())
237+
readTS := snapshotTS(r.coordinator.Clock(), r.store)
238238
res, err := r.store.ScanAt(ctx, req.StartKey, req.EndKey, limit, readTS)
239239
if err != nil {
240240
return &pb.ScanResponse{

adapter/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func getConnState(conn redcon.Conn) *connState {
109109
}
110110

111111
func (r *RedisServer) readTS() uint64 {
112-
return snapshotTS(r.coordinator.Clock())
112+
return snapshotTS(r.coordinator.Clock(), r.store)
113113
}
114114

115115
func (r *RedisServer) Run() error {

adapter/ts.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
package adapter
22

3-
import "github.com/bootjp/elastickv/kv"
3+
import (
4+
"github.com/bootjp/elastickv/kv"
5+
"github.com/bootjp/elastickv/store"
6+
)
47

5-
// snapshotTS returns a timestamp suitable for snapshot reads without
6-
// unnecessarily advancing the logical clock. It relies solely on the shared
7-
// HLC; if none has been issued yet, fall back to MaxUint64 to see latest
8-
// committed versions irrespective of local clock lag.
9-
func snapshotTS(clock *kv.HLC) uint64 {
10-
if clock == nil {
11-
return ^uint64(0)
8+
// snapshotTS picks a safe snapshot timestamp:
9+
// - uses the store's last commit watermark if available,
10+
// - otherwise the coordinator's HLC current value,
11+
// - and falls back to MaxUint64 if neither is set.
12+
func snapshotTS(clock *kv.HLC, st store.MVCCStore) uint64 {
13+
ts := uint64(0)
14+
if st != nil {
15+
ts = st.LastCommitTS()
1216
}
13-
if cur := clock.Current(); cur != 0 {
14-
return cur
17+
if clock != nil {
18+
if cur := clock.Current(); cur > ts {
19+
ts = cur
20+
}
1521
}
16-
return ^uint64(0)
22+
if ts == 0 {
23+
ts = ^uint64(0)
24+
}
25+
return ts
1726
}

store/mvcc_store.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ type mvccStore struct {
6161
lastCommitTS uint64
6262
}
6363

64+
// LastCommitTS exposes the latest commit timestamp for read snapshot selection.
65+
// It is intentionally not part of the public MVCCStore interface.
66+
func (s *mvccStore) LastCommitTS() uint64 {
67+
s.mtx.RLock()
68+
defer s.mtx.RUnlock()
69+
return s.lastCommitTS
70+
}
71+
6472
// NewMVCCStore creates a new MVCC-enabled in-memory store.
6573
func NewMVCCStore() MVCCStore {
6674
return &mvccStore{

store/store.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type MVCCStore interface {
5959
// It must return ErrWriteConflict if any key has a newer commit timestamp
6060
// than startTS.
6161
ApplyMutations(ctx context.Context, mutations []*KVPairMutation, startTS, commitTS uint64) error
62+
// LastCommitTS returns the highest commit timestamp applied on this node.
63+
LastCommitTS() uint64
6264
Snapshot() (io.ReadWriter, error)
6365
Restore(buf io.Reader) error
6466
Close() error

0 commit comments

Comments
 (0)