Skip to content

Commit b71b2d1

Browse files
committed
changefeedccl: fix incorrect previous values for dropped columns
Previously, when we adjusted when the backfill was emitted for the drop column under the declarative schema changer when schema_locked was active, we wound up emitting the previous values incorrectly. What this meant was that in the Avro format the old and new values would be the same when a column was being dropped. This was observed by the cdc/schemareg test. To address this, this patch correctly resolves the previous values in the drop column case from the last descriptor version. Fixes: #150540 Release note: None
1 parent b4e83ac commit b71b2d1

File tree

6 files changed

+171
-34
lines changed

6 files changed

+171
-34
lines changed

pkg/ccl/changefeedccl/cdcevent/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ go_library(
3030
"//pkg/sql/pgwire/pgerror",
3131
"//pkg/sql/row",
3232
"//pkg/sql/rowenc",
33+
"//pkg/sql/sem/catid",
3334
"//pkg/sql/sem/eval",
3435
"//pkg/sql/sem/tree",
3536
"//pkg/sql/sessiondatapb",

pkg/ccl/changefeedccl/cdcevent/event.go

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package cdcevent
88
import (
99
"context"
1010
"fmt"
11+
"slices"
1112
"strings"
1213
"time"
1314

@@ -21,6 +22,7 @@ import (
2122
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2223
"github.com/cockroachdb/cockroach/pkg/sql/row"
2324
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
25+
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
2426
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
2527
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2628
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
@@ -302,7 +304,7 @@ func NewEventDescriptor(
302304
}
303305

304306
// addColumn is a helper to add a column to this descriptor.
305-
addColumn := func(col catalog.Column, ord int) int {
307+
addColumn := func(col catalog.Column, nameOverride string, ord int) int {
306308
resultColumn := ResultColumn{
307309
ResultColumn: colinfo.ResultColumn{
308310
Name: col.GetName(),
@@ -315,10 +317,12 @@ func NewEventDescriptor(
315317
ord: ord,
316318
sqlString: col.ColumnDesc().SQLStringNotHumanReadable(),
317319
}
318-
320+
if nameOverride != "" {
321+
resultColumn.Name = nameOverride
322+
}
319323
colIdx := len(sd.cols)
320324
sd.cols = append(sd.cols, resultColumn)
321-
sd.colsByName[col.GetName()] = colIdx
325+
sd.colsByName[resultColumn.Name] = colIdx
322326

323327
if col.GetType().UserDefined() {
324328
sd.udtCols = append(sd.udtCols, colIdx)
@@ -329,7 +333,46 @@ func NewEventDescriptor(
329333
// Primary key columns must be added in the same order they
330334
// appear in the primary key index.
331335
primaryIdx := desc.GetPrimaryIndex()
332-
colOrd := catalog.ColumnIDToOrdinalMap(desc.PublicColumns())
336+
// The declarative schema changer requires special handling for dropped columns
337+
// to ensure their previous values are available to the changefeed.
338+
// We select a prior descriptor where the column is in the WRITE_ONLY state
339+
// (instead of PUBLIC) and treat it as visible.
340+
// This is necessary because the schema change stages for dropped columns were
341+
// intentionally shifted to support automatic `schema_locked` management. The
342+
// declarative schema changer cannot make a column WRITE_ONLY and flip
343+
// `schema_locked` in two separate stages, so we instead treat the WRITE_ONLY
344+
// stage as a non-backfilling change for CDC.
345+
columnsToTrack := desc.PublicColumns()
346+
colNameOverrides := make(map[catid.ColumnID]string)
347+
if desc.GetDeclarativeSchemaChangerState() != nil {
348+
hasMergedIndex := catalog.HasDeclarativeMergedPrimaryIndex(desc)
349+
allColumns := desc.AllColumns()
350+
columnsToTrack = make([]catalog.Column, 0, len(allColumns))
351+
for _, column := range allColumns {
352+
// Skip all adding columns and non-public column.
353+
if column.Adding() {
354+
continue
355+
}
356+
// Pick up any write-only columns.
357+
if column.Dropped() {
358+
if !column.WriteAndDeleteOnly() || hasMergedIndex || column.IsHidden() {
359+
continue
360+
}
361+
if found, previousName := catalog.FindPreviousColumnNameForDeclarativeSchemaChange(desc, column.GetID()); found {
362+
colNameOverrides[column.GetID()] = previousName
363+
}
364+
}
365+
columnsToTrack = append(columnsToTrack, column)
366+
}
367+
// Recover the order of the original columns.
368+
slices.SortStableFunc(columnsToTrack, func(a, b catalog.Column) int {
369+
return int(a.GetPGAttributeNum()) - int(b.GetPGAttributeNum())
370+
})
371+
}
372+
var colOrd catalog.TableColMap
373+
for ord, col := range columnsToTrack {
374+
colOrd.Set(col.GetID(), ord)
375+
}
333376
writeOnlyAndPublic := catalog.ColumnIDToOrdinalMap(desc.WritableColumns())
334377
var primaryKeyOrdinal catalog.TableColMap
335378

@@ -345,20 +388,20 @@ func NewEventDescriptor(
345388
}
346389
return nil, errors.AssertionFailedf("expected to find column %d", ord)
347390
}
348-
primaryKeyOrdinal.Set(desc.PublicColumns()[ord].GetID(), ordIdx)
391+
primaryKeyOrdinal.Set(columnsToTrack[ord].GetID(), ordIdx)
349392
ordIdx += 1
350393
}
351394
sd.keyCols = make([]int, ordIdx)
352395

353396
switch {
354397
case keyOnly:
355398
ord := 0
356-
for _, col := range desc.PublicColumns() {
399+
for _, col := range columnsToTrack {
357400
pKeyOrd, isPKey := primaryKeyOrdinal.Get(col.GetID())
358401
if !isPKey {
359402
continue
360403
}
361-
colIdx := addColumn(col, ord)
404+
colIdx := addColumn(col, colNameOverrides[col.GetID()], ord)
362405
ord++
363406
sd.keyCols[pKeyOrd] = colIdx
364407
sd.valueCols = append(sd.valueCols, colIdx)
@@ -369,9 +412,9 @@ func NewEventDescriptor(
369412
// a sentinel ordinal position of virtualColOrd.
370413
inFamily := catalog.MakeTableColSet(family.ColumnIDs...)
371414
ord := 0
372-
for _, col := range desc.PublicColumns() {
415+
for _, col := range columnsToTrack {
373416
if isVirtual := col.IsVirtual(); isVirtual && includeVirtualColumns {
374-
colIdx := addColumn(col, virtualColOrd)
417+
colIdx := addColumn(col, colNameOverrides[col.GetID()], virtualColOrd)
375418
sd.valueCols = append(sd.valueCols, colIdx)
376419
continue
377420
}
@@ -380,7 +423,7 @@ func NewEventDescriptor(
380423
if !isPKey && !isInFamily {
381424
continue
382425
}
383-
colIdx := addColumn(col, ord)
426+
colIdx := addColumn(col, colNameOverrides[col.GetID()], ord)
384427
ord++
385428
if isPKey {
386429
sd.keyCols[pKeyOrd] = colIdx
@@ -881,7 +924,29 @@ func getRelevantColumnsForFamily(
881924
// matches the order of columns in the SQL table.
882925
idx := 0
883926
result := make([]descpb.ColumnID, cols.Len())
884-
for _, colID := range tableDesc.PublicColumnIDs() {
927+
visibleColumns := tableDesc.PublicColumns()
928+
if tableDesc.GetDeclarativeSchemaChangerState() != nil {
929+
hasMergedIndex := catalog.HasDeclarativeMergedPrimaryIndex(tableDesc)
930+
visibleColumns = make([]catalog.Column, 0, cols.Len())
931+
for _, col := range tableDesc.AllColumns() {
932+
if col.Adding() {
933+
continue
934+
}
935+
if tableDesc.GetDeclarativeSchemaChangerState() == nil && !col.Public() {
936+
continue
937+
}
938+
if col.Dropped() && (!col.WriteAndDeleteOnly() || hasMergedIndex) {
939+
continue
940+
}
941+
visibleColumns = append(visibleColumns, col)
942+
}
943+
// Recover the order of the original columns.
944+
slices.SortStableFunc(visibleColumns, func(a, b catalog.Column) int {
945+
return int(a.GetPGAttributeNum()) - int(b.GetPGAttributeNum())
946+
})
947+
}
948+
for _, col := range visibleColumns {
949+
colID := col.GetID()
885950
if cols.Contains(colID) {
886951
result[idx] = colID
887952
idx++

pkg/ccl/changefeedccl/cdcevent/event_test.go

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package cdcevent
88
import (
99
"context"
1010
"fmt"
11+
"slices"
1112
"sync/atomic"
1213
"testing"
1314

@@ -173,7 +174,6 @@ CREATE TABLE foo (
173174
FAMILY main (a, b, e),
174175
FAMILY only_c (c)
175176
)`)
176-
177177
for _, tc := range []struct {
178178
schemaChange string
179179
// Each new primary index generated during the test will pause at each stage
@@ -210,16 +210,21 @@ CREATE TABLE foo (
210210
expectedUDTCols: [][]string{{"e"}, {"e"}, {"e"}},
211211
},
212212
{
213-
// We are going to execute a mix of add, drop and alter primary key operations,
213+
// We are going to execute a mix of add, drop and alter primary key operations;
214214
// this will result in 3 primary indexes being swapped.
215-
// 1) The first primary index key will be the same as previous test
215+
// 1) The first primary index key will be the same as the previous test
216216
// 2) The second primary key will use the column "a", without a hash
217217
// sharding column since that needs to be created next.
218-
// 3) The final primary key will "a" and have hash sharding on it.
218+
// 3) The final primary key will "a" and have hash sharding on it (repeated
219+
// for the column removal).
220+
// The values stored will have the following transitions:
221+
// 1) Existing columns in the table
222+
// 2) New column j added (repeated for the final PK switch)
223+
// 3) Old column b removed (final state)
219224
schemaChange: "ALTER TABLE foo ADD COLUMN j INT DEFAULT 32, DROP COLUMN d, DROP COLUMN crdb_internal_a_b_shard_16, DROP COLUMN b, ALTER PRIMARY KEY USING COLUMNS(a) USING HASH",
220-
expectedKeyCols: [][]string{{"crdb_internal_c_e_shard_16", "c", "e"}, {"a"}, {"crdb_internal_a_shard_16", "a"}},
221-
expectedColumns: [][]string{{"a", "b", "e"}, {"a", "b", "e"}, {"a", "e", "j"}},
222-
expectedUDTCols: [][]string{{"e"}, {"e"}, {"e"}},
225+
expectedKeyCols: [][]string{{"crdb_internal_c_e_shard_16", "c", "e"}, {"a"}, {"crdb_internal_a_shard_16", "a"}, {"crdb_internal_a_shard_16", "a"}},
226+
expectedColumns: [][]string{{"a", "b", "e"}, {"a", "b", "e"}, {"a", "b", "e", "j"}, {"a", "e", "j"}},
227+
expectedUDTCols: [][]string{{"e"}, {"e"}, {"e"}, {"e"}},
223228
},
224229
} {
225230
t.Run(tc.schemaChange, func(t *testing.T) {
@@ -917,8 +922,25 @@ func expectResultColumnsWithFamily(
917922
colNamesSet[colName] = -1
918923
}
919924
ord := 0
920-
for _, col := range desc.PublicColumns() {
925+
nameOverrides := make(map[string]string)
926+
// Sort the columns based on the attribute number and if they are PK columns.
927+
sortedAllColumns := desc.AllColumns()
928+
slices.SortStableFunc(sortedAllColumns, func(a, b catalog.Column) int {
929+
return int(a.GetPGAttributeNum()) - int(b.GetPGAttributeNum())
930+
})
931+
for _, col := range sortedAllColumns {
932+
// Skip add mutations.
933+
if col.Adding() {
934+
continue
935+
}
936+
// Only include WriteAndDeleteOnly columns.
937+
if col.Dropped() && !col.WriteAndDeleteOnly() {
938+
continue
939+
}
921940
colName := string(col.ColName())
941+
if found, oldName := catalog.FindPreviousColumnNameForDeclarativeSchemaChange(desc, col.GetID()); found {
942+
nameOverrides[oldName] = colName
943+
}
922944
if _, ok := colNamesSet[colName]; ok {
923945
switch {
924946
case col.IsVirtual():
@@ -936,18 +958,31 @@ func expectResultColumnsWithFamily(
936958
}
937959

938960
for _, colName := range colNames {
939-
col, err := catalog.MustFindColumnByName(desc, colName)
940-
require.NoError(t, err)
961+
searchName := colName
962+
if nameOverrides[colName] != "" {
963+
searchName = nameOverrides[colName]
964+
}
965+
// If a column is missing add a dummy column, which
966+
// will force a mismatch / skip this set.
967+
col := catalog.FindColumnByName(desc, searchName)
968+
if col == nil {
969+
res = append(res, ResultColumn{
970+
ResultColumn: colinfo.ResultColumn{
971+
Name: "Missing column",
972+
},
973+
})
974+
continue
975+
}
941976
res = append(res, ResultColumn{
942977
ResultColumn: colinfo.ResultColumn{
943-
Name: col.GetName(),
978+
Name: colName,
944979
Typ: col.GetType(),
945980
TableID: desc.GetID(),
946981
PGAttributeNum: uint32(col.GetPGAttributeNum()),
947982
},
948983
Computed: col.IsComputed(),
949984
Nullable: col.IsNullable(),
950-
ord: colNamesSet[colName],
985+
ord: colNamesSet[searchName],
951986
sqlString: col.ColumnDesc().SQLStringNotHumanReadable(),
952987
})
953988
}

pkg/ccl/changefeedccl/schemafeed/table_event_filter.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -191,21 +191,12 @@ func shouldFilterAddColumnEvent(e TableEvent, targets changefeedbase.Targets) (b
191191
// notDeclarativeOrHasMergedIndex returns true if the descriptor has a declarative
192192
// schema changer with a merged index.
193193
func notDeclarativeOrHasMergedIndex(desc catalog.TableDescriptor) bool {
194-
// If there are not declarative schema changes then this will always be
194+
// If there are no declarative schema changes then this will always be
195195
// true.
196196
if desc.GetDeclarativeSchemaChangerState() == nil {
197197
return true
198198
}
199-
// For declarative schema changes detect when a new primary index becomes
200-
// WRITE_ONLY (i.e. backfill has been completed).
201-
for idx, target := range desc.GetDeclarativeSchemaChangerState().Targets {
202-
if target.GetPrimaryIndex() != nil &&
203-
target.TargetStatus == scpb.Status_PUBLIC &&
204-
desc.GetDeclarativeSchemaChangerState().CurrentStatuses[idx] == scpb.Status_WRITE_ONLY {
205-
return true
206-
}
207-
}
208-
return false
199+
return catalog.HasDeclarativeMergedPrimaryIndex(desc)
209200
}
210201

211202
// Returns true if the changefeed targets a column which has a drop mutation inside the table event.

pkg/sql/catalog/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ go_library(
4040
"//pkg/sql/privilege",
4141
"//pkg/sql/schemachanger/scpb",
4242
"//pkg/sql/sem/catconstants",
43+
"//pkg/sql/sem/catid",
4344
"//pkg/sql/sem/idxtype",
4445
"//pkg/sql/sem/semenumpb",
4546
"//pkg/sql/sem/tree",

pkg/sql/catalog/descriptor.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
2222
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
2323
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
24+
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
2425
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2526
"github.com/cockroachdb/cockroach/pkg/sql/types"
2627
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -1207,3 +1208,46 @@ func IsSystemDescriptor(desc Descriptor) bool {
12071208
func HasConcurrentDeclarativeSchemaChange(desc Descriptor) bool {
12081209
return desc.GetDeclarativeSchemaChangerState() != nil
12091210
}
1211+
1212+
// HasDeclarativeMergedPrimaryIndex returns if a primary key swap is occurring
1213+
// via the declarative schema changer, and the new index is in a write-only state.
1214+
func HasDeclarativeMergedPrimaryIndex(desc TableDescriptor) bool {
1215+
// For declarative schema changes detect when a new primary index becomes
1216+
// WRITE_ONLY (i.e. backfill has been completed).
1217+
state := desc.GetDeclarativeSchemaChangerState()
1218+
if state == nil {
1219+
return false
1220+
}
1221+
for idx, target := range state.Targets {
1222+
if target.GetPrimaryIndex() != nil &&
1223+
target.GetPrimaryIndex().TableID == desc.GetID() &&
1224+
(target.TargetStatus == scpb.Status_PUBLIC || target.TargetStatus == scpb.Status_TRANSIENT_ABSENT) &&
1225+
state.CurrentStatuses[idx] == scpb.Status_WRITE_ONLY {
1226+
return true
1227+
}
1228+
}
1229+
return false
1230+
}
1231+
1232+
// FindPreviousColumnNameForDeclarativeSchemaChange attempts to resolve the previous
1233+
// column name for a declarative schema change. This is handy for cases where the
1234+
// original name needs to be known.
1235+
func FindPreviousColumnNameForDeclarativeSchemaChange(
1236+
desc TableDescriptor, columnID catid.ColumnID,
1237+
) (bool, string) {
1238+
state := desc.GetDeclarativeSchemaChangerState()
1239+
if state == nil {
1240+
// No declarative schema change is active.
1241+
return false, ""
1242+
}
1243+
// Find the dropped column name element name that maps to the old name.
1244+
for _, elem := range state.Targets {
1245+
if nameElem, ok := elem.Element().(*scpb.ColumnName); ok &&
1246+
nameElem.TableID == desc.GetID() &&
1247+
nameElem.ColumnID == columnID &&
1248+
elem.TargetStatus == scpb.Status_ABSENT {
1249+
return true, nameElem.Name
1250+
}
1251+
}
1252+
return false, ""
1253+
}

0 commit comments

Comments
 (0)