Skip to content

Commit 1b300d8

Browse files
committed
catalog/replication: delete existing descriptors before updates
Fixes #153788 Release note: none
1 parent 43f2f96 commit 1b300d8

File tree

1 file changed

+25
-18
lines changed

1 file changed

+25
-18
lines changed

pkg/sql/catalog/replication/reader_catalog.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -132,25 +132,13 @@ func SetupOrAdvanceStandbyReaderCatalog(
132132
// if a table and sequence depend on each other, then updating one and
133133
// fetching the other in a mutable way to remove a dependency will hit
134134
// a validation error.
135-
for _, mut := range descriptorsToWrite {
136-
if err := txn.Descriptors().WriteDescToBatch(ctx, true, mut, b); err != nil {
137-
return errors.Wrapf(err, "unable to create replicated descriptor: %d %T", mut.GetID(), mut)
138-
}
139-
}
140-
if err := extracted.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
141-
if !shouldSetupForReader(e.GetID(), e.GetParentID()) {
142-
return nil
143-
}
144-
// Do not upsert entries if one already exists.
145-
entry := allExistingDescs.LookupNamespaceEntry(catalog.MakeNameInfo(e))
146-
if entry != nil && e.GetID() == entry.GetID() {
147-
return nil
148-
}
149-
return errors.Wrapf(txn.Descriptors().UpsertNamespaceEntryToBatch(ctx, true, e, b), "namespace entry %v", e)
150-
}); err != nil {
151-
return err
152-
}
135+
153136
// Figure out which descriptors should be deleted.
137+
//
138+
// NB: we issue deletes of existing descriptors/namespace entries before
139+
// we upsert new ones in the batch to ensure that if we need to delete and
140+
// upsert the same namespace entry but for a different table id, after the
141+
// txn, the reader will see the upsert.
154142
if err := allExistingDescs.ForEachDescriptor(func(desc catalog.Descriptor) error {
155143
// Skip descriptors that were updated above
156144
if !shouldSetupForReader(desc.GetID(), desc.GetParentID()) ||
@@ -177,6 +165,25 @@ func SetupOrAdvanceStandbyReaderCatalog(
177165
}); err != nil {
178166
return err
179167
}
168+
169+
for _, mut := range descriptorsToWrite {
170+
if err := txn.Descriptors().WriteDescToBatch(ctx, true, mut, b); err != nil {
171+
return errors.Wrapf(err, "unable to create replicated descriptor: %d %T", mut.GetID(), mut)
172+
}
173+
}
174+
if err := extracted.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
175+
if !shouldSetupForReader(e.GetID(), e.GetParentID()) {
176+
return nil
177+
}
178+
// Do not upsert entries if one already exists with the same ID.
179+
entry := allExistingDescs.LookupNamespaceEntry(catalog.MakeNameInfo(e))
180+
if entry != nil && e.GetID() == entry.GetID() {
181+
return nil
182+
}
183+
return errors.Wrapf(txn.Descriptors().UpsertNamespaceEntryToBatch(ctx, true, e, b), "namespace entry %v", e)
184+
}); err != nil {
185+
return err
186+
}
180187
if err := maybeBlockSchemaChangesOnSystemDatabase(ctx, txn, b); err != nil {
181188
return errors.Wrapf(err, "blocking schema changes")
182189
}

0 commit comments

Comments
 (0)