Skip to content

Commit 57e8364

Browse files
committed
logical: implement crud sql writer
This adds the new 'crud' sql writer that is intended to replace the current sql writer and the kv writer. The crud writer will be switched to the default after performance testing demonstrates it is at least as fast as the current sql writer. Release note: none Epic: CRDB-48647
1 parent 175bbf3 commit 57e8364

14 files changed

+426
-41
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ go_library(
1717
"purgatory.go",
1818
"range_stats.go",
1919
"replication_statements.go",
20+
"sql_crud_writer.go",
2021
"sql_row_reader.go",
2122
"sql_row_writer.go",
2223
"table_batch_handler.go",

pkg/crosscluster/logical/event_decoder.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,35 @@
66
package logical
77

88
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
12+
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
13+
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
915
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
16+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
1017
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
18+
"github.com/cockroachdb/cockroach/pkg/sql/types"
1119
"github.com/cockroachdb/cockroach/pkg/util/hlc"
20+
"github.com/cockroachdb/errors"
1221
)
1322

23+
// eventDecoder takes a KV from the source cluster and decodes it into datums
24+
// that are appropriate for the destination table.
25+
type eventDecoder struct {
26+
decoder cdcevent.Decoder
27+
srcToDest map[descpb.ID]destinationTable
28+
29+
// TODO(jeffswenson): clean this interface up. There's a problem with
30+
// layering that requires the event decoder to know about the most recent
31+
// row. If a batch fails to process, its broken up into batches of size 1 and
32+
// those are retried until they succeed or are DLQ'd. The batch handler is
33+
// responsible for decoding rows, but the distsql processor is responsible
34+
// for calling the batch handler and adding rows to the DLQ.
35+
lastRow cdcevent.Row
36+
}
37+
1438
// decodedEvent is constructed from a replication stream event. The replication
1539
// stream event is encoded using the source table descriptor. The decoded must
1640
// contain datums that are compatible with the destination table descriptor.
@@ -37,3 +61,124 @@ type decodedEvent struct {
3761
// prevRow may still lose LWW if there is a recent tombstone.
3862
prevRow tree.Datums
3963
}
64+
65+
func newEventDecoder(
66+
ctx context.Context,
67+
descriptors descs.DB,
68+
settings *cluster.Settings,
69+
procConfigByDestID map[descpb.ID]sqlProcessorTableConfig,
70+
) (*eventDecoder, error) {
71+
srcToDest := make(map[descpb.ID]destinationTable, len(procConfigByDestID))
72+
err := descriptors.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
73+
for dstID, s := range procConfigByDestID {
74+
descriptor, err := txn.Descriptors().GetLeasedImmutableTableByID(ctx, txn.KV(), dstID)
75+
if err != nil {
76+
return err
77+
}
78+
79+
columns := getPhysicalColumnsSchema(descriptor)
80+
columnNames := make([]string, 0, len(columns))
81+
for _, column := range columns {
82+
columnNames = append(columnNames, column.column.GetName())
83+
}
84+
85+
srcToDest[s.srcDesc.GetID()] = destinationTable{
86+
id: dstID,
87+
columns: columnNames,
88+
}
89+
}
90+
return nil
91+
})
92+
if err != nil {
93+
return nil, err
94+
}
95+
96+
decoder, err := newCdcEventDecoder(ctx, procConfigByDestID, settings)
97+
if err != nil {
98+
return nil, err
99+
}
100+
101+
return &eventDecoder{
102+
decoder: decoder,
103+
srcToDest: srcToDest,
104+
}, nil
105+
}
106+
107+
func (d *eventDecoder) decodeEvent(
108+
ctx context.Context, event streampb.StreamEvent_KV,
109+
) (decodedEvent, error) {
110+
decodedRow, err := d.decoder.DecodeKV(ctx, event.KeyValue, cdcevent.CurrentRow, event.KeyValue.Value.Timestamp, false)
111+
if err != nil {
112+
return decodedEvent{}, err
113+
}
114+
115+
dstTable, ok := d.srcToDest[decodedRow.TableID]
116+
if !ok {
117+
return decodedEvent{}, errors.AssertionFailedf("table %d not found", decodedRow.TableID)
118+
}
119+
120+
row := make(tree.Datums, 0, len(dstTable.columns))
121+
row, err = appendDatums(row, decodedRow, dstTable.columns)
122+
if err != nil {
123+
return decodedEvent{}, err
124+
}
125+
d.lastRow = decodedRow
126+
127+
var prevKV roachpb.KeyValue
128+
prevKV.Key = event.KeyValue.Key
129+
prevKV.Value = event.PrevValue
130+
131+
decodedPrevRow, err := d.decoder.DecodeKV(ctx, prevKV, cdcevent.PrevRow, event.PrevValue.Timestamp, false)
132+
if err != nil {
133+
return decodedEvent{}, err
134+
}
135+
136+
prevRow := make(tree.Datums, 0, len(dstTable.columns))
137+
prevRow, err = appendDatums(prevRow, decodedPrevRow, dstTable.columns)
138+
if err != nil {
139+
return decodedEvent{}, err
140+
}
141+
142+
return decodedEvent{
143+
dstDescID: dstTable.id,
144+
isDelete: decodedRow.IsDeleted(),
145+
originTimestamp: event.KeyValue.Value.Timestamp,
146+
row: row,
147+
prevRow: prevRow,
148+
}, nil
149+
}
150+
151+
// appendDatums appends datums for the specified column names from the cdcevent.Row
152+
// to the datums slice and returns the updated slice.
153+
func appendDatums(datums tree.Datums, row cdcevent.Row, columnNames []string) (tree.Datums, error) {
154+
it, err := row.DatumsNamed(columnNames)
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
160+
if dEnum, ok := d.(*tree.DEnum); ok {
161+
// Override the type to Unknown to avoid a mismatched type OID error
162+
// during execution. Note that Unknown is the type used by default
163+
// when a SQL statement is executed without type hints.
164+
//
165+
// TODO(jeffswenson): this feels like the wrong place to do this,
166+
// but its inspired by the implementation in queryBuilder.AddRow.
167+
//
168+
// Really we should be mapping from the source datum type to the
169+
// destination datum type.
170+
dEnum.EnumTyp = types.Unknown
171+
}
172+
datums = append(datums, d)
173+
return nil
174+
}); err != nil {
175+
return nil, err
176+
}
177+
178+
return datums, nil
179+
}
180+
181+
type destinationTable struct {
182+
id descpb.ID
183+
columns []string
184+
}

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,13 +1403,13 @@ func TestTombstoneUpdate(t *testing.T) {
14031403

14041404
// 5. Replicate the delete from 'src-a' -> 'dst'
14051405
var jobIDSrcA jobspb.JobID
1406-
dst.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH MODE = VALIDATED, CURSOR = $2",
1406+
dst.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH CURSOR = $2",
14071407
urlSrcA, start.AsOfSystemTime()).Scan(&jobIDSrcA)
14081408
WaitUntilReplicatedTime(t, s.Clock().Now(), dst, jobIDSrcA)
14091409

