diff --git a/kv/fsm.go b/kv/fsm.go index aeb0ab5..8aa1057 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -107,9 +107,22 @@ func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS ui } func (f *kvFSM) validateConflicts(ctx context.Context, muts []*pb.Mutation, startTS uint64) error { - // Debug guard only: real OCC runs at the leader/storage layer, so conflicts - // should already be resolved before log application. Keep this stub to make - // any unexpected violations visible during development. + seen := make(map[string]struct{}, len(muts)) + for _, mut := range muts { + keyStr := string(mut.Key) + if _, ok := seen[keyStr]; ok { + continue + } + seen[keyStr] = struct{}{} + + latest, exists, err := f.store.LatestCommitTS(ctx, mut.Key) + if err != nil { + return errors.WithStack(err) + } + if exists && latest > startTS { + return errors.Wrapf(store.ErrWriteConflict, "key: %s", string(mut.Key)) + } + } return nil } diff --git a/kv/fsm_occ_test.go b/kv/fsm_occ_test.go new file mode 100644 index 0000000..e89cbb1 --- /dev/null +++ b/kv/fsm_occ_test.go @@ -0,0 +1,74 @@ +package kv + +import ( + "context" + "testing" + + pb "github.com/bootjp/elastickv/proto" + "github.com/bootjp/elastickv/store" + "github.com/hashicorp/raft" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +func TestValidateConflictsDetectsStaleWrite(t *testing.T) { + ctx := context.Background() + st := store.NewMVCCStore() + require.NoError(t, st.PutAt(ctx, []byte("k"), []byte("v1"), 50, 0)) + + fsm, ok := NewKvFSM(st).(*kvFSM) + require.True(t, ok) + + muts := []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("k"), Value: []byte("v2")}} + err := fsm.validateConflicts(ctx, muts, 40) + require.ErrorIs(t, err, store.ErrWriteConflict) +} + +func TestApplyReturnsErrorOnConflict(t *testing.T) { + ctx := context.Background() + st := store.NewMVCCStore() + fsm, ok := NewKvFSM(st).(*kvFSM) + require.True(t, ok) + + // First write commits at ts=100. + put := &pb.Request{ + IsTxn: false, + Phase: pb.Phase_NONE, + Ts: 100, + Mutations: []*pb.Mutation{{ + Op: pb.Op_PUT, + Key: []byte("k"), + Value: []byte("v1"), + }}, + } + data, err := proto.Marshal(put) + require.NoError(t, err) + + resp := fsm.Apply(&raft.Log{Type: raft.LogCommand, Data: data}) + require.Nil(t, resp) + + // Stale transaction attempts to commit with startTS=90. + conflict := &pb.Request{ + IsTxn: true, + Phase: pb.Phase_COMMIT, + Ts: 90, + Mutations: []*pb.Mutation{{ + Op: pb.Op_PUT, + Key: []byte("k"), + Value: []byte("v2"), + }}, + } + data, err = proto.Marshal(conflict) + require.NoError(t, err) + + resp = fsm.Apply(&raft.Log{Type: raft.LogCommand, Data: data}) + + err, ok = resp.(error) + require.True(t, ok) + require.ErrorIs(t, err, store.ErrWriteConflict) + + // Ensure committed value remains. + v, err := st.GetAt(ctx, []byte("k"), ^uint64(0)) + require.NoError(t, err) + require.Equal(t, []byte("v1"), v) +}