Skip to content

Commit 69b26fd

Browse files
committed
roachtest/import: add cancellation test to import
Previously, import_cancellation was a separate test that imported the entire TPC-H dataset and then ran the TPC-H workload to verify that there were no errors. Unfortunately, the TPC-H workload does not verify the correctness of results (only the number of results), which means there could be import errors that go undetected. This patch brings the core import & cancel logic from the import-cancellation test into the import test as a custom test runner, which lets us use the validation logic already in the import test. For now we only import a single, random file but a future patch will select a random set of files to import to maintain the same coverage in the face of concurrent imports. Fixes: cockroachdb#156816 Release note: None
1 parent 2a70fbe commit 69b26fd

File tree

4 files changed

+130
-316
lines changed

4 files changed

+130
-316
lines changed

pkg/cmd/roachtest/tests/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ go_library(
8888
"hibernate_blocklist.go",
8989
"hotspotsplits.go",
9090
"import.go",
91-
"import_cancellation.go",
9291
"inconsistency.go",
9392
"indexes.go",
9493
"inspect_throughput.go",

pkg/cmd/roachtest/tests/import.go

Lines changed: 130 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,13 @@ var tests = []importTestSpec{
276276
})
277277
},
278278
},
279+
// Test cancellation of import jobs.
280+
{
281+
subtestName: "cancellation",
282+
nodes: []int{4},
283+
datasetNames: FromFunc(anyDataset),
284+
importRunner: importCancellationRunner,
285+
},
279286
}
280287

