Skip to content

Commit 7966099

Browse files
committed
sql: add support for span checks and cluster checks
Previously the checks were independent and could only process individual spans. The row count check will be dependent on other checks that perform the scan of each span and will need to aggregate the row counts after all the spans have been processed. This change sets up interfaces for span-level (dependent on other checks running on the spans) and cluster-level checks (run after all spans have been processed). Part of: #155472 Epic: CRDB-55075 Release note: None
1 parent 65de6b4 commit 7966099

File tree

7 files changed

+390
-80
lines changed

7 files changed

+390
-80
lines changed

pkg/sql/inspect/index_consistency_check.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,20 +58,9 @@ func (c *indexConsistencyCheckApplicability) AppliesTo(
5858
return spanContainsTable(c.tableID, codec, span)
5959
}
6060

61-
// checkState represents the state of an index consistency check.
62-
type checkState int
63-
64-
const (
65-
// checkNotStarted indicates Start() has not been called yet.
66-
checkNotStarted checkState = iota
67-
// checkHashMatched indicates the hash precheck passed - no corruption detected,
68-
// so the full check can be skipped.
69-
checkHashMatched
70-
// checkRunning indicates the full check is actively running and may produce more results.
71-
checkRunning
72-
// checkDone indicates the check has finished (iterator exhausted or error occurred).
73-
checkDone
74-
)
61+
func (c *indexConsistencyCheckApplicability) IsSpanLevel() bool {
62+
return false
63+
}
7564

7665
// indexConsistencyCheck verifies consistency between a table's primary index
7766
// and a specified secondary index by streaming rows from both sides of a

