Skip to content

Commit 4fe2a80

Browse files
craig[bot]yuzefovichchengxiong-ruan
committed
107825: sql: fix rare flake in TestDistSQLFlowsVirtualTables r=yuzefovich a=yuzefovich This commit fixes a rare flake in `TestDistSQLFlowsVirtualTables`. The test previously asserted that only the target query was present in `*_distsql_flows` virtual tables, but we just saw a case where an internal query showed up in there (related to table stats cache). This assertion was too strict and unnecessary - the test really cares whether the target query is present or not, so this commit removes the assertion. Fixes: cockroachdb#107769. Release note: None 108053: sql: unskip TestSchemaChangePurgeFailure r=chengxiong-ruan a=chengxiong-ruan Informs cockroachdb#51796 There has been a lot of changes since the test was first written and looks some of integer variables are not used to control the schema changer flow at all. Specifically the concept of "Async Schema Changer" was misleading. This commit changes the test to test the purging behavior by comparing the key counts before and after rollback. We don't need to test how many attempts. Release note: None Release justification: test only change Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Chengxiong Ruan <[email protected]>
3 parents 83a56ea + 5113599 + 02d937c commit 4fe2a80

File tree

4 files changed

+105
-115
lines changed

4 files changed

+105
-115
lines changed

pkg/sql/crdb_internal_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -636,15 +636,11 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) {
636636
const clusterScope = "cluster"
637637
const nodeScope = "node"
638638
getNum := func(db *sqlutils.SQLRunner, scope string) int {
639-
querySuffix := fmt.Sprintf("FROM crdb_internal.%s_distsql_flows", scope)
640-
// Check that all remote flows (if any) correspond to the expected
641-
// statement.
642-
stmts := db.QueryStr(t, "SELECT stmt "+querySuffix)
643-
for _, stmt := range stmts {
644-
require.Equal(t, query, stmt[0])
645-
}
639+
// Count the number of flows matching our target query. Note that there
640+
// could be other flows in the table for the internal operations.
641+
countQuery := fmt.Sprintf("SELECT count(*) FROM crdb_internal.%s_distsql_flows WHERE stmt = '%s'", scope, query)
646642
var num int
647-
db.QueryRow(t, "SELECT count(*) "+querySuffix).Scan(&num)
643+
db.QueryRow(t, countQuery).Scan(&num)
648644
return num
649645
}
650646
for nodeID := 0; nodeID < numNodes; nodeID++ {

pkg/sql/schema_changer_test.go

Lines changed: 92 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
5050
"github.com/cockroachdb/cockroach/pkg/sql/isql"
5151
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
52+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
5253
"github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
5354
"github.com/cockroachdb/cockroach/pkg/sql/stats"
5455
"github.com/cockroachdb/cockroach/pkg/testutils"
@@ -1587,125 +1588,109 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
15871588
func TestSchemaChangePurgeFailure(t *testing.T) {
15881589
defer leaktest.AfterTest(t)()
15891590
defer log.Scope(t).Close(t)
1590-
skip.WithIssue(t, 51796)
1591-
// TODO (lucy): This test needs more complicated schema changer knobs than
1592-
// currently implemented. Previously this test disabled the async schema
1593-
// changer so that we don't retry the cleanup of the failed schema change
1594-
// until a certain point in the test.
1595-
params, _ := createTestServerParams()
1596-
const chunkSize = 200
1597-
var enableAsyncSchemaChanges uint32
1598-
var attempts int32
1599-
// attempt 1: write the first chunk of the index.
1600-
// attempt 2: write the second chunk and hit a unique constraint
1601-
// violation; purge the schema change.
1602-
// attempt 3: return an error while purging the schema change.
1603-
var expectedAttempts int32 = 3
1604-
params.Knobs = base.TestingKnobs{
1605-
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
1606-
BackfillChunkSize: chunkSize,
1607-
},
1608-
DistSQL: &execinfra.TestingKnobs{
1609-
RunBeforeBackfillChunk: func(sp roachpb.Span) error {
1610-
// Return a deadline exceeded error during the third attempt
1611-
// which attempts to clean up the schema change.
1612-
if atomic.AddInt32(&attempts, 1) == expectedAttempts {
1613-
// Disable the async schema changer for assertions.
1614-
atomic.StoreUint32(&enableAsyncSchemaChanges, 0)
1615-
return context.DeadlineExceeded
1616-
}
1617-
return nil
1618-
},
1619-
BulkAdderFlushesEveryBatch: true,
1620-
},
1621-
}
1622-
server, sqlDB, kvDB := serverutils.StartServer(t, params)
1623-
defer server.Stopper().Stop(context.Background())
1624-
codec := server.ApplicationLayer().Codec()
16251591

1626-
// Disable strict GC TTL enforcement because we're going to shove a zero-value
1627-
// TTL into the system with AddImmediateGCZoneConfig.
1628-
defer sqltestutils.DisableGCTTLStrictEnforcement(t, sqlDB)()
1629-
1630-
if _, err := sqlDB.Exec(`
1631-
CREATE DATABASE t;
1632-
CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
1633-
`); err != nil {
1634-
t.Fatal(err)
1635-
}
1636-
1637-
// Bulk insert.
1638-
const maxValue = chunkSize + 1
1639-
if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil {
1640-
t.Fatal(err)
1641-
}
1642-
1643-
// Add a row with a duplicate value=0 which is the same
1644-
// value as for the key maxValue.
1645-
if _, err := sqlDB.Exec(
1646-
`INSERT INTO t.test VALUES ($1, $2)`, maxValue+1, 0,
1647-
); err != nil {
1648-
t.Fatal(err)
1649-
}
1650-
1651-
// A schema change that violates integrity constraints.
1652-
if _, err := sqlDB.Exec(
1653-
"CREATE UNIQUE INDEX foo ON t.test (v)",
1654-
); !testutils.IsError(err, `violates unique constraint "foo"`) {
1655-
t.Fatal(err)
1656-
}
1592+
for _, schemaChangerSetup := range []string{
1593+
"SET use_declarative_schema_changer='off'",
1594+
"SET use_declarative_schema_changer='on'",
1595+
} {
1596+
params, _ := createTestServerParams()
1597+
const chunkSize = 200
16571598

1658-
// The index doesn't exist
1659-
if _, err := sqlDB.Query(
1660-
`SELECT v from t.test@foo`,
1661-
); !testutils.IsError(err, "index .* not found") {
1662-
t.Fatal(err)
1663-
}
1599+
var getKeyCount func() (int, error)
1600+
countBeforeRollback := 0
1601+
params.Knobs = base.TestingKnobs{
1602+
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
1603+
RunBeforeOnFailOrCancel: func(jobID jobspb.JobID) error {
1604+
cnt, err := getKeyCount()
1605+
if err != nil {
1606+
return err
1607+
}
1608+
countBeforeRollback = cnt
1609+
return nil
1610+
},
1611+
},
1612+
DistSQL: &execinfra.TestingKnobs{
1613+
BulkAdderFlushesEveryBatch: true,
1614+
},
1615+
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
1616+
RunBeforeMakingPostCommitPlan: func(inRollback bool) error {
1617+
if inRollback {
1618+
cnt, err := getKeyCount()
1619+
if err != nil {
1620+
return err
1621+
}
1622+
countBeforeRollback = cnt
1623+
}
1624+
return nil
1625+
},
1626+
},
1627+
}
1628+
server, sqlDB, kvDB := serverutils.StartServer(t, params)
1629+
defer server.Stopper().Stop(context.Background())
1630+
codec := server.ApplicationLayer().Codec()
16641631

1665-
// Allow async schema change purge to attempt backfill and error.
1666-
atomic.StoreUint32(&enableAsyncSchemaChanges, 1)
1667-
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test")
1668-
// deal with schema change knob
1669-
if _, err := sqltestutils.AddImmediateGCZoneConfig(sqlDB, tableDesc.GetID()); err != nil {
1670-
t.Fatal(err)
1671-
}
1632+
_, err := sqlDB.Exec(fmt.Sprintf("SET CLUSTER SETTING bulkio.index_backfill.batch_size = %d", chunkSize))
1633+
require.NoError(t, err)
16721634

1673-
// The deadline exceeded error in the schema change purge results in no
1674-
// retry attempts of the purge.
1675-
testutils.SucceedsSoon(t, func() error {
1676-
if read := atomic.LoadInt32(&attempts); read != expectedAttempts {
1677-
return errors.Errorf("%d retries, despite allowing only (schema change + reverse) = %d", read, expectedAttempts)
1635+
getKeyCount = func() (int, error) {
1636+
return sqltestutils.GetTableKeyCount(ctx, kvDB, codec)
16781637
}
1679-
return nil
1680-
})
1638+
// Disable strict GC TTL enforcement because we're going to shove a zero-value
1639+
// TTL into the system with AddImmediateGCZoneConfig.
1640+
defer sqltestutils.DisableGCTTLStrictEnforcement(t, sqlDB)()
16811641

1682-
// There is still some garbage index data that needs to be purged. All the
1683-
// rows from k = 0 to k = chunkSize - 1 have index values.
1684-
numGarbageValues := chunkSize
1642+
_, err = sqlDB.Exec(schemaChangerSetup)
1643+
require.NoError(t, err)
16851644

1686-
ctx := context.Background()
1645+
if _, err := sqlDB.Exec(`
1646+
CREATE DATABASE t;
1647+
CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
1648+
`); err != nil {
1649+
t.Fatal(err)
1650+
}
16871651

1688-
if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 1, maxValue+1+numGarbageValues); err != nil {
1689-
t.Fatal(err)
1690-
}
1652+
// Bulk insert.
1653+
const maxValue = chunkSize + 1
1654+
if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil {
1655+
t.Fatal(err)
1656+
}
16911657

1692-
if err := sqlutils.RunScrub(sqlDB, "t", "test"); err != nil {
1693-
t.Fatal(err)
1694-
}
1658+
// Add a row with a duplicate value=0 which is the same
1659+
// value as for the key maxValue.
1660+
if _, err := sqlDB.Exec(
1661+
`INSERT INTO t.test VALUES ($1, $2)`, maxValue+1, 0,
1662+
); err != nil {
1663+
t.Fatal(err)
1664+
}
16951665

1696-
// Enable async schema change processing to ensure that it cleans up the
1697-
// above garbage left behind.
1698-
atomic.StoreUint32(&enableAsyncSchemaChanges, 1)
1666+
// A schema change that violates integrity constraints.
1667+
if _, err := sqlDB.Exec(
1668+
"CREATE UNIQUE INDEX foo ON t.test (v)",
1669+
); !testutils.IsError(err, `violates unique constraint "foo"`) {
1670+
t.Fatal(err)
1671+
}
16991672

1700-
// No garbage left behind.
1701-
testutils.SucceedsSoon(t, func() error {
1702-
numGarbageValues = 0
1703-
return sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 1, maxValue+1+numGarbageValues)
1704-
})
1673+
// The index doesn't exist
1674+
if _, err := sqlDB.Query(
1675+
`SELECT v from t.test@foo`,
1676+
); !testutils.IsError(err, "index .* not found") {
1677+
t.Fatal(err)
1678+
}
17051679

