Skip to content

Commit 18350d4

Browse files
craig[bot]ZhouXing19
andcommitted
Merge #153294
153294: IMPORT: trigger async INSPECT job for validation r=ZhouXing19 a=ZhouXing19 Fixes #146327 We found a bug that async flush can cause silent data corruption for IMPORT, and this commit is to trigger a validation step to alert if similar data corruption happens again. To avoid increasing the latency for IMPORT we trigger an INSPECT job that runs the validation asynchronously right after the table is set to public at the end of an import. We use the commit timestamp of the internal txn which update the table state to public, so that we can ensure the INSPECT focus on the snapshot at the end of the IMPORT job. This behavior is gated by cluster settings `bulkio.import.row_count_validation.unsafe.enabled`, which is default to false. See more internal discussion [here](https://cockroachlabs.slack.com/archives/C02DSDS9TM1/p1757439225277989?thread_ts=1756991224.633879&cid=C02DSDS9TM1). Release note: Added `bulkio.import.row_count_validation.unsafe.enabled` (default false) that will trigger asynchronous INSPECT job at the end of an IMPORT execution. Co-authored-by: ZhouXing19 <[email protected]>
2 parents 3aa8436 + 8ffae11 commit 18350d4

File tree

2 files changed

+78
-21
lines changed

2 files changed

+78
-21
lines changed

pkg/sql/importer/import_job.go

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,18 @@ var performConstraintValidation = settings.RegisterBoolSetting(
107107
settings.WithUnsafe,
108108
)
109109

110+
// TODO(janexing): tune the default to True when INSPECT is merged in stable release.
111+
var importRowCountValidation = settings.RegisterBoolSetting(
112+
settings.ApplicationLevel,
113+
"bulkio.import.row_count_validation.unsafe.enabled",
114+
"enables asynchronous validation of imported data via INSPECT "+
115+
"jobs. When enabled, an INSPECT job runs after import completion to "+
116+
"detect potential data corruption. Disabling this setting may result "+
117+
"in undetected data corruption if the import process fails.",
118+
false,
119+
settings.WithUnsafe,
120+
)
121+
110122
// Resume is part of the jobs.Resumer interface.
111123
func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
112124
p := execCtx.(sql.JobExecContext)
@@ -279,10 +291,28 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
279291
return err
280292
}
281293

282-
if err := r.publishTables(ctx, p.ExecCfg(), res); err != nil {
294+
setPublicTimestamp, err := r.publishTables(ctx, p.ExecCfg(), res)
295+
if err != nil {
283296
return err
284297
}
285298

299+
if importRowCountValidation.Get(&p.ExecCfg().Settings.SV) {
300+
tblDesc := tabledesc.NewBuilder(details.Tables[0].Desc).BuildImmutableTable()
301+
if len(tblDesc.PublicNonPrimaryIndexes()) > 0 {
302+
_, err := sql.TriggerInspectJob(
303+
ctx,
304+
fmt.Sprintf("import-validation-%s", tblDesc.GetName()),
305+
p.ExecCfg(),
306+
tblDesc,
307+
setPublicTimestamp,
308+
)
309+
if err != nil {
310+
return errors.Wrapf(err, "failed to trigger inspect for import validation for table %s", tblDesc.GetName())
311+
}
312+
log.Eventf(ctx, "triggered inspect job for import validation for table %s with AOST %s", tblDesc.GetName(), setPublicTimestamp)
313+
}
314+
}
315+
286316
emitImportJobEvent(ctx, p, jobs.StateSucceeded, r.job)
287317

288318
addToFileFormatTelemetry(details.Format.Format.String(), "succeeded")
@@ -304,7 +334,6 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
304334
}
305335

306336
logutil.LogJobCompletion(ctx, importJobRecoveryEventType, r.job.ID(), true, nil, r.res.Rows)
307-
308337
return nil
309338
}
310339

