Skip to content

Commit 2e5f8a0

Browse files
craig[bot]mw5h
andcommitted
Merge #157929
157929: roachtest: add validation for row fragmentation in import tests r=mw5h a=mw5h Previously, import tests did not verify whether tables with multiple column families were correctly handled during import, specifically whether rows were fragmented across range boundaries. This could lead to data corruption if import splits a row mid-family. This commit adds validation logic that parses the output of SHOW RANGE FROM INDEX to ensure that all families for a given primary key value are located within a single range. This check is pretty cheap, so we do it for all datasets. We also add a test case that takes a random dataset and adds column families to the schema before beginning import as well as a smoke test that is locked to a small dataset for developer use. Fixes: #157241 Release note: None Co-authored-by: Matt White <[email protected]>
2 parents 9d6132d + 1e30470 commit 2e5f8a0

File tree

2 files changed

+160
-6
lines changed

2 files changed

+160
-6
lines changed

pkg/cmd/roachtest/tests/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ go_library(
283283
"//pkg/sql/execinfrapb",
284284
"//pkg/sql/pgwire/pgcode",
285285
"//pkg/sql/pgwire/pgerror",
286+
"//pkg/sql/randgen",
286287
"//pkg/sql/sem/tree",
287288
"//pkg/sql/ttl/ttlbase",
288289
"//pkg/sql/vecindex/cspann",

pkg/cmd/roachtest/tests/import.go

Lines changed: 159 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2828
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
2929
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
30+
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
3031
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3132
"github.com/cockroachdb/cockroach/pkg/util/retry"
3233
"github.com/cockroachdb/errors"
@@ -209,7 +210,7 @@ type importTestSpec struct {
209210
datasetNames stringSource
210211

211212
// preTestHook is run after tables are created, but before the import starts.
212-
preTestHook func(context.Context, test.Test, cluster.Cluster)
213+
preTestHook func(context.Context, test.Test, cluster.Cluster, *rand.Rand)
213214
// importRunner is an alternate import runner.
214215
importRunner func(context.Context, test.Test, cluster.Cluster, *logger.Logger, *rand.Rand, dataset) error
215216
}
@@ -230,6 +231,13 @@ var tests = []importTestSpec{
230231
calibrate: true,
231232
datasetNames: FromFunc(allDatasets),
232233
},
234+
// Small dataset for quickly iterating while developing this test.
235+
{
236+
subtestName: "smoke",
237+
nodes: []int{4},
238+
manualOnly: true,
239+
datasetNames: One("tpch/supplier"),
240+
},
233241
// Basic test w/o injected failures.
234242
{
235243
subtestName: "basic",
@@ -254,7 +262,7 @@ var tests = []importTestSpec{
254262
subtestName: "decommissioned",
255263
nodes: []int{4},
256264
datasetNames: FromFunc(anyDataset),
257-
preTestHook: func(ctx context.Context, t test.Test, c cluster.Cluster) {
265+
preTestHook: func(ctx context.Context, t test.Test, c cluster.Cluster, _ *rand.Rand) {
258266
nodeToDecommission := 2
259267
t.Status(fmt.Sprintf("decommissioning node %d", nodeToDecommission))
260268
c.Run(ctx, option.WithNodes(c.Node(nodeToDecommission)),
@@ -300,6 +308,13 @@ var tests = []importTestSpec{
300308
datasetNames: FromFunc(anyThreeDatasets),
301309
importRunner: importCancellationRunner,
302310
},
311+
// Test column families.
312+
{
313+
subtestName: "colfam",
314+
nodes: []int{4},
315+
datasetNames: FromFunc(anyDataset),
316+
preTestHook: makeColumnFamilies,
317+
},
303318
}
304319

305320
func registerImport(r registry.Registry) {
@@ -357,9 +372,9 @@ func runImportTest(
357372
// Create and use a test database
358373
conn := c.Conn(ctx, t.L(), 1)
359374
defer conn.Close()
360-
_, err := conn.Exec(`CREATE DATABASE import_test`)
375+
_, err := conn.ExecContext(ctx, `CREATE DATABASE import_test`)
361376
require.NoError(t, err)
362-
_, err = conn.Exec(`USE import_test`)
377+
_, err = conn.ExecContext(ctx, `USE import_test`)
363378
require.NoError(t, err)
364379

365380
// Initialize datasets and create tables.
@@ -372,14 +387,14 @@ func runImportTest(
372387
require.NotNilf(t, ds.getFingerprint(),
373388
"dataset '%s' has no fingerprint. Run calibrate manually.", name)
374389
}
375-
_, err = conn.Exec(ds.getCreateTableStmt())
390+
_, err = conn.ExecContext(ctx, ds.getCreateTableStmt())
376391
require.NoError(t, err)
377392
}
378393

379394
// If there's a pre-test hook, run it now.
380395
if testSpec.preTestHook != nil {
381396
t.Status("Running pre-test hook")
382-
testSpec.preTestHook(ctx, t, c)
397+
testSpec.preTestHook(ctx, t, c, rng)
383398
}
384399

385400
// For calibration runs, filter out datasets that have fingerprint files.
@@ -475,6 +490,11 @@ func runImportTest(
475490
t.WorkerStatus("validating ", datasetName)
476491
defer t.WorkerStatus()
477492

493+
err = validateNoRowFragmentation(ctx, l, conn, ds.getTableName())
494+
if err != nil {
495+
return err
496+
}
497+
478498
var rows *gosql.Rows
479499
rows, err = conn.Query(fmt.Sprintf(`SHOW FINGERPRINTS FROM TABLE import_test.%s`, ds.getTableName()))
480500
if err != nil {
@@ -510,6 +530,94 @@ func runImportTest(
510530
m.Wait()
511531
}
512532

533+
// validateNoRowFragmentation verifies that IMPORT did not split rows with multiple
534+
// column families across range boundaries. It queries the table schema and range
535+
// boundaries to ensure that no range starts with a key that includes a column family suffix.
536+
func validateNoRowFragmentation(
537+
ctx context.Context, l *logger.Logger, conn *gosql.DB, tableName string,
538+
) (err error) {
539+
defer func() {
540+
if err != nil {
541+
err = errors.Wrapf(err, "%s", tableName)
542+
}
543+
}()
544+
545+
var rows *gosql.Rows
546+
rows, err = conn.QueryContext(ctx,
547+
fmt.Sprintf(`SELECT index_name,
548+
count(*)
549+
FROM [SHOW INDEXES FROM import_test.%s]
550+
WHERE NOT storing
551+
GROUP BY index_name`, tableName))
552+
if err != nil {
553+
return err
554+
}
555+
556+
keyLens := make(map[string]int)
557+
for rows.Next() {
558+
var keyName string
559+
var keyLen int
560+
err = rows.Scan(&keyName, &keyLen)
561+
if err != nil {
562+
return err
563+
}
564+
keyLens[keyName] = keyLen
565+
}
566+
if err = rows.Err(); err != nil {
567+
return err
568+
}
569+
570+
for keyName, keyLen := range keyLens {
571+
l.Printf("Checking key %s with %d key columns for split rows", keyName, keyLen)
572+
573+
rows, err = conn.QueryContext(ctx, fmt.Sprintf(
574+
`SELECT start_key, end_key, range_id FROM [SHOW RANGES FROM INDEX import_test.%s@%s]`,
575+
tableName, keyName))
576+
if err != nil {
577+
return err
578+
}
579+
580+
for rows.Next() {
581+
var startKey, endKey string
582+
var rangeID int64
583+
err = rows.Scan(&startKey, &endKey, &rangeID)
584+
if err != nil {
585+
return errors.Wrapf(err, "%s", keyName)
586+
}
587+
588+
if err = checkKeyForFamilySuffix(startKey, keyLen); err != nil {
589+
return errors.Wrapf(err, "%s start key", keyName)
590+
}
591+
if err = checkKeyForFamilySuffix(endKey, keyLen); err != nil {
592+
return errors.Wrapf(err, "%s end key", keyName)
593+
}
594+
}
595+
if err = rows.Err(); err != nil {
596+
return errors.Wrapf(err, "%s", keyName)
597+
}
598+
}
599+
600+
return nil
601+
}
602+
603+
// checkKeyForFamilySuffix checks if a pretty-printed key from SHOW RANGES contains
604+
// a column family suffix, which would indicate a mid-row split. maxAllowed is the
605+
// maximum number of key segments expected for this index (before family suffix).
606+
func checkKeyForFamilySuffix(prettyKey string, maxAllowed int) error {
607+
// Skip special boundary markers
608+
if strings.HasPrefix(prettyKey, "<before:") || strings.HasPrefix(prettyKey, "<after:") ||
609+
strings.Contains(prettyKey, "Min>") || strings.Contains(prettyKey, "Max>") {
610+
return nil
611+
}
612+
613+
numKeyParts := len(strings.Split(strings.TrimPrefix(prettyKey, "…/"), "/"))
614+
615+
if numKeyParts > maxAllowed {
616+
return errors.Newf("%s shows a mid-row split", prettyKey)
617+
}
618+
return nil
619+
}
620+
513621
// importCancellationRunner() is the test runner for the import cancellation
514622
// test. This test makes a number of attempts at importing a dataset, cancelling
515623
// all but the last. Each attempt imports a random subset of files from the
@@ -621,6 +729,51 @@ func importCancellationRunner(
621729
return err
622730
}
623731

732+
// makeColumnFamilies() is a pre-test hook that changes the tables
733+
// in import_test to use column families. To do this, we iterate the
734+
// tables in the database, reading the schema for each table, modifying
735+
// the schema and then re-creating the table. Because CRDB does not
736+
// allow altering a column's family, we simply drop and re-create the
737+
// entire table.
738+
func makeColumnFamilies(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) {
739+
conn := c.Conn(ctx, t.L(), 1)
740+
defer conn.Close()
741+
742+
// Read table names
743+
rows, err := conn.QueryContext(ctx, `SELECT table_name FROM [SHOW TABLES FROM import_test]`)
744+
require.NoError(t, err)
745+
var tableNames []string
746+
for rows.Next() {
747+
var name string
748+
require.NoError(t, rows.Scan(&name))
749+
tableNames = append(tableNames, name)
750+
}
751+
require.NoError(t, rows.Err())
752+
753+
for _, tableName := range tableNames {
754+
var createStmt string
755+
err = conn.QueryRowContext(ctx,
756+
fmt.Sprintf(`SELECT create_statement FROM [SHOW CREATE TABLE import_test.%s]`,
757+
tableName)).Scan(&createStmt)
758+
require.NoError(t, err)
759+
760+
createStmt, changed := randgen.ApplyString(rng, createStmt, randgen.ColumnFamilyMutator)
761+
if !changed {
762+
continue
763+
}
764+
765+
oldFullyQualifiedTableName := fmt.Sprintf("public.%s", tableName)
766+
newFullyQualifiedTableName := fmt.Sprintf("import_test.%s", tableName)
767+
createStmt = strings.Replace(createStmt, oldFullyQualifiedTableName, newFullyQualifiedTableName, 1)
768+
769+
t.L().Printf("Using: %s", createStmt)
770+
_, err = conn.ExecContext(ctx, fmt.Sprintf(`DROP TABLE import_test.%s`, tableName))
771+
require.NoError(t, err)
772+
_, err = conn.ExecContext(ctx, createStmt)
773+
require.NoError(t, err)
774+
}
775+
}
776+
624777
// runSyncImportJob() runs an import job and waits for it to complete.
625778
func runSyncImportJob(ctx context.Context, conn *gosql.DB, ds dataset) error {
626779
importStmt := formatImportStmt(ds.getTableName(), ds.getDataURLs(), false)

0 commit comments

Comments
 (0)