pkg/sql/inspect/inspect_processor.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (p *inspectProcessor) processSpan(
274274
asOfToUse := p.getTimestampForSpan()
275275

276276
// Only create checks that apply to this span.
277-
var checks []inspectCheck
277+
var checks inspectChecks
278278
for _, factory := range p.checkFactories {
279279
check := factory(asOfToUse)
280280
applies, err := check.AppliesTo(p.cfg.Codec, span)
@@ -322,8 +322,20 @@ func (p *inspectProcessor) processSpan(
322322
}
323323
}
324324

325+
// Process the span-level checks.
326+
progressMsg := &jobspb.InspectProcessorProgress{}
327+
for _, check := range checks {
328+
if check, ok := check.(inspectSpanCheck); ok {
329+
progressMsg.ChecksCompleted++
330+
err := check.CheckSpan(ctx, checks, p.loggerBundle, progressMsg)
331+
if err != nil {
332+
return err
333+
}
334+
}
335+
}
336+
325337
// Report span completion for checkpointing.
326-
if err := p.sendSpanCompletionProgress(ctx, output, span, false /* finished */); err != nil {
338+
if err := p.sendSpanCompletionProgress(ctx, output, span, progressMsg); err != nil {
327339
return err
328340
}
329341

@@ -371,15 +383,14 @@ func (p *inspectProcessor) getTimestampForSpan() hlc.Timestamp {
371383
// sendSpanCompletionProgress sends progress indicating a span has been completed.
372384
// This is used for checkpointing to track which spans are done.
373385
func (p *inspectProcessor) sendSpanCompletionProgress(
374-
ctx context.Context, output execinfra.RowReceiver, completedSpan roachpb.Span, finished bool,
386+
ctx context.Context,
387+
output execinfra.RowReceiver,
388+
completedSpan roachpb.Span,
389+
progressMsg *jobspb.InspectProcessorProgress,
375390
) error {
376391
if p.spansProcessedCtr != nil {
377392
p.spansProcessedCtr.Inc(1)
378393
}
379-
progressMsg := &jobspb.InspectProcessorProgress{
380-
ChecksCompleted: 0, // No additional checks completed, just marking span done
381-
Finished: finished,
382-
}
383394

384395
progressAny, err := pbtypes.MarshalAny(progressMsg)
385396
if err != nil {
@@ -393,7 +404,7 @@ func (p *inspectProcessor) sendSpanCompletionProgress(
393404
NodeID: p.flowCtx.NodeID.SQLInstanceID(),
394405
FlowID: p.flowCtx.ID,
395406
ProcessorID: p.processorID,
396-
Drained: finished,
407+
Drained: progressMsg.Finished,
397408
},
398409
}
399410

@@ -423,7 +434,7 @@ func newInspectProcessor(
423434
) (execinfra.Processor, error) {
424435
execCfg := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
425436

426-
checkFactories, err := buildInspectCheckFactories(execCfg, spec)
437+
checkFactories, err := buildInspectCheckFactories(execCfg, spec.InspectDetails)
427438
if err != nil {
428439
return nil, err
429440
}
@@ -438,7 +449,7 @@ func newInspectProcessor(
438449
checkFactories: checkFactories,
439450
cfg: flowCtx.Cfg,
440451
spanSrc: newSliceSpanSource(spec.Spans),
441-
loggerBundle: getInspectLogger(execCfg, spec.JobID),
452+
loggerBundle: getInspectLogger(flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig), spec.JobID),
442453
concurrency: getProcessorConcurrency(flowCtx),
443454
clock: flowCtx.Cfg.DB.KV().Clock(),
444455
spansProcessedCtr: spansProcessedCtr,
@@ -453,10 +464,10 @@ func newInspectProcessor(
453464
// This indirection ensures that each check instance is freshly created per span,
454465
// avoiding shared state across concurrent workers.
455466
func buildInspectCheckFactories(
456-
execCfg *sql.ExecutorConfig, spec execinfrapb.InspectSpec,
467+
execCfg *sql.ExecutorConfig, details jobspb.InspectDetails,
457468
) ([]inspectCheckFactory, error) {
458-
checkFactories := make([]inspectCheckFactory, 0, len(spec.InspectDetails.Checks))
459-
for _, specCheck := range spec.InspectDetails.Checks {
469+
checkFactories := make([]inspectCheckFactory, 0, len(details.Checks))
470+
for _, specCheck := range details.Checks {
460471
tableID := specCheck.TableID
461472
indexID := specCheck.IndexID
462473
tableVersion := specCheck.TableVersion
@@ -473,7 +484,6 @@ func buildInspectCheckFactories(
473484
asOf: asOf,
474485
}
475486
})
476-
477487
default:
478488
return nil, errors.AssertionFailedf("unsupported inspect check type: %v", specCheck.Type)
479489
}

pkg/sql/inspect/inspect_processor_test.go

Lines changed: 129 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package inspect
77

88
import (
99
"context"
10-
"errors"
1110
"fmt"
1211
"testing"
1312
"time"
@@ -24,6 +23,7 @@ import (
2423
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2524
"github.com/cockroachdb/cockroach/pkg/util/log"
2625
"github.com/cockroachdb/cockroach/pkg/util/uuid"
26+
"github.com/cockroachdb/errors"
2727
"github.com/stretchr/testify/require"
2828
)
2929

@@ -126,6 +126,10 @@ func (t *testingInspectCheck) AppliesTo(codec keys.SQLCodec, span roachpb.Span)
126126
return true, nil
127127
}
128128

129+
func (c *testingInspectCheck) IsSpanLevel() bool {
130+
return false
131+
}
132+
129133
// Started implements the inspectCheck interface.
130134
func (t *testingInspectCheck) Started() bool {
131135
return t.started
@@ -205,6 +209,73 @@ func (t *testingInspectCheck) Close(context.Context) error {
205209
return nil
206210
}
207211

212+
// testingInspectCheckSpan is a test implementation of inspectCheckSpan.
213+
type testingInspectCheckSpan struct {
214+
started bool
215+
216+
spanIssues int64
217+
}
218+
219+
var _ inspectCheckApplicability = (*testingInspectCheckSpan)(nil)
220+
221+
var _ inspectSpanCheck = (*testingInspectCheckSpan)(nil)
222+
223+
func (t *testingInspectCheckSpan) AppliesTo(codec keys.SQLCodec, span roachpb.Span) (bool, error) {
224+
// For testing, assume all checks apply to all spans
225+
return true, nil
226+
}
227+
228+
func (c *testingInspectCheckSpan) IsSpanLevel() bool {
229+
return true
230+
}
231+
232+
func (t *testingInspectCheckSpan) AppliesToCluster() (bool, error) {
233+
// For testing it runs.
234+
return true, nil
235+
}
236+
237+
func (t *testingInspectCheckSpan) Started() bool {
238+
return t.started
239+
}
240+
241+
func (t *testingInspectCheckSpan) Start(
242+
ctx context.Context, cfg *execinfra.ServerConfig, span roachpb.Span, workerIndex int,
243+
) error {
244+
t.started = true
245+
return nil
246+
}
247+
248+
func (*testingInspectCheckSpan) Next(
249+
ctx context.Context, cfg *execinfra.ServerConfig,
250+
) (*inspectIssue, error) {
251+
return nil, nil
252+
}
253+
254+
func (*testingInspectCheckSpan) Done(ctx context.Context) bool {
255+
return true
256+
}
257+
258+
func (*testingInspectCheckSpan) Close(ctx context.Context) error {
259+
return nil
260+
}
261+
262+
func (t *testingInspectCheckSpan) RegisterChecksForSpan(checks inspectChecks) error {
263+
return nil
264+
}
265+
266+
func (t *testingInspectCheckSpan) CheckSpan(
267+
ctx context.Context, logger *inspectLoggerBundle, msg *jobspb.InspectProcessorProgress,
268+
) error {
269+
for i := t.spanIssues; i > 0; i-- {
270+
err := logger.logIssue(ctx, &inspectIssue{})
271+
if err != nil {
272+
return errors.Wrapf(err, "error logging inspect issue")
273+
}
274+
}
275+
276+
return nil
277+
}
278+
208279
// discardRowReceiver is a minimal RowReceiver that discards all data.
209280
type discardRowReceiver struct{}
210281

@@ -249,7 +320,7 @@ func runProcessorAndWait(t *testing.T, proc *inspectProcessor, expectErr bool) {
249320
// makeProcessor will create an inspect processor for test.
250321
func makeProcessor(
251322
t *testing.T,
252-
checkFactory inspectCheckFactory,
323+
checkFactories []inspectCheckFactory,
253324
src spanSource,
254325
concurrency int,
255326
asOf hlc.Timestamp,
@@ -274,7 +345,7 @@ func makeProcessor(
274345
AsOf: asOf,
275346
},
276347
},
277-
checkFactories: []inspectCheckFactory{checkFactory},
348+
checkFactories: checkFactories,
278349
cfg: flowCtx.Cfg,
279350
flowCtx: flowCtx,
280351
spanSrc: src,
@@ -358,12 +429,14 @@ func TestInspectProcessor_ControlFlow(t *testing.T) {
358429

359430
for _, tc := range tests {
360431
t.Run(tc.desc, func(t *testing.T) {
361-
factory := func(asOf hlc.Timestamp) inspectCheck {
362-
return &testingInspectCheck{
363-
configs: tc.configs,
364-
}
432+
factories := []inspectCheckFactory{
433+
func(asOf hlc.Timestamp) inspectCheck {
434+
return &testingInspectCheck{
435+
configs: tc.configs,
436+
}
437+
},
365438
}
366-
proc, _ := makeProcessor(t, factory, tc.spanSrc, len(tc.configs), hlc.Timestamp{})
439+
proc, _ := makeProcessor(t, factories, tc.spanSrc, len(tc.configs), hlc.Timestamp{})
367440
runProcessorAndWait(t, proc, tc.expectErr)
368441
})
369442
}
@@ -377,20 +450,22 @@ func TestInspectProcessor_EmitIssues(t *testing.T) {
377450
mode: spanModeNormal,
378451
maxSpans: 1,
379452
}
380-
factory := func(asOf hlc.Timestamp) inspectCheck {
381-
return &testingInspectCheck{
382-
configs: []testingCheckConfig{
383-
{
384-
mode: checkModeNone,
385-
issues: []*inspectIssue{
386-
{ErrorType: "test_error", PrimaryKey: "pk1"},
387-
{ErrorType: "test_error", PrimaryKey: "pk2"},
453+
factories := []inspectCheckFactory{
454+
func(asOf hlc.Timestamp) inspectCheck {
455+
return &testingInspectCheck{
456+
configs: []testingCheckConfig{
457+
{
458+
mode: checkModeNone,
459+
issues: []*inspectIssue{
460+
{ErrorType: "test_error", PrimaryKey: "pk1"},
461+
{ErrorType: "test_error", PrimaryKey: "pk2"},
462+
},
388463
},
389464
},
390-
},
391-
}
465+
}
466+
},
392467
}
393-
proc, logger := makeProcessor(t, factory, spanSrc, 1, hlc.Timestamp{})
468+
proc, logger := makeProcessor(t, factories, spanSrc, 1, hlc.Timestamp{})
394469

395470
runProcessorAndWait(t, proc, true /* expectErr */)
396471

@@ -439,21 +514,23 @@ func TestInspectProcessor_AsOfTime(t *testing.T) {
439514
testStartTime := time.Now()
440515

441516
var capturedTimestamp hlc.Timestamp
442-
factory := func(asOf hlc.Timestamp) inspectCheck {
443-
capturedTimestamp = asOf
444-
return &testingInspectCheck{
445-
configs: []testingCheckConfig{
446-
{
447-
mode: checkModeNone,
448-
issues: []*inspectIssue{
449-
{ErrorType: "test_error", PrimaryKey: "pk1", AOST: asOf.GoTime()},
517+
factories := []inspectCheckFactory{
518+
func(asOf hlc.Timestamp) inspectCheck {
519+
capturedTimestamp = asOf
520+
return &testingInspectCheck{
521+
configs: []testingCheckConfig{
522+
{
523+
mode: checkModeNone,
524+
issues: []*inspectIssue{
525+
{ErrorType: "test_error", PrimaryKey: "pk1", AOST: asOf.GoTime()},
526+
},
450527
},
451528
},
452-
},
453-
}
529+
}
530+
},
454531
}
455532

456-
proc, logger := makeProcessor(t, factory, spanSrc, 1, tc.asOf)
533+
proc, logger := makeProcessor(t, factories, spanSrc, 1, tc.asOf)
457534

458535
runProcessorAndWait(t, proc, true /* expectErr */)
459536

@@ -465,3 +542,25 @@ func TestInspectProcessor_AsOfTime(t *testing.T) {
465542
})
466543
}
467544
}
545+
546+
func TestInspectProcessor_SpanCheck(t *testing.T) {
547+
defer leaktest.AfterTest(t)()
548+
defer log.Scope(t).Close(t)
549+
550+
spanSrc := &testingSpanSource{
551+
mode: spanModeNormal,
552+
maxSpans: 1,
553+
}
554+
factories := []inspectCheckFactory{
555+
func(asOf hlc.Timestamp) inspectCheck {
556+
return &testingInspectCheckSpan{
557+
spanIssues: 2,
558+
}
559+
},
560+
}
561+
proc, logger := makeProcessor(t, factories, spanSrc, 1, hlc.Timestamp{})
562+
563+
runProcessorAndWait(t, proc, true /* expectErr */)
564+
565+
require.Equal(t, 2, logger.numIssuesFound())
566+
}

0 commit comments

Comments
 (0)