@@ -422,21 +451,24 @@ func bindTableDescImportProperties(
422451
// publishTables updates the status of imported tables from OFFLINE to PUBLIC.
423452
func (r *importResumer) publishTables(
424453
ctx context.Context, execCfg *sql.ExecutorConfig, res kvpb.BulkOpSummary,
425-
) error {
454+
) (hlc.Timestamp, error) {
455+
var setPublicTimestamp hlc.Timestamp
426456
details := r.job.Details().(jobspb.ImportDetails)
427457
// Tables should only be published once.
428458
if details.TablesPublished {
429-
return nil
459+
return setPublicTimestamp, nil
430460
}
431461

432462
log.Event(ctx, "making tables live")
433463

464+
var kvTxn *kv.Txn
434465
err := sql.DescsTxn(ctx, execCfg, func(
435466
ctx context.Context, txn isql.Txn, descsCol *descs.Collection,
436467
) error {
437-
b := txn.KV().NewBatch()
468+
kvTxn = txn.KV()
469+
b := kvTxn.NewBatch()
438470
for _, tbl := range details.Tables {
439-
newTableDesc, err := descsCol.MutableByID(txn.KV()).Table(ctx, tbl.Desc.ID)
471+
newTableDesc, err := descsCol.MutableByID(kvTxn).Table(ctx, tbl.Desc.ID)
440472
if err != nil {
441473
return err
442474
}
@@ -473,7 +505,7 @@ func (r *importResumer) publishTables(
473505
return errors.Wrapf(err, "publishing table %d", newTableDesc.ID)
474506
}
475507
}
476-
if err := txn.KV().Run(ctx, b); err != nil {
508+
if err := kvTxn.Run(ctx, b); err != nil {
477509
return errors.Wrap(err, "publishing tables")
478510
}
479511

@@ -486,7 +518,16 @@ func (r *importResumer) publishTables(
486518
return nil
487519
})
488520
if err != nil {
489-
return err
521+
return setPublicTimestamp, err
522+
}
523+
524+
// Try to get the commit timestamp for the transaction that set
525+
// the table to public. This timestamp will be used to run the
526+
// INSPECT job for this table, to ensure the INSPECT looks at the
527+
// snapshot that IMPORT just finished.
528+
setPublicTimestamp, err = kvTxn.CommitTimestamp()
529+
if err != nil {
530+
return setPublicTimestamp, err
490531
}
491532

492533
// Initiate a run of CREATE STATISTICS. We don't know the actual number of
@@ -497,7 +538,7 @@ func (r *importResumer) publishTables(
497538
execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */)
498539
}
499540

500-
return nil
541+
return setPublicTimestamp, nil
501542
}
502543

503544
// checkVirtualConstraints checks constraints that are enforced via runtime

pkg/sql/scrub.go

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -477,19 +477,38 @@ func (n *scrubNode) runScrubTableJob(
477477
ctx context.Context, p *planner, tableDesc catalog.TableDescriptor, asOf hlc.Timestamp,
478478
) error {
479479
// Consistency check is done async via a job.
480-
jobID := p.ExecCfg().JobRegistry.MakeJobID()
480+
jobID, err := TriggerInspectJob(ctx, tree.Serialize(n.n), p.ExecCfg(), tableDesc, asOf)
481+
if err != nil {
482+
return err
483+
}
484+
// Let the eval context track this job ID for status and error reporting.
485+
p.extendedEvalCtx.jobs.addCreatedJobID(jobID)
486+
return nil
487+
}
488+
489+
// TriggerInspectJob starts an inspect job for the snapshot.
490+
func TriggerInspectJob(
491+
ctx context.Context,
492+
jobRecordDescription string,
493+
execCfg *ExecutorConfig,
494+
tableDesc catalog.TableDescriptor,
495+
asOf hlc.Timestamp,
496+
) (jobspb.JobID, error) {
497+
// Consistency check is done async via a job.
498+
jobID := execCfg.JobRegistry.MakeJobID()
481499

482500
// TODO(148300): just grab the first secondary index and use that for the
483501
// consistency check.
484502
// TODO(148365): When INSPECT is added, we want to skip unsupported indexes
485503
// and return a NOTICE.
486504
secIndexes := tableDesc.PublicNonPrimaryIndexes()
487505
if len(secIndexes) == 0 {
488-
return errors.AssertionFailedf("must have at least one secondary index")
506+
return jobID, errors.AssertionFailedf("must have at least one secondary index")
489507
}
490508

509+
// TODO(sql-queries): add row count check when that is implemented.
491510
jr := jobs.Record{
492-
Description: tree.Serialize(n.n),
511+
Description: jobRecordDescription,
493512
Details: jobspb.InspectDetails{
494513
Checks: []*jobspb.InspectDetails_Check{
495514
{
@@ -507,22 +526,19 @@ func (n *scrubNode) runScrubTableJob(
507526
}
508527

509528
var sj *jobs.StartableJob
510-
if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) {
511-
return p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr)
529+
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) {
530+
return execCfg.JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr)
512531
}); err != nil {
513532
if sj != nil {
514533
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
515534
log.Dev.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr)
516535
}
517536
}
518-
return err
537+
return jobID, err
519538
}
520-
521-
log.Dev.Infof(ctx, "created and started inspect job %d (no-op)", jobID)
539+
log.Dev.Infof(ctx, "created and started inspect job %d", jobID)
522540
if err := sj.Start(ctx); err != nil {
523-
return err
541+
return jobID, err
524542
}
525-
// Let the eval context track this job ID for status and error reporting.
526-
p.extendedEvalCtx.jobs.addCreatedJobID(jobID)
527-
return nil
543+
return jobID, nil
528544
}

0 commit comments

Comments
 (0)