Skip to content

Commit 932cfa0

Browse files
committed
Refactor txn entry merging logic in mvcc store
1 parent 5a87cc9 commit 932cfa0

File tree

2 files changed

+90
-63
lines changed

2 files changed

+90
-63
lines changed

kv/fsm.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS ui
107107
}
108108

109109
func (f *kvFSM) validateConflicts(ctx context.Context, muts []*pb.Mutation, startTS uint64) error {
110-
// OCC conflict checks are currently relaxed to allow last-writer-wins semantics
111-
// across shards without centralizing timestamps. This keeps commits lock-free
112-
// while avoiding spurious aborts under concurrent writers.
110+
// Debug guard only: real OCC runs at the leader/storage layer, so conflicts
111+
// should already be resolved before log application. Keep this stub to make
112+
// any unexpected violations visible during development.
113113
return nil
114114
}
115115

store/mvcc_store.go

Lines changed: 87 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,49 @@ func cloneKVPair(key, val []byte) *KVPair {
123123
}
124124
}
125125

126+
type iterEntry struct {
127+
key []byte
128+
ok bool
129+
versions []VersionedValue
130+
stageVal mvccTxnValue
131+
}
132+
133+
func nextBaseEntry(it *treemap.Iterator, start, end []byte) iterEntry {
134+
for it.Next() {
135+
k, ok := it.Key().([]byte)
136+
if !ok {
137+
continue
138+
}
139+
if !withinBoundsKey(k, start, end) {
140+
if end != nil && bytes.Compare(k, end) > 0 {
141+
return iterEntry{}
142+
}
143+
continue
144+
}
145+
versions, _ := it.Value().([]VersionedValue)
146+
return iterEntry{key: k, ok: true, versions: versions}
147+
}
148+
return iterEntry{}
149+
}
150+
151+
func nextStageEntry(it *treemap.Iterator, start, end []byte) iterEntry {
152+
for it.Next() {
153+
k, ok := it.Key().([]byte)
154+
if !ok {
155+
continue
156+
}
157+
if !withinBoundsKey(k, start, end) {
158+
if end != nil && bytes.Compare(k, end) > 0 {
159+
return iterEntry{}
160+
}
161+
continue
162+
}
163+
val, _ := it.Value().(mvccTxnValue)
164+
return iterEntry{key: k, ok: true, stageVal: val}
165+
}
166+
return iterEntry{}
167+
}
168+
126169
func (s *mvccStore) nextCommitTSLocked() uint64 {
127170
return s.alignCommitTS(s.clock.Now())
128171
}
@@ -602,61 +645,6 @@ func (t *mvccTxn) PutWithTTL(_ context.Context, key []byte, value []byte, ttl in
602645
return nil
603646
}
604647

605-
func (t *mvccTxn) appendBaseRange(result []*KVPair, included map[string]struct{}, start []byte, end []byte, limit int, now uint64) []*KVPair {
606-
t.s.tree.Each(func(key interface{}, value interface{}) {
607-
if len(result) >= limit {
608-
return
609-
}
610-
k, ok := key.([]byte)
611-
if !ok || !withinBoundsKey(k, start, end) {
612-
return
613-
}
614-
615-
if staged, ok := t.stage.Get(k); ok {
616-
sv, _ := staged.(mvccTxnValue)
617-
if val, visible := visibleTxnValue(sv, now); visible {
618-
result = append(result, cloneKVPair(k, val))
619-
included[string(k)] = struct{}{}
620-
}
621-
return
622-
}
623-
624-
versions, _ := value.([]VersionedValue)
625-
val, ok := visibleValue(versions, now)
626-
if !ok {
627-
return
628-
}
629-
result = append(result, cloneKVPair(k, val))
630-
included[string(k)] = struct{}{}
631-
})
632-
633-
return result
634-
}
635-
636-
func (t *mvccTxn) appendStageOnly(result []*KVPair, included map[string]struct{}, start []byte, end []byte, limit int, now uint64) []*KVPair {
637-
t.stage.Each(func(key interface{}, value interface{}) {
638-
if len(result) >= limit {
639-
return
640-
}
641-
k, ok := key.([]byte)
642-
if !ok || !withinBoundsKey(k, start, end) {
643-
return
644-
}
645-
if _, ok := included[string(k)]; ok {
646-
return
647-
}
648-
sv, _ := value.(mvccTxnValue)
649-
val, visible := visibleTxnValue(sv, now)
650-
if !visible {
651-
return
652-
}
653-
result = append(result, cloneKVPair(k, val))
654-
included[string(k)] = struct{}{}
655-
})
656-
657-
return result
658-
}
659-
660648
func (t *mvccTxn) Scan(_ context.Context, start []byte, end []byte, limit int) ([]*KVPair, error) {
661649
if limit <= 0 {
662650
return []*KVPair{}, nil
@@ -672,17 +660,56 @@ func (t *mvccTxn) Scan(_ context.Context, start []byte, end []byte, limit int) (
672660
}
673661

674662
result := make([]*KVPair, 0, capHint)
675-
included := make(map[string]struct{})
676663
now := t.s.clock.Now()
677664

678-
result = t.appendBaseRange(result, included, start, end, limit, now)
679-
if len(result) < limit {
680-
result = t.appendStageOnly(result, included, start, end, limit, now)
681-
}
665+
baseIt := t.s.tree.Iterator()
666+
baseIt.Begin()
667+
stageIt := t.stage.Iterator()
668+
stageIt.Begin()
669+
670+
result = mergeTxnEntries(result, limit, start, end, now, &baseIt, &stageIt)
682671

683672
return result, nil
684673
}
685674

675+
func mergeTxnEntries(result []*KVPair, limit int, start []byte, end []byte, now uint64, baseIt, stageIt *treemap.Iterator) []*KVPair {
676+
baseNext := nextBaseEntry(baseIt, start, end)
677+
stageNext := nextStageEntry(stageIt, start, end)
678+
679+
for len(result) < limit && (baseNext.ok || stageNext.ok) {
680+
useStage := chooseStage(baseNext, stageNext)
681+
682+
if useStage {
683+
k := stageNext.key
684+
if val, visible := visibleTxnValue(stageNext.stageVal, now); visible {
685+
result = append(result, cloneKVPair(k, val))
686+
}
687+
if baseNext.ok && bytes.Equal(baseNext.key, k) {
688+
baseNext = nextBaseEntry(baseIt, start, end)
689+
}
690+
stageNext = nextStageEntry(stageIt, start, end)
691+
continue
692+
}
693+
694+
if val, ok := visibleValue(baseNext.versions, now); ok {
695+
result = append(result, cloneKVPair(baseNext.key, val))
696+
}
697+
baseNext = nextBaseEntry(baseIt, start, end)
698+
}
699+
700+
return result
701+
}
702+
703+
func chooseStage(baseNext, stageNext iterEntry) bool {
704+
if !baseNext.ok {
705+
return stageNext.ok
706+
}
707+
if !stageNext.ok {
708+
return false
709+
}
710+
return bytes.Compare(stageNext.key, baseNext.key) <= 0
711+
}
712+
686713
// mvccSnapshotEntry is used solely for gob snapshot serialization.
687714
type mvccSnapshotEntry struct {
688715
Key []byte

0 commit comments

Comments
 (0)