Skip to content

Commit a87b6de

Browse files
committed
sql: add support for span checks and cluster checks
Previously the checks were independent and could only process individual spans. The row count check will be dependent on other checks that perform the scan of each span and will need to aggregate the row counts after all the spans have been processed. This change sets up interfaces for span-level (dependent on other checks running on the spans) and cluster-level checks (run after all spans have been processed). Part of: #155472 Epic: CRDB-55075 Release note: None
1 parent 6447ea5 commit a87b6de

File tree

7 files changed

+413
-78
lines changed

7 files changed

+413
-78
lines changed

pkg/jobs/jobspb/jobs.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,9 @@ func (r *RowLevelTTLProcessorProgress) SafeFormat(p redact.SafePrinter, _ rune)
176176
func (c CreateStatsDetails) IsAuto() bool {
177177
return c.Name == AutoStatsName || c.Name == AutoPartialStatsName
178178
}
179+
180+
// UpdateWithSpanProgress updates the InspectProgress with data from the
181+
// check-specific fields in the span progress update.
182+
func (p *InspectProgress) UpdateWithSpanProgress(prog *InspectProcessorProgress) {
183+
return
184+
}

pkg/sql/inspect/index_consistency_check.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,21 +58,6 @@ func (c *indexConsistencyCheckApplicability) AppliesTo(
5858
return spanContainsTable(c.tableID, codec, span)
5959
}
6060

61-
// checkState represents the state of an index consistency check.
62-
type checkState int
63-
64-
const (
65-
// checkNotStarted indicates Start() has not been called yet.
66-
checkNotStarted checkState = iota
67-
// checkHashMatched indicates the hash precheck passed - no corruption detected,
68-
// so the full check can be skipped.
69-
checkHashMatched
70-
// checkRunning indicates the full check is actively running and may produce more results.
71-
checkRunning
72-
// checkDone indicates the check has finished (iterator exhausted or error occurred).
73-
checkDone
74-
)
75-
7661
// indexConsistencyCheck verifies consistency between a table's primary index
7762
// and a specified secondary index by streaming rows from both sides of a
7863
// query. It reports an issue if a key exists in the primary but not the

