Skip to content

Commit ed66505

Browse files
committed
ccl/cdc: change when drop column is detected in schema feed
Previously, before automatic schema_locked toggle was added in the declarative schema changer, we had a guarantee that schema_locked would be unset before any column mutations are added. Unfortunately, the automatic toggle logic will disable schema_locked at the same time as the column is removed. This doesn't play nice with the existing schema feed logic. To address this, this patch detects the mutation once the new primary index is backfilled, so that is separate from the schema_locked being disabled. This is still valid behavior because the schema change is still in flight at this point, and doesn't have a usable new primary index. Informs: #150003 Release note (bug fix): Addressed a bug on schema_locked tables when a column is dropped, and schema_locked is toggled for the user.
1 parent 99d44a0 commit ed66505

File tree

3 files changed

+86
-6
lines changed

3 files changed

+86
-6
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2180,6 +2180,54 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
21802180
})
21812181
}
21822182

2183+
func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamiliesWithManualSchemaLocked(
2184+
t *testing.T,
2185+
) {
2186+
defer leaktest.AfterTest(t)()
2187+
defer log.Scope(t).Close(t)
2188+
2189+
require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))
2190+
2191+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
2192+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
2193+
2194+
sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
2195+
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)
2196+
2197+
var args []any
2198+
if _, ok := f.(*webhookFeedFactory); ok {
2199+
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
2200+
}
2201+
// Open up the changefeed.
2202+
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c`, args...)
2203+
defer closeFeed(t, cf)
2204+
assertPayloads(t, cf, []string{
2205+
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
2206+
`hasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`,
2207+
})
2208+
2209+
// Check that dropping a watched column will backfill the changefeed.
2210+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=false)`)
2211+
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
2212+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=true)`)
2213+
assertPayloads(t, cf, []string{
2214+
`hasfams.id_a: [0]->{"after": {"id": 0}}`,
2215+
})
2216+
2217+
// Check that dropping a watched column will backfill the changefeed.
2218+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=false)`)
2219+
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
2220+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=true)`)
2221+
assertPayloads(t, cf, []string{
2222+
`hasfams.b_and_c: [0]->{"after": {"c": "c"}}`,
2223+
})
2224+
}
2225+
2226+
runWithAndWithoutRegression141453(t, testFn, func(t *testing.T, testFn cdcTestFn) {
2227+
cdcTest(t, testFn)
2228+
})
2229+
}
2230+
21832231
func TestNoStopAfterNonTargetAddColumnWithBackfill(t *testing.T) {
21842232
defer leaktest.AfterTest(t)()
21852233
defer log.Scope(t).Close(t)
@@ -3203,7 +3251,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
32033251
`drop_column: [2]->{"after": {"a": 2, "b": "2"}}`,
32043252
})
32053253
sqlDB.Exec(t, `ALTER TABLE drop_column DROP COLUMN b`)
3206-
ts := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `drop_column`, 2)
3254+
ts := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `drop_column`, 6)
32073255

32083256
// Backfill for DROP COLUMN b.
32093257
assertPayloads(t, dropColumn, []string{
@@ -3256,7 +3304,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
32563304
// the 7th step (version 15). Finally, when adding column d, it goes from 17->25 ith the schema change
32573305
// being visible at the 7th step (version 23).
32583306
// TODO(#142936): Investigate if this descriptor version hardcoding is sound.
3259-
dropTS := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 2)
3307+
dropTS := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 6)
32603308
addTS := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 15)
32613309
addTS2 := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 23)
32623310

pkg/ccl/changefeedccl/schemafeed/table_event_filter.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,26 @@ func shouldFilterAddColumnEvent(e TableEvent, targets changefeedbase.Targets) (b
188188
return !watched, nil
189189
}
190190

191+
// notDeclarativeOrHasMergedIndex returns true if the descriptor has a declarative
192+
// schema changer with a merged index.
193+
func notDeclarativeOrHasMergedIndex(desc catalog.TableDescriptor) bool {
194+
// If there are not declarative schema changes then this will always be
195+
// true.
196+
if desc.GetDeclarativeSchemaChangerState() == nil {
197+
return true
198+
}
199+
// For declarative schema changes detect when a new primary index becomes
200+
// WRITE_ONLY (i.e. backfill has been completed).
201+
for idx, target := range desc.GetDeclarativeSchemaChangerState().Targets {
202+
if target.GetPrimaryIndex() != nil &&
203+
target.TargetStatus == scpb.Status_PUBLIC &&
204+
desc.GetDeclarativeSchemaChangerState().CurrentStatuses[idx] == scpb.Status_WRITE_ONLY {
205+
return true
206+
}
207+
}
208+
return false
209+
}
210+
191211
// Returns true if the changefeed targets a column which has a drop mutation inside the table event.
192212
func droppedColumnIsWatched(e TableEvent, targets changefeedbase.Targets) (bool, error) {
193213
// If no column families are specified, then all columns are targeted.
@@ -212,7 +232,13 @@ func droppedColumnIsWatched(e TableEvent, targets changefeedbase.Targets) (bool,
212232
if m.AsColumn() == nil || m.AsColumn().IsHidden() {
213233
continue
214234
}
215-
if m.Dropped() && m.WriteAndDeleteOnly() && watchedColumnIDs.Contains(int(m.AsColumn().GetID())) {
235+
// For dropped columns wait for WriteAndDeleteOnly to be hit. When using
236+
// the declarative schema changer we will wait a bit later in the plan to
237+
// publish the dropped column, since schema_locked and the column being
238+
// write and delete only happen at the same stage. Since the schema change
239+
// is still in progress, there is a gray area in terms of when the change
240+
// should be visible.
241+
if m.Dropped() && m.WriteAndDeleteOnly() && notDeclarativeOrHasMergedIndex(e.After) && watchedColumnIDs.Contains(int(m.AsColumn().GetID())) {
216242
return true, nil
217243
}
218244
}
@@ -273,7 +299,13 @@ func dropVisibleColumnMutationExists(desc catalog.TableDescriptor) bool {
273299
if m.AsColumn() == nil || m.AsColumn().IsHidden() {
274300
continue
275301
}
276-
if m.Dropped() && m.WriteAndDeleteOnly() {
302+
// For dropped columns wait for WriteAndDeleteOnly to be hit. When using
303+
// the declarative schema changer we will wait a bit later in the plan to
304+
// publish the dropped column, since schema_locked and the column being
305+
// write and delete only happen at the same stage. Since the schema change
306+
// is still in progress, there is a gray area in terms of when the change
307+
// should be visible.
308+
if m.Dropped() && m.WriteAndDeleteOnly() && notDeclarativeOrHasMergedIndex(desc) {
277309
return true
278310
}
279311
}

pkg/ccl/changefeedccl/schemafeed/testdata/drop_column

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ ALTER TABLE t DROP COLUMN j;
1212

1313
pop f=1
1414
----
15-
t 1->2: DropColumn
15+
t 1->2: Unknown
1616
t 2->3: Unknown
1717
t 3->4: Unknown
1818
t 4->5: Unknown
19-
t 5->6: Unknown
19+
t 5->6: DropColumn
2020
t 6->7: PrimaryKeyChange (no column changes)
2121
t 7->8: Unknown
2222
t 8->9: AddHiddenColumn

0 commit comments

Comments
 (0)