Skip to content

Commit dc790f0

Browse files
craig[bot]jeffswenson
andcommitted
Merge #150257
150257: logical: coalesce updates in crud writer r=jeffswenson a=jeffswenson Previously, the crud writer decoding logic only inspected keys to determine which table they were part of. Now, the crud writer inspects keys to coalesce updates to a row. The immediate motivation for this change is read refreshes. If the crud writer batch included duplicate rows and the batch failed to apply, the read refresh would assign the current value in the DB as the previous value for each row. This caused subsequent rows in the batch to fail to apply, which caused the entire batch to restart. An additional benefit to write coalescing is it allows replication to quickly apply updates to a hot row. Since only the most recent write needs to be applied. Coalescing updates is implicitly allowed by the semantics of LDR because LDR does not guarantee in order application of updates for a single row, so any stale update can be dropped as a LWW loser. The main implication of this change is CDC. A CDC on an LDR peer will not emit every update applied to the source cluster. If the user needs a complete history in CDC they can construct the history by running CDC on the source. Updates are only coalesced within a batch. If the most recent update fails to apply, the batch will be broken up and attempted sequentially. So the last valid row will still be written. Release note: none Part of: CRDB-51531 Co-authored-by: Jeff Swenson <[email protected]>
2 parents 9c4f2b8 + 817dd7d commit dc790f0

File tree

5 files changed

+295
-35
lines changed

5 files changed

