Skip to content

Commit f3c24cf

Browse files
craig[bot]spilchennicktrav
committed
154010: sql/inspect: make AS OF SYSTEM TIME optional in INSPECT jobs r=spilchen a=spilchen Previously, INSPECT jobs required specifying an `AS OF SYSTEM TIME` timestamp. This commit removes that requirement. If the timestamp is left empty, the INSPECT processor will now default to using the current time when querying for index inconsistencies. A fresh timestamp will be acquired for each span. No protected timestamp record will be used in this mode. Fixes #148675 Epic: CRDB-30356 Release note: none 154071: startup: unskip TestStartupFailureRandomRange; disable UA improve logs r=rickystewart a=nicktrav TestStartupFailureRandomRange is marked as a nightly-only test. However, given the intricacies of how various environment variables are passed into the test infrastructure, this test is effectively never run. Update the skip logic to build on top of #153135, allowing the test to be run in nightly-only stress runs. Update the test to disable multi-tenant mode. This currently causes the test to fail. Fixes #123908. Release note: None. Epic: None. Co-authored-by: Matt Spilchen <[email protected]> Co-authored-by: Nick Travers <[email protected]>
3 parents 755c186 + b3a82ac + 1e8d027 commit f3c24cf

File tree

7 files changed

+136
-29
lines changed

7 files changed