1706-
// A new attempt cleans up a chunk of data.
1707-
if attempts != expectedAttempts+1 {
1708-
t.Fatalf("%d chunk ops, despite allowing only (schema change + reverse) = %d", attempts, expectedAttempts)
1680+
// countBeforeRollback is assigned in the rollback testing knob which is
1681+
// called before rollback starts so that the first chunk (200 keys) written
1682+
// is still visible. The first chunk is visible because there is no
1683+
// duplicate keys within it. The duplicate keys only exist in the second
1684+
// chunk. Also note that we wrote maxValue+1 rows, and there is 1 extra key
1685+
// from kv.
1686+
require.Equal(t, countBeforeRollback, maxValue+2+chunkSize)
1687+
1688+
// No garbage left behind after rollback. This check should succeed pretty
1689+
// fast since we use `DelRange` in GC and `CheckTableKeyCount` cannot see
1690+
// tombstones.
1691+
testutils.SucceedsSoon(t, func() error {
1692+
return sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 1, maxValue+1)
1693+
})
17091694
}
17101695
}
17111696

pkg/sql/schemachanger/scexec/testing_knobs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ type TestingKnobs struct {
4141

4242
// RunBeforeBackfill is called just before starting the backfill.
4343
RunBeforeBackfill func() error
44+
45+
// RunBeforeMakingPostCommitPlan is called just before making the post commit
46+
// plan.
47+
RunBeforeMakingPostCommitPlan func(inRollback bool) error
4448
}
4549

4650
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.

pkg/sql/schemachanger/scrun/scrun.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ func RunSchemaChangesInJob(
118118
descriptorIDs []descpb.ID,
119119
rollbackCause error,
120120
) error {
121+
if knobs != nil && knobs.RunBeforeMakingPostCommitPlan != nil {
122+
if err := knobs.RunBeforeMakingPostCommitPlan(rollbackCause != nil); err != nil {
123+
return err
124+
}
125+
}
121126
p, err := makePostCommitPlan(ctx, deps, jobID, descriptorIDs, rollbackCause)
122127
if err != nil {
123128
if knobs != nil && knobs.OnPostCommitPlanError != nil {

0 commit comments

Comments
 (0)