@@ -49,6 +49,7 @@ import (
4949 "github.com/cockroachdb/cockroach/pkg/sql/gcjob"
5050 "github.com/cockroachdb/cockroach/pkg/sql/isql"
5151 "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
52+ "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
5253 "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
5354 "github.com/cockroachdb/cockroach/pkg/sql/stats"
5455 "github.com/cockroachdb/cockroach/pkg/testutils"
@@ -1587,125 +1588,109 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
15871588func TestSchemaChangePurgeFailure (t * testing.T ) {
15881589 defer leaktest .AfterTest (t )()
15891590 defer log .Scope (t ).Close (t )
1590- skip .WithIssue (t , 51796 )
1591- // TODO (lucy): This test needs more complicated schema changer knobs than
1592- // currently implemented. Previously this test disabled the async schema
1593- // changer so that we don't retry the cleanup of the failed schema change
1594- // until a certain point in the test.
1595- params , _ := createTestServerParams ()
1596- const chunkSize = 200
1597- var enableAsyncSchemaChanges uint32
1598- var attempts int32
1599- // attempt 1: write the first chunk of the index.
1600- // attempt 2: write the second chunk and hit a unique constraint
1601- // violation; purge the schema change.
1602- // attempt 3: return an error while purging the schema change.
1603- var expectedAttempts int32 = 3
1604- params .Knobs = base.TestingKnobs {
1605- SQLSchemaChanger : & sql.SchemaChangerTestingKnobs {
1606- BackfillChunkSize : chunkSize ,
1607- },
1608- DistSQL : & execinfra.TestingKnobs {
1609- RunBeforeBackfillChunk : func (sp roachpb.Span ) error {
1610- // Return a deadline exceeded error during the third attempt
1611- // which attempts to clean up the schema change.
1612- if atomic .AddInt32 (& attempts , 1 ) == expectedAttempts {
1613- // Disable the async schema changer for assertions.
1614- atomic .StoreUint32 (& enableAsyncSchemaChanges , 0 )
1615- return context .DeadlineExceeded
1616- }
1617- return nil
1618- },
1619- BulkAdderFlushesEveryBatch : true ,
1620- },
1621- }
1622- server , sqlDB , kvDB := serverutils .StartServer (t , params )
1623- defer server .Stopper ().Stop (context .Background ())
1624- codec := server .ApplicationLayer ().Codec ()
16251591
1626- // Disable strict GC TTL enforcement because we're going to shove a zero-value
1627- // TTL into the system with AddImmediateGCZoneConfig.
1628- defer sqltestutils .DisableGCTTLStrictEnforcement (t , sqlDB )()
1629-
1630- if _ , err := sqlDB .Exec (`
1631- CREATE DATABASE t;
1632- CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
1633- ` ); err != nil {
1634- t .Fatal (err )
1635- }
1636-
1637- // Bulk insert.
1638- const maxValue = chunkSize + 1
1639- if err := sqltestutils .BulkInsertIntoTable (sqlDB , maxValue ); err != nil {
1640- t .Fatal (err )
1641- }
1592+ for _ , schemaChangerSetup := range []string {
1593+ "SET use_declarative_schema_changer='off'" ,
1594+ "SET use_declarative_schema_changer='on'" ,
1595+ } {
1596+ params , _ := createTestServerParams ()
1597+ const chunkSize = 200
16421598
1643- // Add a row with a duplicate value=0 which is the same
1644- // value as for the key maxValue.
1645- if _ , err := sqlDB .Exec (
1646- `INSERT INTO t.test VALUES ($1, $2)` , maxValue + 1 , 0 ,
1647- ); err != nil {
1648- t .Fatal (err )
1649- }
1599+ var getKeyCount func () (int , error )
1600+ countBeforeRollback := 0
1601+ params .Knobs = base.TestingKnobs {
1602+ SQLSchemaChanger : & sql.SchemaChangerTestingKnobs {
1603+ RunBeforeOnFailOrCancel : func (jobID jobspb.JobID ) error {
1604+ cnt , err := getKeyCount ()
1605+ if err != nil {
1606+ return err
1607+ }
1608+ countBeforeRollback = cnt
1609+ return nil
1610+ },
1611+ },
1612+ DistSQL : & execinfra.TestingKnobs {
1613+ BulkAdderFlushesEveryBatch : true ,
1614+ },
1615+ SQLDeclarativeSchemaChanger : & scexec.TestingKnobs {
1616+ RunBeforeMakingPostCommitPlan : func (inRollback bool ) error {
1617+ if inRollback {
1618+ cnt , err := getKeyCount ()
1619+ if err != nil {
1620+ return err
1621+ }
1622+ countBeforeRollback = cnt
1623+ }
1624+ return nil
1625+ },
1626+ },
1627+ }
1628+ server , sqlDB , kvDB := serverutils .StartServer (t , params )
1629+ defer server .Stopper ().Stop (context .Background ())
1630+ codec := server .ApplicationLayer ().Codec ()
16501631
1651- // A schema change that violates integrity constraints.
1652- if _ , err := sqlDB .Exec (
1653- "CREATE UNIQUE INDEX foo ON t.test (v)" ,
1654- ); ! testutils .IsError (err , `violates unique constraint "foo"` ) {
1655- t .Fatal (err )
1656- }
1632+ _ , err := sqlDB .Exec (fmt .Sprintf ("SET CLUSTER SETTING bulkio.index_backfill.batch_size = %d" , chunkSize ))
1633+ require .NoError (t , err )
16571634
1658- // The index doesn't exist
1659- if _ , err := sqlDB . Query (
1660- `SELECT v from t.test@foo` ,
1661- ); ! testutils . IsError ( err , "index .* not found" ) {
1662- t . Fatal ( err )
1663- }
1635+ getKeyCount = func () ( int , error ) {
1636+ return sqltestutils . GetTableKeyCount ( ctx , kvDB , codec )
1637+ }
1638+ // Disable strict GC TTL enforcement because we're going to shove a zero-value
1639+ // TTL into the system with AddImmediateGCZoneConfig.
1640+ defer sqltestutils . DisableGCTTLStrictEnforcement ( t , sqlDB )()
16641641
1665- // Allow async schema change purge to attempt backfill and error.
1666- atomic .StoreUint32 (& enableAsyncSchemaChanges , 1 )
1667- tableDesc := desctestutils .TestingGetPublicTableDescriptor (kvDB , keys .SystemSQLCodec , "t" , "test" )
1668- // deal with schema change knob
1669- if _ , err := sqltestutils .AddImmediateGCZoneConfig (sqlDB , tableDesc .GetID ()); err != nil {
1670- t .Fatal (err )
1671- }
1642+ _ , err = sqlDB .Exec (schemaChangerSetup )
1643+ require .NoError (t , err )
16721644
1673- // The deadline exceeded error in the schema change purge results in no
1674- // retry attempts of the purge.
1675- testutils . SucceedsSoon ( t , func () error {
1676- if read := atomic . LoadInt32 ( & attempts ); read != expectedAttempts {
1677- return errors . Errorf ( "%d retries, despite allowing only (schema change + reverse) = %d" , read , expectedAttempts )
1645+ if _ , err := sqlDB . Exec ( `
1646+ CREATE DATABASE t;
1647+ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
1648+ ` ); err != nil {
1649+ t . Fatal ( err )
16781650 }
1679- return nil
1680- })
1681-
1682- // There is still some garbage index data that needs to be purged. All the
1683- // rows from k = 0 to k = chunkSize - 1 have index values.
1684- numGarbageValues := chunkSize
16851651
1686- ctx := context .Background ()
1652+ // Bulk insert.
1653+ const maxValue = chunkSize + 1
1654+ if err := sqltestutils .BulkInsertIntoTable (sqlDB , maxValue ); err != nil {
1655+ t .Fatal (err )
1656+ }
16871657
1688- if err := sqltestutils .CheckTableKeyCount (ctx , kvDB , codec , 1 , maxValue + 1 + numGarbageValues ); err != nil {
1689- t .Fatal (err )
1690- }
1658+ // Add a row with a duplicate value=0 which is the same
1659+ // value as for the key maxValue.
1660+ if _ , err := sqlDB .Exec (
1661+ `INSERT INTO t.test VALUES ($1, $2)` , maxValue + 1 , 0 ,
1662+ ); err != nil {
1663+ t .Fatal (err )
1664+ }
16911665
1692- if err := sqlutils .RunScrub (sqlDB , "t" , "test" ); err != nil {
1693- t .Fatal (err )
1694- }
1666+ // A schema change that violates integrity constraints.
1667+ if _ , err := sqlDB .Exec (
1668+ "CREATE UNIQUE INDEX foo ON t.test (v)" ,
1669+ ); ! testutils .IsError (err , `violates unique constraint "foo"` ) {
1670+ t .Fatal (err )
1671+ }
16951672
1696- // Enable async schema change processing to ensure that it cleans up the
1697- // above garbage left behind.
1698- atomic .StoreUint32 (& enableAsyncSchemaChanges , 1 )
1673+ // The index doesn't exist
1674+ if _ , err := sqlDB .Query (
1675+ `SELECT v from t.test@foo` ,
1676+ ); ! testutils .IsError (err , "index .* not found" ) {
1677+ t .Fatal (err )
1678+ }
16991679
1700- // No garbage left behind.
1701- testutils .SucceedsSoon (t , func () error {
1702- numGarbageValues = 0
1703- return sqltestutils .CheckTableKeyCount (ctx , kvDB , codec , 1 , maxValue + 1 + numGarbageValues )
1704- })
1680+ // countBeforeRollback is assigned in the rollback testing knob which is
1681+ // called before rollback starts so that the first chunk (200 keys) written
1682+ // is still visible. The first chunk is visible because there is no
1683+ // duplicate keys within it. The duplicate keys only exist in the second
1684+ // chunk. Also note that we wrote maxValue+1 rows, and there is 1 extra key
1685+ // from kv.
1686+ require .Equal (t , countBeforeRollback , maxValue + 2 + chunkSize )
17051687
1706- // A new attempt cleans up a chunk of data.
1707- if attempts != expectedAttempts + 1 {
1708- t .Fatalf ("%d chunk ops, despite allowing only (schema change + reverse) = %d" , attempts , expectedAttempts )
1688+ // No garbage left behind after rollback. This check should succeed pretty
1689+ // fast since we use `DelRange` in GC and `CheckTableKeyCount` cannot see
1690+ // tombstones.
1691+ testutils .SucceedsSoon (t , func () error {
1692+ return sqltestutils .CheckTableKeyCount (ctx , kvDB , codec , 1 , maxValue + 1 )
1693+ })
17091694 }
17101695}
17111696
0 commit comments