@@ -16,13 +16,15 @@ import (
1616 "time"
1717
1818 "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
19+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
1920 "github.com/cockroachdb/cockroach/pkg/crosscluster"
2021 "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
2122 "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2223 "github.com/cockroachdb/cockroach/pkg/keys"
2324 "github.com/cockroachdb/cockroach/pkg/repstream/streampb"
2425 "github.com/cockroachdb/cockroach/pkg/roachpb"
2526 "github.com/cockroachdb/cockroach/pkg/settings"
27+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
2628 "github.com/cockroachdb/cockroach/pkg/sql"
2729 "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2830 "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
@@ -93,6 +95,30 @@ var maxChunkSize = settings.RegisterIntSetting(
9395 settings .NonNegativeInt ,
9496)
9597
98+ type writerType string
99+
100+ const (
101+ // writerTypeSQL uses the SQL layer to write replicated rows.
102+ writerTypeSQL writerType = "sql"
103+ // writerTypeLegacyKV uses the legacy KV layer to write rows. The KV writer
104+ // is deprecated because it does not support the full set of features of the
105+ // SQL writer.
106+ writerTypeLegacyKV writerType = "legacy-kv"
107+ )
108+
109+ var immediateModeWriter = settings .RegisterStringSetting (
110+ settings .ApplicationLevel ,
111+ "logical_replication.consumer.immediate_mode_writer" ,
112+ "the writer to use when in immediate mode" ,
113+ metamorphic .ConstantWithTestChoice ("logical_replication.consumer.immediate_mode_writer" , string (writerTypeSQL ), string (writerTypeLegacyKV )),
114+ settings .WithValidateString (func (sv * settings.Values , val string ) error {
115+ if val != string (writerTypeSQL ) && val != string (writerTypeLegacyKV ) {
116+ return errors .Newf ("immediate mode writer must be either 'sql' or 'legacy-kv', got '%s'" , val )
117+ }
118+ return nil
119+ }),
120+ )
121+
96122// logicalReplicationWriterProcessor consumes a cross-cluster replication stream
97123// by decoding kvs in it to logical changes and applying them by executing DMLs.
98124type logicalReplicationWriterProcessor struct {
@@ -106,8 +132,6 @@ type logicalReplicationWriterProcessor struct {
106132
107133 configByTable map [descpb.ID ]sqlProcessorTableConfig
108134
109- getBatchSize func () int
110-
111135 streamPartitionClient streamclient.Client
112136
113137 // frontier keeps track of the progress for the spans tracked by this processor
@@ -211,40 +235,9 @@ func newLogicalReplicationWriterProcessor(
211235 }
212236
213237 lrw := & logicalReplicationWriterProcessor {
214- configByTable : procConfigByDestTableID ,
215- spec : spec ,
216- processorID : processorID ,
217- getBatchSize : func () int {
218- // TODO(ssd): We set this to 1 since putting more than 1
219- // row in a KV batch using the new ConditionalPut-based
220- // conflict resolution would require more complex error
221- // handling and tracking that we haven't implemented
222- // yet.
223- if spec .Mode == jobspb .LogicalReplicationDetails_Immediate {
224- return 1
225- }
226- // We want to decide whether to use implicit txns or not based on
227- // the schema of the dest table. Benchmarking has shown that
228- // implicit txns are beneficial on tables with no secondary indexes
229- // whereas explicit txns are beneficial when at least one secondary
230- // index is present.
231- //
232- // Unfortunately, if we have multiple replication pairs, we don't
233- // know which tables will be affected by this batch before deciding
234- // on the batch size, so we'll use a heuristic such that we'll use
235- // the implicit txns if at least half of the dest tables are
236- // without the secondary indexes. If we only have a single
237- // replication pair, then this heuristic gives us the precise
238- // recommendation.
239- //
240- // (Here we have access to the descriptor of the source table, but
241- // for now we assume that the source and the dest descriptors are
242- // similar.)
243- if 2 * numTablesWithSecondaryIndexes < len (procConfigByDestTableID ) && useImplicitTxns .Get (& flowCtx .Cfg .Settings .SV ) {
244- return 1
245- }
246- return int (flushBatchSize .Get (& flowCtx .Cfg .Settings .SV ))
247- },
238+ configByTable : procConfigByDestTableID ,
239+ spec : spec ,
240+ processorID : processorID ,
248241 frontier : frontier ,
249242 stopCh : make (chan struct {}),
250243 checkpointCh : make (chan []jobspb.ResolvedSpan ),
@@ -725,14 +718,12 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
725718 var err error
726719 sd := sql .NewInternalSessionData (ctx , flowCtx .Cfg .Settings , "" /* opName */ )
727720
728- if lrw .spec .Mode == jobspb .LogicalReplicationDetails_Immediate {
729- evalCtx := flowCtx .NewEvalCtx ()
730- evalCtx .SessionDataStack .Push (sd )
731- rp , err = newKVRowProcessor (ctx , flowCtx .Cfg , evalCtx , lrw .spec , lrw .configByTable )
732- if err != nil {
733- return err
734- }
735- } else {
721+ writer , err := getWriterType (ctx , lrw .spec .Mode , flowCtx .Cfg .Settings )
722+ if err != nil {
723+ return err
724+ }
725+ switch writer {
726+ case writerTypeSQL :
736727 rp , err = makeSQLProcessor (
737728 ctx , flowCtx .Cfg .Settings , lrw .configByTable ,
738729 jobspb .JobID (lrw .spec .JobID ),
@@ -747,6 +738,13 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
747738 if err != nil {
748739 return err
749740 }
741+ case writerTypeLegacyKV :
742+ rp , err = newKVRowProcessor (ctx , flowCtx .Cfg , flowCtx .EvalCtx , lrw .spec , lrw .configByTable )
743+ if err != nil {
744+ return err
745+ }
746+ default :
747+ return errors .AssertionFailedf ("unknown logical replication writer type: %s" , writer )
750748 }
751749
752750 if streamingKnobs , ok := flowCtx .TestingKnobs ().StreamingTestingKnobs .(* sql.StreamingTestingKnobs ); ok {
@@ -760,6 +758,24 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
760758 return nil
761759}
762760
761+ func getWriterType (
762+ ctx context.Context , mode jobspb.LogicalReplicationDetails_ApplyMode , settings * cluster.Settings ,
763+ ) (writerType , error ) {
764+ switch mode {
765+ case jobspb .LogicalReplicationDetails_Immediate :
766+ // Require v25.2 to use the sql writer by default to ensure that the
767+ // KV origin timestamp validation is available on all nodes.
768+ if settings .Version .IsActive (ctx , clusterversion .V25_2 ) {
769+ return writerType (immediateModeWriter .Get (& settings .SV )), nil
770+ }
771+ return writerTypeSQL , nil
772+ case jobspb .LogicalReplicationDetails_Validated :
773+ return writerTypeSQL , nil
774+ default :
775+ return "" , errors .Newf ("unknown logical replication writer type: %s" , mode )
776+ }
777+ }
778+
763779// flushBuffer processes some or all of the events in the passed buffer, and
764780// zeros out each event in the passed buffer for which it successfully completed
765781// processing either by applying it or by sending it to a DLQ. If mustProcess is
@@ -987,7 +1003,7 @@ func (t replicationMutationType) String() string {
9871003func (lrw * logicalReplicationWriterProcessor ) flushChunk (
9881004 ctx context.Context , bh BatchHandler , chunk []streampb.StreamEvent_KV , canRetry retryEligibility ,
9891005) (flushStats , error ) {
990- batchSize := lrw . getBatchSize ()
1006+ batchSize := bh . BatchSize ()
9911007
9921008 lrw .debug .RecordChunkStart ()
9931009 defer lrw .debug .RecordChunkComplete ()
@@ -1213,20 +1229,14 @@ type BatchHandler interface {
12131229 // or are not applied as a group. If the batch is a single KV it may use an
12141230 // implicit txn.
12151231 HandleBatch (context.Context , []streampb.StreamEvent_KV ) (batchStats , error )
1232+ BatchSize () int
12161233 GetLastRow () cdcevent.Row
12171234 SetSyntheticFailurePercent (uint32 )
12181235 ReportMutations (* stats.Refresher )
12191236 ReleaseLeases (context.Context )
12201237 Close (context.Context )
12211238}
12221239
1223- var useImplicitTxns = settings .RegisterBoolSetting (
1224- settings .ApplicationLevel ,
1225- "logical_replication.consumer.use_implicit_txns.enabled" ,
1226- "determines whether the consumer processes each row in a separate implicit txn" ,
1227- metamorphic .ConstantWithTestBool ("logical_replication.consumer.use_implicit_txns.enabled" , true ),
1228- )
1229-
12301240func init () {
12311241 rowexec .NewLogicalReplicationWriterProcessor = newLogicalReplicationWriterProcessor
12321242}
0 commit comments