Skip to content

Commit 512e905

Browse files
authored
Merge pull request #151527 from fqazi/backport25.3-150435-150952
release-25.3: address change feed failure with drop column on a schema_locked table
2 parents 01284ad + d41232d commit 512e905

File tree

8 files changed

+246
-29
lines changed

8 files changed

+246
-29
lines changed

pkg/ccl/changefeedccl/cdcevent/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ go_library(
3030
"//pkg/sql/pgwire/pgerror",
3131
"//pkg/sql/row",
3232
"//pkg/sql/rowenc",
33+
"//pkg/sql/sem/catid",
3334
"//pkg/sql/sem/eval",
3435
"//pkg/sql/sem/tree",
3536
"//pkg/sql/sessiondatapb",

pkg/ccl/changefeedccl/cdcevent/event.go

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package cdcevent
88
import (
99
"context"
1010
"fmt"
11+
"slices"
1112
"strings"
1213
"time"
1314

@@ -21,6 +22,7 @@ import (
2122
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2223
"github.com/cockroachdb/cockroach/pkg/sql/row"
2324
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
25+
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
2426
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
2527
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2628
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
@@ -292,7 +294,7 @@ func NewEventDescriptor(
292294
}
293295

294296
// addColumn is a helper to add a column to this descriptor.
295-
addColumn := func(col catalog.Column, ord int) int {
297+
addColumn := func(col catalog.Column, nameOverride string, ord int) int {
296298
resultColumn := ResultColumn{
297299
ResultColumn: colinfo.ResultColumn{
298300
Name: col.GetName(),
@@ -305,10 +307,12 @@ func NewEventDescriptor(
305307
ord: ord,
306308
sqlString: col.ColumnDesc().SQLStringNotHumanReadable(),
307309
}
308-
310+
if nameOverride != "" {
311+
resultColumn.Name = nameOverride
312+
}
309313
colIdx := len(sd.cols)
310314
sd.cols = append(sd.cols, resultColumn)
311-
sd.colsByName[col.GetName()] = colIdx
315+
sd.colsByName[resultColumn.Name] = colIdx
312316

313317
if col.GetType().UserDefined() {
314318
sd.udtCols = append(sd.udtCols, colIdx)
@@ -319,7 +323,46 @@ func NewEventDescriptor(
319323
// Primary key columns must be added in the same order they
320324
// appear in the primary key index.
321325
primaryIdx := desc.GetPrimaryIndex()
322-
colOrd := catalog.ColumnIDToOrdinalMap(desc.PublicColumns())
326+
// The declarative schema changer requires special handling for dropped columns
327+
// to ensure their previous values are available to the changefeed.
328+
// We select a prior descriptor where the column is in the WRITE_ONLY state
329+
// (instead of PUBLIC) and treat it as visible.
330+
// This is necessary because the schema change stages for dropped columns were
331+
// intentionally shifted to support automatic `schema_locked` management. The
332+
// declarative schema changer cannot make a column WRITE_ONLY and flip
333+
// `schema_locked` in two separate stages, so we instead treat the WRITE_ONLY
334+
// stage as a non-backfilling change for CDC.
335+
columnsToTrack := desc.PublicColumns()
336+
colNameOverrides := make(map[catid.ColumnID]string)
337+
if desc.GetDeclarativeSchemaChangerState() != nil {
338+
hasMergedIndex := catalog.HasDeclarativeMergedPrimaryIndex(desc)
339+
allColumns := desc.AllColumns()
340+
columnsToTrack = make([]catalog.Column, 0, len(allColumns))
341+
for _, column := range allColumns {
342+
// Skip all adding columns and non-public column.
343+
if column.Adding() {
344+
continue
345+
}
346+
// Pick up any write-only columns.
347+
if column.Dropped() {
348+
if !column.WriteAndDeleteOnly() || hasMergedIndex || column.IsHidden() {
349+
continue
350+
}
351+
if found, previousName := catalog.FindPreviousColumnNameForDeclarativeSchemaChange(desc, column.GetID()); found {
352+
colNameOverrides[column.GetID()] = previousName
353+
}
354+
}
355+
columnsToTrack = append(columnsToTrack, column)
356+
}
357+
// Recover the order of the original columns.
358+
slices.SortStableFunc(columnsToTrack, func(a, b catalog.Column) int {
359+
return int(a.GetPGAttributeNum()) - int(b.GetPGAttributeNum())
360+
})
361+
}
362+
var colOrd catalog.TableColMap
363+
for ord, col := range columnsToTrack {
364+
colOrd.Set(col.GetID(), ord)
365+
}
323366
writeOnlyAndPublic := catalog.ColumnIDToOrdinalMap(desc.WritableColumns())
324367
var primaryKeyOrdinal catalog.TableColMap
325368

@@ -335,20 +378,20 @@ func NewEventDescriptor(
335378
}
336379
return nil, errors.AssertionFailedf("expected to find column %d", ord)
337380
}
338-
primaryKeyOrdinal.Set(desc.PublicColumns()[ord].GetID(), ordIdx)
381+
primaryKeyOrdinal.Set(columnsToTrack[ord].GetID(), ordIdx)
339382
ordIdx += 1
340383
}
341384
sd.keyCols = make([]int, ordIdx)
342385

343386
switch {
344387
case keyOnly:
345388
ord := 0
346-
for _, col := range desc.PublicColumns() {
389+
for _, col := range columnsToTrack {
347390
pKeyOrd, isPKey := primaryKeyOrdinal.Get(col.GetID())
348391
if !isPKey {
349392
continue
350393
}
351-
colIdx := addColumn(col, ord)
394+
colIdx := addColumn(col, colNameOverrides[col.GetID()], ord)
352395
ord++
353396
sd.keyCols[pKeyOrd] = colIdx
354397
sd.valueCols = append(sd.valueCols, colIdx)
@@ -359,9 +402,9 @@ func NewEventDescriptor(
359402
// a sentinel ordinal position of virtualColOrd.
360403
inFamily := catalog.MakeTableColSet(family.ColumnIDs...)
361404
ord := 0
362-
for _, col := range desc.PublicColumns() {
405+
for _, col := range columnsToTrack {
363406
if isVirtual := col.IsVirtual(); isVirtual && includeVirtualColumns {
364-
colIdx := addColumn(col, virtualColOrd)
407+
colIdx := addColumn(col, colNameOverrides[col.GetID()], virtualColOrd)
365408
sd.valueCols = append(sd.valueCols, colIdx)
366409
continue
367410
}
@@ -370,7 +413,7 @@ func NewEventDescriptor(
370413
if !isPKey && !isInFamily {
371414
continue
372415
}
373-
colIdx := addColumn(col, ord)
416+
colIdx := addColumn(col, colNameOverrides[col.GetID()], ord)
374417
ord++
375418
if isPKey {
376419
sd.keyCols[pKeyOrd] = colIdx
@@ -871,7 +914,29 @@ func getRelevantColumnsForFamily(
871914
// matches the order of columns in the SQL table.
872915
idx := 0
873916
result := make([]descpb.ColumnID, cols.Len())
874-
for _, colID := range tableDesc.PublicColumnIDs() {
917+
visibleColumns := tableDesc.PublicColumns()
918+
if tableDesc.GetDeclarativeSchemaChangerState() != nil {
919+
hasMergedIndex := catalog.HasDeclarativeMergedPrimaryIndex(tableDesc)
920+
visibleColumns = make([]catalog.Column, 0, cols.Len())
921+
for _, col := range tableDesc.AllColumns() {
922+
if col.Adding() {
923+
continue
924+
}
925+
if tableDesc.GetDeclarativeSchemaChangerState() == nil && !col.Public() {
926+
continue
927+
}
928+
if col.Dropped() && (!col.WriteAndDeleteOnly() || hasMergedIndex) {
929+
continue
930+
}
931+
visibleColumns = append(visibleColumns, col)
932+
}
933+
// Recover the order of the original columns.
934+
slices.SortStableFunc(visibleColumns, func(a, b catalog.Column) int {
935+
return int(a.GetPGAttributeNum()) - int(b.GetPGAttributeNum())
936+
})
937+
}
938+
for _, col := range visibleColumns {
939+
colID := col.GetID()
875940
if cols.Contains(colID) {
876941
result[idx] = colID
877942
idx++

pkg/ccl/changefeedccl/cdcevent/event_test.go

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package cdcevent
88
import (
99
"context"
1010
"fmt"
11+
"slices"
1112
"sync/atomic"
1213
"testing"
1314

@@ -173,7 +174,6 @@ CREATE TABLE foo (
173174
FAMILY main (a, b, e),
174175
FAMILY only_c (c)
175176
)`)
176-
177177
for _, tc := range []struct {
178178
schemaChange string
179179
// Each new primary index generated during the test will pause at each stage
@@ -210,16 +210,21 @@ CREATE TABLE foo (
210210
expectedUDTCols: [][]string{{"e"}, {"e"}, {"e"}},
211211
},
212212
{
213-
// We are going to execute a mix of add, drop and alter primary key operations,
213+
// We are going to execute a mix of add, drop and alter primary key operations;
214214
// this will result in 3 primary indexes being swapped.
215-
// 1) The first primary index key will be the same as previous test
215+
// 1) The first primary index key will be the same as the previous test
216216
// 2) The second primary key will use the column "a", without a hash
217217
// sharding column since that needs to be created next.
218-
// 3) The final primary key will "a" and have hash sharding on it.
218+
// 3) The final primary key will "a" and have hash sharding on it (repeated
219+
// for the column removal).
220+
// The values stored will have the following transitions:
221+
// 1) Existing columns in the table
222+
// 2) New column j added (repeated for the final PK switch)
223+
// 3) Old column b removed (final state)
219224
schemaChange: "ALTER TABLE foo ADD COLUMN j INT DEFAULT 32, DROP COLUMN d, DROP COLUMN crdb_internal_a_b_shard_16, DROP COLUMN b, ALTER PRIMARY KEY USING COLUMNS(a) USING HASH",
220-
expectedKeyCols: [][]string{{"crdb_internal_c_e_shard_16", "c", "e"}, {"a"}, {"crdb_internal_a_shard_16", "a"}},
221-
expectedColumns: [][]string{{"a", "b", "e"}, {"a", "b", "e"}, {"a", "e", "j"}},
222-
expectedUDTCols: [][]string{{"e"}, {"e"}, {"e"}},
225+
expectedKeyCols: [][]string{{"crdb_internal_c_e_shard_16", "c", "e"}, {"a"}, {"crdb_internal_a_shard_16", "a"}, {"crdb_internal_a_shard_16", "a"}},
226+
expectedColumns: [][]string{{"a", "b", "e"}, {"a", "b", "e"}, {"a", "b", "e", "j"}, {"a", "e", "j"}},
227+
expectedUDTCols: [][]string{{"e"}, {"e"}, {"e"}, {"e"}},
223228
},
224229
} {
225230
t.Run(tc.schemaChange, func(t *testing.T) {
@@ -917,8 +922,25 @@ func expectResultColumnsWithFamily(
917922
colNamesSet[colName] = -1
918923
}
919924
ord := 0
920-
for _, col := range desc.PublicColumns() {
925+
nameOverrides := make(map[string]string)
926+
// Sort the columns based on the attribute number and if they are PK columns.
927+
sortedAllColumns := desc.AllColumns()
928+
slices.SortStableFunc(sortedAllColumns, func(a, b catalog.Column) int {
929+
return int(a.GetPGAttributeNum()) - int(b.GetPGAttributeNum())
930+
})
931+
for _, col := range sortedAllColumns {
932+
// Skip add mutations.
933+
if col.Adding() {
934+
continue
935+
}
936+
// Only include WriteAndDeleteOnly columns.
937+
if col.Dropped() && !col.WriteAndDeleteOnly() {
938+
continue
939+
}
921940
colName := string(col.ColName())
941+
if found, oldName := catalog.FindPreviousColumnNameForDeclarativeSchemaChange(desc, col.GetID()); found {
942+
nameOverrides[oldName] = colName
943+
}
922944
if _, ok := colNamesSet[colName]; ok {
923945
switch {
924946
case col.IsVirtual():
@@ -936,18 +958,31 @@ func expectResultColumnsWithFamily(
936958
}
937959

938960
for _, colName := range colNames {
939-
col, err := catalog.MustFindColumnByName(desc, colName)
940-
require.NoError(t, err)
961+
searchName := colName
962+
if nameOverrides[colName] != "" {
963+
searchName = nameOverrides[colName]
964+
}
965+
// If a column is missing add a dummy column, which
966+
// will force a mismatch / skip this set.
967+
col := catalog.FindColumnByName(desc, searchName)
968+
if col == nil {
969+
res = append(res, ResultColumn{
970+
ResultColumn: colinfo.ResultColumn{
971+
Name: "Missing column",
972+
},
973+
})
974+
continue
975+
}
941976
res = append(res, ResultColumn{
942977
ResultColumn: colinfo.ResultColumn{
943-
Name: col.GetName(),
978+
Name: colName,
944979
Typ: col.GetType(),
945980
TableID: desc.GetID(),
946981
PGAttributeNum: uint32(col.GetPGAttributeNum()),
947982
},
948983
Computed: col.IsComputed(),
949984
Nullable: col.IsNullable(),
950-
ord: colNamesSet[colName],
985+
ord: colNamesSet[searchName],
951986
sqlString: col.ColumnDesc().SQLStringNotHumanReadable(),
952987
})
953988
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2106,6 +2106,54 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
21062106
})
21072107
}
21082108

2109+
func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamiliesWithManualSchemaLocked(
2110+
t *testing.T,
2111+
) {
2112+
defer leaktest.AfterTest(t)()
2113+
defer log.Scope(t).Close(t)
2114+
2115+
require.NoError(t, log.SetVModule("kv_feed=2,changefeed_processors=2"))
2116+
2117+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
2118+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
2119+
2120+
sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
2121+
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)
2122+
2123+
var args []any
2124+
if _, ok := f.(*webhookFeedFactory); ok {
2125+
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
2126+
}
2127+
// Open up the changefeed.
2128+
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c`, args...)
2129+
defer closeFeed(t, cf)
2130+
assertPayloads(t, cf, []string{
2131+
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
2132+
`hasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`,
2133+
})
2134+
2135+
// Check that dropping a watched column will backfill the changefeed.
2136+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=false)`)
2137+
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
2138+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=true)`)
2139+
assertPayloads(t, cf, []string{
2140+
`hasfams.id_a: [0]->{"after": {"id": 0}}`,
2141+
})
2142+
2143+
// Check that dropping a watched column will backfill the changefeed.
2144+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=false)`)
2145+
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
2146+
sqlDB.Exec(t, `ALTER TABLE hasfams SET (schema_locked=true)`)
2147+
assertPayloads(t, cf, []string{
2148+
`hasfams.b_and_c: [0]->{"after": {"c": "c"}}`,
2149+
})
2150+
}
2151+
2152+
runWithAndWithoutRegression141453(t, testFn, func(t *testing.T, testFn cdcTestFn) {
2153+
cdcTest(t, testFn)
2154+
})
2155+
}
2156+
21092157
func TestNoStopAfterNonTargetAddColumnWithBackfill(t *testing.T) {
21102158
defer leaktest.AfterTest(t)()
21112159
defer log.Scope(t).Close(t)
@@ -3129,7 +3177,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
31293177
`drop_column: [2]->{"after": {"a": 2, "b": "2"}}`,
31303178
})
31313179
sqlDB.Exec(t, `ALTER TABLE drop_column DROP COLUMN b`)
3132-
ts := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `drop_column`, 2)
3180+
ts := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `drop_column`, 6)
31333181

31343182
// Backfill for DROP COLUMN b.
31353183
assertPayloads(t, dropColumn, []string{
@@ -3182,7 +3230,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
31823230
// the 7th step (version 15). Finally, when adding column d, it goes from 17->25 ith the schema change
31833231
// being visible at the 7th step (version 23).
31843232
// TODO(#142936): Investigate if this descriptor version hardcoding is sound.
3185-
dropTS := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 2)
3233+
dropTS := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 6)
31863234
addTS := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 15)
31873235
addTS2 := schematestutils.FetchDescVersionModificationTime(t, s.Server, `d`, `public`, `multiple_alters`, 23)
31883236

pkg/ccl/changefeedccl/schemafeed/table_event_filter.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,17 @@ func shouldFilterAddColumnEvent(e TableEvent, targets changefeedbase.Targets) (b
188188
return !watched, nil
189189
}
190190

191+
// notDeclarativeOrHasMergedIndex returns true if the descriptor has a declarative
192+
// schema changer with a merged index.
193+
func notDeclarativeOrHasMergedIndex(desc catalog.TableDescriptor) bool {
194+
// If there are no declarative schema changes then this will always be
195+
// true.
196+
if desc.GetDeclarativeSchemaChangerState() == nil {
197+
return true
198+
}
199+
return catalog.HasDeclarativeMergedPrimaryIndex(desc)
200+
}
201+
191202
// Returns true if the changefeed targets a column which has a drop mutation inside the table event.
192203
func droppedColumnIsWatched(e TableEvent, targets changefeedbase.Targets) (bool, error) {
193204
// If no column families are specified, then all columns are targeted.
@@ -212,7 +223,13 @@ func droppedColumnIsWatched(e TableEvent, targets changefeedbase.Targets) (bool,
212223
if m.AsColumn() == nil || m.AsColumn().IsHidden() {
213224
continue
214225
}
215-
if m.Dropped() && m.WriteAndDeleteOnly() && watchedColumnIDs.Contains(int(m.AsColumn().GetID())) {
226+
// For dropped columns wait for WriteAndDeleteOnly to be hit. When using
227+
// the declarative schema changer we will wait a bit later in the plan to
228+
// publish the dropped column, since schema_locked and the column being
229+
// write and delete only happen at the same stage. Since the schema change
230+
// is still in progress, there is a gray area in terms of when the change
231+
// should be visible.
232+
if m.Dropped() && m.WriteAndDeleteOnly() && notDeclarativeOrHasMergedIndex(e.After) && watchedColumnIDs.Contains(int(m.AsColumn().GetID())) {
216233
return true, nil
217234
}
218235
}
@@ -273,7 +290,13 @@ func dropVisibleColumnMutationExists(desc catalog.TableDescriptor) bool {
273290
if m.AsColumn() == nil || m.AsColumn().IsHidden() {
274291
continue
275292
}
276-
if m.Dropped() && m.WriteAndDeleteOnly() {
293+
// For dropped columns wait for WriteAndDeleteOnly to be hit. When using
294+
// the declarative schema changer we will wait a bit later in the plan to
295+
// publish the dropped column, since schema_locked and the column being
296+
// write and delete only happen at the same stage. Since the schema change
297+
// is still in progress, there is a gray area in terms of when the change
298+
// should be visible.
299+
if m.Dropped() && m.WriteAndDeleteOnly() && notDeclarativeOrHasMergedIndex(desc) {
277300
return true
278301
}
279302
}

0 commit comments

Comments
 (0)