Skip to content

Commit c7cc4d9

Browse files
committed
sql: support partial index schema changes in LDR tables using the SQL writer
If logical_replication.consumer.immediate_mode_writer is set to legacy-kv, we disallow schema changes related to partial indexes on table with LDR jobs running on them. Note that if a user runs LDR in the undocumented validated mode, which always uses the sql writer, and but has this setting equal 'legacy-kv', partial index schema changes will still fail. This is a papercuit i'm willing to deal with. Epic: none Release note (ops change): partial index schema changes are supported in replicating tables when logical_replication.consumer.immediate_mode_writer is not set to legacy-kv
1 parent 23acfaa commit c7cc4d9

File tree

16 files changed

+37
-20
lines changed

16 files changed

+37
-20
lines changed

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2180,7 +2180,6 @@ func TestLogicalReplicationSchemaChanges(t *testing.T) {
21802180
// adding unsuppored indexes disallowed
21812181
{"add virtual column index", "CREATE INDEX virtual_col_idx ON tab(virtual_col)", false},
21822182
{"add hash index", "CREATE INDEX hash_idx ON tab(pk) USING HASH WITH (bucket_count = 4)", false},
2183-
{"add partial index", "CREATE INDEX partial_idx ON tab(composite_col) WHERE pk > 0", false},
21842183
{"add unique index", "CREATE UNIQUE INDEX unique_idx ON tab(composite_col)", false},
21852184

21862185
// Drop table is blocked
@@ -2421,9 +2420,17 @@ func TestPartialIndexes(t *testing.T) {
24212420
dbA.CheckQueryResultsRetry(t, "SELECT * FROM foo WHERE pi = 2", [][]string{{"1", "2", "hello"}})
24222421
dbA.CheckQueryResultsRetry(t, "SELECT * FROM foo WHERE pi = -2", [][]string{{"-1", "-2", "world"}})
24232422

2423+
// Check that dbA and dbB can add an additional partial index, enabled when using the sql writer.
2424+
dbA.Exec(t, "CREATE INDEX idx2 ON a.foo (pi) WHERE pk = 0")
2425+
dbB.Exec(t, "CREATE INDEX idx2 ON b.foo (pi) WHERE pk = 0")
2426+
24242427
// Check for partial indexes when using legacy kv writer.
24252428
dbA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'legacy-kv'")
24262429
dbA.ExpectErr(t, "cannot create logical replication stream: table foo has a partial index idx", "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo", dbBURL.String())
2430+
2431+
// With the kv writer set, we can no longer create partial indexes on the replicating tables.
2432+
dbA.ExpectErr(t, " this schema change is disallowed on table foo because it is referenced by one or more logical replication jobs", "CREATE INDEX idx3 ON a.foo (pi) WHERE pk = 0")
2433+
dbB.ExpectErr(t, " this schema change is disallowed on table foo because it is referenced by one or more logical replication jobs", "CREATE INDEX idx3 ON b.foo (pi) WHERE pk = 0")
24272434
}
24282435

24292436
// TestLogicalReplicationCreationChecks verifies that we check that the table

pkg/sql/alter_index.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (p *planner) AlterIndex(ctx context.Context, n *tree.AlterIndex) (planNode,
4646
}
4747

4848
// Disallow schema changes if this table's schema is locked.
49-
if err := checkSchemaChangeIsAllowed(tableDesc, n); err != nil {
49+
if err := checkSchemaChangeIsAllowed(tableDesc, n, p.ExecCfg().Settings); err != nil {
5050
return nil, err
5151
}
5252

pkg/sql/alter_index_visible.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (p *planner) AlterIndexVisible(
6666
}
6767

6868
// Disallow schema changes if this table's schema is locked.
69-
if err := checkSchemaChangeIsAllowed(tableDesc, n); err != nil {
69+
if err := checkSchemaChangeIsAllowed(tableDesc, n, p.ExecCfg().Settings); err != nil {
7070
return nil, err
7171
}
7272

pkg/sql/alter_table.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/jobs"
1818
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1919
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
20+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2021
"github.com/cockroachdb/cockroach/pkg/sql/auditlogging/auditevents"
2122
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2223
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
@@ -37,6 +38,7 @@ import (
3738
"github.com/cockroachdb/cockroach/pkg/sql/sem/semenumpb"
3839
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3940
"github.com/cockroachdb/cockroach/pkg/sql/sem/volatility"
41+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
4042
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
4143
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
4244
"github.com/cockroachdb/cockroach/pkg/sql/stats"
@@ -101,7 +103,7 @@ func (p *planner) AlterTable(ctx context.Context, n *tree.AlterTable) (planNode,
101103

102104
// Disallow schema changes if this table's schema is locked, unless it is to
103105
// set/reset the "schema_locked" storage parameter.
104-
if err = checkSchemaChangeIsAllowed(tableDesc, n); err != nil {
106+
if err = checkSchemaChangeIsAllowed(tableDesc, n, p.ExecCfg().Settings); err != nil {
105107
return nil, err
106108
}
107109

@@ -460,7 +462,7 @@ func (n *alterTableNode) startExec(params runParams) error {
460462
for _, updated := range affected {
461463
// Disallow schema change if the FK references a table whose schema is
462464
// locked.
463-
if err := checkSchemaChangeIsAllowed(updated, n.n); err != nil {
465+
if err := checkSchemaChangeIsAllowed(updated, n.n, params.ExecCfg().Settings); err != nil {
464466
return err
465467
}
466468
if err := params.p.writeSchemaChange(
@@ -2344,7 +2346,9 @@ func (p *planner) tryRemoveFKBackReferences(
23442346
// not modifying the value of schema_locked.
23452347
// - The table is referenced by logical data replication jobs, and the statement
23462348
// is not in the allow list of LDR schema changes.
2347-
func checkSchemaChangeIsAllowed(desc catalog.TableDescriptor, n tree.Statement) (ret error) {
2349+
func checkSchemaChangeIsAllowed(
2350+
desc catalog.TableDescriptor, n tree.Statement, settings *cluster.Settings,
2351+
) (ret error) {
23482352
if desc == nil {
23492353
return nil
23502354
}
@@ -2358,9 +2362,9 @@ func checkSchemaChangeIsAllowed(desc catalog.TableDescriptor, n tree.Statement)
23582362
virtualColNames = append(virtualColNames, col.GetName())
23592363
}
23602364
}
2361-
if !tree.IsAllowedLDRSchemaChange(n, virtualColNames) {
2365+
kvWriterEnabled := sqlclustersettings.LDRWriterType(sqlclustersettings.LDRImmediateModeWriter.Get(&settings.SV))
2366+
if !tree.IsAllowedLDRSchemaChange(n, virtualColNames, kvWriterEnabled == sqlclustersettings.LDRWriterTypeLegacyKV) {
23622367
return sqlerrors.NewDisallowedSchemaChangeOnLDRTableErr(desc.GetName(), desc.TableDesc().LDRJobIDs)
2363-
23642368
}
23652369
}
23662370
return nil

pkg/sql/alter_table_locality.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (p *planner) AlterTableLocality(
8181
}
8282

8383
// Disallow schema changes if this table's schema is locked.
84-
if err := checkSchemaChangeIsAllowed(tableDesc, n); err != nil {
84+
if err := checkSchemaChangeIsAllowed(tableDesc, n, p.ExecCfg().Settings); err != nil {
8585
return nil, err
8686
}
8787

pkg/sql/alter_table_set_schema.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (p *planner) AlterTableSetSchema(
8585
}
8686

8787
// Disallow schema changes if this table's schema is locked.
88-
if err := checkSchemaChangeIsAllowed(tableDesc, n); err != nil {
88+
if err := checkSchemaChangeIsAllowed(tableDesc, n, p.ExecCfg().Settings); err != nil {
8989
return nil, err
9090
}
9191

pkg/sql/create_index.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (p *planner) CreateIndex(ctx context.Context, n *tree.CreateIndex) (planNod
9797
}
9898

9999
// Disallow schema changes if this table's schema is locked.
100-
if err := checkSchemaChangeIsAllowed(tableDesc, n); err != nil {
100+
if err := checkSchemaChangeIsAllowed(tableDesc, n, p.ExecCfg().Settings); err != nil {
101101
return nil, err
102102
}
103103

pkg/sql/drop_index.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (p *planner) DropIndex(ctx context.Context, n *tree.DropIndex) (planNode, e
6666
}
6767

6868
// Disallow schema changes if this table's schema is locked.
69-
if err = checkSchemaChangeIsAllowed(tableDesc, n); err != nil {
69+
if err = checkSchemaChangeIsAllowed(tableDesc, n, p.ExecCfg().Settings); err != nil {
7070
return nil, err
7171
}
7272

pkg/sql/drop_table.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (p *planner) DropTable(ctx context.Context, n *tree.DropTable) (planNode, e
7373
for _, toDel := range td {
7474
droppedDesc := toDel.desc
7575
// Disallow the DROP if this table's schema is locked.
76-
if err := checkSchemaChangeIsAllowed(droppedDesc, n); err != nil {
76+
if err := checkSchemaChangeIsAllowed(droppedDesc, n, p.ExecCfg().Settings); err != nil {
7777
return nil, err
7878
}
7979
for _, fk := range droppedDesc.InboundForeignKeys() {

pkg/sql/rename_column.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (p *planner) RenameColumn(ctx context.Context, n *tree.RenameColumn) (planN
5353
}
5454

5555
// Disallow schema changes if this table's schema is locked.
56-
if err := checkSchemaChangeIsAllowed(tableDesc, n); err != nil {
56+
if err := checkSchemaChangeIsAllowed(tableDesc, n, p.ExecCfg().Settings); err != nil {
5757
return nil, err
5858
}
5959

0 commit comments

Comments
 (0)