Skip to content

Commit 2e870ad

Browse files
craig[bot]iskettaneh
andcommitted
Merge #159847
159847: kvserver: add spanSetWriteBatch to spanset package r=iskettaneh a=iskettaneh This PR adds spanSetWriteBatch to the spanset package and asserts on accesses. This will be used later by an engine wrapper to assert on the engine accesses. References: #158281 Release notes: None Co-authored-by: iskettaneh <[email protected]>
2 parents 78aef05 + 5d878a5 commit 2e870ad

File tree

1 file changed

+140
-61
lines changed

1 file changed

+140
-61
lines changed

pkg/kv/kvserver/spanset/batch.go

Lines changed: 140 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/storage/fs"
1717
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1818
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
19+
"github.com/cockroachdb/errors"
1920
"github.com/cockroachdb/pebble"
2021
"github.com/cockroachdb/pebble/rangekey"
2122
)
@@ -764,83 +765,155 @@ func NewReader(r storage.Reader, spans *SpanSet, ts hlc.Timestamp) storage.Reade
764765
return spanSetReader{r: r, spans: spans, ts: ts}
765766
}
766767

768+
// NewReadWriter returns a storage.ReadWriter that asserts access of the
769+
// underlying ReadWriter against the given SpanSet.
770+
//
771+
// NewReadWriter clones and does not retain the provided span set.
772+
func NewReadWriter(rw storage.ReadWriter, spans *SpanSet) storage.ReadWriter {
773+
return makeSpanSetReadWriter(rw, spans)
774+
}
775+
767776
// NewReadWriterAt returns a storage.ReadWriter that asserts access of the
768777
// underlying ReadWriter against the given SpanSet at a given timestamp.
769-
// If zero timestamp is provided, accesses are considered non-MVCC.
770778
//
771779
// NewReadWriterAt clones and does not retain the provided span set.
772780
func NewReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Timestamp) storage.ReadWriter {
773781
return makeSpanSetReadWriterAt(rw, spans, ts)
774782
}
775783

