Skip to content

Commit 5f658ad

Browse files
craig[bot]littlestar642msbutler
committed
144049: sql/crdb_internal: rename alter_statements to fk_statements r=rafiss a=littlestar642 Fixes: #143762 Update name of `alter_statements` column to `fk_statements` when executing below sql Previous ``` SELECT alter_statements FROM crdb_internal.create_statements; ``` Current ``` SELECT fk_statements FROM crdb_internal.create_statements; ``` 144508: sql: support partial index schema changes in LDR tables using the SQL writer r=msbutler a=msbutler If logical_replication.consumer.immediate_mode_writer is set to legacy-kv, we disallow schema changes related to partial indexes on table with LDR jobs running on them. Note that if a user runs LDR in the undocumented validated mode, which always uses the sql writer, but has this setting equal 'legacy-kv', partial index schema changes will still fail. This is a papercuit i'm willing to deal with. Epic: none Release note (ops change): partial index schema changes are supported in replicating tables when logical_replication.consumer.immediate_mode_writer is not set to legacy-kv Co-authored-by: avinash jaiswal <[email protected]> Co-authored-by: Michael Butler <[email protected]>
3 parents 0aae430 + 7c43cb2 + c7cc4d9 commit 5f658ad

31 files changed

+102
-78
lines changed

pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ function signature category details schema oid
190190
query ITTITTTTTTTTTBBBB colnames
191191
SELECT * FROM crdb_internal.create_statements WHERE database_name = ''
192192
----
193-
database_id database_name schema_name descriptor_id descriptor_type descriptor_name create_statement state create_nofks rls_statements alter_statements validate_statements create_redactable has_partitions is_multi_region is_virtual is_temporary
193+
database_id database_name schema_name descriptor_id descriptor_type descriptor_name create_statement state create_nofks rls_statements fk_statements validate_statements create_redactable has_partitions is_multi_region is_virtual is_temporary
194194

195195
query ITITTBTB colnames
196196
SELECT * FROM crdb_internal.table_columns WHERE descriptor_name = ''