pkg/sql/inspect/inspect_processor.go

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func (p *inspectProcessor) processSpan(
267267
asOfToUse := p.getTimestampForSpan()
268268

269269
// Only create checks that apply to this span.
270-
var checks []inspectCheck
270+
var checks inspectChecks
271271
for _, factory := range p.checkFactories {
272272
check := factory(asOfToUse)
273273
applies, err := check.AppliesTo(p.cfg.Codec, span)
@@ -278,6 +278,16 @@ func (p *inspectProcessor) processSpan(
278278
checks = append(checks, check)
279279
}
280280
}
281+
checks.sortStable()
282+
283+
// Provide the span-level checks with a reference to all the other checks.
284+
for _, check := range checks {
285+
if check, ok := check.(inspectSpanCheck); ok {
286+
if err := check.RegisterChecksForSpan(checks); err != nil {
287+
return err
288+
}
289+
}
290+
}
281291

282292
runner := inspectRunner{
283293
checks: checks,
@@ -315,8 +325,22 @@ func (p *inspectProcessor) processSpan(
315325
}
316326
}
317327

328+
progressMsg := &jobspb.InspectProcessorProgress{
329+
ChecksCompleted: 0, // No additional checks completed, just marking span done
330+
Finished: false,
331+
}
332+
333+
for _, check := range checks {
334+
if check, ok := check.(inspectSpanCheck); ok {
335+
err := check.CheckSpan(ctx, p.loggerBundle, progressMsg)
336+
if err != nil {
337+
return err
338+
}
339+
}
340+
}
341+
318342
// Report span completion for checkpointing.
319-
if err := p.sendSpanCompletionProgress(ctx, output, span, false /* finished */); err != nil {
343+
if err := p.sendSpanCompletionProgress(ctx, output, span, false /* finished */, progressMsg); err != nil {
320344
return err
321345
}
322346

@@ -364,15 +388,18 @@ func (p *inspectProcessor) getTimestampForSpan() hlc.Timestamp {
364388
// sendSpanCompletionProgress sends progress indicating a span has been completed.
365389
// This is used for checkpointing to track which spans are done.
366390
func (p *inspectProcessor) sendSpanCompletionProgress(
367-
ctx context.Context, output execinfra.RowReceiver, completedSpan roachpb.Span, finished bool,
391+
ctx context.Context,
392+
output execinfra.RowReceiver,
393+
completedSpan roachpb.Span,
394+
finished bool,
395+
progressMsg *jobspb.InspectProcessorProgress,
368396
) error {
369397
if p.spansProcessedCtr != nil {
370398
p.spansProcessedCtr.Inc(1)
371399
}
372-
progressMsg := &jobspb.InspectProcessorProgress{
373-
ChecksCompleted: 0, // No additional checks completed, just marking span done
374-
Finished: finished,
375-
}
400+
401+
progressMsg.ChecksCompleted = 0 // No additional checks completed, just marking span done
402+
progressMsg.Finished = finished
376403

377404
progressAny, err := pbtypes.MarshalAny(progressMsg)
378405
if err != nil {
@@ -416,7 +443,7 @@ func newInspectProcessor(
416443
) (execinfra.Processor, error) {
417444
execCfg := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
418445

419-
checkFactories, err := buildInspectCheckFactories(execCfg, spec)
446+
checkFactories, err := buildInspectCheckFactories(execCfg, spec.InspectDetails)
420447
if err != nil {
421448
return nil, err
422449
}
@@ -430,7 +457,7 @@ func newInspectProcessor(
430457
checkFactories: checkFactories,
431458
cfg: flowCtx.Cfg,
432459
spanSrc: newSliceSpanSource(spec.Spans),
433-
loggerBundle: getInspectLogger(execCfg, spec.JobID),
460+
loggerBundle: getInspectLogger(flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig), spec.JobID),
434461
concurrency: getProcessorConcurrency(flowCtx),
435462
clock: flowCtx.Cfg.DB.KV().Clock(),
436463
spansProcessedCtr: spansProcessedCtr,
@@ -444,10 +471,10 @@ func newInspectProcessor(
444471
// This indirection ensures that each check instance is freshly created per span,
445472
// avoiding shared state across concurrent workers.
446473
func buildInspectCheckFactories(
447-
execCfg *sql.ExecutorConfig, spec execinfrapb.InspectSpec,
474+
execCfg *sql.ExecutorConfig, details jobspb.InspectDetails,
448475
) ([]inspectCheckFactory, error) {
449-
checkFactories := make([]inspectCheckFactory, 0, len(spec.InspectDetails.Checks))
450-
for _, specCheck := range spec.InspectDetails.Checks {
476+
checkFactories := make([]inspectCheckFactory, 0, len(details.Checks))
477+
for _, specCheck := range details.Checks {
451478
tableID := specCheck.TableID
452479
indexID := specCheck.IndexID
453480
tableVersion := specCheck.TableVersion
@@ -464,7 +491,6 @@ func buildInspectCheckFactories(
464491
asOf: asOf,
465492
}
466493
})
467-
468494
default:
469495
return nil, errors.AssertionFailedf("unsupported inspect check type: %v", specCheck.Type)
470496
}

pkg/sql/inspect/inspect_processor_test.go

Lines changed: 121 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package inspect
77

88
import (
99
"context"
10-
"errors"
1110
"fmt"
1211
"testing"
1312
"time"
@@ -24,6 +23,7 @@ import (
2423
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2524
"github.com/cockroachdb/cockroach/pkg/util/log"
2625
"github.com/cockroachdb/cockroach/pkg/util/uuid"
26+
"github.com/cockroachdb/errors"
2727
"github.com/stretchr/testify/require"
2828
)
2929

@@ -205,6 +205,69 @@ func (t *testingInspectCheck) Close(context.Context) error {
205205
return nil
206206
}
207207

208+
// testingInspectCheckSpan is a test implementation of inspectCheckSpan.
209+
type testingInspectCheckSpan struct {
210+
started bool
211+
212+
spanIssues int64
213+
}
214+
215+
var _ inspectCheckApplicability = (*testingInspectCheckSpan)(nil)
216+
217+
var _ inspectSpanCheck = (*testingInspectCheckSpan)(nil)
218+
219+
func (t *testingInspectCheckSpan) AppliesTo(codec keys.SQLCodec, span roachpb.Span) (bool, error) {
220+
// For testing, assume all checks apply to all spans
221+
return true, nil
222+
}
223+
224+
func (t *testingInspectCheckSpan) AppliesToCluster() (bool, error) {
225+
// For testing it runs.
226+
return true, nil
227+
}
228+
229+
func (t *testingInspectCheckSpan) Started() bool {
230+
return t.started
231+
}
232+
233+
func (t *testingInspectCheckSpan) Start(
234+
ctx context.Context, cfg *execinfra.ServerConfig, span roachpb.Span, workerIndex int,
235+
) error {
236+
t.started = true
237+
return nil
238+
}
239+
240+
func (*testingInspectCheckSpan) Next(
241+
ctx context.Context, cfg *execinfra.ServerConfig,
242+
) (*inspectIssue, error) {
243+
return nil, nil
244+
}
245+
246+
func (*testingInspectCheckSpan) Done(ctx context.Context) bool {
247+
return true
248+
}
249+
250+
func (*testingInspectCheckSpan) Close(ctx context.Context) error {
251+
return nil
252+
}
253+
254+
func (t *testingInspectCheckSpan) RegisterChecksForSpan(checks inspectChecks) error {
255+
return nil
256+
}
257+
258+
func (t *testingInspectCheckSpan) CheckSpan(
259+
ctx context.Context, logger *inspectLoggerBundle, msg *jobspb.InspectProcessorProgress,
260+
) error {
261+
for i := t.spanIssues; i > 0; i-- {
262+
err := logger.logIssue(ctx, &inspectIssue{})
263+
if err != nil {
264+
return errors.Wrapf(err, "error logging inspect issue")
265+
}
266+
}
267+
268+
return nil
269+
}
270+
208271
// discardRowReceiver is a minimal RowReceiver that discards all data.
209272
type discardRowReceiver struct{}
210273

@@ -249,7 +312,7 @@ func runProcessorAndWait(t *testing.T, proc *inspectProcessor, expectErr bool) {
249312
// makeProcessor will create an inspect processor for test.
250313
func makeProcessor(
251314
t *testing.T,
252-
checkFactory inspectCheckFactory,
315+
checkFactories []inspectCheckFactory,
253316
src spanSource,
254317
concurrency int,
255318
asOf hlc.Timestamp,
@@ -274,7 +337,7 @@ func makeProcessor(
274337
AsOf: asOf,
275338
},
276339
},
277-
checkFactories: []inspectCheckFactory{checkFactory},
340+
checkFactories: checkFactories,
278341
cfg: flowCtx.Cfg,
279342
flowCtx: flowCtx,
280343
spanSrc: src,
@@ -358,12 +421,14 @@ func TestInspectProcessor_ControlFlow(t *testing.T) {
358421

359422
for _, tc := range tests {
360423
t.Run(tc.desc, func(t *testing.T) {
361-
factory := func(asOf hlc.Timestamp) inspectCheck {
362-
return &testingInspectCheck{
363-
configs: tc.configs,
364-
}
424+
factories := []inspectCheckFactory{
425+
func(asOf hlc.Timestamp) inspectCheck {
426+
return &testingInspectCheck{
427+
configs: tc.configs,
428+
}
429+
},
365430
}
366-
proc, _ := makeProcessor(t, factory, tc.spanSrc, len(tc.configs), hlc.Timestamp{})
431+
proc, _ := makeProcessor(t, factories, tc.spanSrc, len(tc.configs), hlc.Timestamp{})
367432
runProcessorAndWait(t, proc, tc.expectErr)
368433
})
369434
}
@@ -377,20 +442,22 @@ func TestInspectProcessor_EmitIssues(t *testing.T) {
377442
mode: spanModeNormal,
378443
maxSpans: 1,
379444
}
380-
factory := func(asOf hlc.Timestamp) inspectCheck {
381-
return &testingInspectCheck{
382-
configs: []testingCheckConfig{
383-
{
384-
mode: checkModeNone,
385-
issues: []*inspectIssue{
386-
{ErrorType: "test_error", PrimaryKey: "pk1"},
387-
{ErrorType: "test_error", PrimaryKey: "pk2"},
445+
factories := []inspectCheckFactory{
446+
func(asOf hlc.Timestamp) inspectCheck {
447+
return &testingInspectCheck{
448+
configs: []testingCheckConfig{
449+
{
450+
mode: checkModeNone,
451+
issues: []*inspectIssue{
452+
{ErrorType: "test_error", PrimaryKey: "pk1"},
453+
{ErrorType: "test_error", PrimaryKey: "pk2"},
454+
},
388455
},
389456
},
390-
},
391-
}
457+
}
458+
},
392459
}
393-
proc, logger := makeProcessor(t, factory, spanSrc, 1, hlc.Timestamp{})
460+
proc, logger := makeProcessor(t, factories, spanSrc, 1, hlc.Timestamp{})
394461

395462
runProcessorAndWait(t, proc, true /* expectErr */)
396463

@@ -439,21 +506,23 @@ func TestInspectProcessor_AsOfTime(t *testing.T) {
439506
testStartTime := time.Now()
440507

441508
var capturedTimestamp hlc.Timestamp
442-
factory := func(asOf hlc.Timestamp) inspectCheck {
443-
capturedTimestamp = asOf
444-
return &testingInspectCheck{
445-
configs: []testingCheckConfig{
446-
{
447-
mode: checkModeNone,
448-
issues: []*inspectIssue{
449-
{ErrorType: "test_error", PrimaryKey: "pk1", AOST: asOf.GoTime()},
509+
factories := []inspectCheckFactory{
510+
func(asOf hlc.Timestamp) inspectCheck {
511+
capturedTimestamp = asOf
512+
return &testingInspectCheck{
513+
configs: []testingCheckConfig{
514+
{
515+
mode: checkModeNone,
516+
issues: []*inspectIssue{
517+
{ErrorType: "test_error", PrimaryKey: "pk1", AOST: asOf.GoTime()},
518+
},
450519
},
451520
},
452-
},
453-
}
521+
}
522+
},
454523
}
455524

456-
proc, logger := makeProcessor(t, factory, spanSrc, 1, tc.asOf)
525+
proc, logger := makeProcessor(t, factories, spanSrc, 1, tc.asOf)
457526

458527
runProcessorAndWait(t, proc, true /* expectErr */)
459528

@@ -465,3 +534,25 @@ func TestInspectProcessor_AsOfTime(t *testing.T) {
465534
})
466535
}
467536
}
537+
538+
func TestInspectProcessor_SpanCheck(t *testing.T) {
539+
defer leaktest.AfterTest(t)()
540+
defer log.Scope(t).Close(t)
541+
542+
spanSrc := &testingSpanSource{
543+
mode: spanModeNormal,
544+
maxSpans: 1,
545+
}
546+
factories := []inspectCheckFactory{
547+
func(asOf hlc.Timestamp) inspectCheck {
548+
return &testingInspectCheckSpan{
549+
spanIssues: 2,
550+
}
551+
},
552+
}
553+
proc, logger := makeProcessor(t, factories, spanSrc, 1, hlc.Timestamp{})
554+
555+
runProcessorAndWait(t, proc, true /* expectErr */)
556+
557+
require.Equal(t, 2, logger.numIssuesFound())
558+
}

0 commit comments

Comments
 (0)