Skip to content

Commit c55c03d

Browse files
craig[bot]fqazi
andcommitted
Merge #145972
145972: catalog/replication: fix validation errors when updating catalog r=fqazi a=fqazi Previously, updating PCR reader catalog descriptors involved processing and flushing one at a time. This worked unless a subsequent descriptor depended on the current one. However, it could fail when, for example, removing a column dependency (i.e., ALTER COLUMN .. SET DEFAULT NULL). To address this, this patch first generates the mutated descriptors and then flushes them into the collections. Fixes: #145971 Release note (bug fix): addressed a bug where the PCR reader catalog job could hit validation errors, when schema object dependencies between them (for example a sequence default expression was being removed). Co-authored-by: Faizan Qazi <[email protected]>
2 parents 7f3df80 + f8d43a6 commit c55c03d

File tree

3 files changed

+35
-5
lines changed

3 files changed

+35
-5
lines changed

pkg/sql/catalog/descs/collection.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,9 @@ func (tc *Collection) WriteDescToBatch(
334334
}
335335
desc.MaybeIncrementVersion()
336336
// Replicated PCR descriptors cannot be modified unless the collection
337-
// is setup for updating them.
338-
if !tc.readerCatalogSetup && desc.GetReplicatedPCRVersion() != 0 {
337+
// is setup for updating them. If write validation is disabled then, allow
338+
// PCR reader catalog descriptors to be modified, otherwise repair is impossible.
339+
if !tc.readerCatalogSetup && desc.GetReplicatedPCRVersion() != 0 && !tc.skipValidationOnWrite {
339340
return pgerror.Newf(pgcode.ReadOnlySQLTransaction,
340341
"replicated %s %s (%d) cannot be mutated",
341342
desc.GetObjectTypeString(),

pkg/sql/catalog/replication/reader_catalog.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ func SetupOrAdvanceStandbyReaderCatalog(
6060
// Resolve any existing descriptors within the tenant, which
6161
// will be use to compute old values for writing.
6262
b := txn.KV().NewBatch()
63+
// Batch the write updates, since we mutate descriptors one at a time
64+
// below.
65+
descriptorsToWrite := make([]catalog.MutableDescriptor, 0, len(allExistingDescs.OrderedDescriptorIDs()))
6366
if err := extracted.ForEachDescriptor(func(fromDesc catalog.Descriptor) error {
6467
if !shouldSetupForReader(fromDesc.GetID(), fromDesc.GetParentID()) {
6568
return nil
@@ -71,7 +74,7 @@ func SetupOrAdvanceStandbyReaderCatalog(
7174
existingDesc, err := txn.Descriptors().MutableByID(txn.KV()).Desc(ctx, fromDesc.GetID())
7275
if err != nil &&
7376
!errors.Is(err, catalog.ErrDescriptorNotFound) {
74-
return err
77+
return errors.Wrapf(err, "failed to get mutable descriptor")
7578
} else {
7679
err = nil
7780
}
@@ -118,11 +121,22 @@ func SetupOrAdvanceStandbyReaderCatalog(
118121
descriptorsRenamed.Add(existingDesc.GetID())
119122
}
120123
}
121-
return errors.Wrapf(txn.Descriptors().WriteDescToBatch(ctx, true, mut, b),
122-
"unable to create replicated descriptor: %d %T", mut.GetID(), mut)
124+
descriptorsToWrite = append(descriptorsToWrite, mut)
125+
return nil
123126
}); err != nil {
124127
return err
125128
}
129+
// Write all the descriptors into a single batch, we previously would write
130+
// them in the loop above. But that interferes with mutable descriptor
131+
// validation, which will include uncommitted descriptors. For example,
132+
// if a table and sequence depend on each other, then updating one and
133+
// fetching the other in a mutable way to remove a dependency will hit
134+
// a validation error.
135+
for _, mut := range descriptorsToWrite {
136+
if err := txn.Descriptors().WriteDescToBatch(ctx, true, mut, b); err != nil {
137+
return errors.Wrapf(err, "unable to create replicated descriptor: %d %T", mut.GetID(), mut)
138+
}
139+
}
126140
if err := extracted.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
127141
if !shouldSetupForReader(e.GetID(), e.GetParentID()) {
128142
return nil

pkg/sql/catalog/replication/reader_catalog_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,8 @@ INSERT INTO t3(n) VALUES (3);
276276
// Manipulate the schema first.
277277
ddlToExec = []string{
278278
"ALTER TABLE t1 ADD COLUMN j int default 32",
279+
"CREATE SEQUENCE sq2;",
280+
"ALTER TABLE t1 ALTER COLUMN n SET default nextval('sq2')",
279281
"INSERT INTO t1(val, j) VALUES('open', 1);",
280282
"INSERT INTO t1(val, j) VALUES('closed', 2);",
281283
"INSERT INTO t1(val, j) VALUES('inactive', 3);",
@@ -301,6 +303,19 @@ INSERT INTO t3(n) VALUES (3);
301303
r.destRunner.ExpectErr(t, "schema changes are not allowed on a reader catalog", "ALTER SEQUENCE sq1 RENAME TO sq4")
302304
r.destRunner.ExpectErr(t, "schema changes are not allowed on a reader catalog", "ALTER TYPE status ADD VALUE 'newval' ")
303305

306+
// As a final test intentionally drop dependencies between descriptors. If we
307+
// don't batch descriptor updates this will cause a validation error, since
308+
// the sequence and table depend on each other.
309+
ddlToExec = []string{
310+
"ALTER TABLE t1 ALTER COLUMN n SET default NULL",
311+
}
312+
for _, ddl := range ddlToExec {
313+
r.srcRunner.Exec(t, ddl)
314+
}
315+
// Confirm that everything matches at the old timestamp.
316+
require.NoError(t, r.advanceTS(ctx, r.ts.Clock().Now(), true))
317+
r.compareEqual(t, "SELECT * FROM v1 ORDER BY 1")
318+
r.compareEqual(t, "SELECT * FROM t2 ORDER BY j")
304319
}
305320

306321
// TestReaderCatalogTSAdvance tests repeated advances of timestamp

0 commit comments

Comments
 (0)