281288
func registerImport(r registry.Registry) {
@@ -486,9 +493,120 @@ func runImportTest(
486493
m.Wait()
487494
}
488495

496+
// importCancellationRunner() is the test runner for the import cancellation
497+
// test. This test makes a number of attempts at importing a dataset, cancelling
498+
// all but the last. Each attempt imports a random subset of files from the
499+
// dataset in an effort to perturb tombstone errors.
500+
func importCancellationRunner(
501+
ctx context.Context, t test.Test, c cluster.Cluster, l *logger.Logger, rng *rand.Rand, ds dataset,
502+
) error {
503+
conn := c.Conn(ctx, l, 1)
504+
defer conn.Close()
505+
506+
// Set a random GC TTL that is relatively short. We want to exercise GC
507+
// of MVCC range tombstones, and would like a mix of live MVCC Range
508+
// Tombstones and the Pebble RangeKeyUnset tombstones that clear them.
509+
ttl_stmt := fmt.Sprintf(`ALTER TABLE import_test.%s CONFIGURE ZONE USING gc.ttlseconds = $1`, ds.getTableName())
510+
_, err := conn.ExecContext(ctx, ttl_stmt, randutil.RandIntInRange(rng, 10*60 /* 10m */, 20*60 /* 20m */))
511+
if err != nil {
512+
return errors.Wrapf(err, "%s", ttl_stmt)
513+
}
514+
515+
numAttempts := randutil.RandIntInRange(rng, 2, 5)
516+
finalAttempt := numAttempts - 1
517+
urlsToImport := ds.getDataURLs()
518+
for attempt := range numAttempts {
519+
if len(urlsToImport) == 0 {
520+
break
521+
}
522+
urls := urlsToImport
523+
524+
// If not the last attempt, import a subset of the files available.
525+
// This will create MVCC range tombstones across separate regions of
526+
// the table's keyspace.
527+
if attempt != finalAttempt {
528+
rng.Shuffle(len(urls), func(i, j int) {
529+
urls[i], urls[j] = urls[j], urls[i]
530+
})
531+
urls = urls[:randutil.RandIntInRange(rng, 1, len(urls)+1)]
532+
}
533+
534+
t.WorkerStatus(fmt.Sprintf("beginning attempt %d for %s using files: %s", attempt+1, ds.getTableName(),
535+
strings.Join(urls, ", ")))
536+
537+
var jobID jobspb.JobID
538+
jobID, err = runRawAsyncImportJob(ctx, conn, ds.getTableName(), urls)
539+
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
540+
return err
541+
}
542+
543+
if attempt != finalAttempt {
544+
var timeout time.Duration
545+
// Local tests tend to finish before we cancel if we use the full range.
546+
if c.IsLocal() {
547+
timeout = time.Duration(randutil.RandIntInRange(rng, 5, 20)) * time.Second
548+
} else {
549+
timeout = time.Duration(randutil.RandIntInRange(rng, 10, 60*3)) * time.Second
550+
}
551+
select {
552+
case <-time.After(timeout):
553+
t.WorkerStatus(fmt.Sprintf("Cancelling import job for attempt %d/%d for table %s after %v.",
554+
attempt+1, numAttempts, ds.getTableName(), timeout))
555+
_, err = conn.ExecContext(ctx,
556+
`CANCEL JOBS (WITH x AS (SHOW JOBS)
557+
SELECT job_id
558+
FROM x
559+
WHERE job_id = $1
560+
AND status IN ('pending', 'running', 'retrying'))`, jobID)
561+
if err != nil {
562+
return err
563+
}
564+
case <-ctx.Done():
565+
return nil
566+
}
567+
}
568+
569+
// Block until the job is complete. Afterwards, it either completed
570+
// succesfully before we cancelled it, or the cancellation has finished
571+
// reverting the keys it wrote prior to cancellation.
572+
var status string
573+
err = conn.QueryRowContext(
574+
ctx,
575+
`WITH x AS (SHOW JOBS WHEN COMPLETE (SELECT $1)) SELECT status FROM x`,
576+
jobID,
577+
).Scan(&status)
578+
if err != nil {
579+
return err
580+
}
581+
t.WorkerStatus(fmt.Sprintf("Import job for attempt %d/%d for table %s (%d files) completed with status %s.",
582+
attempt+1, numAttempts, ds.getTableName(), len(urls), status))
583+
584+
// If the IMPORT was successful (eg, our cancellation came in too late),
585+
// remove the files that succeeded so we don't try to import them again.
586+
// If this was the last attempt, this should remove all the remaining
587+
// files and `filesToImport` should be empty.
588+
if status == "succeeded" {
589+
t.L().PrintfCtx(ctx, "Removing files [%s] from consideration; completed", strings.Join(urls, ", "))
590+
urlsToImport = slices.DeleteFunc(urlsToImport, func(url string) bool {
591+
return slices.Contains(urls, url)
592+
})
593+
} else if status == "failed" {
594+
return errors.Newf("Job %s failed.\n", jobID)
595+
}
596+
}
597+
if len(urlsToImport) != 0 {
598+
return errors.Newf("Expected zero remaining %q files after final attempt, but %d remaining.",
599+
ds.getTableName(), len(urlsToImport))
600+
}
601+
602+
// Restore GC TTLs so we don't interfere with post-import table validation.
603+
_, err = conn.ExecContext(ctx, ttl_stmt, 60*60*4 /* 4 hours */)
604+
return err
605+
}
606+
489607
// runSyncImportJob() runs an import job and waits for it to complete.
490608
func runSyncImportJob(ctx context.Context, conn *gosql.DB, ds dataset) error {
491-
importStmt := formatImportStmt(ds, false)
609+
importStmt := formatImportStmt(ds.getTableName(), ds.getDataURLs(), false)
492610
_, err := conn.ExecContext(ctx, importStmt)
493611
if err != nil {
494612
err = errors.Wrapf(err, "%s", importStmt)
@@ -498,7 +616,14 @@ func runSyncImportJob(ctx context.Context, conn *gosql.DB, ds dataset) error {
498616

499617
// runAsyncImportJob() runs an import job and returns the job id immediately.
500618
func runAsyncImportJob(ctx context.Context, conn *gosql.DB, ds dataset) (jobspb.JobID, error) {
501-
importStmt := formatImportStmt(ds, true)
619+
return runRawAsyncImportJob(ctx, conn, ds.getTableName(), ds.getDataURLs())
620+
}
621+
622+
// runRawAsyncImportJob() runs an import job using the table name and files provided.
623+
func runRawAsyncImportJob(
624+
ctx context.Context, conn *gosql.DB, tableName string, urls []string,
625+
) (jobspb.JobID, error) {
626+
importStmt := formatImportStmt(tableName, urls, true)
502627

503628
var jobID jobspb.JobID
504629
err := conn.QueryRowContext(ctx, importStmt).Scan(&jobID)
@@ -511,14 +636,10 @@ func runAsyncImportJob(ctx context.Context, conn *gosql.DB, ds dataset) (jobspb.
511636

512637
// formatImportStmt() takes a dataset and formats a SQL import statment for that
513638
// dataset.
514-
func formatImportStmt(d dataset, detached bool) string {
639+
func formatImportStmt(tableName string, urls []string, detached bool) string {
515640
var stmt strings.Builder
516-
fmt.Fprintf(
517-
&stmt,
518-
`IMPORT INTO import_test.%s CSV DATA ('%s') WITH delimiter='|'`,
519-
d.getTableName(),
520-
strings.Join(d.getDataURLs(), "', '"),
521-
)
641+
fmt.Fprintf(&stmt, `IMPORT INTO import_test.%s CSV DATA ('%s') WITH delimiter='|'`,
642+
tableName, strings.Join(urls, "', '"))
522643

523644
if detached {
524645
stmt.WriteString(", detached")

0 commit comments

Comments
 (0)