Skip to content

Commit dd8a813

Browse files
committed
kvserver: introduce SpansetEngine
This commit adds SpansetEngine, which is a wrapper around Engine that will be used to assert that each engine only accesses its keys.
1 parent 2e870ad commit dd8a813

File tree

3 files changed

+373
-0
lines changed

3 files changed

+373
-0
lines changed

pkg/kv/kvserver/spanset/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_library(
44
name = "spanset",
55
srcs = [
66
"batch.go",
7+
"engine.go",
78
"merge.go",
89
"spanset.go",
910
],
@@ -13,13 +14,15 @@ go_library(
1314
"//pkg/keys",
1415
"//pkg/roachpb",
1516
"//pkg/storage",
17+
"//pkg/storage/enginepb",
1618
"//pkg/storage/fs",
1719
"//pkg/util/debugutil",
1820
"//pkg/util/hlc",
1921
"//pkg/util/log",
2022
"//pkg/util/protoutil",
2123
"@com_github_cockroachdb_errors//:errors",
2224
"@com_github_cockroachdb_pebble//:pebble",
25+
"@com_github_cockroachdb_pebble//metrics",
2326
"@com_github_cockroachdb_pebble//rangekey",
2427
],
2528
)
@@ -29,6 +32,7 @@ go_test(
2932
size = "small",
3033
srcs = [
3134
"batch_test.go",
35+
"engine_test.go",
3236
"merge_test.go",
3337
"spanset_test.go",
3438
],
@@ -42,6 +46,7 @@ go_test(
4246
"//pkg/testutils",
4347
"//pkg/util/hlc",
4448
"//pkg/util/leaktest",
49+
"@com_github_cockroachdb_errors//:errors",
4550
"@com_github_stretchr_testify//require",
4651
],
4752
)

