Skip to content

Commit 7420200

Browse files
committed
sql/inspect: make AS OF SYSTEM TIME optional in INSPECT jobs
Previously, INSPECT jobs required specifying an `AS OF SYSTEM TIME` timestamp. This commit removes that requirement. If the timestamp is left empty, the INSPECT processor will now default to using the current time when querying for index inconsistencies. A fresh timestamp will be acquired for each span. No protected timestamp record will be used in this mode. Fixes #148675 Epic: CRDB-30356 Release note: none
1 parent a2df483 commit 7420200

File tree

6 files changed

+116
-18
lines changed

6 files changed

+116
-18
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1463,6 +1463,7 @@ message InspectDetails {
14631463
repeated Check checks = 1;
14641464

14651465
// AsOf specifies the timestamp at which the inspect checks should be performed.
1466+
// If empty, the current timestamp will be used for each span processed.
14661467
util.hlc.Timestamp as_of = 2 [(gogoproto.nullable) = false];
14671468

14681469
// ProtectedTimestampRecord is the id of the protected timestamp record that

pkg/sql/inspect/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ go_test(
9595
"//pkg/testutils/skip",
9696
"//pkg/testutils/sqlutils",
9797
"//pkg/testutils/testcluster",
98+
"//pkg/util/hlc",
9899
"//pkg/util/leaktest",
99100
"//pkg/util/log",
100101
"//pkg/util/syncutil",

pkg/sql/inspect/index_consistency_check_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ func TestIndexConsistencyWithReservedWordColumns(t *testing.T) {
571571
// TODO(148365): Run INSPECT instead of SCRUB.
572572
_, err := db.Exec(`SET enable_scrub_job=true`)
573573
require.NoError(t, err)
574-
_, err = db.Query(`EXPERIMENTAL SCRUB TABLE test.reserved_table AS OF SYSTEM TIME '-1us' WITH OPTIONS INDEX ALL`)
574+
_, err = db.Query(`EXPERIMENTAL SCRUB TABLE test.reserved_table WITH OPTIONS INDEX ALL`)
575575
require.NoError(t, err, "should succeed on table with reserved word column names")
576576
require.Equal(t, 0, issueLogger.numIssuesFound(), "No issues should be found in happy path test")
577577

pkg/sql/inspect/inspect_processor.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
2121
"github.com/cockroachdb/cockroach/pkg/sql/types"
2222
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
23+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2324
"github.com/cockroachdb/cockroach/pkg/util/log"
2425
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2526
"github.com/cockroachdb/errors"
@@ -36,7 +37,7 @@ var (
3637
)
3738
)
3839

39-
type inspectCheckFactory func() inspectCheck
40+
type inspectCheckFactory func(asOf hlc.Timestamp) inspectCheck
4041

4142
type inspectProcessor struct {
4243
processorID int32
@@ -47,6 +48,7 @@ type inspectProcessor struct {
4748
spanSrc spanSource
4849
logger inspectLogger
4950
concurrency int
51+
clock *hlc.Clock
5052
}
5153

5254
var _ execinfra.Processor = (*inspectProcessor)(nil)
@@ -190,10 +192,12 @@ func getInspectLogger(flowCtx *execinfra.FlowCtx, jobID jobspb.JobID) inspectLog
190192
func (p *inspectProcessor) processSpan(
191193
ctx context.Context, span roachpb.Span, workerIndex int,
192194
) (err error) {
195+
asOfToUse := p.getTimestampForSpan()
196+
193197
// Only create checks that apply to this span
194198
var checks []inspectCheck
195199
for _, factory := range p.checkFactories {
196-
check := factory()
200+
check := factory(asOfToUse)
197201
applies, err := check.AppliesTo(p.cfg.Codec, span)
198202
if err != nil {
199203
return err
@@ -231,6 +235,16 @@ func (p *inspectProcessor) processSpan(
231235
return nil
232236
}
233237

238+
// getTimestampForSpan returns the timestamp to use for processing a span.
239+
// If AsOf is empty, it returns the current timestamp from the processor's clock.
240+
// Otherwise, it returns the specified AsOf timestamp.
241+
func (p *inspectProcessor) getTimestampForSpan() hlc.Timestamp {
242+
if p.spec.InspectDetails.AsOf.IsEmpty() {
243+
return p.clock.Now()
244+
}
245+
return p.spec.InspectDetails.AsOf
246+
}
247+
234248
// newInspectProcessor constructs a new inspectProcessor from the given InspectSpec.
235249
// It parses the spec to generate a set of inspectCheck factories, sets up the span source,
236250
// and wires in logging and concurrency controls.
@@ -240,9 +254,6 @@ func (p *inspectProcessor) processSpan(
240254
func newInspectProcessor(
241255
ctx context.Context, flowCtx *execinfra.FlowCtx, processorID int32, spec execinfrapb.InspectSpec,
242256
) (execinfra.Processor, error) {
243-
if spec.InspectDetails.AsOf.IsEmpty() {
244-
return nil, errors.AssertionFailedf("ASOF time must be set for INSPECT")
245-
}
246257
checkFactories, err := buildInspectCheckFactories(flowCtx, spec)
247258
if err != nil {
248259
return nil, err
@@ -257,6 +268,7 @@ func newInspectProcessor(
257268
spanSrc: newSliceSpanSource(spec.Spans),
258269
logger: getInspectLogger(flowCtx, spec.JobID),
259270
concurrency: getProcessorConcurrency(flowCtx),
271+
clock: flowCtx.Cfg.DB.KV().Clock(),
260272
}, nil
261273
}
262274

@@ -275,14 +287,14 @@ func buildInspectCheckFactories(
275287
indexID := specCheck.IndexID
276288
switch specCheck.Type {
277289
case jobspb.InspectCheckIndexConsistency:
278-
checkFactories = append(checkFactories, func() inspectCheck {
290+
checkFactories = append(checkFactories, func(asOf hlc.Timestamp) inspectCheck {
279291
return &indexConsistencyCheck{
280292
indexConsistencyCheckApplicability: indexConsistencyCheckApplicability{
281293
tableID: tableID,
282294
},
283295
flowCtx: flowCtx,
284296
indexID: indexID,
285-
asOf: spec.InspectDetails.AsOf,
297+
asOf: asOf,
286298
}
287299
})
288300

pkg/sql/inspect/inspect_processor_test.go

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ import (
1212
"testing"
1313
"time"
1414

15+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1516
"github.com/cockroachdb/cockroach/pkg/keys"
1617
"github.com/cockroachdb/cockroach/pkg/roachpb"
1718
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1819
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
1920
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
21+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2022
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2123
"github.com/cockroachdb/cockroach/pkg/util/log"
2224
"github.com/stretchr/testify/require"
@@ -228,19 +230,29 @@ func runProcessorAndWait(t *testing.T, proc *inspectProcessor, expectErr bool) {
228230

229231
// makeProcessor will create an inspect processor for test.
230232
func makeProcessor(
231-
t *testing.T, checkFactory inspectCheckFactory, src spanSource, concurrency int,
233+
t *testing.T,
234+
checkFactory inspectCheckFactory,
235+
src spanSource,
236+
concurrency int,
237+
asOf hlc.Timestamp,
232238
) (*inspectProcessor, *testIssueCollector) {
233239
t.Helper()
240+
clock := hlc.NewClockForTesting(nil)
234241
logger := &testIssueCollector{}
235242
proc := &inspectProcessor{
236-
spec: execinfrapb.InspectSpec{},
243+
spec: execinfrapb.InspectSpec{
244+
InspectDetails: jobspb.InspectDetails{
245+
AsOf: asOf,
246+
},
247+
},
237248
checkFactories: []inspectCheckFactory{checkFactory},
238249
cfg: &execinfra.ServerConfig{
239250
Settings: cluster.MakeTestingClusterSettings(),
240251
},
241252
spanSrc: src,
242253
logger: logger,
243254
concurrency: concurrency,
255+
clock: clock,
244256
}
245257
return proc, logger
246258
}
@@ -318,12 +330,12 @@ func TestInspectProcessor_ControlFlow(t *testing.T) {
318330

319331
for _, tc := range tests {
320332
t.Run(tc.desc, func(t *testing.T) {
321-
factory := func() inspectCheck {
333+
factory := func(asOf hlc.Timestamp) inspectCheck {
322334
return &testingInspectCheck{
323335
configs: tc.configs,
324336
}
325337
}
326-
proc, _ := makeProcessor(t, factory, tc.spanSrc, len(tc.configs))
338+
proc, _ := makeProcessor(t, factory, tc.spanSrc, len(tc.configs), hlc.Timestamp{})
327339
runProcessorAndWait(t, proc, tc.expectErr)
328340
})
329341
}
@@ -337,7 +349,7 @@ func TestInspectProcessor_EmitIssues(t *testing.T) {
337349
mode: spanModeNormal,
338350
maxSpans: 1,
339351
}
340-
factory := func() inspectCheck {
352+
factory := func(asOf hlc.Timestamp) inspectCheck {
341353
return &testingInspectCheck{
342354
configs: []testingCheckConfig{
343355
{
@@ -350,9 +362,78 @@ func TestInspectProcessor_EmitIssues(t *testing.T) {
350362
},
351363
}
352364
}
353-
proc, logger := makeProcessor(t, factory, spanSrc, 1)
365+
proc, logger := makeProcessor(t, factory, spanSrc, 1, hlc.Timestamp{})
354366

355367
runProcessorAndWait(t, proc, true /* expectErr */)
356368

357369
require.Equal(t, 2, logger.numIssuesFound())
358370
}
371+
372+
func TestInspectProcessor_AsOfTime(t *testing.T) {
373+
defer leaktest.AfterTest(t)()
374+
defer log.Scope(t).Close(t)
375+
376+
tests := []struct {
377+
name string
378+
asOf hlc.Timestamp
379+
verifyTimestamp func(t *testing.T, actualTime time.Time, capturedTimestamp hlc.Timestamp, testStartTime time.Time)
380+
}{
381+
{
382+
name: "empty timestamp uses clock time",
383+
asOf: hlc.Timestamp{}, // Empty timestamp
384+
verifyTimestamp: func(t *testing.T, actualTime time.Time, capturedTimestamp hlc.Timestamp, testStartTime time.Time) {
385+
// Verify that the AOST time in the issue is >= the test start time
386+
require.True(t, actualTime.After(testStartTime) || actualTime.Equal(testStartTime),
387+
"AOST time (%v) should be >= test start time (%v)", actualTime, testStartTime)
388+
// Also verify the timestamp was not empty
389+
require.False(t, capturedTimestamp.IsEmpty(),
390+
"Captured timestamp should not be empty when AsOf is not specified")
391+
},
392+
},
393+
{
394+
name: "specific timestamp is preserved",
395+
asOf: hlc.Timestamp{WallTime: 12345},
396+
verifyTimestamp: func(t *testing.T, actualTime time.Time, capturedTimestamp hlc.Timestamp, testStartTime time.Time) {
397+
// Verify that the exact timestamp is preserved
398+
require.Equal(t, hlc.Timestamp{WallTime: 12345}.GoTime(), actualTime)
399+
},
400+
},
401+
}
402+
403+
for _, tc := range tests {
404+
t.Run(tc.name, func(t *testing.T) {
405+
spanSrc := &testingSpanSource{
406+
mode: spanModeNormal,
407+
maxSpans: 1,
408+
}
409+
410+
// Record start time before creating the processor
411+
testStartTime := time.Now()
412+
413+
var capturedTimestamp hlc.Timestamp
414+
factory := func(asOf hlc.Timestamp) inspectCheck {
415+
capturedTimestamp = asOf
416+
return &testingInspectCheck{
417+
configs: []testingCheckConfig{
418+
{
419+
mode: checkModeNone,
420+
issues: []*inspectIssue{
421+
{ErrorType: "test_error", PrimaryKey: "pk1", AOST: asOf.GoTime()},
422+
},
423+
},
424+
},
425+
}
426+
}
427+
428+
proc, logger := makeProcessor(t, factory, spanSrc, 1, tc.asOf)
429+
430+
runProcessorAndWait(t, proc, true /* expectErr */)
431+
432+
require.Equal(t, 1, logger.numIssuesFound())
433+
434+
// Run the test-specific timestamp verification
435+
actualTime := logger.issue(0).AOST
436+
tc.verifyTimestamp(t, actualTime, capturedTimestamp, testStartTime)
437+
})
438+
}
439+
}

pkg/sql/scrub.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,14 @@ func (n *scrubNode) startScrubTable(
227227
return pgerror.Newf(pgcode.InvalidTransactionState,
228228
"cannot run within a multi-statement transaction")
229229
}
230-
if !hasTS {
231-
return pgerror.Newf(pgcode.Syntax,
232-
"SCRUB with inspect jobs requires AS OF SYSTEM TIME")
230+
231+
// If AS OF SYSTEM TIME is not provided, we pass in an empty timestamp to
232+
// force the job to use the current time.
233+
var asOfForJob hlc.Timestamp
234+
if hasTS {
235+
asOfForJob = ts
233236
}
234-
return n.runScrubTableJob(ctx, p, tableDesc, ts)
237+
return n.runScrubTableJob(ctx, p, tableDesc, asOfForJob)
235238
}
236239
// Process SCRUB options. These are only present during a SCRUB TABLE
237240
// statement.

0 commit comments

Comments
 (0)