Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -92,7 +92,7 @@ func (r *Replica) executeReadOnlyBatch(
if err := rw.PinEngineStateForIterators(readCategory); err != nil {
return nil, g, nil, kvpb.NewError(err)
}
if util.RaceEnabled {
if buildutil.CrdbTestBuild {
rw = spanset.NewReadWriterAt(rw, g.LatchSpans(), ba.Timestamp)
}
defer rw.Close()
Expand Down
14 changes: 9 additions & 5 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
Expand Down Expand Up @@ -746,7 +746,7 @@ func (r *Replica) evaluateWriteBatchWrapper(
ui uncertainty.Interval,
omitInRangefeeds bool,
) (storage.Batch, *kvpb.BatchResponse, result.Result, *kvpb.Error) {
batch, opLogger := r.newBatchedEngine(g)
batch, opLogger := r.newBatchedEngine(ba, g)
now := timeutil.Now()
br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, g, st, ui, readWrite, omitInRangefeeds)
r.store.metrics.ReplicaWriteBatchEvaluationLatency.RecordValue(timeutil.Since(now).Nanoseconds())
Expand All @@ -764,7 +764,9 @@ func (r *Replica) evaluateWriteBatchWrapper(
// are enabled, it also returns an engine.OpLoggerBatch. If non-nil, then this
// OpLogger is attached to the returned engine.Batch, recording all operations.
// Its recording should be attached to the Result of request evaluation.
func (r *Replica) newBatchedEngine(g *concurrency.Guard) (storage.Batch, *storage.OpLoggerBatch) {
func (r *Replica) newBatchedEngine(
ba *kvpb.BatchRequest, g *concurrency.Guard,
) (storage.Batch, *storage.OpLoggerBatch) {
batch := r.store.TODOEngine().NewBatch()
if !batch.ConsistentIterators() {
// This is not currently needed for correctness, but future optimizations
Expand Down Expand Up @@ -808,13 +810,15 @@ func (r *Replica) newBatchedEngine(g *concurrency.Guard) (storage.Batch, *storag
opLogger = storage.NewOpLoggerBatch(batch)
batch = opLogger
}
if util.RaceEnabled {
if buildutil.CrdbTestBuild {
// During writes we may encounter a versioned value newer than the request
// timestamp, and may have to retry at a higher timestamp. This is still
// safe as we're only ever writing at timestamps higher than the timestamp
// any write latch would be declared at. But because of this, we don't
// assert on access timestamps using spanset.NewBatchAt.
batch = spanset.NewBatch(batch, g.LatchSpans())
// FIXME: we now do, update the comment.
g.LatchSpans().Panic = true
batch = spanset.NewBatchAt(batch, g.LatchSpans(), ba.Header.Timestamp)
}
return batch, opLogger
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,8 @@ func (s spanSetWriter) PutRawMVCCRangeKey(rangeKey storage.MVCCRangeKey, value [

func (s spanSetWriter) PutEngineRangeKey(start, end roachpb.Key, suffix, value []byte) error {
if !s.spansOnly {
panic("cannot do timestamp checking for PutEngineRangeKey")
// FIXME: why not?
// panic("cannot do timestamp checking for PutEngineRangeKey")
}
if err := s.checkAllowedRange(start, end); err != nil {
return err
Expand All @@ -644,7 +645,8 @@ func (s spanSetWriter) PutEngineRangeKey(start, end roachpb.Key, suffix, value [

func (s spanSetWriter) ClearEngineRangeKey(start, end roachpb.Key, suffix []byte) error {
if !s.spansOnly {
panic("cannot do timestamp checking for ClearEngineRangeKey")
// FIXME: why not?
// panic("cannot do timestamp checking for ClearEngineRangeKey")
}
if err := s.checkAllowedRange(start, end); err != nil {
return err
Expand Down Expand Up @@ -695,7 +697,8 @@ func (s spanSetWriter) PutUnversioned(key roachpb.Key, value []byte) error {

func (s spanSetWriter) PutEngineKey(key storage.EngineKey, value []byte) error {
if !s.spansOnly {
panic("cannot do timestamp checking for putting EngineKey")
// FIXME: why not?
// panic("cannot do timestamp checking for putting EngineKey")
}
if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil {
return err
Expand Down Expand Up @@ -858,7 +861,7 @@ func NewBatchAt(b storage.Batch, spans *SpanSet, ts hlc.Timestamp) storage.Batch
ReadWriter: makeSpanSetReadWriterAt(b, spans, ts),
b: b,
spans: spans,
ts: ts,
ts: ts, // FIXME: this timestamp is not used
}
}

Expand Down
54 changes: 54 additions & 0 deletions pkg/kv/kvserver/spanset/spanset.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Span struct {
type SpanSet struct {
spans [NumSpanAccess][NumSpanScope][]Span
allowUndeclared bool
Panic bool
}

var spanSetPool = sync.Pool{
Expand Down Expand Up @@ -114,6 +115,7 @@ func (s *SpanSet) Release() {
}
}
s.allowUndeclared = false
s.Panic = false
spanSetPool.Put(s)
}

Expand Down Expand Up @@ -156,6 +158,7 @@ func (s *SpanSet) Copy() *SpanSet {
}
}
n.allowUndeclared = s.allowUndeclared
n.Panic = s.Panic
return n
}

Expand Down Expand Up @@ -209,6 +212,7 @@ func (s *SpanSet) Merge(s2 *SpanSet) {
}
}
s.allowUndeclared = s2.allowUndeclared
s.Panic = s2.Panic
s.SortAndDedup()
}

Expand Down Expand Up @@ -300,6 +304,22 @@ func (s *SpanSet) CheckAllowedAt(
case SpanReadOnly:
switch access {
case SpanReadOnly:
if s.Panic {
// Was here, good.
/*
* | github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset.(*MVCCIterator).checkAllowed
* | pkg/kv/kvserver/spanset/batch.go:134
* | github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset.(*MVCCIterator).SeekGE
* | pkg/kv/kvserver/spanset/batch.go:80
* | github.com/cockroachdb/cockroach/pkg/storage.MVCCDeleteRangeUsingTombstone
* | pkg/storage/mvcc.go:4226
* | github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval.deleteRangeUsingTombstone
* | pkg/kv/kvserver/batcheval/cmd_delete_range.go:181
* | github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval.DeleteRange
* | pkg/kv/kvserver/batcheval/cmd_delete_range.go:123
*/
// panic("was here")
}
// Read spans acquired at a specific timestamp only allow reads
// at that timestamp and below. Non-MVCC access is not allowed.
return mvcc && timestamp.LessEq(declTimestamp)
Expand All @@ -312,10 +332,44 @@ func (s *SpanSet) CheckAllowedAt(
case SpanReadWrite:
switch access {
case SpanReadOnly:
if s.Panic {
// And here too, good.
/*
* | github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset.(*MVCCIterator).checkAllowed
* | pkg/kv/kvserver/spanset/batch.go:134
* | github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset.(*MVCCIterator).SeekGE
* | pkg/kv/kvserver/spanset/batch.go:80
* | github.com/cockroachdb/cockroach/pkg/storage.mvccGetMetadata
* | pkg/storage/mvcc.go:1686
* | github.com/cockroachdb/cockroach/pkg/storage.mvccPutInternal
* | pkg/storage/mvcc.go:2310
* | github.com/cockroachdb/cockroach/pkg/storage.MVCCDelete
* | pkg/storage/mvcc.go:2036
* | github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval.Delete
* | pkg/kv/kvserver/batcheval/cmd_delete.go:51
*/
// panic("and here too")
}
// Write spans acquired at a specific timestamp allow reads at
// any timestamp. Non-MVCC access is not allowed.
return mvcc
case SpanReadWrite:
if s.Panic {
// And here, good.
/*
* | github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset.spanSetWriter.checkAllowed
* | pkg/kv/kvserver/spanset/batch.go:542
* | github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset.spanSetWriter.PutMVCC
* | pkg/kv/kvserver/spanset/batch.go:678
* | github.com/cockroachdb/cockroach/pkg/storage.mvccPutInternal
* | pkg/storage/mvcc.go:2782
* | github.com/cockroachdb/cockroach/pkg/storage.MVCCDelete
* | pkg/storage/mvcc.go:2036
* | github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval.Delete
* | pkg/kv/kvserver/batcheval/cmd_delete.go:51
*/
// panic("and here")
}
// Write spans acquired at a specific timestamp allow writes at
// that timestamp of above. Non-MVCC access is not allowed.
return mvcc && declTimestamp.LessEq(timestamp)
Expand Down
Loading