Skip to content

Commit bf31bec

Browse files
committed
Add OCC validation and conflict detection tests
1 parent e7b394c commit bf31bec

File tree

2 files changed

+91
-3
lines changed

2 files changed

+91
-3
lines changed

kv/fsm.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,22 @@ func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS ui
107107
}
108108

109109
func (f *kvFSM) validateConflicts(ctx context.Context, muts []*pb.Mutation, startTS uint64) error {
110-
// Debug guard only: real OCC runs at the leader/storage layer, so conflicts
111-
// should already be resolved before log application. Keep this stub to make
112-
// any unexpected violations visible during development.
110+
seen := make(map[string]struct{}, len(muts))
111+
for _, mut := range muts {
112+
keyStr := string(mut.Key)
113+
if _, ok := seen[keyStr]; ok {
114+
continue
115+
}
116+
seen[keyStr] = struct{}{}
117+
118+
latest, exists, err := f.store.LatestCommitTS(ctx, mut.Key)
119+
if err != nil {
120+
return errors.WithStack(err)
121+
}
122+
if exists && latest > startTS {
123+
return errors.Wrapf(store.ErrWriteConflict, "key: %s", string(mut.Key))
124+
}
125+
}
113126
return nil
114127
}
115128

kv/fsm_occ_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package kv
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
pb "github.com/bootjp/elastickv/proto"
8+
"github.com/bootjp/elastickv/store"
9+
"github.com/hashicorp/raft"
10+
"github.com/stretchr/testify/require"
11+
"google.golang.org/protobuf/proto"
12+
)
13+
14+
func TestValidateConflictsDetectsStaleWrite(t *testing.T) {
15+
ctx := context.Background()
16+
st := store.NewMVCCStore()
17+
require.NoError(t, st.PutAt(ctx, []byte("k"), []byte("v1"), 50, 0))
18+
19+
fsm, ok := NewKvFSM(st).(*kvFSM)
20+
require.True(t, ok)
21+
22+
muts := []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("k"), Value: []byte("v2")}}
23+
err := fsm.validateConflicts(ctx, muts, 40)
24+
require.ErrorIs(t, err, store.ErrWriteConflict)
25+
}
26+
27+
func TestApplyReturnsErrorOnConflict(t *testing.T) {
28+
ctx := context.Background()
29+
st := store.NewMVCCStore()
30+
fsm, ok := NewKvFSM(st).(*kvFSM)
31+
require.True(t, ok)
32+
33+
// First write commits at ts=100.
34+
put := &pb.Request{
35+
IsTxn: false,
36+
Phase: pb.Phase_NONE,
37+
Ts: 100,
38+
Mutations: []*pb.Mutation{{
39+
Op: pb.Op_PUT,
40+
Key: []byte("k"),
41+
Value: []byte("v1"),
42+
}},
43+
}
44+
data, err := proto.Marshal(put)
45+
require.NoError(t, err)
46+
47+
resp := fsm.Apply(&raft.Log{Type: raft.LogCommand, Data: data})
48+
require.Nil(t, resp)
49+
50+
// Stale transaction attempts to commit with startTS=90.
51+
conflict := &pb.Request{
52+
IsTxn: true,
53+
Phase: pb.Phase_COMMIT,
54+
Ts: 90,
55+
Mutations: []*pb.Mutation{{
56+
Op: pb.Op_PUT,
57+
Key: []byte("k"),
58+
Value: []byte("v2"),
59+
}},
60+
}
61+
data, err = proto.Marshal(conflict)
62+
require.NoError(t, err)
63+
64+
resp = fsm.Apply(&raft.Log{Type: raft.LogCommand, Data: data})
65+
66+
err, ok = resp.(error)
67+
require.True(t, ok)
68+
require.ErrorIs(t, err, store.ErrWriteConflict)
69+
70+
// Ensure committed value remains.
71+
v, err := st.GetAt(ctx, []byte("k"), ^uint64(0))
72+
require.NoError(t, err)
73+
require.Equal(t, []byte("v1"), v)
74+
}
75+

0 commit comments

Comments
 (0)