+295
-35
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ go_test(
122122
"batch_handler_test.go",
123123
"create_logical_replication_stmt_test.go",
124124
"dead_letter_queue_test.go",
125+
"event_decoder_test.go",
125126
"logical_replication_job_test.go",
126127
"lww_kv_processor_test.go",
127128
"lww_row_processor_test.go",

pkg/crosscluster/logical/event_decoder.go

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ package logical
77

88
import (
99
"context"
10+
"sort"
1011

1112
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
13+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1214
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1315
"github.com/cockroachdb/cockroach/pkg/roachpb"
1416
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -104,10 +106,76 @@ func newEventDecoder(
104106
}, nil
105107
}
106108

109+
// decodeAndCoalesceEvents returns the decoded events sorted by key and
110+
// deduplicated to a single event for each primary key.
111+
func (d *eventDecoder) decodeAndCoalesceEvents(
112+
ctx context.Context,
113+
batch []streampb.StreamEvent_KV,
114+
discard jobspb.LogicalReplicationDetails_Discard,
115+
) ([]decodedEvent, error) {
116+
// Basic idea:
117+
// 1. Sort the batch so the keys and mvcc timestamps are in ascending order. Sorting by key
118+
// ensures that all events for a given table and row are adjacent. Sorting by timestamp
119+
// ensures that if i < j then row[i] comes before row[j] in application time.
120+
// 2. For eacy row in the batch, decode the first row and use it as the previous value. We use
121+
// the earliest row as the previous value because as long as the batch is not a replay, the
122+
// previous value of the first instance of the row is expected to match the local value.
123+
// 3. For the last event for each row, decode it as the value to insert.
124+
125+
toDecode := make([]streampb.StreamEvent_KV, 0, len(batch))
126+
for _, event := range batch {
127+
// Discard deletes before sorting and coalescing updates. Its possible that
128+
// the correct previous value was attached to a deleted event. That's okay because:
129+
// 1. The previous value is only a guess at the previous value. It does not have to match
130+
// the actual local value.
131+
// 2. DELETE -> INSERT isn't expected to be a super common pattern. Trying to coalesce the previous value
132+
// and discard deletes is more complex than just discarding them.
133+
if discard == jobspb.LogicalReplicationDetails_DiscardAllDeletes && event.KeyValue.Value.RawBytes == nil {
134+
continue
135+
}
136+
toDecode = append(toDecode, event)
137+
}
138+
139+
if len(toDecode) == 0 {
140+
return nil, nil
141+
}
142+
143+
sort.Slice(toDecode, func(i, j int) bool {
144+
cmp := toDecode[i].KeyValue.Key.Compare(toDecode[j].KeyValue.Key)
145+
if cmp != 0 {
146+
return cmp < 0
147+
}
148+
return toDecode[i].KeyValue.Value.Timestamp.Less(toDecode[j].KeyValue.Value.Timestamp)
149+
})
150+
151+
var result []decodedEvent
152+
153+
first, last := toDecode[0], toDecode[0]
154+
for _, event := range toDecode[1:] {
155+
if event.KeyValue.Key.Compare(first.KeyValue.Key) != 0 {
156+
decoded, err := d.decodeEvent(ctx, first, last)
157+
if err != nil {
158+
return nil, err
159+
}
160+
result = append(result, decoded)
161+
first, last = event, event
162+
} else {
163+
last = event
164+
}
165+
}
166+
decoded, err := d.decodeEvent(ctx, first, last)
167+
if err != nil {
168+
return nil, err
169+
}
170+
result = append(result, decoded)
171+
172+
return result, nil
173+
}
174+
107175
func (d *eventDecoder) decodeEvent(
108-
ctx context.Context, event streampb.StreamEvent_KV,
176+
ctx context.Context, first streampb.StreamEvent_KV, last streampb.StreamEvent_KV,
109177
) (decodedEvent, error) {
110-
decodedRow, err := d.decoder.DecodeKV(ctx, event.KeyValue, cdcevent.CurrentRow, event.KeyValue.Value.Timestamp, false)
178+
decodedRow, err := d.decoder.DecodeKV(ctx, last.KeyValue, cdcevent.CurrentRow, last.KeyValue.Value.Timestamp, false)
111179
if err != nil {
112180
return decodedEvent{}, err
113181
}
@@ -125,10 +193,10 @@ func (d *eventDecoder) decodeEvent(
125193
d.lastRow = decodedRow
126194

127195
var prevKV roachpb.KeyValue
128-
prevKV.Key = event.KeyValue.Key
129-
prevKV.Value = event.PrevValue
196+
prevKV.Key = first.KeyValue.Key
197+
prevKV.Value = first.PrevValue
130198

131-
decodedPrevRow, err := d.decoder.DecodeKV(ctx, prevKV, cdcevent.PrevRow, event.PrevValue.Timestamp, false)
199+
decodedPrevRow, err := d.decoder.DecodeKV(ctx, prevKV, cdcevent.PrevRow, first.PrevValue.Timestamp, false)
132200
if err != nil {
133201
return decodedEvent{}, err
134202
}
@@ -142,7 +210,7 @@ func (d *eventDecoder) decodeEvent(
142210
return decodedEvent{
143211
dstDescID: dstTable.id,
144212
isDelete: decodedRow.IsDeleted(),
145-
originTimestamp: event.KeyValue.Value.Timestamp,
213+
originTimestamp: last.KeyValue.Value.Timestamp,
146214
row: row,
147215
prevRow: prevRow,
148216
}, nil
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
"math/rand"
11+
"testing"
12+
13+
"github.com/cockroachdb/cockroach/pkg/base"
14+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
15+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
16+
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
17+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
18+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
19+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
20+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
21+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
22+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
23+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
24+
"github.com/cockroachdb/cockroach/pkg/util/log"
25+
"github.com/stretchr/testify/require"
26+
)
27+
28+
func newTestDecoder(
29+
t *testing.T, s serverutils.ApplicationLayerInterface, tableName string,
30+
) (*eventDecoder, *EventBuilder) {
31+
ctx := context.Background()
32+
desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), tree.Name(tableName))
33+
34+
decoder, err := newEventDecoder(ctx, s.InternalDB().(descs.DB), s.ClusterSettings(), map[descpb.ID]sqlProcessorTableConfig{
35+
desc.GetID(): {
36+
srcDesc: desc,
37+
},
38+
})
39+
require.NoError(t, err)
40+
41+
eb := newKvEventBuilder(t, desc.TableDesc())
42+
return decoder, eb
43+
}
44+
45+
func TestEventDecoder_Deduplication(t *testing.T) {
46+
defer leaktest.AfterTest(t)()
47+
defer log.Scope(t).Close(t)
48+
49+
ctx := context.Background()
50+
srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
51+
defer srv.Stopper().Stop(ctx)
52+
s := srv.ApplicationLayer()
53+
runner := sqlutils.MakeSQLRunner(sqlDB)
54+
55+
runner.Exec(t, `
56+
CREATE TABLE test_table (
57+
id INT PRIMARY KEY,
58+
value STRING
59+
)
60+
`)
61+
62+
decoder, eb := newTestDecoder(t, s, "test_table")
63+
64+
times := []hlc.Timestamp{}
65+
for i := 0; i < 8; i++ {
66+
times = append(times, s.Clock().Now())
67+
}
68+
69+
batch := []streampb.StreamEvent_KV{
70+
eb.updateEvent(times[1],
71+
[]tree.Datum{tree.NewDInt(tree.DInt(1)), tree.NewDString("v1")},
72+
[]tree.Datum{tree.NewDInt(tree.DInt(1)), tree.NewDString("original")},
73+
),
74+
eb.updateEvent(times[2],
75+
[]tree.Datum{tree.NewDInt(tree.DInt(1)), tree.NewDString("v2")},
76+
[]tree.Datum{tree.NewDInt(tree.DInt(1)), tree.NewDString("v1")},
77+
),
78+
eb.updateEvent(times[3],
79+
[]tree.Datum{tree.NewDInt(tree.DInt(1)), tree.NewDString("v3_final")},
80+
[]tree.Datum{tree.NewDInt(tree.DInt(1)), tree.NewDString("v2")},
81+
),
82+
eb.updateEvent(times[4],
83+
[]tree.Datum{tree.NewDInt(tree.DInt(2)), tree.NewDString("single_value")},
84+
[]tree.Datum{tree.NewDInt(tree.DInt(2)), tree.NewDString("single_prev")},
85+
),
86+
eb.insertEvent(times[5],
87+
[]tree.Datum{tree.NewDInt(tree.DInt(3)), tree.NewDString("inserted")},
88+
),
89+
eb.updateEvent(times[6],
90+
[]tree.Datum{tree.NewDInt(tree.DInt(3)), tree.NewDString("updated")},
91+
[]tree.Datum{tree.NewDInt(tree.DInt(3)), tree.NewDString("inserted")},
92+
),
93+
eb.deleteEvent(times[7],
94+
[]tree.Datum{tree.NewDInt(tree.DInt(3)), tree.NewDString("updated")},
95+
),
96+
}
97+
98+
rand.Shuffle(len(batch), func(i, j int) {
99+
batch[i], batch[j] = batch[j], batch[i]
100+
})
101+
102+
events, err := decoder.decodeAndCoalesceEvents(ctx, batch, jobspb.LogicalReplicationDetails_DiscardNothing)
103+
require.NoError(t, err)
104+
105+
require.Len(t, events, 3)
106+
107+
// Row 1: Multiple updates coalesced
108+
require.False(t, events[0].isDelete)
109+
require.Equal(t, times[3], events[0].originTimestamp)
110+
require.Equal(t, tree.NewDString("v3_final"), events[0].row[1])
111+
require.Equal(t, tree.NewDString("original"), events[0].prevRow[1])
112+
113+
// Row 2: Single update unchanged
114+
require.False(t, events[1].isDelete)
115+
require.Equal(t, times[4], events[1].originTimestamp)
116+
require.Equal(t, tree.NewDString("single_value"), events[1].row[1])
117+
require.Equal(t, tree.NewDString("single_prev"), events[1].prevRow[1])
118+
119+
// Row 3: Insert -> Update -> Delete coalesced
120+
require.True(t, events[2].isDelete)
121+
require.Equal(t, times[7], events[2].originTimestamp)
122+
require.Equal(t, tree.DNull, events[2].prevRow[1])
123+
}
124+
125+
func TestEventDecoder_DeduplicationWithDiscardDelete(t *testing.T) {
126+
defer leaktest.AfterTest(t)()
127+
defer log.Scope(t).Close(t)
128+
129+
ctx := context.Background()
130+
srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
131+
defer srv.Stopper().Stop(ctx)
132+
s := srv.ApplicationLayer()
133+
runner := sqlutils.MakeSQLRunner(sqlDB)
134+
135+
runner.Exec(t, `
136+
CREATE TABLE test_table (
137+
id INT PRIMARY KEY,
138+
value STRING
139+
)
140+
`)
141+
142+
decoder, eb := newTestDecoder(t, s, "test_table")
143+
144+
times := []hlc.Timestamp{}
145+
for i := 0; i < 8; i++ {
146+
times = append(times, s.Clock().Now())
147+
}
148+
149+
batch := []streampb.StreamEvent_KV{
150+
// Single delete coalesces to nothing
151+
eb.deleteEvent(times[1],
152+
[]tree.Datum{tree.NewDInt(tree.DInt(1)), tree.NewDString("original")},
153+
),
154+
155+
// Update followed by delete coalesces to the update
156+
eb.updateEvent(times[2],
157+
[]tree.Datum{tree.NewDInt(tree.DInt(2)), tree.NewDString("v2")},
158+
[]tree.Datum{tree.NewDInt(tree.DInt(2)), tree.NewDString("v1")},
159+
),
160+
eb.deleteEvent(times[3],
161+
[]tree.Datum{tree.NewDInt(tree.DInt(2)), tree.NewDString("v2")},
162+
),
163+
164+
// Delete -> Insert -> Delete coalesces to the insert
165+
eb.deleteEvent(times[4],
166+
// NOTE: this could be used as the prev value, but since we discard rows before
167+
// coalescing, the insert's NULL prev value will be used.
168+
[]tree.Datum{tree.NewDInt(tree.DInt(3)), tree.NewDString("prev_value")},
169+
),
170+
eb.insertEvent(times[5],
171+
[]tree.Datum{tree.NewDInt(tree.DInt(3)), tree.NewDString("inserted")},
172+
),
173+
eb.deleteEvent(times[6],
174+
[]tree.Datum{tree.NewDInt(tree.DInt(3)), tree.NewDString("second_dropped_delete")},
175+
),
176+
}
177+
178+
rand.Shuffle(len(batch), func(i, j int) {
179+
batch[i], batch[j] = batch[j], batch[i]
180+
})
181+
182+
events, err := decoder.decodeAndCoalesceEvents(ctx, batch, jobspb.LogicalReplicationDetails_DiscardAllDeletes)
183+
require.NoError(t, err)
184+
185+
require.Len(t, events, 2)
186+
for _, e := range events {
187+
require.False(t, e.isDelete, "expected no deletes when discarding deletes")
188+
}
189+
190+
// Row 3: Insert -> Update -> Delete coalesced
191+
require.Equal(t, times[2], events[0].originTimestamp)
192+
require.Equal(t, tree.NewDString("v2"), events[0].row[1])
193+
require.Equal(t, tree.NewDString("v1"), events[0].prevRow[1])
194+
195+
// Row 2: Coalesces to insert
196+
require.Equal(t, times[5], events[1].originTimestamp)
197+
require.Equal(t, tree.NewDString("inserted"), events[1].row[1])
198+
require.Equal(t, tree.DNull, events[1].prevRow[1])
199+
}

pkg/crosscluster/logical/sql_crud_writer.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package logical
77

88
import (
99
"context"
10-
"sort"
1110

1211
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
1312
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
@@ -83,7 +82,7 @@ func (c *sqlCrudWriter) HandleBatch(
8382
ctx, sp := tracing.ChildSpan(ctx, "crudBatcher.HandleBatch")
8483
defer sp.Finish()
8584

86-
sortedEvents, err := c.decodeAndSortEvents(ctx, batch)
85+
sortedEvents, err := c.decoder.decodeAndCoalesceEvents(ctx, batch, c.discard)
8786
if err != nil {
8887
return batchStats{}, err
8988
}
@@ -101,29 +100,6 @@ func (c *sqlCrudWriter) HandleBatch(
101100
return combinedStats, nil
102101
}
103102

104-
// decodeAndSortEvents returns the decoded events sorted by destination
105-
// descriptor ID. This grouping allows `eventsByTable` to produce a single
106-
// group for each table.
107-
func (c *sqlCrudWriter) decodeAndSortEvents(
108-
ctx context.Context, batch []streampb.StreamEvent_KV,
109-
) ([]decodedEvent, error) {
110-
events := make([]decodedEvent, 0, len(batch))
111-
for _, event := range batch {
112-
if c.discard == jobspb.LogicalReplicationDetails_DiscardAllDeletes && len(event.KeyValue.Value.RawBytes) == 0 {
113-
continue
114-
}
115-
decoded, err := c.decoder.decodeEvent(ctx, event)
116-
if err != nil {
117-
return nil, err
118-
}
119-
events = append(events, decoded)
120-
}
121-
sort.SliceStable(events, func(i, j int) bool {
122-
return events[i].dstDescID < events[j].dstDescID
123-
})
124-
return events, nil
125-
}
126-
127103
// eventsByTable is an iterator that groups events by their destination
128104
// descriptor ID. For optimal batching input events should be sorted by
129105
// destination descriptor ID because the iterator groups runs of events with

0 commit comments

Comments
 (0)