@@ -1143,6 +1143,9 @@ func TestChangefeedUserDefinedTypes(t *testing.T) {
11431143 defer leaktest .AfterTest (t )()
11441144 testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
11451145 sqlDB := sqlutils .MakeSQLRunner (s .DB )
1146+
1147+ _ = maybeDisableDeclarativeSchemaChangesForTest (t , sqlDB )
1148+
11461149 // Set up a type and table.
11471150 sqlDB .Exec (t , `CREATE TYPE t AS ENUM ('hello', 'howdy', 'hi')` )
11481151 sqlDB .Exec (t , `CREATE TABLE tt (x INT PRIMARY KEY, y t)` )
@@ -1548,6 +1551,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) {
15481551 testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
15491552 sqlDB := sqlutils .MakeSQLRunner (s .DB )
15501553
1554+ _ = maybeDisableDeclarativeSchemaChangesForTest (t , sqlDB )
1555+
15511556 // Schema changes that predate the changefeed.
15521557 t .Run (`historical` , func (t * testing.T ) {
15531558 sqlDB .Exec (t , `CREATE TABLE historical (a INT PRIMARY KEY, b STRING DEFAULT 'before')` )
@@ -1874,7 +1879,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
18741879
18751880 testFn := func (t * testing.T , s TestServerWithSystem , f cdctest.TestFeedFactory ) {
18761881 sqlDB := sqlutils .MakeSQLRunner (s .DB )
1877- usingLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest (t , sqlDB , rnd )
1882+ usingLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest (t , sqlDB )
18781883 // NB: For the `ALTER TABLE foo ADD COLUMN ... DEFAULT` schema change,
18791884 // the expected boundary is different depending on if we are using the
18801885 // legacy schema changer or not.
@@ -2117,6 +2122,202 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
21172122 }
21182123}
21192124
2125+ // Test schema changes that require a backfill when the backfill option is
2126+ // allowed when using the legacy schema changer.
2127+ //
2128+ // TODO: remove this test when the legacy schema changer is deprecated.
2129+ func TestChangefeedSchemaChangeAllowBackfill_Legacy (t * testing.T ) {
2130+ defer leaktest .AfterTest (t )()
2131+ defer log .Scope (t ).Close (t )
2132+
2133+ testFn := func (t * testing.T , s TestServerWithSystem , f cdctest.TestFeedFactory ) {
2134+ sqlDB := sqlutils .MakeSQLRunner (s .DB )
2135+
2136+ t .Log ("using legacy schema changer" )
2137+ sqlDB .Exec (t , "SET use_declarative_schema_changer='off'" )
2138+ sqlDB .Exec (t , "SET CLUSTER SETTING sql.defaults.use_declarative_schema_changer='off'" )
2139+
2140+ // Expected semantics:
2141+ //
2142+ // 1) DROP COLUMN
2143+ // If the table descriptor is at version 1 when the `ALTER TABLE` stmt is issued,
2144+ // we expect the changefeed level backfill to be triggered at the `ModificationTime` of
2145+ // version 2 of the said descriptor. This is because this is the descriptor
2146+ // version at which the dropped column stops being visible to SELECTs. Note that
2147+ // this means we will see row updates resulting from the schema-change level
2148+ // backfill _after_ the changefeed level backfill.
2149+ //
2150+ // 2) ADD COLUMN WITH DEFAULT & ADD COLUMN AS ... STORED
2151+ // If the table descriptor is at version 1 when the `ALTER TABLE` stmt is issued,
2152+ // we expect the changefeed level backfill to be triggered at the
2153+ // `ModificationTime` of version 4 of said descriptor. This is because this is the
2154+ // descriptor version which makes the schema-change level backfill for the
2155+ // newly-added column public. This means we wil see row updates resulting from the
2156+ // schema-change level backfill _before_ the changefeed level backfill.
2157+
2158+ t .Run (`add column with default` , func (t * testing.T ) {
2159+ sqlDB .Exec (t , `CREATE TABLE add_column_def (a INT PRIMARY KEY)` )
2160+ sqlDB .Exec (t , `INSERT INTO add_column_def VALUES (1)` )
2161+ sqlDB .Exec (t , `INSERT INTO add_column_def VALUES (2)` )
2162+ addColumnDef := feed (t , f , `CREATE CHANGEFEED FOR add_column_def WITH updated` )
2163+ defer closeFeed (t , addColumnDef )
2164+ assertPayloadsStripTs (t , addColumnDef , []string {
2165+ `add_column_def: [1]->{"after": {"a": 1}}` ,
2166+ `add_column_def: [2]->{"after": {"a": 2}}` ,
2167+ })
2168+ sqlDB .Exec (t , `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'` )
2169+ ts := schematestutils .FetchDescVersionModificationTime (t , s .TestServer .Server ,
2170+ `d` , `public` , `add_column_def` , 4 )
2171+
2172+ // Schema change backfill
2173+ assertPayloadsStripTs (t , addColumnDef , []string {
2174+ `add_column_def: [1]->{"after": {"a": 1}}` ,
2175+ `add_column_def: [2]->{"after": {"a": 2}}` ,
2176+ })
2177+ // Changefeed level backfill
2178+ assertPayloads (t , addColumnDef , []string {
2179+ fmt .Sprintf (`add_column_def: [1]->{"after": {"a": 1, "b": "d"}, "updated": "%s"}` ,
2180+ ts .AsOfSystemTime ()),
2181+ fmt .Sprintf (`add_column_def: [2]->{"after": {"a": 2, "b": "d"}, "updated": "%s"}` ,
2182+ ts .AsOfSystemTime ()),
2183+ })
2184+ })
2185+
2186+ t .Run (`add column computed` , func (t * testing.T ) {
2187+ sqlDB .Exec (t , `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)` )
2188+ sqlDB .Exec (t , `INSERT INTO add_col_comp VALUES (1)` )
2189+ sqlDB .Exec (t , `INSERT INTO add_col_comp (a) VALUES (2)` )
2190+ addColComp := feed (t , f , `CREATE CHANGEFEED FOR add_col_comp WITH updated` )
2191+ defer closeFeed (t , addColComp )
2192+ assertPayloadsStripTs (t , addColComp , []string {
2193+ `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}` ,
2194+ `add_col_comp: [2]->{"after": {"a": 2, "b": 7}}` ,
2195+ })
2196+ sqlDB .Exec (t , `ALTER TABLE add_col_comp ADD COLUMN c INT AS (a + 10) STORED` )
2197+ assertPayloadsStripTs (t , addColComp , []string {
2198+ `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}` ,
2199+ `add_col_comp: [2]->{"after": {"a": 2, "b": 7}}` ,
2200+ })
2201+ ts := schematestutils .FetchDescVersionModificationTime (t , s .TestServer .Server ,
2202+ `d` , `public` , `add_col_comp` , 4 )
2203+
2204+ assertPayloads (t , addColComp , []string {
2205+ fmt .Sprintf (`add_col_comp: [1]->{"after": {"a": 1, "b": 6, "c": 11}, "updated": "%s"}` ,
2206+ ts .AsOfSystemTime ()),
2207+ fmt .Sprintf (`add_col_comp: [2]->{"after": {"a": 2, "b": 7, "c": 12}, "updated": "%s"}` ,
2208+ ts .AsOfSystemTime ()),
2209+ })
2210+ })
2211+
2212+ t .Run (`drop column` , func (t * testing.T ) {
2213+ sqlDB .Exec (t , `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)` )
2214+ sqlDB .Exec (t , `INSERT INTO drop_column VALUES (1, '1')` )
2215+ sqlDB .Exec (t , `INSERT INTO drop_column VALUES (2, '2')` )
2216+ dropColumn := feed (t , f , `CREATE CHANGEFEED FOR drop_column WITH updated` )
2217+ defer closeFeed (t , dropColumn )
2218+ assertPayloadsStripTs (t , dropColumn , []string {
2219+ `drop_column: [1]->{"after": {"a": 1, "b": "1"}}` ,
2220+ `drop_column: [2]->{"after": {"a": 2, "b": "2"}}` ,
2221+ })
2222+ sqlDB .Exec (t , `ALTER TABLE drop_column DROP COLUMN b` )
2223+ sqlDB .Exec (t , `INSERT INTO drop_column VALUES (3)` )
2224+
2225+ // since the changefeed level backfill (which flushes the sink before
2226+ // the backfill) occurs before the schema-change backfill for a drop
2227+ // column, the order in which the sink receives both backfills is
2228+ // uncertain. the only guarantee here is per-key ordering guarantees,
2229+ // so we must check both backfills in the same assertion.
2230+ assertPayloadsPerKeyOrderedStripTs (t , dropColumn , []string {
2231+ // Changefeed level backfill for DROP COLUMN b.
2232+ `drop_column: [1]->{"after": {"a": 1}}` ,
2233+ `drop_column: [2]->{"after": {"a": 2}}` ,
2234+ // Schema-change backfill for DROP COLUMN b.
2235+ `drop_column: [1]->{"after": {"a": 1}}` ,
2236+ `drop_column: [2]->{"after": {"a": 2}}` ,
2237+ // Insert 3 into drop_column
2238+ `drop_column: [3]->{"after": {"a": 3}}` ,
2239+ })
2240+ })
2241+
2242+ t .Run (`multiple alters` , func (t * testing.T ) {
2243+ sqlDB .Exec (t , `CREATE TABLE multiple_alters (a INT PRIMARY KEY, b STRING)` )
2244+ sqlDB .Exec (t , `INSERT INTO multiple_alters VALUES (1, '1')` )
2245+ sqlDB .Exec (t , `INSERT INTO multiple_alters VALUES (2, '2')` )
2246+
2247+ // Set up a hook to pause the changfeed on the next emit.
2248+ var wg sync.WaitGroup
2249+ waitSinkHook := func (_ context.Context ) error {
2250+ wg .Wait ()
2251+ return nil
2252+ }
2253+ knobs := s .TestingKnobs .
2254+ DistSQL .(* execinfra.TestingKnobs ).
2255+ Changefeed .(* TestingKnobs )
2256+ knobs .BeforeEmitRow = waitSinkHook
2257+
2258+ multipleAlters := feed (t , f , `CREATE CHANGEFEED FOR multiple_alters WITH updated` )
2259+ defer closeFeed (t , multipleAlters )
2260+ assertPayloadsStripTs (t , multipleAlters , []string {
2261+ `multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}` ,
2262+ `multiple_alters: [2]->{"after": {"a": 2, "b": "2"}}` ,
2263+ })
2264+
2265+ // Wait on the next emit, queue up three ALTERs. The next poll process
2266+ // will see all of them at once.
2267+ wg .Add (1 )
2268+ waitForSchemaChange (t , sqlDB , `ALTER TABLE multiple_alters DROP COLUMN b` )
2269+ waitForSchemaChange (t , sqlDB , `ALTER TABLE multiple_alters ADD COLUMN c STRING DEFAULT 'cee'` )
2270+ waitForSchemaChange (t , sqlDB , `ALTER TABLE multiple_alters ADD COLUMN d STRING DEFAULT 'dee'` )
2271+ wg .Done ()
2272+
2273+ // assertions are grouped this way because the sink is flushed prior
2274+ // to a changefeed level backfill, ensuring all messages are received
2275+ // at the start of the assertion
2276+ assertPayloadsPerKeyOrderedStripTs (t , multipleAlters , []string {
2277+ // Changefeed level backfill for DROP COLUMN b.
2278+ `multiple_alters: [1]->{"after": {"a": 1}}` ,
2279+ `multiple_alters: [2]->{"after": {"a": 2}}` ,
2280+ // Schema-change backfill for DROP COLUMN b.
2281+ `multiple_alters: [1]->{"after": {"a": 1}}` ,
2282+ `multiple_alters: [2]->{"after": {"a": 2}}` ,
2283+ // Schema-change backfill for ADD COLUMN c.
2284+ `multiple_alters: [1]->{"after": {"a": 1}}` ,
2285+ `multiple_alters: [2]->{"after": {"a": 2}}` ,
2286+ })
2287+ assertPayloadsPerKeyOrderedStripTs (t , multipleAlters , []string {
2288+ // Changefeed level backfill for ADD COLUMN c.
2289+ `multiple_alters: [1]->{"after": {"a": 1, "c": "cee"}}` ,
2290+ `multiple_alters: [2]->{"after": {"a": 2, "c": "cee"}}` ,
2291+ // Schema change level backfill for ADD COLUMN d.
2292+ `multiple_alters: [1]->{"after": {"a": 1, "c": "cee"}}` ,
2293+ `multiple_alters: [2]->{"after": {"a": 2, "c": "cee"}}` ,
2294+ })
2295+ ts := schematestutils .FetchDescVersionModificationTime (t , s .TestServer .Server ,
2296+ `d` , `public` , `multiple_alters` , 10 )
2297+ // Changefeed level backfill for ADD COLUMN d.
2298+ assertPayloads (t , multipleAlters , []string {
2299+ // Backfill no-ops for column D (C schema change is complete)
2300+ // TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed.
2301+ // Scan output for column C
2302+ fmt .Sprintf (`multiple_alters: [1]->{"after": {"a": 1, "c": "cee", "d": "dee"}, "updated": "%s"}` , ts .AsOfSystemTime ()),
2303+ fmt .Sprintf (`multiple_alters: [2]->{"after": {"a": 2, "c": "cee", "d": "dee"}, "updated": "%s"}` , ts .AsOfSystemTime ()),
2304+ })
2305+ })
2306+ }
2307+
2308+ cdcTestWithSystem (t , testFn )
2309+
2310+ log .Flush ()
2311+ entries , err := log .FetchEntriesFromFiles (0 , math .MaxInt64 , 1 ,
2312+ regexp .MustCompile ("cdc ux violation" ), log .WithFlattenedSensitiveData )
2313+ if err != nil {
2314+ t .Fatal (err )
2315+ }
2316+ if len (entries ) > 0 {
2317+ t .Fatalf ("Found violation of CDC's guarantees: %v" , entries )
2318+ }
2319+ }
2320+
21202321// TestChangefeedSchemaChangeAllowBackfill tests schema changes that require a
21212322// backfill when the backfill option is allowed.
21222323func TestChangefeedSchemaChangeAllowBackfill (t * testing.T ) {
@@ -2282,6 +2483,7 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {
22822483
22832484 testFn := func (t * testing.T , s TestServerWithSystem , f cdctest.TestFeedFactory ) {
22842485 sqlDB := sqlutils .MakeSQLRunner (s .DB )
2486+ usingLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest (t , sqlDB )
22852487
22862488 t .Run (`add column with default` , func (t * testing.T ) {
22872489 sqlDB .Exec (t , `CREATE TABLE add_column_def (a INT PRIMARY KEY)` )
@@ -2298,8 +2500,22 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {
22982500 })
22992501 sqlDB .Exec (t , `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'` )
23002502
2301- // The primary index swap occurs at version 7.
2302- ts := schematestutils .FetchDescVersionModificationTime (t , s .TestServer .Server , `d` , `public` , `add_column_def` , 7 )
2503+ var ts hlc.Timestamp
2504+ if usingLegacySchemaChanger {
2505+ // Schema change becomes public at version 4.
2506+ ts = schematestutils .FetchDescVersionModificationTime (t , s .TestServer .Server ,
2507+ `d` , `public` , `add_column_def` , 4 )
2508+ // The legacy schema changer rewrites KVs in place, so we see
2509+ // an additional backfill before the changefeed-level backfill.
2510+ assertPayloadsStripTs (t , combinedFeed , []string {
2511+ `add_column_def: [1]->{"after": {"a": 1}}` ,
2512+ `add_column_def: [2]->{"after": {"a": 2}}` ,
2513+ })
2514+ } else {
2515+ // The primary index swap occurs at version 7.
2516+ ts = schematestutils .FetchDescVersionModificationTime (t , s .TestServer .Server ,
2517+ `d` , `public` , `add_column_def` , 7 )
2518+ }
23032519 assertPayloads (t , combinedFeed , []string {
23042520 fmt .Sprintf (`add_column_def: [1]->{"after": {"a": 1, "b": "d"}, "updated": "%s"}` ,
23052521 ts .AsOfSystemTime ()),
@@ -2482,6 +2698,8 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) {
24822698 testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
24832699 sqlDB := sqlutils .MakeSQLRunner (s .DB )
24842700
2701+ _ = maybeDisableDeclarativeSchemaChangesForTest (t , sqlDB )
2702+
24852703 // Table with 2 column families.
24862704 sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, FAMILY most (a,b), FAMILY rest (c))` )
24872705 sqlDB .Exec (t , `INSERT INTO foo values (0, 'dog', 'cat')` )
@@ -2520,6 +2738,8 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) {
25202738 testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
25212739 sqlDB := sqlutils .MakeSQLRunner (s .DB )
25222740
2741+ _ = maybeDisableDeclarativeSchemaChangesForTest (t , sqlDB )
2742+
25232743 // Table with 2 column families.
25242744 sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, FAMILY f1 (a,b), FAMILY f2 (c))` )
25252745 sqlDB .Exec (t , `INSERT INTO foo values (0, 'dog', 'cat')` )
@@ -3515,6 +3735,9 @@ func TestChangefeedNoBackfill(t *testing.T) {
35153735 skip .UnderShort (t )
35163736 testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
35173737 sqlDB := sqlutils .MakeSQLRunner (s .DB )
3738+
3739+ usingLegacySchemaChanger := maybeDisableDeclarativeSchemaChangesForTest (t , sqlDB )
3740+
35183741 // Shorten the intervals so this test doesn't take so long. We need to wait
35193742 // for timestamps to get resolved.
35203743 sqlDB .Exec (t , "SET CLUSTER SETTING changefeed.experimental_poll_interval = '200ms'" )
@@ -3589,9 +3812,25 @@ func TestChangefeedNoBackfill(t *testing.T) {
35893812 })
35903813 sqlDB .Exec (t , `ALTER TABLE drop_column DROP COLUMN b` )
35913814 sqlDB .Exec (t , `INSERT INTO drop_column VALUES (2)` )
3592- assertPayloads (t , dropColumn , []string {
3593- `drop_column: [2]->{"after": {"a": 2}}` ,
3594- })
3815+
3816+ var payloads []string
3817+ if usingLegacySchemaChanger {
3818+ // NB: Legacy schema changes modify the physical KVs in place while
3819+ // the changefeed is running, so you see a "backfill" even though
3820+ // the changefeed does not perform one. If we did not specify
3821+ // `schema_change_policy='nobackfill'`, then we would have seen
3822+ // 0 and 1 an additional time before seeing row 2.
3823+ payloads = []string {
3824+ `drop_column: [0]->{"after": {"a": 0}}` ,
3825+ `drop_column: [1]->{"after": {"a": 1}}` ,
3826+ `drop_column: [2]->{"after": {"a": 2}}` ,
3827+ }
3828+ } else {
3829+ payloads = []string {
3830+ `drop_column: [2]->{"after": {"a": 2}}` ,
3831+ }
3832+ }
3833+ assertPayloads (t , dropColumn , payloads )
35953834 })
35963835 t .Run ("add index" , func (t * testing.T ) {
35973836 // This case does not exit
0 commit comments