Skip to content

Commit 2a4ea34

Browse files
committed
logical: create table handler
The table handler is a `BatchHandler` that operates on rows from a single table. Each batch is attempted twice. Once assuming the values supplied are accurate and a second time with values read from the local database. Release note: none Epic: CRDB-48647
1 parent 8898577 commit 2a4ea34

File tree

4 files changed

+669
-0
lines changed

4 files changed

+669
-0
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
srcs = [
77
"create_logical_replication_stmt.go",
88
"dead_letter_queue.go",
9+
"event_decoder.go",
910
"logical_replication_dist.go",
1011
"logical_replication_job.go",
1112
"logical_replication_writer_processor.go",
@@ -18,6 +19,7 @@ go_library(
1819
"replication_statements.go",
1920
"sql_row_reader.go",
2021
"sql_row_writer.go",
22+
"table_batch_handler.go",
2123
"tombstone_updater.go",
2224
"udf_row_processor.go",
2325
],
@@ -125,6 +127,7 @@ go_test(
125127
"replication_statements_test.go",
126128
"sql_row_reader_test.go",
127129
"sql_row_writer_test.go",
130+
"table_batch_handler_test.go",
128131
"udf_row_processor_test.go",
129132
],
130133
data = glob(["testdata/**"]) + [
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
10+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
11+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
12+
)
13+
14+
// decodedEvent is constructed from a replication stream event. The replication
15+
// stream event is encoded using the source table descriptor. The decoded must
16+
// contain datums that are compatible with the destination table descriptor.
17+
type decodedEvent struct {
18+
// dstDescID is the descriptor ID for the table in the destination cluster.
19+
dstDescID descpb.ID
20+
21+
// isDelete is true if the event is a delete. This implies values in the row
22+
// may be NULL that are not allowed to be NULL in the source table's schema.
23+
// Only the primary key columns are expected to have values.
24+
isDelete bool
25+
26+
// originTimestamp is the mvcc timestamp of the row in the source cluster.
27+
originTimestamp hlc.Timestamp
28+
29+
// row is the decoded row from the replication stream. The datums in row are
30+
// in col id order for the destination table.
31+
row tree.Datums
32+
33+
// prevRow is either the previous row from the replication stream or it is
34+
// the local version of the row if there was a read refresh.
35+
//
36+
// The datums in prevRow are in col id order for the destination table. nil
37+
// prevRow may still lose LWW if there is a recent tombstone.
38+
prevRow tree.Datums
39+
}
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/keys"
12+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
13+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
14+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
15+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
16+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
17+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
18+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
19+
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
20+
"github.com/cockroachdb/errors"
21+
)
22+
23+
// tableHandler applies batches of replication events that are destined for a
24+
// sinlgle table.
25+
type tableHandler struct {
26+
sqlReader *sqlRowReader
27+
sqlWriter *sqlRowWriter
28+
db descs.DB
29+
tombstoneUpdater *tombstoneUpdater
30+
}
31+
32+
type tableBatchStats struct {
33+
// refreshedRows are the number of rows that were re-read from the local
34+
// database.
35+
refreshedRows int64
36+
// inserts is the number of rows that were inserted.
37+
inserts int64
38+
// updates is the number of rows that were updated.
39+
updates int64
40+
// deletes is the number of rows that were deleted.
41+
deletes int64
42+
// tombstoneUpdates is the number of tombstones that were updated. This case
43+
// only occurs if the event is a replicated delete and the row did not exist
44+
// locally.
45+
tombstoneUpdates int64
46+
// refreshLwwLosers is the number of rows that were dropped as lww losers
47+
// after reading them locally.
48+
refreshLwwLosers int64
49+
// kvLwwLosers is the number of rows that were dropped as lww losers after
50+
// attempting to write to the KV layer. This case only occurs if there is a
51+
// tombstone that is more recent than the replicated action.
52+
kvLwwLosers int64
53+
}
54+
55+
func (t *tableBatchStats) Add(o tableBatchStats) {
56+
t.refreshedRows += o.refreshedRows
57+
t.inserts += o.inserts
58+
t.updates += o.updates
59+
t.deletes += o.deletes
60+
t.tombstoneUpdates += o.tombstoneUpdates
61+
t.refreshLwwLosers += o.refreshLwwLosers
62+
t.kvLwwLosers += o.kvLwwLosers
63+
}
64+
65+
// newTableHandler creates a new tableHandler for the given table descriptor ID.
66+
// It internally constructs the sqlReader and sqlWriter components.
67+
func newTableHandler(
68+
tableID descpb.ID,
69+
db descs.DB,
70+
codec keys.SQLCodec,
71+
sd *sessiondata.SessionData,
72+
leaseMgr *lease.Manager,
73+
settings *cluster.Settings,
74+
) (*tableHandler, error) {
75+
var table catalog.TableDescriptor
76+
77+
// NOTE: we don't hold a lease on the table descriptor, but validation
78+
// prevents users from changing the set of columns or the primary key of an
79+
// LDR replicated table.
80+
err := db.DescsTxn(context.Background(), func(ctx context.Context, txn descs.Txn) error {
81+
var err error
82+
table, err = txn.Descriptors().GetLeasedImmutableTableByID(ctx, txn.KV(), tableID)
83+
return err
84+
})
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
reader, err := newSQLRowReader(table)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
writer, err := newSQLRowWriter(table)
95+
if err != nil {
96+
return nil, err
97+
}
98+
99+
tombstoneUpdater := newTombstoneUpdater(codec, db.KV(), leaseMgr, tableID, sd, settings)
100+
101+
return &tableHandler{
102+
sqlReader: reader,
103+
sqlWriter: writer,
104+
db: db,
105+
tombstoneUpdater: tombstoneUpdater,
106+
}, nil
107+
}
108+
109+
func (t *tableHandler) handleDecodedBatch(
110+
ctx context.Context, batch []decodedEvent,
111+
) (tableBatchStats, error) {
112+
stats, err := t.attemptBatch(ctx, batch)
113+
if err == nil {
114+
return stats, nil
115+
}
116+
117+
refreshedBatch, refreshStats, err := t.refreshPrevRows(ctx, batch)
118+
if err != nil {
119+
return stats, err
120+
}
121+
122+
stats, err = t.attemptBatch(ctx, refreshedBatch)
123+
if err != nil {
124+
return tableBatchStats{}, err
125+
}
126+
127+
stats.Add(refreshStats)
128+
129+
return stats, nil
130+
}
131+
132+
func (t *tableHandler) attemptBatch(
133+
ctx context.Context, batch []decodedEvent,
134+
) (tableBatchStats, error) {
135+
var stats tableBatchStats
136+
err := t.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
137+
for _, event := range batch {
138+
switch {
139+
case event.isDelete && len(event.prevRow) != 0:
140+
stats.deletes++
141+
err := t.sqlWriter.DeleteRow(ctx, txn, event.originTimestamp, event.prevRow)
142+
if err != nil {
143+
return err
144+
}
145+
case event.isDelete && len(event.prevRow) == 0:
146+
stats.tombstoneUpdates++
147+
tombstoneUpdateStats, err := t.tombstoneUpdater.updateTombstone(ctx, txn, event.originTimestamp, event.row)
148+
if err != nil {
149+
return err
150+
}
151+
stats.kvLwwLosers += tombstoneUpdateStats.kvWriteTooOld
152+
case event.prevRow == nil:
153+
stats.inserts++
154+
err := t.sqlWriter.InsertRow(ctx, txn, event.originTimestamp, event.row)
155+
if isLwwLoser(err) {
156+
// Insert may observe a LWW failure if it attempts to write over a tombstone.
157+
stats.kvLwwLosers++
158+
continue
159+
}
160+
if err != nil {
161+
return err
162+
}
163+
case event.prevRow != nil:
164+
stats.updates++
165+
err := t.sqlWriter.UpdateRow(ctx, txn, event.originTimestamp, event.prevRow, event.row)
166+
if err != nil {
167+
return err
168+
}
169+
default:
170+
return errors.AssertionFailedf("unhandled event type: %v", event)
171+
}
172+
}
173+
return nil
174+
})
175+
if err != nil {
176+
return tableBatchStats{}, err
177+
}
178+
return stats, nil
179+
}
180+
181+
// refreshPrevRows refreshes the prevRow field for each event in the batch. If
182+
// any event is known to be a lww loser based on the read, its dropped from the
183+
// batch.
184+
func (t *tableHandler) refreshPrevRows(
185+
ctx context.Context, batch []decodedEvent,
186+
) ([]decodedEvent, tableBatchStats, error) {
187+
var stats tableBatchStats
188+
stats.refreshedRows = int64(len(batch))
189+
190+
rows := make([]tree.Datums, 0, len(batch))
191+
for _, event := range batch {
192+
rows = append(rows, event.row)
193+
}
194+
195+
var refreshedRows map[int]priorRow
196+
err := t.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
197+
var err error
198+
// TODO(jeffswenson): should we apply the batch in the same transaction
199+
// that we perform the read refresh? We could maybe even use locking reads.
200+
refreshedRows, err = t.sqlReader.ReadRows(ctx, txn, rows)
201+
return err
202+
})
203+
if err != nil {
204+
return nil, tableBatchStats{}, err
205+
}
206+
207+
refreshedBatch := make([]decodedEvent, 0, len(batch))
208+
for i, event := range batch {
209+
var prevRow tree.Datums
210+
if refreshed, found := refreshedRows[i]; found {
211+
if !refreshed.logicalTimestamp.Less(event.originTimestamp) {
212+
// TODO(jeffswenson): update this logic when its time to handle
213+
// ties.
214+
// Skip the row because it is a lww loser. Note: we can only identify LWW
215+
// losers during the read refresh if the row exists. If its a tombstone,
216+
// the local value may win LWW, but we have to attempt the
217+
// insert/tombstone update to find out. We even have to do this if the
218+
// replicated event is a delete because the local tombstone may have an
219+
// older logical timestamp.
220+
stats.refreshLwwLosers++
221+
continue
222+
}
223+
prevRow = refreshed.row
224+
}
225+
refreshedEvent := decodedEvent{
226+
dstDescID: event.dstDescID,
227+
isDelete: event.isDelete,
228+
row: event.row,
229+
originTimestamp: event.originTimestamp,
230+
prevRow: prevRow,
231+
}
232+
refreshedBatch = append(refreshedBatch, refreshedEvent)
233+
}
234+
235+
return refreshedBatch, stats, nil
236+
}
237+
238+
func (t *tableHandler) ReleaseLeases(ctx context.Context) {
239+
t.tombstoneUpdater.ReleaseLeases(ctx)
240+
}

0 commit comments

Comments
 (0)