Skip to content

Commit c931ab6

Browse files
craig[bot]msbutleraerfrei
committed
153790: catalog/replication: delete existing descriptors before updates r=fqazi a=msbutler Fixes #153788 Release note: none 153792: changefeedccl: disable roachtest on IBM due to missing Kafka support r=rickystewart,log-head a=aerfrei Marks the 10 minute tpcc-100 test as disabled on IBM platforms, since kafka is not supported on s390x. Fixes: #153759 Release note: None Co-authored-by: Michael Butler <[email protected]> Co-authored-by: Aerin Freilich <[email protected]>
3 parents 44921a5 + 733c90d + ad7f59b commit c931ab6

File tree

2 files changed

+32
-24
lines changed

2 files changed

+32
-24
lines changed

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3031,12 +3031,13 @@ func registerCDC(r registry.Registry) {
30313031
Run: runCDCMultipleSchemaChanges,
30323032
})
30333033
r.Add(registry.TestSpec{
3034-
Name: "cdc/tpcc-100/10min/sink=kafka/envelope=enriched",
3035-
Owner: registry.OwnerCDC,
3036-
Benchmark: true,
3037-
Cluster: r.MakeClusterSpec(4, spec.WorkloadNode(), spec.CPU(16)),
3038-
Leases: registry.MetamorphicLeases,
3039-
CompatibleClouds: registry.AllClouds,
3034+
Name: "cdc/tpcc-100/10min/sink=kafka/envelope=enriched",
3035+
Owner: registry.OwnerCDC,
3036+
Benchmark: true,
3037+
Cluster: r.MakeClusterSpec(4, spec.WorkloadNode(), spec.CPU(16)),
3038+
Leases: registry.MetamorphicLeases,
3039+
// Disabled on IBM due to lack of Kafka support on s390x.
3040+
CompatibleClouds: registry.AllClouds.NoIBM(),
30403041
Suites: registry.Suites(registry.Nightly),
30413042
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
30423043
ct := newCDCTester(ctx, t, c)

pkg/sql/catalog/replication/reader_catalog.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -133,25 +133,13 @@ func SetupOrAdvanceStandbyReaderCatalog(
133133
// if a table and sequence depend on each other, then updating one and
134134
// fetching the other in a mutable way to remove a dependency will hit
135135
// a validation error.
136-
for _, mut := range descriptorsToWrite {
137-
if err := txn.Descriptors().WriteDescToBatch(ctx, true, mut, b); err != nil {
138-
return errors.Wrapf(err, "unable to create replicated descriptor: %d %T", mut.GetID(), mut)
139-
}
140-
}
141-
if err := extracted.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
142-
if !shouldSetupForReader(e.GetID(), e.GetName(), e.GetParentID()) {
143-
return nil
144-
}
145-
// Do not upsert entries if one already exists.
146-
entry := allExistingDescs.LookupNamespaceEntry(catalog.MakeNameInfo(e))
147-
if entry != nil && e.GetID() == entry.GetID() {
148-
return nil
149-
}
150-
return errors.Wrapf(txn.Descriptors().UpsertNamespaceEntryToBatch(ctx, true, e, b), "namespace entry %v", e)
151-
}); err != nil {
152-
return err
153-
}
136+
154137
// Figure out which descriptors should be deleted.
138+
//
139+
// NB: we issue deletes of existing descriptors/namespace entries before
140+
// we upsert new ones in the batch to ensure that if we need to delete and
141+
// upsert the same namespace entry but for a different table id, after the
142+
// txn, the reader will see the upsert.
155143
if err := allExistingDescs.ForEachDescriptor(func(desc catalog.Descriptor) error {
156144
// Skip descriptors that were updated above
157145
if !shouldSetupForReader(desc.GetID(), desc.GetName(), desc.GetParentID()) ||
@@ -178,6 +166,25 @@ func SetupOrAdvanceStandbyReaderCatalog(
178166
}); err != nil {
179167
return err
180168
}
169+
170+
for _, mut := range descriptorsToWrite {
171+
if err := txn.Descriptors().WriteDescToBatch(ctx, true, mut, b); err != nil {
172+
return errors.Wrapf(err, "unable to create replicated descriptor: %d %T", mut.GetID(), mut)
173+
}
174+
}
175+
if err := extracted.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
176+
if !shouldSetupForReader(e.GetID(), e.GetName(), e.GetParentID()) {
177+
return nil
178+
}
179+
// Do not upsert entries if one already exists with the same ID.
180+
entry := allExistingDescs.LookupNamespaceEntry(catalog.MakeNameInfo(e))
181+
if entry != nil && e.GetID() == entry.GetID() {
182+
return nil
183+
}
184+
return errors.Wrapf(txn.Descriptors().UpsertNamespaceEntryToBatch(ctx, true, e, b), "namespace entry %v", e)
185+
}); err != nil {
186+
return err
187+
}
181188
if err := maybeBlockSchemaChangesOnSystemDatabase(ctx, txn, b); err != nil {
182189
return errors.Wrapf(err, "blocking schema changes")
183190
}

0 commit comments

Comments
 (0)