@@ -58,7 +58,6 @@ import (
58
58
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
59
59
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
60
60
"github.com/cockroachdb/cockroach/pkg/sql/stats"
61
- "github.com/cockroachdb/cockroach/pkg/sql/tests"
62
61
"github.com/cockroachdb/cockroach/pkg/testutils"
63
62
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
64
63
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
@@ -2006,35 +2005,36 @@ func TestImportRowLimit(t *testing.T) {
2006
2005
})
2007
2006
}
2008
2007
2009
- func TestFailedImportGC (t * testing.T ) {
2008
+ // TestFailedImport verifies that a failed import will clean up after itself
2009
+ // (meaning that the table doesn't contain garbage data that was partially
2010
+ // imported and that the table is brought online).
2011
+ func TestFailedImport (t * testing.T ) {
2010
2012
defer leaktest .AfterTest (t )()
2011
2013
defer log .Scope (t ).Close (t )
2012
2014
2013
2015
skip .UnderRace (t )
2014
2016
2017
+ rng , _ := randutil .NewTestRand ()
2015
2018
const nodes = 3
2016
-
2017
- var forceFailure bool
2018
- blockGC := make (chan struct {})
2019
+ numFiles := nodes
2020
+ rowsPerFile := 1000
2021
+ rowsPerRaceFile := 16
2022
+ testFiles := makeCSVData (t , numFiles , rowsPerFile , nodes , rowsPerRaceFile )
2019
2023
2020
2024
ctx := context .Background ()
2021
- baseDir := datapathutils .TestDataPath (t , "pgdump " )
2025
+ baseDir := datapathutils .TestDataPath (t , "csv " )
2022
2026
tc := serverutils .StartCluster (t , nodes , base.TestClusterArgs {ServerArgs : base.TestServerArgs {
2023
2027
// Test fails within a test tenant. This may be because we're trying
2024
2028
// to access files in nodelocal://1, which is off node. More
2025
2029
// investigation is required. Tracked with #76378.
2026
2030
DefaultTestTenant : base .TODOTestTenantDisabled ,
2027
2031
SQLMemoryPoolSize : 256 << 20 ,
2028
2032
ExternalIODir : baseDir ,
2029
- Knobs : base.TestingKnobs {
2030
- GCJob : & sql.GCJobTestingKnobs {
2031
- RunBeforeResume : func (_ jobspb.JobID ) error { <- blockGC ; return nil },
2032
- },
2033
- },
2034
2033
}})
2035
2034
defer tc .Stopper ().Stop (ctx )
2036
2035
conn := tc .ServerConn (0 )
2037
2036
2037
+ var forceFailure bool
2038
2038
for i := 0 ; i < tc .NumServers (); i ++ {
2039
2039
tc .Server (i ).JobRegistry ().(* jobs.Registry ).TestingWrapResumerConstructor (
2040
2040
jobspb .TypeImport ,
@@ -2051,56 +2051,25 @@ func TestFailedImportGC(t *testing.T) {
2051
2051
}
2052
2052
2053
2053
sqlDB := sqlutils .MakeSQLRunner (conn )
2054
- kvDB := tc .Server (0 ).DB ()
2055
-
2056
2054
sqlDB .Exec (t , `SET CLUSTER SETTING kv.bulk_ingest.batch_size = '10KB'` )
2055
+ sqlDB .Exec (t , "CREATE DATABASE failedimport; USE failedimport;" )
2056
+ sqlDB .Exec (t , `CREATE TABLE t (a INT PRIMARY KEY, b STRING)` )
2057
2057
2058
- forceFailure = true
2059
- defer func () { forceFailure = false }()
2060
- defer gcjob .SetSmallMaxGCIntervalForTest ()()
2061
- beforeImport , err := tree .MakeDTimestampTZ (tc .Server (0 ).Clock ().Now ().GoTime (), time .Millisecond )
2062
- if err != nil {
2063
- t .Fatal (err )
2058
+ expectedRows := "0"
2059
+ if rng .Float64 () < 0.5 {
2060
+ sqlDB .Exec (t , `INSERT INTO t VALUES (-1, 'a'), (-2, 'b')` )
2061
+ expectedRows = "2"
2064
2062
}
2065
2063
2066
- sqlDB .Exec (t , "CREATE DATABASE failedimport; USE failedimport;" )
2064
+ forceFailure = true
2065
+ defer func () { forceFailure = false }()
2067
2066
// Hit a failure during import.
2068
2067
sqlDB .ExpectErr (
2069
2068
t , `testing injected failure` ,
2070
- fmt .Sprintf (` IMPORT TABLE simple FROM PGDUMP ('%s') WITH ignore_unsupported_statements` , "nodelocal://1/simple.sql" ),
2069
+ fmt .Sprintf (" IMPORT INTO t (a, b) CSV DATA (%s)" , strings . Join ( testFiles . files , "," ) ),
2071
2070
)
2072
- // Nudge the registry to quickly adopt the job.
2073
- tc .Server (0 ).JobRegistry ().(* jobs.Registry ).TestingNudgeAdoptionQueue ()
2074
-
2075
- // In the case of the test, the ID of the table that will be cleaned up due
2076
- // to the failed import will be two higher than the ID of the empty database
2077
- // it was created in.
2078
- // We increment the id once for the public schema and a second time for the
2079
- // "MakeSimpleTableDescriptor".
2080
- dbID := sqlutils .QueryDatabaseID (t , sqlDB .DB , "failedimport" )
2081
- tableID := descpb .ID (dbID + 2 )
2082
- var td catalog.TableDescriptor
2083
- execCfg := tc .Server (0 ).ExecutorConfig ().(sql.ExecutorConfig )
2084
- if err := sql .DescsTxn (ctx , & execCfg , func (ctx context.Context , txn isql.Txn , col * descs.Collection ) (err error ) {
2085
- td , err = col .ByIDWithoutLeased (txn .KV ()).Get ().Table (ctx , tableID )
2086
- return err
2087
- }); err != nil {
2088
- t .Fatal (err )
2089
- }
2090
- // Ensure that we have garbage written to the descriptor that we want to
2091
- // clean up.
2092
- tests .CheckKeyCount (t , kvDB , td .TableSpan (keys .SystemSQLCodec ), 87 )
2093
-
2094
- // Allow GC to progress.
2095
- close (blockGC )
2096
- // Ensure that a GC job was created, and wait for it to finish.
2097
- doneGCQuery := fmt .Sprintf (
2098
- "SELECT count(*) FROM crdb_internal.jobs WHERE job_type = '%s' AND running_status = '%s' AND created > %s" ,
2099
- "SCHEMA CHANGE GC" , sql .StatusWaitingForMVCCGC , beforeImport .String (),
2100
- )
2101
- sqlDB .CheckQueryResultsRetry (t , doneGCQuery , [][]string {{"1" }})
2102
- // Expect there are no more KVs for this span.
2103
- tests .CheckKeyCount (t , kvDB , td .TableSpan (keys .SystemSQLCodec ), 0 )
2071
+ // Ensure that the table is online and is reverted properly.
2072
+ sqlDB .CheckQueryResultsRetry (t , "SELECT count(*) FROM t" , [][]string {{expectedRows }})
2104
2073
}
2105
2074
2106
2075
// TestImportIntoCSVCancel cancels a distributed import. This test
@@ -2163,10 +2132,6 @@ func TestImportIntoCSVCancel(t *testing.T) {
2163
2132
sqlDB .CheckQueryResults (t , "SELECT count(*) FROM t" , [][]string {{"0" }})
2164
2133
}
2165
2134
2166
- // Verify that a failed import will clean up after itself. This means:
2167
- // - Delete the garbage data that it partially imported.
2168
- // - Delete the table descriptor for the table that was created during the
2169
- // import.
2170
2135
func TestImportCSVStmt (t * testing.T ) {
2171
2136
defer leaktest .AfterTest (t )()
2172
2137
defer log .Scope (t ).Close (t )
@@ -6538,8 +6503,12 @@ func TestCreateStatsAfterImport(t *testing.T) {
6538
6503
stats .DefaultAsOfTime = time .Microsecond
6539
6504
6540
6505
const nodes = 1
6506
+ numFiles := nodes
6507
+ rowsPerFile := 1000
6508
+ rowsPerRaceFile := 16
6509
+ testFiles := makeCSVData (t , numFiles , rowsPerFile , nodes , rowsPerRaceFile )
6541
6510
ctx := context .Background ()
6542
- baseDir := datapathutils .TestDataPath (t )
6511
+ baseDir := datapathutils .TestDataPath (t , "csv" )
6543
6512
6544
6513
// Disable stats collection on system tables before the cluster is started,
6545
6514
// otherwise there is a race condition where stats may be collected before we
@@ -6560,22 +6529,39 @@ func TestCreateStatsAfterImport(t *testing.T) {
6560
6529
6561
6530
sqlDB .Exec (t , `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=true` )
6562
6531
6563
- sqlDB .Exec (t , "IMPORT PGDUMP ($1) WITH ignore_unsupported_statements" , "nodelocal://1/cockroachdump/dump.sql" )
6532
+ sqlDB .Exec (t , `CREATE TABLE t (a INT PRIMARY KEY, b STRING)` )
6564
6533
6565
- // Verify that statistics have been created.
6534
+ emptyTableStats := [][]string {
6535
+ {"__auto__" , "{a}" , "0" , "0" , "0" },
6536
+ {"__auto__" , "{b}" , "0" , "0" , "0" },
6537
+ }
6538
+ // Wait until stats are collected on the empty table (to make the test more
6539
+ // deterministic).
6566
6540
sqlDB .CheckQueryResultsRetry (t ,
6567
6541
`SELECT statistics_name, column_names, row_count, distinct_count, null_count
6568
6542
FROM [SHOW STATISTICS FOR TABLE t]` ,
6569
- [][]string {
6570
- {"__auto__" , "{i}" , "2" , "2" , "0" },
6571
- {"__auto__" , "{t}" , "2" , "2" , "0" },
6572
- })
6543
+ emptyTableStats )
6544
+
6545
+ sqlDB .Exec (
6546
+ t , fmt .Sprintf ("IMPORT INTO t (a, b) CSV DATA (%s)" , strings .Join (testFiles .files , "," )),
6547
+ )
6548
+
6549
+ expectedStats := [][]string {
6550
+ {"__auto__" , "{a}" , "1000" , "1000" , "0" },
6551
+ {"__auto__" , "{b}" , "1000" , "26" , "0" },
6552
+ }
6553
+ if util .RaceEnabled {
6554
+ expectedStats = [][]string {
6555
+ {"__auto__" , "{a}" , "16" , "16" , "0" },
6556
+ {"__auto__" , "{b}" , "16" , "16" , "0" },
6557
+ }
6558
+ }
6559
+
6560
+ // Verify that statistics have been created.
6573
6561
sqlDB .CheckQueryResultsRetry (t ,
6574
6562
`SELECT statistics_name, column_names, row_count, distinct_count, null_count
6575
- FROM [SHOW STATISTICS FOR TABLE a]` ,
6576
- [][]string {
6577
- {"__auto__" , "{i}" , "1" , "1" , "0" },
6578
- })
6563
+ FROM [SHOW STATISTICS FOR TABLE t]` ,
6564
+ append (emptyTableStats , expectedStats ... ))
6579
6565
}
6580
6566
6581
6567
func TestImportAvro (t * testing.T ) {
0 commit comments