pkg/cli/zip_table_registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ var zipInternalTablesPerCluster = DebugZipTableRegistry{
307307
"is_virtual",
308308
"is_temporary",
309309
"crdb_internal.hide_sql_constants(create_statement) as create_statement",
310-
"crdb_internal.hide_sql_constants(alter_statements) as alter_statements",
310+
"crdb_internal.hide_sql_constants(fk_statements) as fk_statements",
311311
"crdb_internal.hide_sql_constants(create_nofks) as create_nofks",
312312
"crdb_internal.redact(create_redactable) as create_redactable",
313313
},

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ go_library(
8686
"//pkg/sql/sem/tree/treecmp",
8787
"//pkg/sql/sessiondata",
8888
"//pkg/sql/sessiondatapb",
89+
"//pkg/sql/sqlclustersettings",
8990
"//pkg/sql/stats",
9091
"//pkg/sql/syntheticprivilege",
9192
"//pkg/sql/types",
@@ -178,6 +179,7 @@ go_test(
178179
"//pkg/sql/sem/idxtype",
179180
"//pkg/sql/sem/tree",
180181
"//pkg/sql/sessiondata",
182+
"//pkg/sql/sqlclustersettings",
181183
"//pkg/sql/sqltestutils",
182184
"//pkg/sql/stats",
183185
"//pkg/testutils",

pkg/crosscluster/logical/create_logical_replication_stmt.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
4141
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
4242
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
43+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
4344
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
4445
"github.com/cockroachdb/cockroach/pkg/sql/types"
4546
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
@@ -489,7 +490,7 @@ func doLDRPlan(
489490
}
490491
}
491492

492-
err := tabledesc.CheckLogicalReplicationCompatibility(&srcExternalCatalog.Tables[i], destTableDesc.TableDesc(), details.SkipSchemaCheck || details.CreateTable, writer == writerTypeLegacyKV)
493+
err := tabledesc.CheckLogicalReplicationCompatibility(&srcExternalCatalog.Tables[i], destTableDesc.TableDesc(), details.SkipSchemaCheck || details.CreateTable, writer == sqlclustersettings.LDRWriterTypeLegacyKV)
493494
if err != nil {
494495
return err
495496
}

pkg/crosscluster/logical/logical_replication_dist.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/sql"
1818
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1919
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
20+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
2021
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2122
"github.com/cockroachdb/cockroach/pkg/util/log"
2223
)
@@ -36,7 +37,7 @@ func constructLogicalReplicationWriterSpecs(
3637
discard jobspb.LogicalReplicationDetails_Discard,
3738
mode jobspb.LogicalReplicationDetails_ApplyMode,
3839
metricsLabel string,
39-
writer writerType,
40+
writer sqlclustersettings.LDRWriterType,
4041
) (map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, error) {
4142
spanGroup := roachpb.SpanGroup{}
4243
baseSpec := execinfrapb.LogicalReplicationWriterSpec{

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
3535
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
3636
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
37+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
3738
"github.com/cockroachdb/cockroach/pkg/util/bulk"
3839
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
3940
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -539,7 +540,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
539540
return errors.Wrapf(err, "failed to look up schema descriptor for table %d", pair.DstDescriptorID)
540541
}
541542

542-
if err := tabledesc.CheckLogicalReplicationCompatibility(&srcTableDesc, dstTableDesc.TableDesc(), payload.SkipSchemaCheck || payload.CreateTable, writer == writerTypeLegacyKV); err != nil {
543+
if err := tabledesc.CheckLogicalReplicationCompatibility(&srcTableDesc, dstTableDesc.TableDesc(), payload.SkipSchemaCheck || payload.CreateTable, writer == sqlclustersettings.LDRWriterTypeLegacyKV); err != nil {
543544
return err
544545
}
545546

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
4545
"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
4646
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
47+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
4748
"github.com/cockroachdb/cockroach/pkg/sql/stats"
4849
"github.com/cockroachdb/cockroach/pkg/testutils"
4950
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
@@ -2179,7 +2180,6 @@ func TestLogicalReplicationSchemaChanges(t *testing.T) {
21792180
// adding unsuppored indexes disallowed
21802181
{"add virtual column index", "CREATE INDEX virtual_col_idx ON tab(virtual_col)", false},
21812182
{"add hash index", "CREATE INDEX hash_idx ON tab(pk) USING HASH WITH (bucket_count = 4)", false},
2182-
{"add partial index", "CREATE INDEX partial_idx ON tab(composite_col) WHERE pk > 0", false},
21832183
{"add unique index", "CREATE UNIQUE INDEX unique_idx ON tab(composite_col)", false},
21842184

21852185
// Drop table is blocked
@@ -2420,9 +2420,17 @@ func TestPartialIndexes(t *testing.T) {
24202420
dbA.CheckQueryResultsRetry(t, "SELECT * FROM foo WHERE pi = 2", [][]string{{"1", "2", "hello"}})
24212421
dbA.CheckQueryResultsRetry(t, "SELECT * FROM foo WHERE pi = -2", [][]string{{"-1", "-2", "world"}})
24222422

2423+
// Check that dbA and dbB can add an additional partial index, enabled when using the sql writer.
2424+
dbA.Exec(t, "CREATE INDEX idx2 ON a.foo (pi) WHERE pk = 0")
2425+
dbB.Exec(t, "CREATE INDEX idx2 ON b.foo (pi) WHERE pk = 0")
2426+
24232427
// Check for partial indexes when using legacy kv writer.
24242428
dbA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'legacy-kv'")
24252429
dbA.ExpectErr(t, "cannot create logical replication stream: table foo has a partial index idx", "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo", dbBURL.String())
2430+
2431+
// With the kv writer set, we can no longer create partial indexes on the replicating tables.
2432+
dbA.ExpectErr(t, " this schema change is disallowed on table foo because it is referenced by one or more logical replication jobs", "CREATE INDEX idx3 ON a.foo (pi) WHERE pk = 0")
2433+
dbB.ExpectErr(t, " this schema change is disallowed on table foo because it is referenced by one or more logical replication jobs", "CREATE INDEX idx3 ON b.foo (pi) WHERE pk = 0")
24262434
}
24272435

24282436
// TestLogicalReplicationCreationChecks verifies that we check that the table
@@ -2666,7 +2674,7 @@ func TestGetWriterType(t *testing.T) {
26662674
st := cluster.MakeTestingClusterSettings()
26672675
wt, err := getWriterType(ctx, jobspb.LogicalReplicationDetails_Validated, st)
26682676
require.NoError(t, err)
2669-
require.Equal(t, writerTypeSQL, wt)
2677+
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
26702678
})
26712679

26722680
t.Run("immediate-mode-pre-25.2", func(t *testing.T) {
@@ -2677,7 +2685,7 @@ func TestGetWriterType(t *testing.T) {
26772685
)
26782686
wt, err := getWriterType(ctx, jobspb.LogicalReplicationDetails_Immediate, st)
26792687
require.NoError(t, err)
2680-
require.Equal(t, writerTypeSQL, wt)
2688+
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
26812689
})
26822690

26832691
t.Run("immediate-mode-post-25.2", func(t *testing.T) {
@@ -2686,14 +2694,14 @@ func TestGetWriterType(t *testing.T) {
26862694
clusterversion.PreviousRelease.Version(),
26872695
true, /* initializeVersion */
26882696
)
2689-
immediateModeWriter.Override(ctx, &st.SV, string(writerTypeSQL))
2697+
sqlclustersettings.LDRImmediateModeWriter.Override(ctx, &st.SV, string(sqlclustersettings.LDRWriterTypeSQL))
26902698
wt, err := getWriterType(ctx, jobspb.LogicalReplicationDetails_Immediate, st)
26912699
require.NoError(t, err)
2692-
require.Equal(t, writerTypeSQL, wt)
2700+
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
26932701

2694-
immediateModeWriter.Override(ctx, &st.SV, string(writerTypeLegacyKV))
2702+
sqlclustersettings.LDRImmediateModeWriter.Override(ctx, &st.SV, string(sqlclustersettings.LDRWriterTypeSQL))
26952703
wt, err = getWriterType(ctx, jobspb.LogicalReplicationDetails_Immediate, st)
26962704
require.NoError(t, err)
2697-
require.Equal(t, writerTypeLegacyKV, wt)
2705+
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
26982706
})
26992707
}

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ import (
3737
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
3838
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
3939
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
40+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
4041
"github.com/cockroachdb/cockroach/pkg/sql/stats"
4142
"github.com/cockroachdb/cockroach/pkg/sql/types"
4243
"github.com/cockroachdb/cockroach/pkg/util/bulk"
4344
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
4445
"github.com/cockroachdb/cockroach/pkg/util/log"
4546
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
46-
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
4747
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
4848
"github.com/cockroachdb/cockroach/pkg/util/span"
4949
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -95,34 +95,6 @@ var maxChunkSize = settings.RegisterIntSetting(
9595
settings.NonNegativeInt,
9696
)
9797

98-
type writerType string
99-
100-
const (
101-
// writerTypeSQL uses the SQL layer to write replicated rows.
102-
writerTypeSQL writerType = "sql"
103-
// writerTypeLegacyKV uses the legacy KV layer to write rows. The KV writer
104-
// is deprecated because it does not support the full set of features of the
105-
// SQL writer.
106-
writerTypeLegacyKV writerType = "legacy-kv"
107-
108-
// writerTypeCRUD is the shiny new sql writer that uses explicit reads,
109-
// inserts, updates, and deletes instead of upserts.
110-
writerTypeCRUD writerType = "crud"
111-
)
112-
113-
var immediateModeWriter = settings.RegisterStringSetting(
114-
settings.ApplicationLevel,
115-
"logical_replication.consumer.immediate_mode_writer",
116-
"the writer to use when in immediate mode",
117-
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(writerTypeSQL), string(writerTypeLegacyKV), string(writerTypeCRUD)),
118-
settings.WithValidateString(func(sv *settings.Values, val string) error {
119-
if val != string(writerTypeSQL) && val != string(writerTypeLegacyKV) && val != string(writerTypeCRUD) {
120-
return errors.Newf("immediate mode writer must be either 'sql', 'legacy-kv', or 'crud', got '%s'", val)
121-
}
122-
return nil
123-
}),
124-
)
125-
12698
// logicalReplicationWriterProcessor consumes a cross-cluster replication stream
12799
// by decoding kvs in it to logical changes and applying them by executing DMLs.
128100
type logicalReplicationWriterProcessor struct {
@@ -719,7 +691,7 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
719691
b.Close(lrw.Ctx())
720692
}
721693

722-
writer := writerType(lrw.spec.WriterType)
694+
writer := sqlclustersettings.LDRWriterType(lrw.spec.WriterType)
723695
if writer == "" && !lrw.FlowCtx.Cfg.Settings.Version.IsActive(ctx, clusterversion.V25_2) {
724696
var err error
725697
writer, err = getWriterType(
@@ -738,7 +710,7 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
738710
sd := sql.NewInternalSessionData(ctx, flowCtx.Cfg.Settings, "" /* opName */)
739711

740712
switch writer {
741-
case writerTypeSQL:
713+
case sqlclustersettings.LDRWriterTypeSQL:
742714
rp, err = makeSQLProcessor(
743715
ctx, flowCtx.Cfg.Settings, lrw.configByTable,
744716
jobspb.JobID(lrw.spec.JobID),
@@ -753,12 +725,12 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
753725
if err != nil {
754726
return err
755727
}
756-
case writerTypeLegacyKV:
728+
case sqlclustersettings.LDRWriterTypeLegacyKV:
757729
rp, err = newKVRowProcessor(ctx, flowCtx.Cfg, flowCtx.EvalCtx, lrw.spec, lrw.configByTable)
758730
if err != nil {
759731
return err
760732
}
761-
case writerTypeCRUD:
733+
case sqlclustersettings.LDRWriterTypeCRUD:
762734
rp, err = newCrudSqlWriter(ctx, flowCtx.Cfg, flowCtx.EvalCtx, sd, lrw.spec.Discard, lrw.configByTable, jobspb.JobID(lrw.spec.JobID))
763735
if err != nil {
764736
return err
@@ -780,17 +752,17 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
780752

781753
func getWriterType(
782754
ctx context.Context, mode jobspb.LogicalReplicationDetails_ApplyMode, settings *cluster.Settings,
783-
) (writerType, error) {
755+
) (sqlclustersettings.LDRWriterType, error) {
784756
switch mode {
785757
case jobspb.LogicalReplicationDetails_Immediate:
786758
// Require v25.2 to use the sql writer by default to ensure that the
787759
// KV origin timestamp validation is available on all nodes.
788760
if settings.Version.IsActive(ctx, clusterversion.V25_2) {
789-
return writerType(immediateModeWriter.Get(&settings.SV)), nil
761+
return sqlclustersettings.LDRWriterType(sqlclustersettings.LDRImmediateModeWriter.Get(&settings.SV)), nil
790762
}
791-
return writerTypeSQL, nil
763+
return sqlclustersettings.LDRWriterTypeSQL, nil
792764
case jobspb.LogicalReplicationDetails_Validated:
793-
return writerTypeSQL, nil
765+
return sqlclustersettings.LDRWriterTypeSQL, nil
794766
default:
795767
return "", errors.Newf("unknown logical replication writer type: %s", mode)
796768
}

pkg/sql/alter_index.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (p *planner) AlterIndex(ctx context.Context, n *tree.AlterIndex) (planNode,
4646
}
4747

4848
// Disallow schema changes if this table's schema is locked.
49-
if err := checkSchemaChangeIsAllowed(tableDesc, n); err != nil {
49+
if err := checkSchemaChangeIsAllowed(tableDesc, n, p.ExecCfg().Settings); err != nil {
5050
return nil, err
5151
}
5252

pkg/sql/alter_index_visible.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (p *planner) AlterIndexVisible(
6666
}
6767

6868
// Disallow schema changes if this table's schema is locked.
69-
if err := checkSchemaChangeIsAllowed(tableDesc, n); err != nil {
69+
if err := checkSchemaChangeIsAllowed(tableDesc, n, p.ExecCfg().Settings); err != nil {
7070
return nil, err
7171
}
7272

0 commit comments

Comments
 (0)