pkg/kv/kvserver/spanset/engine.go

Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package spanset
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/roachpb"
12+
"github.com/cockroachdb/cockroach/pkg/storage"
13+
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
14+
"github.com/cockroachdb/cockroach/pkg/storage/fs"
15+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
16+
"github.com/cockroachdb/pebble"
17+
"github.com/cockroachdb/pebble/metrics"
18+
)
19+
20+
// spanSetEngine wraps an Engine and asserts that it does not access spans that
21+
// do not belong to it.
22+
type spanSetEngine struct {
23+
ReadWriter
24+
e storage.Engine
25+
spans *SpanSet
26+
}
27+
28+
var _ storage.Engine = &spanSetEngine{}
29+
30+
// NewEngine creates a new spanSetEngine wrapper.
31+
func NewEngine(engine storage.Engine, forbiddenMatchers ...func(TrickySpan) error) storage.Engine {
32+
spans := &SpanSet{}
33+
// For engines, we disable undeclared access assertions as we only care about
34+
// preventing access to spans that don't belong to this engine.
35+
spans.DisableUndeclaredAccessAssertions()
36+
// Add the forbidden matchers that assert against access to disallowed keys.
37+
for _, matcher := range forbiddenMatchers {
38+
spans.AddForbiddenMatcher(matcher)
39+
}
40+
return &spanSetEngine{
41+
ReadWriter: makeSpanSetReadWriter(engine, spans),
42+
e: engine,
43+
spans: spans,
44+
}
45+
}
46+
47+
// AddForbiddenMatcher adds a forbidden matcher to the underlying SpanSet.
48+
func (s *spanSetEngine) AddForbiddenMatcher(matcher func(TrickySpan) error) {
49+
s.spans.AddForbiddenMatcher(matcher)
50+
}
51+
52+
// NewBatch implements the storage.EngineWithoutRW interface.
53+
func (s *spanSetEngine) NewBatch() storage.Batch {
54+
return NewBatch(s.e.NewBatch(), s.spans)
55+
}
56+
57+
// NewBatch implements the storage.EngineWithoutRW interface.
58+
func (s *spanSetEngine) NewReader(durability storage.DurabilityRequirement) storage.Reader {
59+
return NewReader(s.e.NewReader(durability), s.spans, hlc.Timestamp{})
60+
}
61+
62+
// NewReadOnly implements the storage.EngineWithoutRW interface.
63+
func (s *spanSetEngine) NewReadOnly(durability storage.DurabilityRequirement) storage.ReadWriter {
64+
return NewReadWriterAt(s.e.NewReadOnly(durability), s.spans, hlc.Timestamp{})
65+
}
66+
67+
// NewUnindexedBatch implements the storage.EngineWithoutRW interface.
68+
func (s *spanSetEngine) NewUnindexedBatch() storage.Batch {
69+
return NewBatch(s.e.NewUnindexedBatch(), s.spans)
70+
}
71+
72+
// Excise implements the storage.EngineWithoutRW interface.
73+
func (s *spanSetEngine) Excise(ctx context.Context, span roachpb.Span) error {
74+
if err := s.spans.CheckAllowed(SpanReadWrite, TrickySpan{Key: span.Key, EndKey: span.EndKey}); err != nil {
75+
return err
76+
}
77+
return s.e.Excise(ctx, span)
78+
}
79+
80+
// Download implements the storage.EngineWithoutRW interface.
81+
func (s *spanSetEngine) Download(ctx context.Context, span roachpb.Span, copy bool) error {
82+
if err := s.spans.CheckAllowed(SpanReadWrite, TrickySpan{Key: span.Key, EndKey: span.EndKey}); err != nil {
83+
return err
84+
}
85+
return s.e.Download(ctx, span, copy)
86+
}
87+
88+
// CreateCheckpoint implements the storage.EngineWithoutRW interface.
89+
func (s *spanSetEngine) CreateCheckpoint(dir string, spans []roachpb.Span) error {
90+
for _, span := range spans {
91+
if err := s.spans.CheckAllowed(SpanReadOnly, TrickySpan{Key: span.Key, EndKey: span.EndKey}); err != nil {
92+
return err
93+
}
94+
}
95+
return s.e.CreateCheckpoint(dir, spans)
96+
}
97+
98+
// GetTableMetrics implements the storage.EngineWithoutRW interface.
99+
func (s *spanSetEngine) GetTableMetrics(
100+
start, end roachpb.Key,
101+
) ([]enginepb.SSTableMetricsInfo, error) {
102+
if err := s.spans.CheckAllowed(SpanReadOnly, TrickySpan{Key: start, EndKey: end}); err != nil {
103+
return nil, err
104+
}
105+
return s.e.GetTableMetrics(start, end)
106+
}
107+
108+
// CompactRange implements the storage.EngineWithoutRW interface.
109+
func (s *spanSetEngine) CompactRange(ctx context.Context, start, end roachpb.Key) error {
110+
if err := s.spans.CheckAllowed(SpanReadWrite, TrickySpan{Key: start, EndKey: end}); err != nil {
111+
return err
112+
}
113+
return s.e.CompactRange(ctx, start, end)
114+
}
115+
116+
// ApproximateDiskBytes implements the storage.EngineWithoutRW interface.
117+
func (s *spanSetEngine) ApproximateDiskBytes(
118+
from, to roachpb.Key,
119+
) (total, remote, external uint64, _ error) {
120+
if err := s.spans.CheckAllowed(SpanReadOnly, TrickySpan{Key: from, EndKey: to}); err != nil {
121+
return 0, 0, 0, err
122+
}
123+
return s.e.ApproximateDiskBytes(from, to)
124+
}
125+
126+
// NewSnapshot implements the storage.EngineWithoutRW interface.
127+
func (s *spanSetEngine) NewSnapshot(keyRanges ...roachpb.Span) storage.Reader {
128+
for _, span := range keyRanges {
129+
if err := s.spans.CheckAllowed(SpanReadOnly, TrickySpan{Key: span.Key, EndKey: span.EndKey}); err != nil {
130+
panic(err)
131+
}
132+
}
133+
return NewReader(s.e.NewSnapshot(keyRanges...), s.spans, hlc.Timestamp{})
134+
}
135+
136+
// NewWriteBatch implements the storage.EngineWithoutRW interface.
137+
func (s *spanSetEngine) NewWriteBatch() storage.WriteBatch {
138+
wb := s.e.NewWriteBatch()
139+
return &spanSetWriteBatch{
140+
spanSetWriter: spanSetWriter{w: wb, spans: s.spans, spansOnly: true},
141+
wb: wb,
142+
}
143+
}
144+
145+
// Attrs implements the storage.EngineWithoutRW interface.
146+
func (s *spanSetEngine) Attrs() roachpb.Attributes {
147+
return s.e.Attrs()
148+
}
149+
150+
// Capacity implements the storage.EngineWithoutRW interface.
151+
func (s *spanSetEngine) Capacity() (roachpb.StoreCapacity, error) {
152+
return s.e.Capacity()
153+
}
154+
155+
// Properties implements the storage.EngineWithoutRW interface.
156+
func (s *spanSetEngine) Properties() roachpb.StoreProperties {
157+
return s.e.Properties()
158+
}
159+
160+
// ProfileSeparatedValueRetrievals implements the storage.EngineWithoutRW interface.
161+
func (s *spanSetEngine) ProfileSeparatedValueRetrievals(
162+
ctx context.Context,
163+
) (*metrics.ValueRetrievalProfile, error) {
164+
return s.e.ProfileSeparatedValueRetrievals(ctx)
165+
}
166+
167+
// Compact implements the storage.EngineWithoutRW interface.
168+
func (s *spanSetEngine) Compact(ctx context.Context) error {
169+
return s.e.Compact(ctx)
170+
}
171+
172+
// Env implements the storage.EngineWithoutRW interface.
173+
func (s *spanSetEngine) Env() *fs.Env {
174+
return s.e.Env()
175+
}
176+
177+
// Flush implements the storage.EngineWithoutRW interface.
178+
func (s *spanSetEngine) Flush() error {
179+
return s.e.Flush()
180+
}
181+
182+
// GetMetrics implements the storage.EngineWithoutRW interface.
183+
func (s *spanSetEngine) GetMetrics() storage.Metrics {
184+
return s.e.GetMetrics()
185+
}
186+
187+
// GetEncryptionRegistries implements the storage.EngineWithoutRW interface.
188+
func (s *spanSetEngine) GetEncryptionRegistries() (*fs.EncryptionRegistries, error) {
189+
return s.e.GetEncryptionRegistries()
190+
}
191+
192+
// GetEnvStats implements the storage.EngineWithoutRW interface.
193+
func (s *spanSetEngine) GetEnvStats() (*fs.EnvStats, error) {
194+
return s.e.GetEnvStats()
195+
}
196+
197+
// GetAuxiliaryDir implements the storage.EngineWithoutRW interface.
198+
func (s *spanSetEngine) GetAuxiliaryDir() string {
199+
return s.e.GetAuxiliaryDir()
200+
}
201+
202+
// IngestLocalFiles implements the storage.EngineWithoutRW interface.
203+
func (s *spanSetEngine) IngestLocalFiles(ctx context.Context, paths []string) error {
204+
return s.e.IngestLocalFiles(ctx, paths)
205+
}
206+
207+
// IngestLocalFilesWithStats implements the storage.EngineWithoutRW interface.
208+
func (s *spanSetEngine) IngestLocalFilesWithStats(
209+
ctx context.Context, paths []string,
210+
) (pebble.IngestOperationStats, error) {
211+
return s.e.IngestLocalFilesWithStats(ctx, paths)
212+
}
213+
214+
// IngestAndExciseFiles implements the storage.EngineWithoutRW interface.
215+
func (s *spanSetEngine) IngestAndExciseFiles(
216+
ctx context.Context,
217+
paths []string,
218+
shared []pebble.SharedSSTMeta,
219+
external []pebble.ExternalFile,
220+
exciseSpan roachpb.Span,
221+
) (pebble.IngestOperationStats, error) {
222+
if err := s.spans.CheckAllowed(SpanReadWrite, TrickySpan{Key: exciseSpan.Key, EndKey: exciseSpan.EndKey}); err != nil {
223+
return pebble.IngestOperationStats{}, err
224+
}
225+
return s.e.IngestAndExciseFiles(ctx, paths, shared, external, exciseSpan)
226+
}
227+
228+
// IngestExternalFiles implements the storage.EngineWithoutRW interface.
229+
func (s *spanSetEngine) IngestExternalFiles(
230+
ctx context.Context, external []pebble.ExternalFile,
231+
) (pebble.IngestOperationStats, error) {
232+
return s.e.IngestExternalFiles(ctx, external)
233+
}
234+
235+
// IngestLocalFilesToWriter implements the storage.EngineWithoutRW interface.
236+
func (s *spanSetEngine) IngestLocalFilesToWriter(
237+
ctx context.Context, paths []string, clearedSpans []roachpb.Span, writer storage.Writer,
238+
) error {
239+
return s.e.IngestLocalFilesToWriter(ctx, paths, clearedSpans, writer)
240+
}
241+
242+
// ScanStorageInternalKeys implements the storage.EngineWithoutRW interface.
243+
func (s *spanSetEngine) ScanStorageInternalKeys(
244+
start, end roachpb.Key, megabytesPerSecond int64,
245+
) ([]enginepb.StorageInternalKeysMetrics, error) {
246+
if err := s.spans.CheckAllowed(SpanReadWrite, TrickySpan{Key: start, EndKey: end}); err != nil {
247+
return nil, err
248+
}
249+
return s.e.ScanStorageInternalKeys(start, end, megabytesPerSecond)
250+
}
251+
252+
// RegisterFlushCompletedCallback implements the storage.EngineWithoutRW interface.
253+
func (s *spanSetEngine) RegisterFlushCompletedCallback(cb func()) {
254+
s.e.RegisterFlushCompletedCallback(cb)
255+
}
256+
257+
// MinVersion implements the storage.EngineWithoutRW interface.
258+
func (s *spanSetEngine) MinVersion() roachpb.Version {
259+
return s.e.MinVersion()
260+
}
261+
262+
// SetMinVersion implements the storage.EngineWithoutRW interface.
263+
func (s *spanSetEngine) SetMinVersion(version roachpb.Version) error {
264+
return s.e.SetMinVersion(version)
265+
}
266+
267+
// SetCompactionConcurrency implements the storage.EngineWithoutRW interface.
268+
func (s *spanSetEngine) SetCompactionConcurrency(n uint64) {
269+
s.e.SetCompactionConcurrency(n)
270+
}
271+
272+
// SetStoreID implements the storage.EngineWithoutRW interface.
273+
func (s *spanSetEngine) SetStoreID(ctx context.Context, storeID int32) error {
274+
return s.e.SetStoreID(ctx, storeID)
275+
}
276+
277+
// GetStoreID implements the storage.EngineWithoutRW interface.
278+
func (s *spanSetEngine) GetStoreID() (int32, error) {
279+
return s.e.GetStoreID()
280+
}
281+
282+
// RegisterDiskSlowCallback implements the storage.EngineWithoutRW interface.
283+
func (s *spanSetEngine) RegisterDiskSlowCallback(cb func(info pebble.DiskSlowInfo)) {
284+
s.e.RegisterDiskSlowCallback(cb)
285+
}
286+
287+
// RegisterLowDiskSpaceCallback implements the storage.EngineWithoutRW interface.
288+
func (s *spanSetEngine) RegisterLowDiskSpaceCallback(cb func(info pebble.LowDiskSpaceInfo)) {
289+
s.e.RegisterLowDiskSpaceCallback(cb)
290+
}
291+
292+
// GetPebbleOptions implements the storage.EngineWithoutRW interface.
293+
func (s *spanSetEngine) GetPebbleOptions() *pebble.Options {
294+
return s.e.GetPebbleOptions()
295+
}
296+
297+
// GetDiskUnhealthy implements the storage.EngineWithoutRW interface.
298+
func (s *spanSetEngine) GetDiskUnhealthy() bool {
299+
return s.e.GetDiskUnhealthy()
300+
}

0 commit comments

Comments
 (0)