Skip to content

Commit c02d1f3

Browse files
committed
schemachanger: Add an intergration-style test for concurrent schema changer behavior
This commit adds an integration style test for concurrent schema changer behaviors where we run multiple DDLs for an extended period of time on a few descriptors and assert that they all eventually finish and the descriptors end up in the expected state. Release note: None
1 parent c56dcaa commit c02d1f3

File tree

2 files changed

+274
-0
lines changed

2 files changed

+274
-0
lines changed

pkg/sql/schemachanger/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ go_test(
4949
"//pkg/sql/catalog",
5050
"//pkg/sql/catalog/desctestutils",
5151
"//pkg/sql/execinfra",
52+
"//pkg/sql/pgwire/pgcode",
5253
"//pkg/sql/rowenc",
5354
"//pkg/sql/schemachanger/scexec",
5455
"//pkg/sql/schemachanger/scop",
@@ -57,6 +58,7 @@ go_test(
5758
"//pkg/sql/sessiondatapb",
5859
"//pkg/testutils",
5960
"//pkg/testutils/serverutils",
61+
"//pkg/testutils/skip",
6062
"//pkg/testutils/sqlutils",
6163
"//pkg/util/ctxgroup",
6264
"//pkg/util/leaktest",
@@ -65,6 +67,7 @@ go_test(
6567
"//pkg/util/randutil",
6668
"@com_github_cockroachdb_errors//:errors",
6769
"@com_github_cockroachdb_errors//errorspb",
70+
"@com_github_lib_pq//:pq",
6871
"@com_github_stretchr_testify//assert",
6972
"@com_github_stretchr_testify//require",
7073
],

pkg/sql/schemachanger/schemachanger_test.go

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ import (
1414
"context"
1515
"encoding/hex"
1616
"fmt"
17+
"math/rand"
1718
"regexp"
1819
"strings"
1920
"sync"
2021
"sync/atomic"
2122
"testing"
23+
"time"
2224

2325
"github.com/cockroachdb/cockroach/pkg/base"
2426
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -30,20 +32,23 @@ import (
3032
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
3133
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
3234
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
35+
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
3336
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
3437
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
3538
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
3639
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan"
3740
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
3841
"github.com/cockroachdb/cockroach/pkg/testutils"
3942
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
43+
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
4044
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
4145
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
4246
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
4347
"github.com/cockroachdb/cockroach/pkg/util/log"
4448
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
4549
"github.com/cockroachdb/errors"
4650
"github.com/cockroachdb/errors/errorspb"
51+
"github.com/lib/pq"
4752
"github.com/stretchr/testify/assert"
4853
"github.com/stretchr/testify/require"
4954
)
@@ -517,3 +522,269 @@ func requireTableKeyCount(
517522
require.NoError(t, err)
518523
require.Equal(t, keyCount, len(kvs))
519524
}
525+
526+
// TestConcurrentSchemaChanges is an integration style tests where we issue many
527+
// schema changes concurrently (renames, add/drop columns, and create/drop
528+
// indexes) for a period of time and assert that they all finish eventually and
529+
// we end up with expected names, columns, and indexes.
530+
func TestConcurrentSchemaChanges(t *testing.T) {
531+
defer leaktest.AfterTest(t)()
532+
defer log.Scope(t).Close(t)
533+
534+
skip.UnderShort(t, "this test is long running (>3 mins).")
535+
skip.UnderStress(t, "test is already integration style and long running")
536+
skip.UnderStressRace(t, "test is already integration style and long running")
537+
skip.UnderRace(t, "the test knowingly has data race and has logic to account for that")
538+
539+
const testDuration = 3 * time.Minute
540+
const renameDBInterval = 5 * time.Second
541+
const renameSCInterval = 4 * time.Second
542+
const renameTblInterval = 3 * time.Second
543+
const addColInterval = 1 * time.Second
544+
const dropColInterval = 1 * time.Second
545+
const createIdxInterval = 1 * time.Second
546+
const dropIdxInterval = 1 * time.Second
547+
548+
ctx, cancel := context.WithCancel(context.Background())
549+
var params base.TestServerArgs
550+
params.Knobs = base.TestingKnobs{
551+
// Decrease the adopt loop interval so that retries happen quickly.
552+
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
553+
}
554+
s, sqlDB, kvDB := serverutils.StartServer(t, params)
555+
defer s.Stopper().Stop(ctx)
556+
557+
tdb := sqlutils.MakeSQLRunner(sqlDB)
558+
dbName, scName, tblName := "testdb", "testsc", "t"
559+
allColToIndexes := make(map[string]map[string]struct{}) // colName -> indexes that uses that column
560+
allColToIndexes["col"] = map[string]struct{}{"t_pkey": {}}
561+
allNonPublicIdxToKeyCols := make(map[string]map[string]struct{}) // indexName -> its key column(s)
562+
tdb.Exec(t, fmt.Sprintf("CREATE DATABASE %v;", dbName))
563+
tdb.Exec(t, fmt.Sprintf("CREATE SCHEMA %v.%v;", dbName, scName))
564+
tdb.Exec(t, fmt.Sprintf("CREATE TABLE %v.%v.%v (col INT PRIMARY KEY);", dbName, scName, tblName))
565+
tdb.Exec(t, fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName))
566+
567+
// repeatFnWithInterval repeats `fn` indefinitely every `interval` until
568+
// `ctx` is cancelled.
569+
workerErrChan := make(chan error)
570+
var wg sync.WaitGroup
571+
repeatWorkWithInterval := func(workerName string, workInterval time.Duration, work func() error) {
572+
wg.Add(1)
573+
defer wg.Done()
574+
for {
575+
jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32())
576+
select {
577+
case <-ctx.Done():
578+
t.Logf("%v is signaled to finish work", workerName)
579+
return
580+
case <-time.After(jitteredInterval):
581+
if err := work(); err != nil {
582+
t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error())
583+
workerErrChan <- err
584+
return
585+
}
586+
}
587+
}
588+
}
589+
590+
// validate performs a few quick validations after all schema changes are finished:
591+
// 1. Database, schema, and table indeed end up with the tracked name.
592+
// 2. Table indeed has the tracked columns.
593+
// 3. Table indeed has the tracked indexes.
594+
codec := s.ApplicationLayer().Codec()
595+
validate := func() {
596+
dbDesc := desctestutils.TestingGetDatabaseDescriptor(kvDB, codec, dbName)
597+
desctestutils.TestingGetSchemaDescriptor(kvDB, codec, dbDesc.GetID(), scName)
598+
tblDesc := desctestutils.TestingGetTableDescriptor(kvDB, codec, dbName, scName, tblName)
599+
require.Equal(t, len(allColToIndexes), len(tblDesc.PublicColumns())) // allColToIndexes does not include `col`
600+
for _, col := range tblDesc.PublicColumns() {
601+
_, ok := allColToIndexes[col.GetName()]
602+
require.True(t, ok, "column %v does not exist in allColToIndexes=%v", col.GetName(), allColToIndexes)
603+
}
604+
require.Equal(t, len(allNonPublicIdxToKeyCols), len(tblDesc.PublicNonPrimaryIndexes()))
605+
for _, idx := range tblDesc.PublicNonPrimaryIndexes() {
606+
_, ok := allNonPublicIdxToKeyCols[idx.GetName()]
607+
require.True(t, ok, "index %v does not exist in allNonPublicIdxToKeyCols=%v", idx.GetName(), allNonPublicIdxToKeyCols)
608+
}
609+
}
610+
611+
// A goroutine that repeatedly renames database `testdb` randomly.
612+
go repeatWorkWithInterval("rename-db-worker", renameDBInterval, func() error {
613+
newDBName := fmt.Sprintf("testdb_%v", rand.Intn(1000))
614+
if newDBName == dbName {
615+
return nil
616+
}
617+
if _, err := sqlDB.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil {
618+
return err
619+
}
620+
dbName = newDBName
621+
t.Logf("RENAME DATABASE TO %v", newDBName)
622+
return nil
623+
})
624+
625+
// A goroutine that renames schema `testdb.testsc` randomly.
626+
go repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func() error {
627+
newSCName := fmt.Sprintf("testsc_%v", rand.Intn(1000))
628+
if scName == newSCName {
629+
return nil
630+
}
631+
_, err := sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName))
632+
if err == nil {
633+
scName = newSCName
634+
t.Logf("RENAME SCHEMA TO %v", newSCName)
635+
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase) {
636+
err = nil // mute those errors as they're expected
637+
t.Logf("Parent database is renamed; skipping this schema renaming.")
638+
}
639+
return err
640+
})
641+
642+
// A goroutine that renames table `testdb.testsc.t` randomly.
643+
go repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func() error {
644+
newTblName := fmt.Sprintf("t_%v", rand.Intn(1000))
645+
_, err := sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName))
646+
if err == nil {
647+
tblName = newTblName
648+
t.Logf("RENAME TABLE TO %v", newTblName)
649+
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName) {
650+
err = nil
651+
t.Logf("Parent database or schema is renamed; skipping this table renaming.")
652+
}
653+
return err
654+
})
655+
656+
// A goroutine that adds columns to `testdb.testsc.t` randomly.
657+
go repeatWorkWithInterval("add-column-worker", addColInterval, func() error {
658+
newColName := fmt.Sprintf("col_%v", rand.Intn(1000))
659+
if _, ok := allColToIndexes[newColName]; ok {
660+
return nil
661+
}
662+
tblName := tblName
663+
_, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v", dbName, scName, tblName, newColName, rand.Intn(100)))
664+
if err == nil {
665+
allColToIndexes[newColName] = make(map[string]struct{})
666+
t.Logf("ADD COLUMN %v TO TABLE %v", newColName, tblName)
667+
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable) {
668+
err = nil
669+
t.Logf("Parent database or schema or table is renamed; skipping this column addition.")
670+
}
671+
return err
672+
})
673+
674+
// A goroutine that drops columns from `testdb.testsc.t` randomly.
675+
go repeatWorkWithInterval("drop-column-worker", dropColInterval, func() error {
676+
// Randomly pick a non-PK column to drop.
677+
if len(allColToIndexes) == 1 {
678+
return nil
679+
}
680+
var colName string
681+
for col := range allColToIndexes {
682+
if col != "col" {
683+
colName = col
684+
break
685+
}
686+
}
687+
688+
tblName := tblName
689+
_, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;", dbName, scName, tblName, colName))
690+
if err == nil {
691+
for indexName := range allColToIndexes[colName] {
692+
delete(allNonPublicIdxToKeyCols, indexName)
693+
}
694+
delete(allColToIndexes, colName)
695+
t.Logf("DROP COLUMN %v FROM TABLE %v", colName, tblName)
696+
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable) {
697+
err = nil
698+
t.Logf("Parent database or schema or table is renamed; skipping this column removal.")
699+
}
700+
return err
701+
})
702+
703+
// A goroutine that creates secondary index on a randomly selected column.
704+
go repeatWorkWithInterval("create-index-worker", createIdxInterval, func() error {
705+
newIndexName := fmt.Sprintf("idx_%v", rand.Intn(1000))
706+
if _, ok := allNonPublicIdxToKeyCols[newIndexName]; ok {
707+
return nil
708+
}
709+
710+
// Randomly pick a non-PK column to create an index on.
711+
if len(allColToIndexes) == 1 {
712+
return nil
713+
}
714+
var colName string
715+
for col := range allColToIndexes {
716+
if col != "col" {
717+
colName = col
718+
break
719+
}
720+
}
721+
722+
tblName := tblName
723+
_, err := sqlDB.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);", newIndexName, dbName, scName, tblName, colName))
724+
if err == nil {
725+
allNonPublicIdxToKeyCols[newIndexName] = map[string]struct{}{colName: {}}
726+
allColToIndexes[colName][newIndexName] = struct{}{}
727+
t.Logf("CREATE INDEX %v ON TABLE %v(%v)", newIndexName, tblName, colName)
728+
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable, pgcode.UndefinedColumn) {
729+
err = nil
730+
t.Logf("Parent database or schema or table is renamed or column is dropped; skipping this index creation.")
731+
}
732+
return err
733+
})
734+
735+
// A goroutine that drops a secondary index randomly.
736+
go repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func() error {
737+
// Randomly pick a non-public index to drop.
738+
if len(allNonPublicIdxToKeyCols) == 0 {
739+
return nil
740+
}
741+
var indexName string
742+
var indexKeyCols map[string]struct{}
743+
for idx, idxCols := range allNonPublicIdxToKeyCols {
744+
indexName = idx
745+
indexKeyCols = idxCols
746+
break
747+
}
748+
749+
tblName := tblName
750+
_, err := sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName))
751+
if err == nil {
752+
for indexKeyCol := range indexKeyCols {
753+
delete(allColToIndexes[indexKeyCol], indexName)
754+
}
755+
delete(allNonPublicIdxToKeyCols, indexName)
756+
t.Logf("DROP INDEX %v FROM TABLE %v", indexName, tblName)
757+
} else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable, pgcode.UndefinedObject) {
758+
err = nil
759+
t.Logf("Parent database or schema or table is renamed; skipping this index removal.")
760+
}
761+
return err
762+
})
763+
764+
select {
765+
case workerErr := <-workerErrChan:
766+
t.Logf("main: a worker error %q is signaled; Inform all workers to stop.", workerErr.Error())
767+
cancel()
768+
wg.Wait()
769+
t.Logf("main: all workers have stopped their work; Test Failure!")
770+
t.Fatalf(workerErr.Error())
771+
case <-time.After(testDuration):
772+
t.Logf("main: time's up! Inform all workers to stop.")
773+
cancel()
774+
wg.Wait()
775+
t.Logf("main: all workers have stopped. Validating descriptors states...")
776+
validate()
777+
t.Logf("main: validation succeeded! Test success!")
778+
}
779+
}
780+
781+
func isPQErrWithCode(err error, codes ...pgcode.Code) bool {
782+
if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) {
783+
for _, code := range codes {
784+
if pgcode.MakeCode(string(pqErr.Code)) == code {
785+
return true
786+
}
787+
}
788+
}
789+
return false
790+
}

0 commit comments

Comments
 (0)