14101410
// 6. Replicate the update from 'src-b' -> 'dst'
14111411
var jobIDSrcB jobspb.JobID
1412-
dst.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH MODE = VALIDATED, CURSOR = $2",
1412+
dst.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH CURSOR = $2",
14131413
urlSrcB, start.AsOfSystemTime()).Scan(&jobIDSrcB)
14141414
WaitUntilReplicatedTime(t, s.Clock().Now(), dst, jobIDSrcB)
14151415

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,16 +104,20 @@ const (
104104
// is deprecated because it does not support the full set of features of the
105105
// SQL writer.
106106
writerTypeLegacyKV writerType = "legacy-kv"
107+
108+
// writerTypeCRUD is the shiny new sql writer that uses explicit reads,
109+
// inserts, updates, and deletes instead of upserts.
110+
writerTypeCRUD writerType = "crud"
107111
)
108112

109113
var immediateModeWriter = settings.RegisterStringSetting(
110114
settings.ApplicationLevel,
111115
"logical_replication.consumer.immediate_mode_writer",
112116
"the writer to use when in immediate mode",
113-
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(writerTypeSQL), string(writerTypeLegacyKV)),
117+
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(writerTypeSQL), string(writerTypeLegacyKV), string(writerTypeCRUD)),
114118
settings.WithValidateString(func(sv *settings.Values, val string) error {
115-
if val != string(writerTypeSQL) && val != string(writerTypeLegacyKV) {
116-
return errors.Newf("immediate mode writer must be either 'sql' or 'legacy-kv', got '%s'", val)
119+
if val != string(writerTypeSQL) && val != string(writerTypeLegacyKV) && val != string(writerTypeCRUD) {
120+
return errors.Newf("immediate mode writer must be either 'sql', 'legacy-kv', or 'crud', got '%s'", val)
117121
}
118122
return nil
119123
}),
@@ -475,7 +479,11 @@ func (lrw *logicalReplicationWriterProcessor) close() {
475479
}
476480

477481
for _, b := range lrw.bh {
478-
b.Close(lrw.Ctx())
482+
// The batch handler may be nil if the context was cancelled during
483+
// initialization.
484+
if b != nil {
485+
b.Close(lrw.Ctx())
486+
}
479487
}
480488

481489
// Update the global retry queue gauges to reflect that this queue is going
@@ -750,6 +758,11 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
750758
if err != nil {
751759
return err
752760
}
761+
case writerTypeCRUD:
762+
rp, err = newCrudSqlWriter(ctx, flowCtx.Cfg, flowCtx.EvalCtx, sd, lrw.spec.Discard, lrw.configByTable, jobspb.JobID(lrw.spec.JobID))
763+
if err != nil {
764+
return err
765+
}
753766
default:
754767
return errors.AssertionFailedf("unknown logical replication writer type: %s", lrw.spec.WriterType)
755768
}
@@ -1120,7 +1133,6 @@ func (lrw *logicalReplicationWriterProcessor) shouldRetryLater(
11201133
return tooOld
11211134
}
11221135