+136
-29
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1483,6 +1483,7 @@ message InspectDetails {
14831483
repeated Check checks = 1;
14841484

14851485
// AsOf specifies the timestamp at which the inspect checks should be performed.
1486+
// If empty, the current timestamp will be used for each span processed.
14861487
util.hlc.Timestamp as_of = 2 [(gogoproto.nullable) = false];
14871488

14881489
// ProtectedTimestampRecord is the id of the protected timestamp record that

pkg/sql/inspect/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ go_test(
9595
"//pkg/testutils/skip",
9696
"//pkg/testutils/sqlutils",
9797
"//pkg/testutils/testcluster",
98+
"//pkg/util/hlc",
9899
"//pkg/util/leaktest",
99100
"//pkg/util/log",
100101
"//pkg/util/syncutil",

pkg/sql/inspect/index_consistency_check_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ func TestIndexConsistencyWithReservedWordColumns(t *testing.T) {
571571
// TODO(148365): Run INSPECT instead of SCRUB.
572572
_, err := db.Exec(`SET enable_scrub_job=true`)
573573
require.NoError(t, err)
574-
_, err = db.Query(`EXPERIMENTAL SCRUB TABLE test.reserved_table AS OF SYSTEM TIME '-1us' WITH OPTIONS INDEX ALL`)
574+
_, err = db.Query(`EXPERIMENTAL SCRUB TABLE test.reserved_table WITH OPTIONS INDEX ALL`)
575575
require.NoError(t, err, "should succeed on table with reserved word column names")
576576
require.Equal(t, 0, issueLogger.numIssuesFound(), "No issues should be found in happy path test")
577577

pkg/sql/inspect/inspect_processor.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
2121
"github.com/cockroachdb/cockroach/pkg/sql/types"
2222
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
23+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2324
"github.com/cockroachdb/cockroach/pkg/util/log"
2425
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2526
"github.com/cockroachdb/errors"
@@ -36,7 +37,7 @@ var (
3637
)
3738
)
3839

39-
type inspectCheckFactory func() inspectCheck
40+
type inspectCheckFactory func(asOf hlc.Timestamp) inspectCheck
4041

4142
type inspectProcessor struct {
4243
processorID int32
@@ -47,6 +48,7 @@ type inspectProcessor struct {
4748
spanSrc spanSource
4849
logger inspectLogger
4950
concurrency int
51+
clock *hlc.Clock
5052
}
5153

5254
var _ execinfra.Processor = (*inspectProcessor)(nil)
@@ -190,10 +192,12 @@ func getInspectLogger(flowCtx *execinfra.FlowCtx, jobID jobspb.JobID) inspectLog
190192
func (p *inspectProcessor) processSpan(
191193
ctx context.Context, span roachpb.Span, workerIndex int,
192194
) (err error) {
195+
asOfToUse := p.getTimestampForSpan()
196+
193197
// Only create checks that apply to this span
194198
var checks []inspectCheck
195199
for _, factory := range p.checkFactories {
196-
check := factory()
200+
check := factory(asOfToUse)
197201
applies, err := check.AppliesTo(p.cfg.Codec, span)
198202
if err != nil {
199203
return err
@@ -231,6 +235,16 @@ func (p *inspectProcessor) processSpan(
231235
return nil
232236
}
233237

238+
// getTimestampForSpan returns the timestamp to use for processing a span.
239+
// If AsOf is empty, it returns the current timestamp from the processor's clock.
240+
// Otherwise, it returns the specified AsOf timestamp.
241+
func (p *inspectProcessor) getTimestampForSpan() hlc.Timestamp {
242+
if p.spec.InspectDetails.AsOf.IsEmpty() {
243+
return p.clock.Now()
244+
}
245+
return p.spec.InspectDetails.AsOf
246+
}
247+
234248
// newInspectProcessor constructs a new inspectProcessor from the given InspectSpec.
235249
// It parses the spec to generate a set of inspectCheck factories, sets up the span source,
236250
// and wires in logging and concurrency controls.
@@ -240,9 +254,6 @@ func (p *inspectProcessor) processSpan(
240254
func newInspectProcessor(
241255
ctx context.Context, flowCtx *execinfra.FlowCtx, processorID int32, spec execinfrapb.InspectSpec,
242256
) (execinfra.Processor, error) {
243-
if spec.InspectDetails.AsOf.IsEmpty() {
244-
return nil, errors.AssertionFailedf("ASOF time must be set for INSPECT")
245-
}
246257
checkFactories, err := buildInspectCheckFactories(flowCtx, spec)
247258
if err != nil {
248259
return nil, err
@@ -257,6 +268,7 @@ func newInspectProcessor(
257268
spanSrc: newSliceSpanSource(spec.Spans),
258269
logger: getInspectLogger(flowCtx, spec.JobID),
259270
concurrency: getProcessorConcurrency(flowCtx),
271+
clock: flowCtx.Cfg.DB.KV().Clock(),
260272
}, nil
261273
}
262274

@@ -275,14 +287,14 @@ func buildInspectCheckFactories(
275287
indexID := specCheck.IndexID
276288
switch specCheck.Type {
277289
case jobspb.InspectCheckIndexConsistency:
278-
checkFactories = append(checkFactories, func() inspectCheck {
290+
checkFactories = append(checkFactories, func(asOf hlc.Timestamp) inspectCheck {
279291
return &indexConsistencyCheck{
280292
indexConsistencyCheckApplicability: indexConsistencyCheckApplicability{
281293
tableID: tableID,
282294
},
283295
flowCtx: flowCtx,
284296
indexID: indexID,
285-
asOf: spec.InspectDetails.AsOf,
297+
asOf: asOf,
286298
}
287299
})
288300

pkg/sql/inspect/inspect_processor_test.go

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ import (
1212
"testing"
1313
"time"
1414

15+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1516
"github.com/cockroachdb/cockroach/pkg/keys"
1617
"github.com/cockroachdb/cockroach/pkg/roachpb"
1718
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1819
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
1920
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
21+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2022
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2123
"github.com/cockroachdb/cockroach/pkg/util/log"
2224
"github.com/stretchr/testify/require"
@@ -228,19 +230,29 @@ func runProcessorAndWait(t *testing.T, proc *inspectProcessor, expectErr bool) {
228230

229231
// makeProcessor will create an inspect processor for test.
230232
func makeProcessor(
231-
t *testing.T, checkFactory inspectCheckFactory, src spanSource, concurrency int,
233+
t *testing.T,
234+
checkFactory inspectCheckFactory,
235+
src spanSource,
236+
concurrency int,
237+
asOf hlc.Timestamp,
232238
) (*inspectProcessor, *testIssueCollector) {
233239
t.Helper()
240+
clock := hlc.NewClockForTesting(nil)
234241
logger := &testIssueCollector{}
235242
proc := &inspectProcessor{
236-
spec: execinfrapb.InspectSpec{},
243+
spec: execinfrapb.InspectSpec{
244+
InspectDetails: jobspb.InspectDetails{
245+
AsOf: asOf,
246+
},
247+
},
237248
checkFactories: []inspectCheckFactory{checkFactory},
238249
cfg: &execinfra.ServerConfig{
239250
Settings: cluster.MakeTestingClusterSettings(),
240251
},
241252
spanSrc: src,
242253
logger: logger,
243254
concurrency: concurrency,
255+
clock: clock,
244256
}
245257
return proc, logger
246258
}
@@ -318,12 +330,12 @@ func TestInspectProcessor_ControlFlow(t *testing.T) {
318330

319331
for _, tc := range tests {
320332
t.Run(tc.desc, func(t *testing.T) {
321-
factory := func() inspectCheck {
333+
factory := func(asOf hlc.Timestamp) inspectCheck {
322334
return &testingInspectCheck{
323335
configs: tc.configs,
324336
}
325337
}
326-
proc, _ := makeProcessor(t, factory, tc.spanSrc, len(tc.configs))
338+
proc, _ := makeProcessor(t, factory, tc.spanSrc, len(tc.configs), hlc.Timestamp{})
327339
runProcessorAndWait(t, proc, tc.expectErr)
328340
})
329341
}
@@ -337,7 +349,7 @@ func TestInspectProcessor_EmitIssues(t *testing.T) {
337349
mode: spanModeNormal,
338350
maxSpans: 1,
339351
}
340-
factory := func() inspectCheck {
352+
factory := func(asOf hlc.Timestamp) inspectCheck {
341353
return &testingInspectCheck{
342354
configs: []testingCheckConfig{
343355
{
@@ -350,9 +362,78 @@ func TestInspectProcessor_EmitIssues(t *testing.T) {
350362
},
351363
}
352364
}
353-
proc, logger := makeProcessor(t, factory, spanSrc, 1)
365+
proc, logger := makeProcessor(t, factory, spanSrc, 1, hlc.Timestamp{})
354366

355367
runProcessorAndWait(t, proc, true /* expectErr */)
356368

357369
require.Equal(t, 2, logger.numIssuesFound())
358370
}
371+
372+
func TestInspectProcessor_AsOfTime(t *testing.T) {
373+
defer leaktest.AfterTest(t)()
374+
defer log.Scope(t).Close(t)
375+
376+
tests := []struct {
377+
name string
378+
asOf hlc.Timestamp
379+
verifyTimestamp func(t *testing.T, actualTime time.Time, capturedTimestamp hlc.Timestamp, testStartTime time.Time)
380+
}{
381+
{
382+
name: "empty timestamp uses clock time",
383+
asOf: hlc.Timestamp{}, // Empty timestamp
384+
verifyTimestamp: func(t *testing.T, actualTime time.Time, capturedTimestamp hlc.Timestamp, testStartTime time.Time) {
385+
// Verify that the AOST time in the issue is >= the test start time
386+
require.True(t, actualTime.After(testStartTime) || actualTime.Equal(testStartTime),
387+
"AOST time (%v) should be >= test start time (%v)", actualTime, testStartTime)
388+
// Also verify the timestamp was not empty
389+
require.False(t, capturedTimestamp.IsEmpty(),
390+
"Captured timestamp should not be empty when AsOf is not specified")
391+
},
392+
},
393+
{
394+
name: "specific timestamp is preserved",
395+
asOf: hlc.Timestamp{WallTime: 12345},
396+
verifyTimestamp: func(t *testing.T, actualTime time.Time, capturedTimestamp hlc.Timestamp, testStartTime time.Time) {
397+
// Verify that the exact timestamp is preserved
398+
require.Equal(t, hlc.Timestamp{WallTime: 12345}.GoTime(), actualTime)
399+
},
400+
},
401+
}
402+
403+
for _, tc := range tests {
404+
t.Run(tc.name, func(t *testing.T) {
405+
spanSrc := &testingSpanSource{
406+
mode: spanModeNormal,
407+
maxSpans: 1,
408+
}
409+
410+
// Record start time before creating the processor
411+
testStartTime := time.Now()
412+
413+
var capturedTimestamp hlc.Timestamp
414+
factory := func(asOf hlc.Timestamp) inspectCheck {
415+
capturedTimestamp = asOf
416+
return &testingInspectCheck{
417+
configs: []testingCheckConfig{
418+
{
419+
mode: checkModeNone,
420+
issues: []*inspectIssue{
421+
{ErrorType: "test_error", PrimaryKey: "pk1", AOST: asOf.GoTime()},
422+
},
423+
},
424+
},
425+
}
426+
}
427+
428+
proc, logger := makeProcessor(t, factory, spanSrc, 1, tc.asOf)
429+
430+
runProcessorAndWait(t, proc, true /* expectErr */)
431+
432+
require.Equal(t, 1, logger.numIssuesFound())
433+
434+
// Run the test-specific timestamp verification
435+
actualTime := logger.issue(0).AOST
436+
tc.verifyTimestamp(t, actualTime, capturedTimestamp, testStartTime)
437+
})
438+
}
439+
}

pkg/sql/scrub.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,14 @@ func (n *scrubNode) startScrubTable(
227227
return pgerror.Newf(pgcode.InvalidTransactionState,
228228
"cannot run within a multi-statement transaction")
229229
}
230-
if !hasTS {
231-
return pgerror.Newf(pgcode.Syntax,
232-
"SCRUB with inspect jobs requires AS OF SYSTEM TIME")
230+
231+
// If AS OF SYSTEM TIME is not provided, we pass in an empty timestamp to
232+
// force the job to use the current time.
233+
var asOfForJob hlc.Timestamp
234+
if hasTS {
235+
asOfForJob = ts
233236
}
234-
return n.runScrubTableJob(ctx, p, tableDesc, ts)
237+
return n.runScrubTableJob(ctx, p, tableDesc, asOfForJob)
235238
}
236239
// Process SCRUB options. These are only present during a SCRUB TABLE
237240
// statement.

pkg/util/startup/startup_test.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,13 @@ func TestStartupFailure(t *testing.T) {
8585
func TestStartupFailureRandomRange(t *testing.T) {
8686
defer leaktest.AfterTest(t)()
8787
defer log.Scope(t).Close(t)
88-
// This test takes 30s and so we don't want it to run in the "blocking path"
89-
// of CI at all, and we also don't want to stress it in nightlies as part of
90-
// a big package (where it will take a lot of time that could be spent running
91-
// "faster" tests). In this package, it is the only test and so it's fine to
92-
// run it under nightly (skipping race builds because with many nodes they are
93-
// very resource intensive and tend to collapse).
88+
// This test takes 30s+, so we don't want it to run in the "blocking path" of
89+
// CI. We also skip race builds as the test uses multiple nodes, which can
90+
// cause the test to grind to a halt and flake out.
9491
skip.UnderRace(t, "6 nodes with replication is too slow for race")
95-
skip.WithIssue(t, 9999999999, "nicktrav will have a fix shortly")
96-
// TODO(nicktrav): re-enable only under nightlies once the fix is out.
97-
//if !skip.NightlyStress() {
98-
// skip.IgnoreLint(t, "test takes 30s to run due to circuit breakers and timeouts")
99-
//}
92+
if !skip.Stress() {
93+
skip.IgnoreLint(t, "test takes 30s to run due to circuit breakers and timeouts")
94+
}
10095

10196
rng, seed := randutil.NewTestRand()
10297
t.Log("TestStartupFailureRandomRange using seed", seed)
@@ -148,6 +143,11 @@ func runCircuitBreakerTestForKey(
148143
args := base.TestClusterArgs{
149144
ServerArgsPerNode: make(map[int]base.TestServerArgs),
150145
ReusableListenerReg: lReg,
146+
// TODO(travers): This test is has a lingering issue when run in UA mode
147+
// that needs to be addressed before the following can be removed.
148+
ServerArgs: base.TestServerArgs{
149+
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
150+
},
151151
}
152152
var enableFaults atomic.Bool
153153
for i := 0; i < nodes; i++ {
@@ -229,6 +229,7 @@ func runCircuitBreakerTestForKey(
229229
return d.StartKey
230230
}
231231

232+
t.Log("segmenting ranges")
232233
var rangeSpans []roachpb.Span
233234
r, err := c.QueryContext(ctx, "select range_id, start_key, end_key from crdb_internal.ranges_no_leases order by start_key")
234235
require.NoError(t, err, "failed to query ranges")
@@ -243,9 +244,11 @@ func runCircuitBreakerTestForKey(
243244
})
244245
}
245246
good, bad := faultyRangeSelector(rangeSpans)
247+
t.Logf("prepping %d good ranges", len(good))
246248
for _, span := range good {
247249
prepRange(span.Key, false)
248250
}
251+
t.Logf("prepping %d faulty ranges", len(good))
249252
var ranges []string
250253
for _, span := range bad {
251254
prepRange(span.Key, true)
@@ -254,27 +257,33 @@ func runCircuitBreakerTestForKey(
254257
rangesList := fmt.Sprintf("[%s]", strings.Join(ranges, ", "))
255258

256259
// Remove nodes permanently to only leave quorum on planned ranges.
260+
t.Log("stopping n3 and n4")
257261
tc.StopServer(3)
258262
tc.StopServer(4)
259263

260264
// Stop node with replicas that would leave ranges without quorum.
265+
t.Log("stopping n5")
261266
tc.StopServer(5)
262267

263268
// Probe compromised ranges to trigger circuit breakers on them. If we don't
264269
// do this, then restart queries will wait for quorum to be reestablished with
265270
// restarting node without failing.
271+
t.Logf("waiting for %d compromised ranges to trigger CBs", len(bad))
266272
var wg sync.WaitGroup
267273
wg.Add(len(bad))
268274
for _, span := range bad {
269275
go func(key roachpb.Key) {
270276
defer wg.Done()
277+
t.Logf("waiting for compromised range: %s", key)
271278
_ = db.Put(context.Background(), keys.RangeProbeKey(roachpb.RKey(key)), "")
279+
t.Logf("done waiting for compromised range: %s", key)
272280
}(span.Key)
273281
}
274282
wg.Wait()
275283

276284
// Restart node and check that it succeeds in reestablishing range quorum
277285
// necessary for startup actions.
286+
t.Log("starting n5")
278287
require.NoError(t, lReg.MustGet(t, 5).Reopen())
279288
err = tc.RestartServer(5)
280289
require.NoError(t, err, "restarting server with range(s) %s tripping circuit breaker", rangesList)

0 commit comments

Comments
 (0)