@@ -107,6 +107,18 @@ var performConstraintValidation = settings.RegisterBoolSetting(
107
107
settings .WithUnsafe ,
108
108
)
109
109
110
+ // TODO(janexing): tune the default to True when INSPECT is merged in stable release.
111
+ var importRowCountValidation = settings .RegisterBoolSetting (
112
+ settings .ApplicationLevel ,
113
+ "bulkio.import.row_count_validation.unsafe.enabled" ,
114
+ "enables asynchronous validation of imported data via INSPECT " +
115
+ "jobs. When enabled, an INSPECT job runs after import completion to " +
116
+ "detect potential data corruption. Disabling this setting may result " +
117
+ "in undetected data corruption if the import process fails." ,
118
+ false ,
119
+ settings .WithUnsafe ,
120
+ )
121
+
110
122
// Resume is part of the jobs.Resumer interface.
111
123
func (r * importResumer ) Resume (ctx context.Context , execCtx interface {}) error {
112
124
p := execCtx .(sql.JobExecContext )
@@ -279,10 +291,28 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
279
291
return err
280
292
}
281
293
282
- if err := r .publishTables (ctx , p .ExecCfg (), res ); err != nil {
294
+ setPublicTimestamp , err := r .publishTables (ctx , p .ExecCfg (), res )
295
+ if err != nil {
283
296
return err
284
297
}
285
298
299
+ if importRowCountValidation .Get (& p .ExecCfg ().Settings .SV ) {
300
+ tblDesc := tabledesc .NewBuilder (details .Tables [0 ].Desc ).BuildImmutableTable ()
301
+ if len (tblDesc .PublicNonPrimaryIndexes ()) > 0 {
302
+ _ , err := sql .TriggerInspectJob (
303
+ ctx ,
304
+ fmt .Sprintf ("import-validation-%s" , tblDesc .GetName ()),
305
+ p .ExecCfg (),
306
+ tblDesc ,
307
+ setPublicTimestamp ,
308
+ )
309
+ if err != nil {
310
+ return errors .Wrapf (err , "failed to trigger inspect for import validation for table %s" , tblDesc .GetName ())
311
+ }
312
+ log .Eventf (ctx , "triggered inspect job for import validation for table %s with AOST %s" , tblDesc .GetName (), setPublicTimestamp )
313
+ }
314
+ }
315
+
286
316
emitImportJobEvent (ctx , p , jobs .StateSucceeded , r .job )
287
317
288
318
addToFileFormatTelemetry (details .Format .Format .String (), "succeeded" )
@@ -304,7 +334,6 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
304
334
}
305
335
306
336
logutil .LogJobCompletion (ctx , importJobRecoveryEventType , r .job .ID (), true , nil , r .res .Rows )
307
-
308
337
return nil
309
338
}
310
339
@@ -422,21 +451,24 @@ func bindTableDescImportProperties(
422
451
// publishTables updates the status of imported tables from OFFLINE to PUBLIC.
423
452
func (r * importResumer ) publishTables (
424
453
ctx context.Context , execCfg * sql.ExecutorConfig , res kvpb.BulkOpSummary ,
425
- ) error {
454
+ ) (hlc.Timestamp , error ) {
455
+ var setPublicTimestamp hlc.Timestamp
426
456
details := r .job .Details ().(jobspb.ImportDetails )
427
457
// Tables should only be published once.
428
458
if details .TablesPublished {
429
- return nil
459
+ return setPublicTimestamp , nil
430
460
}
431
461
432
462
log .Event (ctx , "making tables live" )
433
463
464
+ var kvTxn * kv.Txn
434
465
err := sql .DescsTxn (ctx , execCfg , func (
435
466
ctx context.Context , txn isql.Txn , descsCol * descs.Collection ,
436
467
) error {
437
- b := txn .KV ().NewBatch ()
468
+ kvTxn = txn .KV ()
469
+ b := kvTxn .NewBatch ()
438
470
for _ , tbl := range details .Tables {
439
- newTableDesc , err := descsCol .MutableByID (txn . KV () ).Table (ctx , tbl .Desc .ID )
471
+ newTableDesc , err := descsCol .MutableByID (kvTxn ).Table (ctx , tbl .Desc .ID )
440
472
if err != nil {
441
473
return err
442
474
}
@@ -473,7 +505,7 @@ func (r *importResumer) publishTables(
473
505
return errors .Wrapf (err , "publishing table %d" , newTableDesc .ID )
474
506
}
475
507
}
476
- if err := txn . KV () .Run (ctx , b ); err != nil {
508
+ if err := kvTxn .Run (ctx , b ); err != nil {
477
509
return errors .Wrap (err , "publishing tables" )
478
510
}
479
511
@@ -486,7 +518,16 @@ func (r *importResumer) publishTables(
486
518
return nil
487
519
})
488
520
if err != nil {
489
- return err
521
+ return setPublicTimestamp , err
522
+ }
523
+
524
+ // Try to get the commit timestamp for the transaction that set
525
+ // the table to public. This timestamp will be used to run the
526
+ // INSPECT job for this table, to ensure the INSPECT looks at the
527
+ // snapshot that IMPORT just finished.
528
+ setPublicTimestamp , err = kvTxn .CommitTimestamp ()
529
+ if err != nil {
530
+ return setPublicTimestamp , err
490
531
}
491
532
492
533
// Initiate a run of CREATE STATISTICS. We don't know the actual number of
@@ -497,7 +538,7 @@ func (r *importResumer) publishTables(
497
538
execCfg .StatsRefresher .NotifyMutation (desc , math .MaxInt32 /* rowsAffected */ )
498
539
}
499
540
500
- return nil
541
+ return setPublicTimestamp , nil
501
542
}
502
543
503
544
// checkVirtualConstraints checks constraints that are enforced via runtime
0 commit comments