1123-
// TODO(dt): maybe this should only be constraint violation errors?
11241136
return retryAllowed
11251137
}
11261138

pkg/crosscluster/logical/lww_kv_processor.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1717
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1920
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2021
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2122
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
@@ -60,12 +61,37 @@ func newKVRowProcessor(
6061
spec execinfrapb.LogicalReplicationWriterSpec,
6162
procConfigByDestID map[descpb.ID]sqlProcessorTableConfig,
6263
) (*kvRowProcessor, error) {
63-
cdcEventTargets := changefeedbase.Targets{}
64-
srcTablesBySrcID := make(map[descpb.ID]catalog.TableDescriptor, len(procConfigByDestID))
6564
dstBySrc := make(map[descpb.ID]descpb.ID, len(procConfigByDestID))
6665

6766
for dstID, s := range procConfigByDestID {
6867
dstBySrc[s.srcDesc.GetID()] = dstID
68+
}
69+
70+
decoder, err := newCdcEventDecoder(ctx, procConfigByDestID, evalCtx.Settings)
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
p := &kvRowProcessor{
76+
cfg: cfg,
77+
spec: spec,
78+
evalCtx: evalCtx,
79+
dstBySrc: dstBySrc,
80+
writers: make(map[descpb.ID]*kvTableWriter, len(procConfigByDestID)),
81+
decoder: decoder,
82+
}
83+
return p, nil
84+
}
85+
86+
func newCdcEventDecoder(
87+
ctx context.Context,
88+
procConfigByDestID map[descpb.ID]sqlProcessorTableConfig,
89+
settings *cluster.Settings,
90+
) (cdcevent.Decoder, error) {
91+
cdcEventTargets := changefeedbase.Targets{}
92+
srcTablesBySrcID := make(map[descpb.ID]catalog.TableDescriptor, len(procConfigByDestID))
93+
94+
for _, s := range procConfigByDestID {
6995
srcTablesBySrcID[s.srcDesc.GetID()] = s.srcDesc
7096
cdcEventTargets.Add(changefeedbase.Target{
7197
Type: jobspb.ChangefeedTargetSpecification_EACH_FAMILY,
@@ -76,21 +102,13 @@ func newKVRowProcessor(
76102

77103
prefixlessCodec := keys.SystemSQLCodec
78104
rfCache, err := cdcevent.NewFixedRowFetcherCache(
79-
ctx, prefixlessCodec, evalCtx.Settings, cdcEventTargets, srcTablesBySrcID,
105+
ctx, prefixlessCodec, settings, cdcEventTargets, srcTablesBySrcID,
80106
)
81107
if err != nil {
82108
return nil, err
83109
}
84110

85-
p := &kvRowProcessor{
86-
cfg: cfg,
87-
spec: spec,
88-
evalCtx: evalCtx,
89-
dstBySrc: dstBySrc,
90-
writers: make(map[descpb.ID]*kvTableWriter, len(procConfigByDestID)),
91-
decoder: cdcevent.NewEventDecoderWithCache(ctx, rfCache, false, false),
92-
}
93-
return p, nil
111+
return cdcevent.NewEventDecoderWithCache(ctx, rfCache, false, false), nil
94112
}
95113

96114
var originID1Options = &kvpb.WriteOptions{OriginID: 1}

pkg/crosscluster/logical/replication_statements.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ func newBulkSelectStatement(
394394
}
395395

396396
func toParsedStatement(stmt tree.Statement) (statements.Statement[tree.Statement], error) {
397-
// TODO(jeffswenson): do I have to round trip through the string or can I
398-
// safely construct the statement directly?
399-
return parser.ParseOne(stmt.String())
397+
// User Serialize instead of String to ensure the type casts use fully
398+
// qualified names.
399+
return parser.ParseOne(tree.Serialize(stmt))
400400
}

0 commit comments

Comments
 (0)