776-
type spanSetBatch struct {
777-
ReadWriter
778-
b storage.Batch
779-
// TODO(ibrahim): The fields spans, spansOnly, and ts don't seem to be used.
780-
// Consider removing or marking them as intended.
781-
spans *SpanSet
784+
// spanSetWriteBatch wraps a storage.WriteBatch and adds span checking.
785+
type spanSetWriteBatch struct {
786+
spanSetWriter
787+
wb storage.WriteBatch
788+
}
782789

783-
spansOnly bool
784-
ts hlc.Timestamp
790+
var _ storage.WriteBatch = (*spanSetWriteBatch)(nil)
791+
792+
// ClearRawEncodedRange implements storage.InternalWriter.
793+
func (s spanSetWriteBatch) ClearRawEncodedRange(start, end []byte) error {
794+
// Decode the engine keys to check spans.
795+
startKey, ok := storage.DecodeEngineKey(start)
796+
if !ok {
797+
return errors.Errorf("cannot decode start engine key")
798+
}
799+
endKey, ok := storage.DecodeEngineKey(end)
800+
if !ok {
801+
return errors.Errorf("cannot decode end engine key")
802+
}
803+
if err := s.spanSetWriter.checkAllowedRange(startKey.Key, endKey.Key); err != nil {
804+
return err
805+
}
806+
return s.wb.ClearRawEncodedRange(start, end)
785807
}
786808

787-
var _ storage.Batch = spanSetBatch{}
809+
// PutInternalRangeKey implements storage.InternalWriter.
810+
func (s spanSetWriteBatch) PutInternalRangeKey(start, end []byte, key rangekey.Key) error {
811+
// Decode the engine keys to check spans.
812+
startKey, ok := storage.DecodeEngineKey(start)
813+
if !ok {
814+
return errors.Errorf("cannot decode start engine key")
815+
}
816+
endKey, ok := storage.DecodeEngineKey(end)
817+
if !ok {
818+
return errors.Errorf("cannot decode end engine key")
819+
}
820+
if err := s.spanSetWriter.checkAllowedRange(startKey.Key, endKey.Key); err != nil {
821+
return err
822+
}
823+
return s.wb.PutInternalRangeKey(start, end, key)
824+
}
788825

789-
func (s spanSetBatch) NewBatchOnlyMVCCIterator(
790-
ctx context.Context, opts storage.IterOptions,
791-
) (storage.MVCCIterator, error) {
792-
mvccIter, err := s.b.NewBatchOnlyMVCCIterator(ctx, opts)
793-
if err != nil {
794-
return nil, err
826+
// PutInternalPointKey implements storage.InternalWriter.
827+
func (s spanSetWriteBatch) PutInternalPointKey(key *pebble.InternalKey, value []byte) error {
828+
// Decode the internal key to check spans.
829+
engKey, ok := storage.DecodeEngineKey(key.UserKey)
830+
if !ok {
831+
return errors.Errorf("cannot decode engine key")
795832
}
796-
if s.spansOnly {
797-
return NewIterator(mvccIter, s.spans), nil
833+
834+
if err := s.spanSetWriter.checkAllowed(engKey.Key); err != nil {
835+
return err
798836
}
799-
return NewIteratorAt(mvccIter, s.spans, s.ts), nil
837+
838+
return s.wb.PutInternalPointKey(key, value)
800839
}
801840

802-
func (s spanSetBatch) Commit(sync bool) error {
803-
return s.b.Commit(sync)
841+
// Close implements storage.WriteBatch.
842+
func (s spanSetWriteBatch) Close() {
843+
s.wb.Close()
804844
}
805845

806-
func (s spanSetBatch) CommitNoSyncWait() error {
807-
return s.b.CommitNoSyncWait()
846+
// Commit implements storage.WriteBatch.
847+
func (s spanSetWriteBatch) Commit(sync bool) error {
848+
return s.wb.Commit(sync)
808849
}
809850

810-
func (s spanSetBatch) SyncWait() error {
811-
return s.b.SyncWait()
851+
// CommitNoSyncWait implements storage.WriteBatch.
852+
func (s spanSetWriteBatch) CommitNoSyncWait() error {
853+
return s.wb.CommitNoSyncWait()
812854
}
813855

814-
func (s spanSetBatch) Empty() bool {
815-
return s.b.Empty()
856+
// SyncWait implements storage.WriteBatch.
857+
func (s spanSetWriteBatch) SyncWait() error {
858+
return s.wb.SyncWait()
816859
}
817860

818-
func (s spanSetBatch) Count() uint32 {
819-
return s.b.Count()
861+
// Empty implements storage.WriteBatch.
862+
func (s spanSetWriteBatch) Empty() bool {
863+
return s.wb.Empty()
820864
}
821865

822-
func (s spanSetBatch) Len() int {
823-
return s.b.Len()
866+
// Count implements storage.WriteBatch.
867+
func (s spanSetWriteBatch) Count() uint32 {
868+
return s.wb.Count()
824869
}
825870

826-
func (s spanSetBatch) Repr() []byte {
827-
return s.b.Repr()
871+
// Len implements storage.WriteBatch.
872+
func (s spanSetWriteBatch) Len() int {
873+
return s.wb.Len()
828874
}
829875

830-
func (s spanSetBatch) CommitStats() storage.BatchCommitStats {
831-
return s.b.CommitStats()
876+
// Repr implements storage.WriteBatch.
877+
func (s spanSetWriteBatch) Repr() []byte {
878+
return s.wb.Repr()
832879
}
833880

834-
func (s spanSetBatch) PutInternalRangeKey(start, end []byte, key rangekey.Key) error {
835-
return s.b.PutInternalRangeKey(start, end, key)
881+
// CommitStats implements storage.WriteBatch.
882+
func (s spanSetWriteBatch) CommitStats() storage.BatchCommitStats {
883+
return s.wb.CommitStats()
836884
}
837885

838-
func (s spanSetBatch) PutInternalPointKey(key *pebble.InternalKey, value []byte) error {
839-
return s.b.PutInternalPointKey(key, value)
886+
type spanSetBatch struct {
887+
spanSetReader
888+
spanSetWriteBatch
889+
b storage.Batch
890+
// TODO(ibrahim): The fields spans, spansOnly, and ts don't seem to be used.
891+
// Consider removing or marking them as intended.
892+
spans *SpanSet
893+
894+
spansOnly bool
895+
ts hlc.Timestamp
840896
}
841897

842-
func (s spanSetBatch) ClearRawEncodedRange(start, end []byte) error {
843-
return s.b.ClearRawEncodedRange(start, end)
898+
var _ storage.Batch = spanSetBatch{}
899+
900+
// Close implements storage.Batch (resolves ambiguity between Reader.Close
901+
// and WriteBatch.Close).
902+
func (s spanSetBatch) Close() {
903+
s.wb.Close()
904+
}
905+
906+
func (s spanSetBatch) NewBatchOnlyMVCCIterator(
907+
ctx context.Context, opts storage.IterOptions,
908+
) (storage.MVCCIterator, error) {
909+
mvccIter, err := s.b.NewBatchOnlyMVCCIterator(ctx, opts)
910+
if err != nil {
911+
return nil, err
912+
}
913+
if s.spansOnly {
914+
return NewIterator(mvccIter, s.spans), nil
915+
}
916+
return NewIteratorAt(mvccIter, s.spans, s.ts), nil
844917
}
845918

846919
// shallowCopy returns a shallow copy of the spanSetBatch. The returned batch
@@ -849,33 +922,43 @@ func (s spanSetBatch) ClearRawEncodedRange(start, end []byte) error {
849922
// spans.
850923
func (s spanSetBatch) shallowCopy() *spanSetBatch {
851924
b := s
852-
b.spanSetReader.spans = b.spanSetReader.spans.ShallowCopy()
853-
b.spanSetWriter.spans = b.spanSetWriter.spans.ShallowCopy()
854925
b.spans = b.spans.ShallowCopy()
926+
b.spanSetReader.spans = b.spans
927+
b.spanSetWriter.spans = b.spans
855928
return &b
856929
}
857930

858931
// NewBatch returns a storage.Batch that asserts access of the underlying
859932
// Batch against the given SpanSet. We only consider span boundaries, associated
860933
// timestamps are not considered.
861934
func NewBatch(b storage.Batch, spans *SpanSet) storage.Batch {
935+
spans = addLockTableSpans(spans)
862936
return &spanSetBatch{
863-
ReadWriter: makeSpanSetReadWriter(b, spans),
864-
b: b,
865-
spans: spans,
866-
spansOnly: true,
937+
spanSetReader: spanSetReader{r: b, spans: spans, spansOnly: true},
938+
spanSetWriteBatch: spanSetWriteBatch{
939+
spanSetWriter: spanSetWriter{w: b, spans: spans, spansOnly: true},
940+
wb: b,
941+
},
942+
b: b,
943+
spans: spans,
944+
spansOnly: true,
867945
}
868946
}
869947

870948
// NewBatchAt returns an storage.Batch that asserts access of the underlying
871949
// Batch against the given SpanSet at the given timestamp.
872950
// If the zero timestamp is used, all accesses are considered non-MVCC.
873951
func NewBatchAt(b storage.Batch, spans *SpanSet, ts hlc.Timestamp) storage.Batch {
952+
spans = addLockTableSpans(spans)
874953
return &spanSetBatch{
875-
ReadWriter: makeSpanSetReadWriterAt(b, spans, ts),
876-
b: b,
877-
spans: spans,
878-
ts: ts,
954+
spanSetReader: spanSetReader{r: b, spans: spans, ts: ts},
955+
spanSetWriteBatch: spanSetWriteBatch{
956+
spanSetWriter: spanSetWriter{w: b, spans: spans, ts: ts},
957+
wb: b,
958+
},
959+
b: b,
960+
spans: spans,
961+
ts: ts,
879962
}
880963
}
881964

@@ -884,9 +967,9 @@ func NewBatchAt(b storage.Batch, spans *SpanSet, ts hlc.Timestamp) storage.Batch
884967
func DisableReaderAssertions(reader storage.Reader) storage.Reader {
885968
switch v := reader.(type) {
886969
case ReadWriter:
887-
return DisableReaderAssertions(v.r)
970+
return DisableReaderAssertions(v.spanSetReader.r)
888971
case *spanSetBatch:
889-
return DisableReaderAssertions(v.r)
972+
return DisableReaderAssertions(v.spanSetReader.r)
890973
default:
891974
return reader
892975
}
@@ -897,9 +980,9 @@ func DisableReaderAssertions(reader storage.Reader) storage.Reader {
897980
func DisableReadWriterAssertions(rw storage.ReadWriter) storage.ReadWriter {
898981
switch v := rw.(type) {
899982
case ReadWriter:
900-
return DisableReadWriterAssertions(v.w.(storage.ReadWriter))
983+
return DisableReadWriterAssertions(v.spanSetWriter.w.(storage.ReadWriter))
901984
case *spanSetBatch:
902-
return DisableReadWriterAssertions(v.w.(storage.ReadWriter))
985+
return DisableReadWriterAssertions(v.spanSetWriteBatch.spanSetWriter.w.(storage.ReadWriter))
903986
default:
904987
return rw
905988
}
@@ -913,10 +996,8 @@ func DisableUndeclaredSpanAssertions(rw storage.ReadWriter) storage.ReadWriter {
913996
switch v := rw.(type) {
914997
case *spanSetBatch:
915998
newSnapSetBatch := v.shallowCopy()
916-
newSnapSetBatch.spanSetReader.spans.DisableUndeclaredAccessAssertions()
917-
newSnapSetBatch.spanSetWriter.spans.DisableUndeclaredAccessAssertions()
999+
newSnapSetBatch.spans.DisableUndeclaredAccessAssertions()
9181000
return newSnapSetBatch
919-
9201001
default:
9211002
return rw
9221003
}
@@ -932,10 +1013,8 @@ func DisableForbiddenSpanAssertions(rw storage.ReadWriter) storage.ReadWriter {
9321013
switch v := rw.(type) {
9331014
case *spanSetBatch:
9341015
newSnapSetBatch := v.shallowCopy()
935-
newSnapSetBatch.spanSetReader.spans.DisableForbiddenSpansAssertions()
936-
newSnapSetBatch.spanSetWriter.spans.DisableForbiddenSpansAssertions()
1016+
newSnapSetBatch.spans.DisableForbiddenSpansAssertions()
9371017
return newSnapSetBatch
938-
9391018
default:
9401019
return rw
9411020
}

0 commit comments

Comments
 (0)