Skip to content

Commit d41232d

Browse files
committed
changefeedccl: fix incorrect previous values for dropped columns
Previously, when we adjusted when the backfill was emitted for the drop column under the declarative schema changer when schema_locked was active, we wound up emitting the previous values incorrectly. What this meant was that in the Avro format the old and new values would be the same when a column was being dropped. This was observed by the cdc/schemareg test. To address this, this patch correctly resolves the previous values in the drop column case from the last descriptor version. Fixes: #150540 Release note: None
1 parent 614e118 commit d41232d

File tree

6 files changed

+171
-34
lines changed

6 files changed

+171
-34
lines changed

pkg/ccl/changefeedccl/cdcevent/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ go_library(
3030
"//pkg/sql/pgwire/pgerror",
3131
"//pkg/sql/row",
3232
"//pkg/sql/rowenc",
33+
"//pkg/sql/sem/catid",
3334
"//pkg/sql/sem/eval",
3435
"//pkg/sql/sem/tree",
3536
"//pkg/sql/sessiondatapb",

pkg/ccl/changefeedccl/cdcevent/event.go

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package cdcevent
88
import (
99
"context"
1010
"fmt"
11+
"slices"
1112
"strings"
1213
"time"
1314

@@ -21,6 +22,7 @@ import (
2122
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2223
"github.com/cockroachdb/cockroach/pkg/sql/row"
2324
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
25+
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
2426
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
2527
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2628
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
@@ -292,7 +294,7 @@ func NewEventDescriptor(
292294
}
293295

294296
// addColumn is a helper to add a column to this descriptor.
295-
addColumn := func(col catalog.Column, ord int) int {
297+
addColumn := func(col catalog.Column, nameOverride string, ord int) int {
296298
resultColumn := ResultColumn{
297299
ResultColumn: colinfo.ResultColumn{
298300
Name: col.GetName(),
@@ -305,10 +307,12 @@ func NewEventDescriptor(
305307
ord: ord,
306308
sqlString: col.ColumnDesc().SQLStringNotHumanReadable(),
307309
}
308-
310+
if nameOverride != "" {
311+
resultColumn.Name = nameOverride
312+
}
309313
colIdx := len(sd.cols)
310314
sd.cols = append(sd.cols, resultColumn)
311-
sd.colsByName[col.GetName()] = colIdx
315+
sd.colsByName[resultColumn.Name] = colIdx
312316

313317
if col.GetType().UserDefined() {
314318
sd.udtCols = append(sd.udtCols, colIdx)
@@ -319,7 +323,46 @@ func NewEventDescriptor(
319323
// Primary key columns must be added in the same order they
320324
// appear in the primary key index.
321325
primaryIdx := desc.GetPrimaryIndex()
322-
colOrd := catalog.ColumnIDToOrdinalMap(desc.PublicColumns())
326+
// The declarative schema changer requires special handling for dropped columns
327+
// to ensure their previous values are available to the changefeed.
328+
// We select a prior descriptor where the column is in the WRITE_ONLY state
329+
// (instead of PUBLIC) and treat it as visible.
330+
// This is necessary because the schema change stages for dropped columns were
331+
// intentionally shifted to support automatic `schema_locked` management. The
332+
// declarative schema changer cannot make a column WRITE_ONLY and flip
333+
// `schema_locked` in two separate stages, so we instead treat the WRITE_ONLY
334+
// stage as a non-backfilling change for CDC.
335+
columnsToTrack := desc.PublicColumns()
336+
colNameOverrides := make(map[catid.ColumnID]string)
337+
if desc.GetDeclarativeSchemaChangerState() != nil {
338+
hasMergedIndex := catalog.HasDeclarativeMergedPrimaryIndex(desc)
339+
allColumns := desc.AllColumns()
340+
columnsToTrack = make([]catalog.Column, 0, len(allColumns))
341+
for _, column := range allColumns {
342+
// Skip all adding columns and non-public column.
343+
if column.Adding() {
344+
continue
345+
}
346+
// Pick up any write-only columns.
347+
if column.Dropped() {
348+
if !column.WriteAndDeleteOnly() || hasMergedIndex || column.IsHidden() {
349+
continue
350+
}
351+
if found, previousName := catalog.FindPreviousColumnNameForDeclarativeSchemaChange(desc, column.GetID()); found {
352+
colNameOverrides[column.GetID()] = previousName
353+
}
354+
}
355+
columnsToTrack = append(columnsToTrack, column)
356+
}
357+
// Recover the order of the original columns.
358+
slices.SortStableFunc(columnsToTrack, func(a, b catalog.Column) int {
359+
return int(a.GetPGAttributeNum()) - int(b.GetPGAttributeNum())
360+
})
361+
}
362+
var colOrd catalog.TableColMap
363+
for ord, col := range columnsToTrack {
364+
colOrd.Set(col.GetID(), ord)
365+
}
323366
writeOnlyAndPublic := catalog.ColumnIDToOrdinalMap(desc.WritableColumns())
324367
var primaryKeyOrdinal catalog.TableColMap
325368

@@ -335,20 +378,20 @@ func NewEventDescriptor(
335378
}
336379
return nil, errors.AssertionFailedf("expected to find column %d", ord)
337380
}
338-
primaryKeyOrdinal.Set(desc.PublicColumns()[ord].GetID(), ordIdx)
381+
primaryKeyOrdinal.Set(columnsToTrack[ord].GetID(), ordIdx)
339382
ordIdx += 1
340383
}
341384
sd.keyCols = make([]int, ordIdx)
342385

343386
switch {
344387
case keyOnly:
345388
ord := 0
346-
for _, col := range desc.PublicColumns() {
389+
for _, col := range columnsToTrack {
347390
pKeyOrd, isPKey := primaryKeyOrdinal.Get(col.GetID())
348391
if !isPKey {
349392
continue
350393
}
351-
colIdx := addColumn(col, ord)
394+
colIdx := addColumn(col, colNameOverrides[col.GetID()], ord)
352395
ord++
353396
sd.keyCols[pKeyOrd] = colIdx
354397
sd.valueCols = append(sd.valueCols, colIdx)
@@ -359,9 +402,9 @@ func NewEventDescriptor(
359402
// a sentinel ordinal position of virtualColOrd.
360403
inFamily := catalog.MakeTableColSet(family.ColumnIDs...)
361404
ord := 0
362-
for _, col := range desc.PublicColumns() {
405+
for _, col := range columnsToTrack {
363406
if isVirtual := col.IsVirtual(); isVirtual && includeVirtualColumns {
364-
colIdx := addColumn(col, virtualColOrd)
407+
colIdx := addColumn(col, colNameOverrides[col.GetID()], virtualColOrd)
365408
sd.valueCols = append(sd.valueCols, colIdx)
366409
continue
367410
}
@@ -370,7 +413,7 @@ func NewEventDescriptor(
370413
if !isPKey && !isInFamily {
371414
continue
372415
}
373-
colIdx := addColumn(col, ord)
416+
colIdx := addColumn(col, colNameOverrides[col.GetID()], ord)
374417
ord++
375418
if isPKey {
376419
sd.keyCols[pKeyOrd] = colIdx
@@ -871,7 +914,29 @@ func getRelevantColumnsForFamily(
871914
// matches the order of columns in the SQL table.
872915
idx := 0
873916
result := make([]descpb.ColumnID, cols.Len())
874-
for _, colID := range tableDesc.PublicColumnIDs() {
917+
visibleColumns := tableDesc.PublicColumns()
918+
if tableDesc.GetDeclarativeSchemaChangerState() != nil {
919+
hasMergedIndex := catalog.HasDeclarativeMergedPrimaryIndex(tableDesc)
920+
visibleColumns = make([]catalog.Column, 0, cols.Len())
921+
for _, col := range tableDesc.AllColumns() {
922+
if col.Adding() {
923+
continue
924+
}
925+
if tableDesc.GetDeclarativeSchemaChangerState() == nil && !col.Public() {
926+
continue
927+
}
928+
if col.Dropped() && (!col.WriteAndDeleteOnly() || hasMergedIndex) {
929+
continue
930+
}
931+
visibleColumns = append(visibleColumns, col)
932+
}
933+
// Recover the order of the original columns.
934+
slices.SortStableFunc(visibleColumns, func(a, b catalog.Column) int {
935+
return int(a.GetPGAttributeNum()) - int(b.GetPGAttributeNum())
936+
})
937+
}
938+
for _, col := range visibleColumns {
939+
colID := col.GetID()
875940
if cols.Contains(colID) {
876941
result[idx] = colID
877942
idx++

pkg/ccl/changefeedccl/cdcevent/event_test.go

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package cdcevent
88
import (
99
"context"
1010
"fmt"
11+
"slices"
1112
"sync/atomic"
1213
"testing"
1314

@@ -173,7 +174,6 @@ CREATE TABLE foo (
173174
FAMILY main (a, b, e),
174175
FAMILY only_c (c)
175176
)`)
176-
177177
for _, tc := range []struct {
178178
schemaChange string
179179
// Each new primary index generated during the test will pause at each stage
@@ -210,16 +210,21 @@ CREATE TABLE foo (
210210
expectedUDTCols: [][]string{{"e"}, {"e"}, {"e"}},
211211
},
212212
{
213-
// We are going to execute a mix of add, drop and alter primary key operations,
213+
// We are going to execute a mix of add, drop and alter primary key operations;
214214
// this will result in 3 primary indexes being swapped.
215-
// 1) The first primary index key will be the same as previous test
215+
// 1) The first primary index key will be the same as the previous test
216216
// 2) The second primary key will use the column "a", without a hash
217217
// sharding column since that needs to be created next.
218-
// 3) The final primary key will "a" and have hash sharding on it.
218+
// 3) The final primary key will "a" and have hash sharding on it (repeated
219+
// for the column removal).
220+
// The values stored will have the following transitions:
221+
// 1) Existing columns in the table
222+
// 2) New column j added (repeated for the final PK switch)
223+
// 3) Old column b removed (final state)
219224
schemaChange: "ALTER TABLE foo ADD COLUMN j INT DEFAULT 32, DROP COLUMN d, DROP COLUMN crdb_internal_a_b_shard_16, DROP COLUMN b, ALTER PRIMARY KEY USING COLUMNS(a) USING HASH",
220-
expectedKeyCols: [][]string{{"crdb_internal_c_e_shard_16", "c", "e"}, {"a"}, {"crdb_internal_a_shard_16", "a"}},
221-
expectedColumns: [][]string{{"a", "b", "e"}, {"a", "b", "e"}, {"a", "e", "j"}},
222-
expectedUDTCols: [][]string{{"e"}, {"e"}, {"e"}},
225+
expectedKeyCols: [][]string{{"crdb_internal_c_e_shard_16", "c", "e"}, {"a"}, {"crdb_internal_a_shard_16", "a"}, {"crdb_internal_a_shard_16", "a"}},
226+
expectedColumns: [][]string{{"a", "b", "e"}, {"a", "b", "e"}, {"a", "b", "e", "j"}, {"a", "e", "j"}},
227+
expectedUDTCols: [][]string{{"e"}, {"e"}, {"e"}, {"e"}},
223228
},
224229
} {
225230
t.Run(tc.schemaChange, func(t *testing.T) {
@@ -917,8 +922,25 @@ func expectResultColumnsWithFamily(
917922
colNamesSet[colName] = -1
918923
}
919924
ord := 0
920-
for _, col := range desc.PublicColumns() {
925+
nameOverrides := make(map[string]string)
926+
// Sort the columns based on the attribute number and if they are PK columns.
927+
sortedAllColumns := desc.AllColumns()
928+
slices.SortStableFunc(sortedAllColumns, func(a, b catalog.Column) int {
929+
return int(a.GetPGAttributeNum()) - int(b.GetPGAttributeNum())
930+
})
931+
for _, col := range sortedAllColumns {
932+
// Skip add mutations.
933+
if col.Adding() {
934+
continue
935+
}
936+
// Only include WriteAndDeleteOnly columns.
937+
if col.Dropped() && !col.WriteAndDeleteOnly() {
938+
continue
939+
}
921940
colName := string(col.ColName())
941+
if found, oldName := catalog.FindPreviousColumnNameForDeclarativeSchemaChange(desc, col.GetID()); found {
942+
nameOverrides[oldName] = colName
943+
}
922944
if _, ok := colNamesSet[colName]; ok {
923945
switch {
924946
case col.IsVirtual():
@@ -936,18 +958,31 @@ func expectResultColumnsWithFamily(
936958
}
937959

938960
for _, colName := range colNames {
939-
col, err := catalog.MustFindColumnByName(desc, colName)
940-
require.NoError(t, err)
961+
searchName := colName
962+
if nameOverrides[colName] != "" {
963+
searchName = nameOverrides[colName]
964+
}
965+
// If a column is missing add a dummy column, which
966+
// will force a mismatch / skip this set.
967+
col := catalog.FindColumnByName(desc, searchName)
968+
if col == nil {
969+
res = append(res, ResultColumn{
970+
ResultColumn: colinfo.ResultColumn{
971+
Name: "Missing column",
972+
},
973+
})
974+
continue
975+
}
941976
res = append(res, ResultColumn{
942977
ResultColumn: colinfo.ResultColumn{
943-
Name: col.GetName(),
978+
Name: colName,
944979
Typ: col.GetType(),
945980
TableID: desc.GetID(),
946981
PGAttributeNum: uint32(col.GetPGAttributeNum()),
947982
},
948983
Computed: col.IsComputed(),
949984
Nullable: col.IsNullable(),
950-
ord: colNamesSet[colName],
985+
ord: colNamesSet[searchName],
951986
sqlString: col.ColumnDesc().SQLStringNotHumanReadable(),
952987
})
953988
}

pkg/ccl/changefeedccl/schemafeed/table_event_filter.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -191,21 +191,12 @@ func shouldFilterAddColumnEvent(e TableEvent, targets changefeedbase.Targets) (b
191191
// notDeclarativeOrHasMergedIndex returns true if the descriptor has a declarative
192192
// schema changer with a merged index.
193193
func notDeclarativeOrHasMergedIndex(desc catalog.TableDescriptor) bool {
194-
// If there are not declarative schema changes then this will always be
194+
// If there are no declarative schema changes then this will always be
195195
// true.
196196
if desc.GetDeclarativeSchemaChangerState() == nil {
197197
return true
198198
}
199-
// For declarative schema changes detect when a new primary index becomes
200-
// WRITE_ONLY (i.e. backfill has been completed).
201-
for idx, target := range desc.GetDeclarativeSchemaChangerState().Targets {
202-
if target.GetPrimaryIndex() != nil &&
203-
target.TargetStatus == scpb.Status_PUBLIC &&
204-
desc.GetDeclarativeSchemaChangerState().CurrentStatuses[idx] == scpb.Status_WRITE_ONLY {
205-
return true
206-
}
207-
}
208-
return false
199+
return catalog.HasDeclarativeMergedPrimaryIndex(desc)
209200
}
210201

211202
// Returns true if the changefeed targets a column which has a drop mutation inside the table event.

pkg/sql/catalog/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ go_library(
4040
"//pkg/sql/privilege",
4141
"//pkg/sql/schemachanger/scpb",
4242
"//pkg/sql/sem/catconstants",
43+
"//pkg/sql/sem/catid",
4344
"//pkg/sql/sem/idxtype",
4445
"//pkg/sql/sem/semenumpb",
4546
"//pkg/sql/sem/tree",

pkg/sql/catalog/descriptor.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
2222
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
2323
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
24+
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
2425
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2526
"github.com/cockroachdb/cockroach/pkg/sql/types"
2627
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -1207,3 +1208,46 @@ func IsSystemDescriptor(desc Descriptor) bool {
12071208
func HasConcurrentDeclarativeSchemaChange(desc Descriptor) bool {
12081209
return desc.GetDeclarativeSchemaChangerState() != nil
12091210
}
1211+
1212+
// HasDeclarativeMergedPrimaryIndex returns if a primary key swap is occurring
1213+
// via the declarative schema changer, and the new index is in a write-only state.
1214+
func HasDeclarativeMergedPrimaryIndex(desc TableDescriptor) bool {
1215+
// For declarative schema changes detect when a new primary index becomes
1216+
// WRITE_ONLY (i.e. backfill has been completed).
1217+
state := desc.GetDeclarativeSchemaChangerState()
1218+
if state == nil {
1219+
return false
1220+
}
1221+
for idx, target := range state.Targets {
1222+
if target.GetPrimaryIndex() != nil &&
1223+
target.GetPrimaryIndex().TableID == desc.GetID() &&
1224+
(target.TargetStatus == scpb.Status_PUBLIC || target.TargetStatus == scpb.Status_TRANSIENT_ABSENT) &&
1225+
state.CurrentStatuses[idx] == scpb.Status_WRITE_ONLY {
1226+
return true
1227+
}
1228+
}
1229+
return false
1230+
}
1231+
1232+
// FindPreviousColumnNameForDeclarativeSchemaChange attempts to resolve the previous
1233+
// column name for a declarative schema change. This is handy for cases where the
1234+
// original name needs to be known.
1235+
func FindPreviousColumnNameForDeclarativeSchemaChange(
1236+
desc TableDescriptor, columnID catid.ColumnID,
1237+
) (bool, string) {
1238+
state := desc.GetDeclarativeSchemaChangerState()
1239+
if state == nil {
1240+
// No declarative schema change is active.
1241+
return false, ""
1242+
}
1243+
// Find the dropped column name element name that maps to the old name.
1244+
for _, elem := range state.Targets {
1245+
if nameElem, ok := elem.Element().(*scpb.ColumnName); ok &&
1246+
nameElem.TableID == desc.GetID() &&
1247+
nameElem.ColumnID == columnID &&
1248+
elem.TargetStatus == scpb.Status_ABSENT {
1249+
return true, nameElem.Name
1250+
}
1251+
}
1252+
return false, ""
1253+
}

0 commit comments

Comments
 (0)