@@ -27,6 +27,7 @@ import (
2727 "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2828 "github.com/cockroachdb/cockroach/pkg/roachprod/install"
2929 "github.com/cockroachdb/cockroach/pkg/roachprod/logger"
30+ "github.com/cockroachdb/cockroach/pkg/sql/randgen"
3031 "github.com/cockroachdb/cockroach/pkg/util/randutil"
3132 "github.com/cockroachdb/cockroach/pkg/util/retry"
3233 "github.com/cockroachdb/errors"
@@ -209,7 +210,7 @@ type importTestSpec struct {
209210 datasetNames stringSource
210211
211212 // preTestHook is run after tables are created, but before the import starts.
212- preTestHook func (context.Context , test.Test , cluster.Cluster )
213+ preTestHook func (context.Context , test.Test , cluster.Cluster , * rand. Rand )
213214 // importRunner is an alternate import runner.
214215 importRunner func (context.Context , test.Test , cluster.Cluster , * logger.Logger , * rand.Rand , dataset ) error
215216}
@@ -230,6 +231,13 @@ var tests = []importTestSpec{
230231 calibrate : true ,
231232 datasetNames : FromFunc (allDatasets ),
232233 },
234+ // Small dataset for quickly iterating while developing this test.
235+ {
236+ subtestName : "smoke" ,
237+ nodes : []int {4 },
238+ manualOnly : true ,
239+ datasetNames : One ("tpch/supplier" ),
240+ },
233241 // Basic test w/o injected failures.
234242 {
235243 subtestName : "basic" ,
@@ -254,7 +262,7 @@ var tests = []importTestSpec{
254262 subtestName : "decommissioned" ,
255263 nodes : []int {4 },
256264 datasetNames : FromFunc (anyDataset ),
257- preTestHook : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
265+ preTestHook : func (ctx context.Context , t test.Test , c cluster.Cluster , _ * rand. Rand ) {
258266 nodeToDecommission := 2
259267 t .Status (fmt .Sprintf ("decommissioning node %d" , nodeToDecommission ))
260268 c .Run (ctx , option .WithNodes (c .Node (nodeToDecommission )),
@@ -300,6 +308,13 @@ var tests = []importTestSpec{
300308 datasetNames : FromFunc (anyThreeDatasets ),
301309 importRunner : importCancellationRunner ,
302310 },
311+ // Test column families.
312+ {
313+ subtestName : "colfam" ,
314+ nodes : []int {4 },
315+ datasetNames : FromFunc (anyDataset ),
316+ preTestHook : makeColumnFamilies ,
317+ },
303318}
304319
305320func registerImport (r registry.Registry ) {
@@ -357,9 +372,9 @@ func runImportTest(
357372 // Create and use a test database
358373 conn := c .Conn (ctx , t .L (), 1 )
359374 defer conn .Close ()
360- _ , err := conn .Exec ( `CREATE DATABASE import_test` )
375+ _ , err := conn .ExecContext ( ctx , `CREATE DATABASE import_test` )
361376 require .NoError (t , err )
362- _ , err = conn .Exec ( `USE import_test` )
377+ _ , err = conn .ExecContext ( ctx , `USE import_test` )
363378 require .NoError (t , err )
364379
365380 // Initialize datasets and create tables.
@@ -372,14 +387,14 @@ func runImportTest(
372387 require .NotNilf (t , ds .getFingerprint (),
373388 "dataset '%s' has no fingerprint. Run calibrate manually." , name )
374389 }
375- _ , err = conn .Exec ( ds .getCreateTableStmt ())
390+ _ , err = conn .ExecContext ( ctx , ds .getCreateTableStmt ())
376391 require .NoError (t , err )
377392 }
378393
379394 // If there's a pre-test hook, run it now.
380395 if testSpec .preTestHook != nil {
381396 t .Status ("Running pre-test hook" )
382- testSpec .preTestHook (ctx , t , c )
397+ testSpec .preTestHook (ctx , t , c , rng )
383398 }
384399
385400 // For calibration runs, filter out datasets that have fingerprint files.
@@ -475,6 +490,11 @@ func runImportTest(
475490 t .WorkerStatus ("validating " , datasetName )
476491 defer t .WorkerStatus ()
477492
493+ err = validateNoRowFragmentation (ctx , l , conn , ds .getTableName ())
494+ if err != nil {
495+ return err
496+ }
497+
478498 var rows * gosql.Rows
479499 rows , err = conn .Query (fmt .Sprintf (`SHOW FINGERPRINTS FROM TABLE import_test.%s` , ds .getTableName ()))
480500 if err != nil {
@@ -510,6 +530,94 @@ func runImportTest(
510530 m .Wait ()
511531}
512532
533+ // validateNoRowFragmentation verifies that IMPORT did not split rows with multiple
534+ // column families across range boundaries. It queries the table schema and range
535+ // boundaries to ensure that no range starts with a key that includes a column family suffix.
536+ func validateNoRowFragmentation (
537+ ctx context.Context , l * logger.Logger , conn * gosql.DB , tableName string ,
538+ ) (err error ) {
539+ defer func () {
540+ if err != nil {
541+ err = errors .Wrapf (err , "%s" , tableName )
542+ }
543+ }()
544+
545+ var rows * gosql.Rows
546+ rows , err = conn .QueryContext (ctx ,
547+ fmt .Sprintf (`SELECT index_name,
548+ count(*)
549+ FROM [SHOW INDEXES FROM import_test.%s]
550+ WHERE NOT storing
551+ GROUP BY index_name` , tableName ))
552+ if err != nil {
553+ return err
554+ }
555+
556+ keyLens := make (map [string ]int )
557+ for rows .Next () {
558+ var keyName string
559+ var keyLen int
560+ err = rows .Scan (& keyName , & keyLen )
561+ if err != nil {
562+ return err
563+ }
564+ keyLens [keyName ] = keyLen
565+ }
566+ if err = rows .Err (); err != nil {
567+ return err
568+ }
569+
570+ for keyName , keyLen := range keyLens {
571+ l .Printf ("Checking key %s with %d key columns for split rows" , keyName , keyLen )
572+
573+ rows , err = conn .QueryContext (ctx , fmt .Sprintf (
574+ `SELECT start_key, end_key, range_id FROM [SHOW RANGES FROM INDEX import_test.%s@%s]` ,
575+ tableName , keyName ))
576+ if err != nil {
577+ return err
578+ }
579+
580+ for rows .Next () {
581+ var startKey , endKey string
582+ var rangeID int64
583+ err = rows .Scan (& startKey , & endKey , & rangeID )
584+ if err != nil {
585+ return errors .Wrapf (err , "%s" , keyName )
586+ }
587+
588+ if err = checkKeyForFamilySuffix (startKey , keyLen ); err != nil {
589+ return errors .Wrapf (err , "%s start key" , keyName )
590+ }
591+ if err = checkKeyForFamilySuffix (endKey , keyLen ); err != nil {
592+ return errors .Wrapf (err , "%s end key" , keyName )
593+ }
594+ }
595+ if err = rows .Err (); err != nil {
596+ return errors .Wrapf (err , "%s" , keyName )
597+ }
598+ }
599+
600+ return nil
601+ }
602+
603+ // checkKeyForFamilySuffix checks if a pretty-printed key from SHOW RANGES contains
604+ // a column family suffix, which would indicate a mid-row split. maxAllowed is the
605+ // maximum number of key segments expected for this index (before family suffix).
606+ func checkKeyForFamilySuffix (prettyKey string , maxAllowed int ) error {
607+ // Skip special boundary markers
608+ if strings .HasPrefix (prettyKey , "<before:" ) || strings .HasPrefix (prettyKey , "<after:" ) ||
609+ strings .Contains (prettyKey , "Min>" ) || strings .Contains (prettyKey , "Max>" ) {
610+ return nil
611+ }
612+
613+ numKeyParts := len (strings .Split (strings .TrimPrefix (prettyKey , "…/" ), "/" ))
614+
615+ if numKeyParts > maxAllowed {
616+ return errors .Newf ("%s shows a mid-row split" , prettyKey )
617+ }
618+ return nil
619+ }
620+
513621// importCancellationRunner() is the test runner for the import cancellation
514622// test. This test makes a number of attempts at importing a dataset, cancelling
515623// all but the last. Each attempt imports a random subset of files from the
@@ -621,6 +729,51 @@ func importCancellationRunner(
621729 return err
622730}
623731
732+ // makeColumnFamilies() is a pre-test hook that changes the tables
733+ // in import_test to use column families. To do this, we iterate the
734+ // tables in the database, reading the schema for each table, modifying
735+ // the schema and then re-creating the table. Because CRDB does not
736+ // allow altering a column's family, we simply drop and re-create the
737+ // entire table.
738+ func makeColumnFamilies (ctx context.Context , t test.Test , c cluster.Cluster , rng * rand.Rand ) {
739+ conn := c .Conn (ctx , t .L (), 1 )
740+ defer conn .Close ()
741+
742+ // Read table names
743+ rows , err := conn .QueryContext (ctx , `SELECT table_name FROM [SHOW TABLES FROM import_test]` )
744+ require .NoError (t , err )
745+ var tableNames []string
746+ for rows .Next () {
747+ var name string
748+ require .NoError (t , rows .Scan (& name ))
749+ tableNames = append (tableNames , name )
750+ }
751+ require .NoError (t , rows .Err ())
752+
753+ for _ , tableName := range tableNames {
754+ var createStmt string
755+ err = conn .QueryRowContext (ctx ,
756+ fmt .Sprintf (`SELECT create_statement FROM [SHOW CREATE TABLE import_test.%s]` ,
757+ tableName )).Scan (& createStmt )
758+ require .NoError (t , err )
759+
760+ createStmt , changed := randgen .ApplyString (rng , createStmt , randgen .ColumnFamilyMutator )
761+ if ! changed {
762+ continue
763+ }
764+
765+ oldFullyQualifiedTableName := fmt .Sprintf ("public.%s" , tableName )
766+ newFullyQualifiedTableName := fmt .Sprintf ("import_test.%s" , tableName )
767+ createStmt = strings .Replace (createStmt , oldFullyQualifiedTableName , newFullyQualifiedTableName , 1 )
768+
769+ t .L ().Printf ("Using: %s" , createStmt )
770+ _ , err = conn .ExecContext (ctx , fmt .Sprintf (`DROP TABLE import_test.%s` , tableName ))
771+ require .NoError (t , err )
772+ _ , err = conn .ExecContext (ctx , createStmt )
773+ require .NoError (t , err )
774+ }
775+ }
776+
624777// runSyncImportJob() runs an import job and waits for it to complete.
625778func runSyncImportJob (ctx context.Context , conn * gosql.DB , ds dataset ) error {
626779 importStmt := formatImportStmt (ds .getTableName (), ds .getDataURLs (), false )
0 commit comments