@@ -276,6 +276,13 @@ var tests = []importTestSpec{
276276 })
277277 },
278278 },
279+ // Test cancellation of import jobs.
280+ {
281+ subtestName : "cancellation" ,
282+ nodes : []int {4 },
283+ datasetNames : FromFunc (anyDataset ),
284+ importRunner : importCancellationRunner ,
285+ },
279286}
280287
281288func registerImport (r registry.Registry ) {
@@ -486,9 +493,120 @@ func runImportTest(
486493 m .Wait ()
487494}
488495
496+ // importCancellationRunner() is the test runner for the import cancellation
497+ // test. This test makes a number of attempts at importing a dataset, cancelling
498+ // all but the last. Each attempt imports a random subset of files from the
499+ // dataset in an effort to perturb tombstone errors.
500+ func importCancellationRunner (
501+ ctx context.Context , t test.Test , c cluster.Cluster , l * logger.Logger , rng * rand.Rand , ds dataset ,
502+ ) error {
503+ conn := c .Conn (ctx , l , 1 )
504+ defer conn .Close ()
505+
506+ // Set a random GC TTL that is relatively short. We want to exercise GC
507+ // of MVCC range tombstones, and would like a mix of live MVCC Range
508+ // Tombstones and the Pebble RangeKeyUnset tombstones that clear them.
509+ ttl_stmt := fmt .Sprintf (`ALTER TABLE import_test.%s CONFIGURE ZONE USING gc.ttlseconds = $1` , ds .getTableName ())
510+ _ , err := conn .ExecContext (ctx , ttl_stmt , randutil .RandIntInRange (rng , 10 * 60 /* 10m */ , 20 * 60 /* 20m */ ))
511+ if err != nil {
512+ return errors .Wrapf (err , "%s" , ttl_stmt )
513+ }
514+
515+ numAttempts := randutil .RandIntInRange (rng , 2 , 5 )
516+ finalAttempt := numAttempts - 1
517+ urlsToImport := ds .getDataURLs ()
518+ for attempt := range numAttempts {
519+ if len (urlsToImport ) == 0 {
520+ break
521+ }
522+ urls := urlsToImport
523+
524+ // If not the last attempt, import a subset of the files available.
525+ // This will create MVCC range tombstones across separate regions of
526+ // the table's keyspace.
527+ if attempt != finalAttempt {
528+ rng .Shuffle (len (urls ), func (i , j int ) {
529+ urls [i ], urls [j ] = urls [j ], urls [i ]
530+ })
531+ urls = urls [:randutil .RandIntInRange (rng , 1 , len (urls )+ 1 )]
532+ }
533+
534+ t .WorkerStatus (fmt .Sprintf ("beginning attempt %d for %s using files: %s" , attempt + 1 , ds .getTableName (),
535+ strings .Join (urls , ", " )))
536+
537+ var jobID jobspb.JobID
538+ jobID , err = runRawAsyncImportJob (ctx , conn , ds .getTableName (), urls )
539+ if err != nil && ! errors .Is (err , context .DeadlineExceeded ) {
540+ return err
541+ }
542+
543+ if attempt != finalAttempt {
544+ var timeout time.Duration
545+ // Local tests tend to finish before we cancel if we use the full range.
546+ if c .IsLocal () {
547+ timeout = time .Duration (randutil .RandIntInRange (rng , 5 , 20 )) * time .Second
548+ } else {
549+ timeout = time .Duration (randutil .RandIntInRange (rng , 10 , 60 * 3 )) * time .Second
550+ }
551+ select {
552+ case <- time .After (timeout ):
553+ t .WorkerStatus (fmt .Sprintf ("Cancelling import job for attempt %d/%d for table %s after %v." ,
554+ attempt + 1 , numAttempts , ds .getTableName (), timeout ))
555+ _ , err = conn .ExecContext (ctx ,
556+ `CANCEL JOBS (WITH x AS (SHOW JOBS)
557+ SELECT job_id
558+ FROM x
559+ WHERE job_id = $1
560+ AND status IN ('pending', 'running', 'retrying'))` , jobID )
561+ if err != nil {
562+ return err
563+ }
564+ case <- ctx .Done ():
565+ return nil
566+ }
567+ }
568+
569+ // Block until the job is complete. Afterwards, it either completed
570+ // succesfully before we cancelled it, or the cancellation has finished
571+ // reverting the keys it wrote prior to cancellation.
572+ var status string
573+ err = conn .QueryRowContext (
574+ ctx ,
575+ `WITH x AS (SHOW JOBS WHEN COMPLETE (SELECT $1)) SELECT status FROM x` ,
576+ jobID ,
577+ ).Scan (& status )
578+ if err != nil {
579+ return err
580+ }
581+ t .WorkerStatus (fmt .Sprintf ("Import job for attempt %d/%d for table %s (%d files) completed with status %s." ,
582+ attempt + 1 , numAttempts , ds .getTableName (), len (urls ), status ))
583+
584+ // If the IMPORT was successful (eg, our cancellation came in too late),
585+ // remove the files that succeeded so we don't try to import them again.
586+ // If this was the last attempt, this should remove all the remaining
587+ // files and `filesToImport` should be empty.
588+ if status == "succeeded" {
589+ t .L ().PrintfCtx (ctx , "Removing files [%s] from consideration; completed" , strings .Join (urls , ", " ))
590+ urlsToImport = slices .DeleteFunc (urlsToImport , func (url string ) bool {
591+ return slices .Contains (urls , url )
592+ })
593+ } else if status == "failed" {
594+ return errors .Newf ("Job %s failed.\n " , jobID )
595+ }
596+ }
597+ if len (urlsToImport ) != 0 {
598+ return errors .Newf ("Expected zero remaining %q files after final attempt, but %d remaining." ,
599+ ds .getTableName (), len (urlsToImport ))
600+ }
601+
602+ // Restore GC TTLs so we don't interfere with post-import table validation.
603+ _ , err = conn .ExecContext (ctx , ttl_stmt , 60 * 60 * 4 /* 4 hours */ )
604+ return err
605+ }
606+
489607// runSyncImportJob() runs an import job and waits for it to complete.
490608func runSyncImportJob (ctx context.Context , conn * gosql.DB , ds dataset ) error {
491- importStmt := formatImportStmt (ds , false )
609+ importStmt := formatImportStmt (ds . getTableName (), ds . getDataURLs () , false )
492610 _ , err := conn .ExecContext (ctx , importStmt )
493611 if err != nil {
494612 err = errors .Wrapf (err , "%s" , importStmt )
@@ -498,7 +616,14 @@ func runSyncImportJob(ctx context.Context, conn *gosql.DB, ds dataset) error {
498616
499617// runAsyncImportJob() runs an import job and returns the job id immediately.
500618func runAsyncImportJob (ctx context.Context , conn * gosql.DB , ds dataset ) (jobspb.JobID , error ) {
501- importStmt := formatImportStmt (ds , true )
619+ return runRawAsyncImportJob (ctx , conn , ds .getTableName (), ds .getDataURLs ())
620+ }
621+
622+ // runRawAsyncImportJob() runs an import job using the table name and files provided.
623+ func runRawAsyncImportJob (
624+ ctx context.Context , conn * gosql.DB , tableName string , urls []string ,
625+ ) (jobspb.JobID , error ) {
626+ importStmt := formatImportStmt (tableName , urls , true )
502627
503628 var jobID jobspb.JobID
504629 err := conn .QueryRowContext (ctx , importStmt ).Scan (& jobID )
@@ -511,14 +636,10 @@ func runAsyncImportJob(ctx context.Context, conn *gosql.DB, ds dataset) (jobspb.
511636
512637// formatImportStmt() takes a dataset and formats a SQL import statment for that
513638// dataset.
514- func formatImportStmt (d dataset , detached bool ) string {
639+ func formatImportStmt (tableName string , urls [] string , detached bool ) string {
515640 var stmt strings.Builder
516- fmt .Fprintf (
517- & stmt ,
518- `IMPORT INTO import_test.%s CSV DATA ('%s') WITH delimiter='|'` ,
519- d .getTableName (),
520- strings .Join (d .getDataURLs (), "', '" ),
521- )
641+ fmt .Fprintf (& stmt , `IMPORT INTO import_test.%s CSV DATA ('%s') WITH delimiter='|'` ,
642+ tableName , strings .Join (urls , "', '" ))
522643
523644 if detached {
524645 stmt .WriteString (", detached" )
0 commit comments