Skip to content

Commit 23acfaa

Browse files
committed
sqlclustersettings: move LDR writer mode to sqlclustersettings package
Currently, a subset of schema changes are allowed on all LDR jobs, but some of these allowed schema changes need to be disallowed when logical_replication.consumer.immediate_mode_writer is set to `legacy-kv'. To that end, we need to move this cluster setting the sqlclustersettings package so that the schema change planner can cleanly check this setting. Epic: none Release note: none
1 parent c157bb6 commit 23acfaa

File tree

8 files changed

+53
-46
lines changed

8 files changed

+53
-46
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ go_library(
8686
"//pkg/sql/sem/tree/treecmp",
8787
"//pkg/sql/sessiondata",
8888
"//pkg/sql/sessiondatapb",
89+
"//pkg/sql/sqlclustersettings",
8990
"//pkg/sql/stats",
9091
"//pkg/sql/syntheticprivilege",
9192
"//pkg/sql/types",
@@ -178,6 +179,7 @@ go_test(
178179
"//pkg/sql/sem/idxtype",
179180
"//pkg/sql/sem/tree",
180181
"//pkg/sql/sessiondata",
182+
"//pkg/sql/sqlclustersettings",
181183
"//pkg/sql/sqltestutils",
182184
"//pkg/sql/stats",
183185
"//pkg/testutils",

pkg/crosscluster/logical/create_logical_replication_stmt.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
4141
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
4242
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
43+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
4344
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
4445
"github.com/cockroachdb/cockroach/pkg/sql/types"
4546
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
@@ -489,7 +490,7 @@ func doLDRPlan(
489490
}
490491
}
491492

492-
err := tabledesc.CheckLogicalReplicationCompatibility(&srcExternalCatalog.Tables[i], destTableDesc.TableDesc(), details.SkipSchemaCheck || details.CreateTable, writer == writerTypeLegacyKV)
493+
err := tabledesc.CheckLogicalReplicationCompatibility(&srcExternalCatalog.Tables[i], destTableDesc.TableDesc(), details.SkipSchemaCheck || details.CreateTable, writer == sqlclustersettings.LDRWriterTypeLegacyKV)
493494
if err != nil {
494495
return err
495496
}

pkg/crosscluster/logical/logical_replication_dist.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/sql"
1818
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1919
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
20+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
2021
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2122
"github.com/cockroachdb/cockroach/pkg/util/log"
2223
)
@@ -36,7 +37,7 @@ func constructLogicalReplicationWriterSpecs(
3637
discard jobspb.LogicalReplicationDetails_Discard,
3738
mode jobspb.LogicalReplicationDetails_ApplyMode,
3839
metricsLabel string,
39-
writer writerType,
40+
writer sqlclustersettings.LDRWriterType,
4041
) (map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, error) {
4142
spanGroup := roachpb.SpanGroup{}
4243
baseSpec := execinfrapb.LogicalReplicationWriterSpec{

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
3535
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
3636
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
37+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
3738
"github.com/cockroachdb/cockroach/pkg/util/bulk"
3839
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
3940
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -539,7 +540,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
539540
return errors.Wrapf(err, "failed to look up schema descriptor for table %d", pair.DstDescriptorID)
540541
}
541542

542-
if err := tabledesc.CheckLogicalReplicationCompatibility(&srcTableDesc, dstTableDesc.TableDesc(), payload.SkipSchemaCheck || payload.CreateTable, writer == writerTypeLegacyKV); err != nil {
543+
if err := tabledesc.CheckLogicalReplicationCompatibility(&srcTableDesc, dstTableDesc.TableDesc(), payload.SkipSchemaCheck || payload.CreateTable, writer == sqlclustersettings.LDRWriterTypeLegacyKV); err != nil {
543544
return err
544545
}
545546

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
4545
"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
4646
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
47+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
4748
"github.com/cockroachdb/cockroach/pkg/sql/stats"
4849
"github.com/cockroachdb/cockroach/pkg/testutils"
4950
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
@@ -2666,7 +2667,7 @@ func TestGetWriterType(t *testing.T) {
26662667
st := cluster.MakeTestingClusterSettings()
26672668
wt, err := getWriterType(ctx, jobspb.LogicalReplicationDetails_Validated, st)
26682669
require.NoError(t, err)
2669-
require.Equal(t, writerTypeSQL, wt)
2670+
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
26702671
})
26712672

26722673
t.Run("immediate-mode-pre-25.2", func(t *testing.T) {
@@ -2677,7 +2678,7 @@ func TestGetWriterType(t *testing.T) {
26772678
)
26782679
wt, err := getWriterType(ctx, jobspb.LogicalReplicationDetails_Immediate, st)
26792680
require.NoError(t, err)
2680-
require.Equal(t, writerTypeSQL, wt)
2681+
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
26812682
})
26822683

26832684
t.Run("immediate-mode-post-25.2", func(t *testing.T) {
@@ -2686,14 +2687,14 @@ func TestGetWriterType(t *testing.T) {
26862687
clusterversion.PreviousRelease.Version(),
26872688
true, /* initializeVersion */
26882689
)
2689-
immediateModeWriter.Override(ctx, &st.SV, string(writerTypeSQL))
2690+
sqlclustersettings.LDRImmediateModeWriter.Override(ctx, &st.SV, string(sqlclustersettings.LDRWriterTypeSQL))
26902691
wt, err := getWriterType(ctx, jobspb.LogicalReplicationDetails_Immediate, st)
26912692
require.NoError(t, err)
2692-
require.Equal(t, writerTypeSQL, wt)
2693+
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
26932694

2694-
immediateModeWriter.Override(ctx, &st.SV, string(writerTypeLegacyKV))
2695+
sqlclustersettings.LDRImmediateModeWriter.Override(ctx, &st.SV, string(sqlclustersettings.LDRWriterTypeSQL))
26952696
wt, err = getWriterType(ctx, jobspb.LogicalReplicationDetails_Immediate, st)
26962697
require.NoError(t, err)
2697-
require.Equal(t, writerTypeLegacyKV, wt)
2698+
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
26982699
})
26992700
}

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ import (
3737
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
3838
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
3939
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
40+
"github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings"
4041
"github.com/cockroachdb/cockroach/pkg/sql/stats"
4142
"github.com/cockroachdb/cockroach/pkg/sql/types"
4243
"github.com/cockroachdb/cockroach/pkg/util/bulk"
4344
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
4445
"github.com/cockroachdb/cockroach/pkg/util/log"
4546
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
46-
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
4747
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
4848
"github.com/cockroachdb/cockroach/pkg/util/span"
4949
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -95,34 +95,6 @@ var maxChunkSize = settings.RegisterIntSetting(
9595
settings.NonNegativeInt,
9696
)
9797

98-
type writerType string
99-
100-
const (
101-
// writerTypeSQL uses the SQL layer to write replicated rows.
102-
writerTypeSQL writerType = "sql"
103-
// writerTypeLegacyKV uses the legacy KV layer to write rows. The KV writer
104-
// is deprecated because it does not support the full set of features of the
105-
// SQL writer.
106-
writerTypeLegacyKV writerType = "legacy-kv"
107-
108-
// writerTypeCRUD is the shiny new sql writer that uses explicit reads,
109-
// inserts, updates, and deletes instead of upserts.
110-
writerTypeCRUD writerType = "crud"
111-
)
112-
113-
var immediateModeWriter = settings.RegisterStringSetting(
114-
settings.ApplicationLevel,
115-
"logical_replication.consumer.immediate_mode_writer",
116-
"the writer to use when in immediate mode",
117-
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(writerTypeSQL), string(writerTypeLegacyKV), string(writerTypeCRUD)),
118-
settings.WithValidateString(func(sv *settings.Values, val string) error {
119-
if val != string(writerTypeSQL) && val != string(writerTypeLegacyKV) && val != string(writerTypeCRUD) {
120-
return errors.Newf("immediate mode writer must be either 'sql', 'legacy-kv', or 'crud', got '%s'", val)
121-
}
122-
return nil
123-
}),
124-
)
125-
12698
// logicalReplicationWriterProcessor consumes a cross-cluster replication stream
12799
// by decoding kvs in it to logical changes and applying them by executing DMLs.
128100
type logicalReplicationWriterProcessor struct {
@@ -719,7 +691,7 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
719691
b.Close(lrw.Ctx())
720692
}
721693

722-
writer := writerType(lrw.spec.WriterType)
694+
writer := sqlclustersettings.LDRWriterType(lrw.spec.WriterType)
723695
if writer == "" && !lrw.FlowCtx.Cfg.Settings.Version.IsActive(ctx, clusterversion.V25_2) {
724696
var err error
725697
writer, err = getWriterType(
@@ -738,7 +710,7 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
738710
sd := sql.NewInternalSessionData(ctx, flowCtx.Cfg.Settings, "" /* opName */)
739711

740712
switch writer {
741-
case writerTypeSQL:
713+
case sqlclustersettings.LDRWriterTypeSQL:
742714
rp, err = makeSQLProcessor(
743715
ctx, flowCtx.Cfg.Settings, lrw.configByTable,
744716
jobspb.JobID(lrw.spec.JobID),
@@ -753,12 +725,12 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
753725
if err != nil {
754726
return err
755727
}
756-
case writerTypeLegacyKV:
728+
case sqlclustersettings.LDRWriterTypeLegacyKV:
757729
rp, err = newKVRowProcessor(ctx, flowCtx.Cfg, flowCtx.EvalCtx, lrw.spec, lrw.configByTable)
758730
if err != nil {
759731
return err
760732
}
761-
case writerTypeCRUD:
733+
case sqlclustersettings.LDRWriterTypeCRUD:
762734
rp, err = newCrudSqlWriter(ctx, flowCtx.Cfg, flowCtx.EvalCtx, sd, lrw.spec.Discard, lrw.configByTable, jobspb.JobID(lrw.spec.JobID))
763735
if err != nil {
764736
return err
@@ -780,17 +752,17 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
780752

781753
func getWriterType(
782754
ctx context.Context, mode jobspb.LogicalReplicationDetails_ApplyMode, settings *cluster.Settings,
783-
) (writerType, error) {
755+
) (sqlclustersettings.LDRWriterType, error) {
784756
switch mode {
785757
case jobspb.LogicalReplicationDetails_Immediate:
786758
// Require v25.2 to use the sql writer by default to ensure that the
787759
// KV origin timestamp validation is available on all nodes.
788760
if settings.Version.IsActive(ctx, clusterversion.V25_2) {
789-
return writerType(immediateModeWriter.Get(&settings.SV)), nil
761+
return sqlclustersettings.LDRWriterType(sqlclustersettings.LDRImmediateModeWriter.Get(&settings.SV)), nil
790762
}
791-
return writerTypeSQL, nil
763+
return sqlclustersettings.LDRWriterTypeSQL, nil
792764
case jobspb.LogicalReplicationDetails_Validated:
793-
return writerTypeSQL, nil
765+
return sqlclustersettings.LDRWriterTypeSQL, nil
794766
default:
795767
return "", errors.Newf("unknown logical replication writer type: %s", mode)
796768
}

pkg/sql/sqlclustersettings/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"//pkg/keys",
1010
"//pkg/settings",
1111
"//pkg/settings/cluster",
12+
"//pkg/util/metamorphic",
1213
"@com_github_cockroachdb_errors//:errors",
1314
],
1415
)

pkg/sql/sqlclustersettings/clustersettings.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/cockroachdb/cockroach/pkg/keys"
1010
"github.com/cockroachdb/cockroach/pkg/settings"
1111
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
12+
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
1213
"github.com/cockroachdb/errors"
1314
)
1415

@@ -109,3 +110,30 @@ var CachedSequencesCacheSizeSetting = settings.RegisterIntSetting(
109110
256,
110111
settings.PositiveInt,
111112
)
113+
114+
type LDRWriterType string
115+
116+
const (
117+
// LDRWriterTypeSQL uses the SQL layer to write replicated rows.
118+
LDRWriterTypeSQL LDRWriterType = "sql"
119+
// LDRWriterTypeLegacyKV uses the legacy KV layer to write rows. The KV writer
120+
// is deprecated because it does not support the full set of features of the
121+
// SQL writer.
122+
LDRWriterTypeLegacyKV LDRWriterType = "legacy-kv"
123+
// writerTypeCRUD is the shiny new sql writer that uses explicit reads,
124+
// inserts, updates, and deletes instead of upserts.
125+
LDRWriterTypeCRUD LDRWriterType = "crud"
126+
)
127+
128+
var LDRImmediateModeWriter = settings.RegisterStringSetting(
129+
settings.ApplicationLevel,
130+
"logical_replication.consumer.immediate_mode_writer",
131+
"the writer to use when in immediate mode",
132+
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(LDRWriterTypeSQL), string(LDRWriterTypeLegacyKV), string(LDRWriterTypeCRUD)),
133+
settings.WithValidateString(func(sv *settings.Values, val string) error {
134+
if val != string(LDRWriterTypeSQL) && val != string(LDRWriterTypeLegacyKV) && val != string(LDRWriterTypeCRUD) {
135+
return errors.Newf("immediate mode writer must be either 'sql', 'legacy-kv', or 'crud', got '%s'", val)
136+
}
137+
return nil
138+
}),
139+
)

0 commit comments

Comments
 (0)