Skip to content

Commit 733c90d

Browse files
committed
catalog/replication: delete existing descriptors before updates
Fixes #153788 Release note: none
1 parent adc3de4 commit 733c90d

File tree

1 file changed

+25
-18
lines changed

1 file changed

+25
-18
lines changed

pkg/sql/catalog/replication/reader_catalog.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -133,25 +133,13 @@ func SetupOrAdvanceStandbyReaderCatalog(
133133
// if a table and sequence depend on each other, then updating one and
134134
// fetching the other in a mutable way to remove a dependency will hit
135135
// a validation error.
136-
for _, mut := range descriptorsToWrite {
137-
if err := txn.Descriptors().WriteDescToBatch(ctx, true, mut, b); err != nil {
138-
return errors.Wrapf(err, "unable to create replicated descriptor: %d %T", mut.GetID(), mut)
139-
}
140-
}
141-
if err := extracted.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
142-
if !shouldSetupForReader(e.GetID(), e.GetName(), e.GetParentID()) {
143-
return nil
144-
}
145-
// Do not upsert entries if one already exists.
146-
entry := allExistingDescs.LookupNamespaceEntry(catalog.MakeNameInfo(e))
147-
if entry != nil && e.GetID() == entry.GetID() {
148-
return nil
149-
}
150-
return errors.Wrapf(txn.Descriptors().UpsertNamespaceEntryToBatch(ctx, true, e, b), "namespace entry %v", e)
151-
}); err != nil {
152-
return err
153-
}
136+
154137
// Figure out which descriptors should be deleted.
138+
//
139+
// NB: we issue deletes of existing descriptors/namespace entries before
140+
// we upsert new ones in the batch to ensure that if we need to delete and
141+
// upsert the same namespace entry but for a different table id, after the
142+
// txn, the reader will see the upsert.
155143
if err := allExistingDescs.ForEachDescriptor(func(desc catalog.Descriptor) error {
156144
// Skip descriptors that were updated above
157145
if !shouldSetupForReader(desc.GetID(), desc.GetName(), desc.GetParentID()) ||
@@ -178,6 +166,25 @@ func SetupOrAdvanceStandbyReaderCatalog(
178166
}); err != nil {
179167
return err
180168
}
169+
170+
for _, mut := range descriptorsToWrite {
171+
if err := txn.Descriptors().WriteDescToBatch(ctx, true, mut, b); err != nil {
172+
return errors.Wrapf(err, "unable to create replicated descriptor: %d %T", mut.GetID(), mut)
173+
}
174+
}
175+
if err := extracted.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
176+
if !shouldSetupForReader(e.GetID(), e.GetName(), e.GetParentID()) {
177+
return nil
178+
}
179+
// Do not upsert entries if one already exists with the same ID.
180+
entry := allExistingDescs.LookupNamespaceEntry(catalog.MakeNameInfo(e))
181+
if entry != nil && e.GetID() == entry.GetID() {
182+
return nil
183+
}
184+
return errors.Wrapf(txn.Descriptors().UpsertNamespaceEntryToBatch(ctx, true, e, b), "namespace entry %v", e)
185+
}); err != nil {
186+
return err
187+
}
181188
if err := maybeBlockSchemaChangesOnSystemDatabase(ctx, txn, b); err != nil {
182189
return errors.Wrapf(err, "blocking schema changes")
183190
}

0 commit comments

Comments
 (0)