Skip to content

Commit 985dbfe

Browse files
craig[bot]spilchen
andcommitted
Merge #153161
153161: sql/inspect: use ASOF time for INSPECT r=spilchen a=spilchen Previously, inspect issues used the current time for the AOST field instead of the timestamp at which the data was actually scanned. This change updates index consistency checks to use time thats flowed into the job. When calling INSPECT from SCRUB, we use the time specified in the SCRUB command. Informs #148573 Release note: none Epic: CRDB-30356 Co-authored-by: Matt Spilchen <[email protected]>
2 parents 760a77e + ab1de7f commit 985dbfe

File tree

9 files changed

+78
-27
lines changed

9 files changed

+78
-27
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1446,6 +1446,9 @@ message InspectDetails {
14461446

14471447
// Checks is the list of individual checks this job will perform.
14481448
repeated Check checks = 1;
1449+
1450+
// AsOf specifies the timestamp at which the inspect checks should be performed.
1451+
util.hlc.Timestamp as_of = 2 [(gogoproto.nullable) = false];
14491452
}
14501453

14511454
message UpdateTableMetadataCacheDetails {}

pkg/sql/inspect/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ go_library(
4040
"//pkg/sql/spanutils",
4141
"//pkg/sql/types",
4242
"//pkg/util/ctxgroup",
43+
"//pkg/util/hlc",
4344
"//pkg/util/log",
4445
"//pkg/util/timeutil",
4546
"//pkg/util/tracing",

pkg/sql/inspect/index_consistency_check.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
2626
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
2727
"github.com/cockroachdb/cockroach/pkg/sql/spanutils"
28-
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
28+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2929
"github.com/cockroachdb/errors"
3030
"github.com/cockroachdb/redact"
3131
)
@@ -38,6 +38,7 @@ type indexConsistencyCheck struct {
3838
flowCtx *execinfra.FlowCtx
3939
tableID descpb.ID
4040
indexID descpb.IndexID
41+
asOf hlc.Timestamp
4142

4243
tableDesc catalog.TableDescriptor
4344
secIndex catalog.Index
@@ -194,16 +195,21 @@ func (c *indexConsistencyCheck) Start(
194195
colNames(pkColumns), colNames(otherColumns), c.tableDesc.GetID(), c.secIndex, c.priIndex.GetID(), predicate,
195196
)
196197

198+
// Wrap the query with AS OF SYSTEM TIME to ensure it uses the specified timestamp
199+
// TODO(148573): use a protected timestamp record for this timestamp.
200+
queryWithAsOf := fmt.Sprintf("SELECT * FROM (%s) AS OF SYSTEM TIME %s", checkQuery, c.asOf.AsOfSystemTime())
201+
197202
// Store the query for error reporting
198-
c.lastQuery = checkQuery
203+
c.lastQuery = queryWithAsOf
199204

205+
// Execute the query with AS OF SYSTEM TIME embedded in the SQL
200206
it, err := c.flowCtx.Cfg.DB.Executor().QueryIteratorEx(
201207
ctx, "inspect-index-consistency-check", nil, /* txn */
202208
sessiondata.InternalExecutorOverride{
203209
User: username.NodeUserName(),
204210
QualityOfService: &sessiondatapb.BulkLowQoS,
205211
},
206-
checkQuery,
212+
queryWithAsOf,
207213
queryArgs...,
208214
)
209215
if err != nil {
@@ -243,9 +249,8 @@ func (c *indexConsistencyCheck) Next(
243249
details["query"] = c.lastQuery // Store the query that caused the error
244250

245251
return &inspectIssue{
246-
ErrorType: InternalError,
247-
// TODO(148573): Use the timestamp that we create a protected timestamp for.
248-
AOST: timeutil.Now(),
252+
ErrorType: InternalError,
253+
AOST: c.asOf.GoTime(),
249254
DatabaseID: c.tableDesc.GetParentID(),
250255
SchemaID: c.tableDesc.GetParentSchemaID(),
251256
ObjectID: c.tableDesc.GetID(),
@@ -308,9 +313,8 @@ func (c *indexConsistencyCheck) Next(
308313
details["index_name"] = c.secIndex.GetName()
309314

310315
return &inspectIssue{
311-
ErrorType: errorType,
312-
// TODO(148573): Use the timestamp that we create a protected timestamp for.
313-
AOST: timeutil.Now(),
316+
ErrorType: errorType,
317+
AOST: c.asOf.GoTime(),
314318
DatabaseID: c.tableDesc.GetParentID(),
315319
SchemaID: c.tableDesc.GetParentSchemaID(),
316320
ObjectID: c.tableDesc.GetID(),

pkg/sql/inspect/index_consistency_check_test.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ func TestDetectIndexConsistencyErrors(t *testing.T) {
149149
// Each element corresponds to the issue at the same index in expectedIssues.
150150
// For non-internal-error issues, the corresponding element should be nil.
151151
expectedInternalErrorPatterns []map[string]string
152+
// useTimestampBeforeCorruption uses a timestamp from before corruption is introduced
153+
useTimestampBeforeCorruption bool
152154
}{
153155
{
154156
desc: "happy path sanity",
@@ -236,6 +238,15 @@ func TestDetectIndexConsistencyErrors(t *testing.T) {
236238
},
237239
postIndexSQL: "DELETE FROM test.t", /* delete all rows to test hasRows=false code path */
238240
},
241+
{
242+
desc: "timestamp before corruption - no issues found",
243+
splitRangeDDL: "ALTER TABLE test.t SPLIT AT VALUES (500)",
244+
indexDDL: []string{
245+
"CREATE INDEX idx_t_a ON test.t (a) STORING (c)",
246+
},
247+
danglingIndexEntryInsertQuery: "SELECT 15, 30, 300, 'corrupt', 'e_3', 300.5", // Add dangling entry after timestamp
248+
useTimestampBeforeCorruption: true, // Use timestamp from before corruption
249+
},
239250
} {
240251
t.Run(tc.desc, func(t *testing.T) {
241252
issueLogger.reset()
@@ -288,6 +299,19 @@ func TestDetectIndexConsistencyErrors(t *testing.T) {
288299
r.Exec(t, tc.postIndexSQL)
289300
}
290301

302+
// Get timestamp before corruption if needed
303+
var expectedASOFTime time.Time
304+
305+
if tc.useTimestampBeforeCorruption {
306+
// Get timestamp before corruption
307+
r.QueryRow(t, "SELECT now()::timestamp").Scan(&expectedASOFTime)
308+
expectedASOFTime = expectedASOFTime.UTC()
309+
310+
// Sleep for 1 millisecond to ensure corruption happens after timestamp
311+
// This should be long enough to guarantee a different timestamp
312+
time.Sleep(1 * time.Millisecond)
313+
}
314+
291315
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, "test", "t")
292316
secIndex := tableDesc.PublicNonPrimaryIndexes()[0]
293317

@@ -341,7 +365,18 @@ func TestDetectIndexConsistencyErrors(t *testing.T) {
341365
// TODO(148365): Run INSPECT instead of SCRUB.
342366
_, err = db.Exec(`SET enable_scrub_job=true`)
343367
require.NoError(t, err)
344-
_, err = db.Query(`EXPERIMENTAL SCRUB TABLE test.t WITH OPTIONS INDEX ALL`)
368+
369+
// If not using timestamp before corruption, get current timestamp
370+
if !tc.useTimestampBeforeCorruption {
371+
// Convert relative timestamp to absolute timestamp using CRDB
372+
r.QueryRow(t, "SELECT (now() + '-1us')::timestamp").Scan(&expectedASOFTime)
373+
expectedASOFTime = expectedASOFTime.UTC()
374+
}
375+
376+
// Use the absolute timestamp in nanoseconds for inspect command
377+
absoluteTimestamp := fmt.Sprintf("'%d'", expectedASOFTime.UnixNano())
378+
scrubQuery := fmt.Sprintf(`EXPERIMENTAL SCRUB TABLE test.t AS OF SYSTEM TIME %s WITH OPTIONS INDEX ALL`, absoluteTimestamp)
379+
_, err = db.Query(scrubQuery)
345380
if tc.expectedErrRegex == "" {
346381
require.NoError(t, err)
347382
require.Equal(t, 0, issueLogger.numIssuesFound())
@@ -380,7 +415,7 @@ func TestDetectIndexConsistencyErrors(t *testing.T) {
380415
require.NotEqual(t, 0, foundIssue.DatabaseID, "expected issue to have a database ID: %s", expectedIssue)
381416
require.NotEqual(t, 0, foundIssue.SchemaID, "expected issue to have a schema ID: %s", expectedIssue)
382417
require.NotEqual(t, 0, foundIssue.ObjectID, "expected issue to have an object ID: %s", expectedIssue)
383-
require.NotEqual(t, time.Time{}, foundIssue.AOST, "expected issue to have an AOST time: %s", expectedIssue)
418+
require.Equal(t, expectedASOFTime, foundIssue.AOST.UTC())
384419

385420
// Additional validation for internal errors
386421
if foundIssue.ErrorType == "internal_error" {
@@ -460,7 +495,7 @@ func TestIndexConsistencyWithReservedWordColumns(t *testing.T) {
460495
// TODO(148365): Run INSPECT instead of SCRUB.
461496
_, err := db.Exec(`SET enable_scrub_job=true`)
462497
require.NoError(t, err)
463-
_, err = db.Query(`EXPERIMENTAL SCRUB TABLE test.reserved_table WITH OPTIONS INDEX ALL`)
498+
_, err = db.Query(`EXPERIMENTAL SCRUB TABLE test.reserved_table AS OF SYSTEM TIME '-1us' WITH OPTIONS INDEX ALL`)
464499
require.NoError(t, err, "should succeed on table with reserved word column names")
465500
require.Equal(t, 0, issueLogger.numIssuesFound(), "No issues should be found in happy path test")
466501

pkg/sql/inspect/inspect_job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func TestInspectJobImplicitTxnSemantics(t *testing.T) {
107107
onInspectErrorToReturn.Store(&tc.onStartError)
108108
defer func() { onInspectErrorToReturn.Store(nil) }()
109109
}
110-
_, err := db.Exec("EXPERIMENTAL SCRUB TABLE db.t")
110+
_, err := db.Exec("EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1us'")
111111
pauseJobStart.Store(false)
112112
if tc.expectedErrRegex != "" {
113113
require.Error(t, err)

pkg/sql/inspect/inspect_processor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,9 @@ func (p *inspectProcessor) processSpan(
218218
func newInspectProcessor(
219219
ctx context.Context, flowCtx *execinfra.FlowCtx, processorID int32, spec execinfrapb.InspectSpec,
220220
) (execinfra.Processor, error) {
221+
if spec.InspectDetails.AsOf.IsEmpty() {
222+
return nil, errors.AssertionFailedf("ASOF time must be set for INSPECT")
223+
}
221224
checkFactories, err := buildInspectCheckFactories(flowCtx, spec)
222225
if err != nil {
223226
return nil, err
@@ -252,6 +255,7 @@ func buildInspectCheckFactories(
252255
flowCtx: flowCtx,
253256
tableID: specCheck.TableID,
254257
indexID: specCheck.IndexID,
258+
asOf: spec.InspectDetails.AsOf,
255259
}
256260
})
257261

pkg/sql/logictest/testdata/logic_test/distsql_inspect

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ subtest end
4747
subtest scrub_job_implicit_txn
4848

4949
statement ok
50-
EXPERIMENTAL SCRUB TABLE data;
50+
EXPERIMENTAL SCRUB TABLE data AS OF SYSTEM TIME '-1us';
5151

5252
query TTB
5353
SELECT description, status, finished IS NOT NULL AS finished FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY created DESC LIMIT 1
5454
----
55-
EXPERIMENTAL SCRUB TABLE data succeeded true
55+
EXPERIMENTAL SCRUB TABLE data AS OF SYSTEM TIME '-1us' succeeded true
5656

5757
subtest end
5858

pkg/sql/logictest/testdata/logic_test/inspect

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ subtest end
2020
subtest scrub_job_implicit_txn
2121

2222
statement ok
23-
EXPERIMENTAL SCRUB TABLE t1;
23+
EXPERIMENTAL SCRUB TABLE t1 AS OF SYSTEM TIME '-1us';
2424

2525
query TTB
2626
SELECT description, status, finished IS NOT NULL AS finished FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY created DESC LIMIT 1
2727
----
28-
EXPERIMENTAL SCRUB TABLE t1 succeeded true
28+
EXPERIMENTAL SCRUB TABLE t1 AS OF SYSTEM TIME '-1us' succeeded true
2929

3030
subtest end
3131

pkg/sql/scrub.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,22 @@ func (n *scrubNode) startScrubDatabase(ctx context.Context, p *planner, name *tr
217217
func (n *scrubNode) startScrubTable(
218218
ctx context.Context, p *planner, tableDesc catalog.TableDescriptor, tableName *tree.TableName,
219219
) error {
220-
if p.extendedEvalCtx.SessionData().EnableScrubJob {
221-
return n.runScrubTableJob(ctx, p, tableDesc)
222-
}
223-
224220
ts, hasTS, err := p.getTimestamp(ctx, n.n.AsOf)
225221
if err != nil {
226222
return err
227223
}
224+
225+
if p.extendedEvalCtx.SessionData().EnableScrubJob {
226+
if !p.extendedEvalCtx.TxnIsSingleStmt {
227+
return pgerror.Newf(pgcode.InvalidTransactionState,
228+
"cannot run within a multi-statement transaction")
229+
}
230+
if !hasTS {
231+
return pgerror.Newf(pgcode.Syntax,
232+
"SCRUB with inspect jobs requires AS OF SYSTEM TIME")
233+
}
234+
return n.runScrubTableJob(ctx, p, tableDesc, ts)
235+
}
228236
// Process SCRUB options. These are only present during a SCRUB TABLE
229237
// statement.
230238
var indexesSet bool
@@ -466,7 +474,7 @@ func createConstraintCheckOperations(
466474
}
467475

468476
func (n *scrubNode) runScrubTableJob(
469-
ctx context.Context, p *planner, tableDesc catalog.TableDescriptor,
477+
ctx context.Context, p *planner, tableDesc catalog.TableDescriptor, asOf hlc.Timestamp,
470478
) error {
471479
// Consistency check is done async via a job.
472480
jobID := p.ExecCfg().JobRegistry.MakeJobID()
@@ -490,18 +498,14 @@ func (n *scrubNode) runScrubTableJob(
490498
IndexID: secIndexes[0].GetID(),
491499
},
492500
},
501+
AsOf: asOf,
493502
},
494503
Progress: jobspb.InspectProgress{},
495504
CreatedBy: nil,
496505
Username: username.NodeUserName(),
497506
DescriptorIDs: descpb.IDs{tableDesc.GetID()},
498507
}
499508

500-
if !p.extendedEvalCtx.TxnIsSingleStmt {
501-
return pgerror.Newf(pgcode.InvalidTransactionState,
502-
"cannot run within a multi-statement transaction")
503-
}
504-
505509
var sj *jobs.StartableJob
506510
if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) {
507511
return p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr)

0 commit comments

Comments
 (0)