Skip to content

Commit 73a66d2

Browse files
committed
changefeedccl: fix ALTER CHANGEFEED span selection
This patch updates the ALTER CHANGEFEED code to use primary index spans instead of entire table spans to be consistent with the rest of the changefeed code and to help maintain the assumption that we will never merge spans from different tables in our frontier data structures. Release note: None
1 parent 1e2f0e2 commit 73a66d2

File tree

1 file changed

+50
-17
lines changed

1 file changed

+50
-17
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -541,12 +541,19 @@ func generateAndValidateNewTargets(
541541
(initialScanType == `only` && originalInitialScanOnlyOption)
542542

543543
// TODO(#142376): Audit whether this list is generated correctly.
544-
var existingTargetIDs []descpb.ID
544+
var existingTargetSpanIDs []spanID
545545
for _, targetDesc := range newTableDescs {
546-
existingTargetIDs = append(existingTargetIDs, targetDesc.GetID())
546+
tableDesc, ok := targetDesc.(catalog.TableDescriptor)
547+
if !ok {
548+
return nil, nil, hlc.Timestamp{}, nil, errors.AssertionFailedf("expected table descriptor")
549+
}
550+
existingTargetSpanIDs = append(existingTargetSpanIDs, spanID{
551+
tableID: tableDesc.GetID(),
552+
indexID: tableDesc.GetPrimaryIndexID(),
553+
})
547554
}
548-
existingTargetSpans := fetchSpansForDescs(p, existingTargetIDs)
549-
var newTargetIDs []descpb.ID
555+
existingTargetSpans := fetchSpansForDescs(p, existingTargetSpanIDs)
556+
var addedTargetSpanIDs []spanID
550557
for _, target := range v.Targets {
551558
desc, found, err := getTargetDesc(ctx, p, descResolver, target.TableName)
552559
if err != nil {
@@ -563,10 +570,17 @@ func generateAndValidateNewTargets(
563570
k := targetKey{TableID: desc.GetID(), FamilyName: target.FamilyName}
564571
newTargets[k] = target
565572
newTableDescs[desc.GetID()] = desc
566-
newTargetIDs = append(newTargetIDs, k.TableID)
573+
tableDesc, ok := desc.(catalog.TableDescriptor)
574+
if !ok {
575+
return nil, nil, hlc.Timestamp{}, nil, errors.AssertionFailedf("expected table descriptor")
576+
}
577+
addedTargetSpanIDs = append(addedTargetSpanIDs, spanID{
578+
tableID: tableDesc.GetID(),
579+
indexID: tableDesc.GetPrimaryIndexID(),
580+
})
567581
}
568582

569-
addedTargetSpans := fetchSpansForDescs(p, newTargetIDs)
583+
addedTargetSpans := fetchSpansForDescs(p, addedTargetSpanIDs)
570584

571585
// By default, we will not perform an initial scan on newly added
572586
// targets. Hence, the user must explicitly state that they want an
@@ -632,13 +646,18 @@ func generateAndValidateNewTargets(
632646
for k := range newTargets {
633647
addedTargets[k.TableID] = struct{}{}
634648
}
635-
droppedIDs := make([]descpb.ID, 0, len(droppedTargets))
649+
droppedSpanIDs := make([]spanID, 0, len(droppedTargets))
636650
for k := range droppedTargets {
637651
if _, wasAdded := addedTargets[k.TableID]; !wasAdded {
638-
droppedIDs = append(droppedIDs, k.TableID)
652+
// For dropped tables, we might not have the desc anymore so
653+
// we can't get the index ID. In any case, it's safe to wipe
654+
// out the entire table span.
655+
droppedSpanIDs = append(droppedSpanIDs, spanID{
656+
tableID: k.TableID,
657+
})
639658
}
640659
}
641-
droppedTargetSpans := fetchSpansForDescs(p, droppedIDs)
660+
droppedTargetSpans := fetchSpansForDescs(p, droppedSpanIDs)
642661
if err := removeSpansFromProgress(newJobProgress, droppedTargetSpans); err != nil {
643662
return nil, nil, hlc.Timestamp{}, nil, err
644663
}
@@ -874,19 +893,33 @@ func getSpanLevelCheckpointFromProgress(
874893
return changefeedProgress.SpanLevelCheckpoint, nil
875894
}
876895

877-
func fetchSpansForDescs(p sql.PlanHookState, descIDs []descpb.ID) (primarySpans []roachpb.Span) {
878-
seen := make(map[descpb.ID]struct{})
896+
type spanID struct {
897+
tableID descpb.ID
898+
indexID descpb.IndexID
899+
}
900+
901+
func fetchSpansForDescs(p sql.PlanHookState, spanIDs []spanID) (primarySpans []roachpb.Span) {
902+
seen := make(map[spanID]struct{})
879903
codec := p.ExtendedEvalContext().Codec
880-
for _, id := range descIDs {
904+
for _, id := range spanIDs {
881905
if _, isDup := seen[id]; isDup {
882906
continue
883907
}
884908
seen[id] = struct{}{}
885-
tablePrefix := codec.TablePrefix(uint32(id))
886-
primarySpan := roachpb.Span{
887-
Key: tablePrefix,
888-
EndKey: tablePrefix.PrefixEnd(),
889-
}
909+
primarySpan := func() roachpb.Span {
910+
if id.indexID == 0 {
911+
tablePrefix := codec.TablePrefix(uint32(id.tableID))
912+
return roachpb.Span{
913+
Key: tablePrefix,
914+
EndKey: tablePrefix.PrefixEnd(),
915+
}
916+
}
917+
indexPrefix := codec.IndexPrefix(uint32(id.tableID), uint32(id.indexID))
918+
return roachpb.Span{
919+
Key: indexPrefix,
920+
EndKey: indexPrefix.PrefixEnd(),
921+
}
922+
}()
890923
primarySpans = append(primarySpans, primarySpan)
891924
}
892925
return primarySpans

0 commit comments

Comments
 (0)