Skip to content

Commit 74c7286

Browse files
craig[bot]aerfreijeffswenson
committed
142851: changefeedccl: add retry to TestChangefeedFailOnTableOffline r=asg0451,KeithCh a=aerfrei Previously, this TestChangefeedFailOnTableOffline test could fail if the import attempt failed with "result is ambiguous". This error should be retried. This should reduce this source of flakiness for that test. Fixes: #142033 Release note: None 144010: logical: create table handler r=jeffswenson a=jeffswenson The table handler is a `BatchHandler` that operates on rows from a single table. Each batch is attempted twice. Once assuming the values supplied are accurate and a second time with values read from the local database. Release note: none Epic: [CRDB-48647](https://cockroachlabs.atlassian.net/browse/CRDB-48647) Co-authored-by: Aerin Freilich <[email protected]> Co-authored-by: Jeff Swenson <[email protected]>
3 parents 0fcd031 + cf80108 + 2a4ea34 commit 74c7286

File tree

8 files changed

+713
-1
lines changed

8 files changed

+713
-1
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5195,7 +5195,7 @@ func TestChangefeedFailOnTableOffline(t *testing.T) {
51955195
// Start an import job which will immediately pause after ingestion
51965196
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';")
51975197
go func() {
5198-
sqlDB.ExpectErrWithTimeout(t, `pause point`, `IMPORT INTO for_import CSV DATA ($1);`, dataSrv.URL)
5198+
sqlDB.ExpectErrWithRetry(t, `pause point`, `IMPORT INTO for_import CSV DATA ($1);`, `result is ambiguous`, dataSrv.URL)
51995199
}()
52005200
sqlDB.CheckQueryResultsRetry(
52015201
t,

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
srcs = [
77
"create_logical_replication_stmt.go",
88
"dead_letter_queue.go",
9+
"event_decoder.go",
910
"logical_replication_dist.go",
1011
"logical_replication_job.go",
1112
"logical_replication_writer_processor.go",
@@ -18,6 +19,7 @@ go_library(
1819
"replication_statements.go",
1920
"sql_row_reader.go",
2021
"sql_row_writer.go",
22+
"table_batch_handler.go",
2123
"tombstone_updater.go",
2224
"udf_row_processor.go",
2325
],
@@ -125,6 +127,7 @@ go_test(
125127
"replication_statements_test.go",
126128
"sql_row_reader_test.go",
127129
"sql_row_writer_test.go",
130+
"table_batch_handler_test.go",
128131
"udf_row_processor_test.go",
129132
],
130133
data = glob(["testdata/**"]) + [
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
10+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
11+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
12+
)
13+
14+
// decodedEvent is constructed from a replication stream event. The replication
15+
// stream event is encoded using the source table descriptor. The decoded must
16+
// contain datums that are compatible with the destination table descriptor.
17+
type decodedEvent struct {
18+
// dstDescID is the descriptor ID for the table in the destination cluster.
19+
dstDescID descpb.ID
20+
21+
// isDelete is true if the event is a delete. This implies values in the row
22+
// may be NULL that are not allowed to be NULL in the source table's schema.
23+
// Only the primary key columns are expected to have values.
24+
isDelete bool
25+
26+
// originTimestamp is the mvcc timestamp of the row in the source cluster.
27+
originTimestamp hlc.Timestamp
28+
29+
// row is the decoded row from the replication stream. The datums in row are
30+
// in col id order for the destination table.
31+
row tree.Datums
32+
33+
// prevRow is either the previous row from the replication stream or it is
34+
// the local version of the row if there was a read refresh.
35+
//
36+
// The datums in prevRow are in col id order for the destination table. nil
37+
// prevRow may still lose LWW if there is a recent tombstone.
38+
prevRow tree.Datums
39+
}
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/keys"
12+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
13+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
14+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
15+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
16+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
17+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
18+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
19+
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
20+
"github.com/cockroachdb/errors"
21+
)
22+
23+
// tableHandler applies batches of replication events that are destined for a
24+
// sinlgle table.
25+
type tableHandler struct {
26+
sqlReader *sqlRowReader
27+
sqlWriter *sqlRowWriter
28+
db descs.DB
29+
tombstoneUpdater *tombstoneUpdater
30+
}
31+
32+
type tableBatchStats struct {
33+
// refreshedRows are the number of rows that were re-read from the local
34+
// database.
35+
refreshedRows int64
36+
// inserts is the number of rows that were inserted.
37+
inserts int64
38+
// updates is the number of rows that were updated.
39+
updates int64
40+
// deletes is the number of rows that were deleted.
41+
deletes int64
42+
// tombstoneUpdates is the number of tombstones that were updated. This case
43+
// only occurs if the event is a replicated delete and the row did not exist
44+
// locally.
45+
tombstoneUpdates int64
46+
// refreshLwwLosers is the number of rows that were dropped as lww losers
47+
// after reading them locally.
48+
refreshLwwLosers int64
49+
// kvLwwLosers is the number of rows that were dropped as lww losers after
50+
// attempting to write to the KV layer. This case only occurs if there is a
51+
// tombstone that is more recent than the replicated action.
52+
kvLwwLosers int64
53+
}
54+
55+
func (t *tableBatchStats) Add(o tableBatchStats) {
56+
t.refreshedRows += o.refreshedRows
57+
t.inserts += o.inserts
58+
t.updates += o.updates
59+
t.deletes += o.deletes
60+
t.tombstoneUpdates += o.tombstoneUpdates
61+
t.refreshLwwLosers += o.refreshLwwLosers
62+
t.kvLwwLosers += o.kvLwwLosers
63+
}
64+
65+
// newTableHandler creates a new tableHandler for the given table descriptor ID.
66+
// It internally constructs the sqlReader and sqlWriter components.
67+
func newTableHandler(
68+
tableID descpb.ID,
69+
db descs.DB,
70+
codec keys.SQLCodec,
71+
sd *sessiondata.SessionData,
72+
leaseMgr *lease.Manager,
73+
settings *cluster.Settings,
74+
) (*tableHandler, error) {
75+
var table catalog.TableDescriptor
76+
77+
// NOTE: we don't hold a lease on the table descriptor, but validation
78+
// prevents users from changing the set of columns or the primary key of an
79+
// LDR replicated table.
80+
err := db.DescsTxn(context.Background(), func(ctx context.Context, txn descs.Txn) error {
81+
var err error
82+
table, err = txn.Descriptors().GetLeasedImmutableTableByID(ctx, txn.KV(), tableID)
83+
return err
84+
})
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
reader, err := newSQLRowReader(table)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
writer, err := newSQLRowWriter(table)
95+
if err != nil {
96+
return nil, err
97+
}
98+
99+
tombstoneUpdater := newTombstoneUpdater(codec, db.KV(), leaseMgr, tableID, sd, settings)
100+
101+
return &tableHandler{
102+
sqlReader: reader,
103+
sqlWriter: writer,
104+
db: db,
105+
tombstoneUpdater: tombstoneUpdater,
106+
}, nil
107+
}
108+
109+
func (t *tableHandler) handleDecodedBatch(
110+
ctx context.Context, batch []decodedEvent,
111+
) (tableBatchStats, error) {
112+
stats, err := t.attemptBatch(ctx, batch)
113+
if err == nil {
114+
return stats, nil
115+
}
116+
117+
refreshedBatch, refreshStats, err := t.refreshPrevRows(ctx, batch)
118+
if err != nil {
119+
return stats, err
120+
}
121+
122+
stats, err = t.attemptBatch(ctx, refreshedBatch)
123+
if err != nil {
124+
return tableBatchStats{}, err
125+
}
126+
127+
stats.Add(refreshStats)
128+
129+
return stats, nil
130+
}
131+
132+
func (t *tableHandler) attemptBatch(
133+
ctx context.Context, batch []decodedEvent,
134+
) (tableBatchStats, error) {
135+
var stats tableBatchStats
136+
err := t.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
137+
for _, event := range batch {
138+
switch {
139+
case event.isDelete && len(event.prevRow) != 0:
140+
stats.deletes++
141+
err := t.sqlWriter.DeleteRow(ctx, txn, event.originTimestamp, event.prevRow)
142+
if err != nil {
143+
return err
144+
}
145+
case event.isDelete && len(event.prevRow) == 0:
146+
stats.tombstoneUpdates++
147+
tombstoneUpdateStats, err := t.tombstoneUpdater.updateTombstone(ctx, txn, event.originTimestamp, event.row)
148+
if err != nil {
149+
return err
150+
}
151+
stats.kvLwwLosers += tombstoneUpdateStats.kvWriteTooOld
152+
case event.prevRow == nil:
153+
stats.inserts++
154+
err := t.sqlWriter.InsertRow(ctx, txn, event.originTimestamp, event.row)
155+
if isLwwLoser(err) {
156+
// Insert may observe a LWW failure if it attempts to write over a tombstone.
157+
stats.kvLwwLosers++
158+
continue
159+
}
160+
if err != nil {
161+
return err
162+
}
163+
case event.prevRow != nil:
164+
stats.updates++
165+
err := t.sqlWriter.UpdateRow(ctx, txn, event.originTimestamp, event.prevRow, event.row)
166+
if err != nil {
167+
return err
168+
}
169+
default:
170+
return errors.AssertionFailedf("unhandled event type: %v", event)
171+
}
172+
}
173+
return nil
174+
})
175+
if err != nil {
176+
return tableBatchStats{}, err
177+
}
178+
return stats, nil
179+
}
180+
181+
// refreshPrevRows refreshes the prevRow field for each event in the batch. If
182+
// any event is known to be a lww loser based on the read, its dropped from the
183+
// batch.
184+
func (t *tableHandler) refreshPrevRows(
185+
ctx context.Context, batch []decodedEvent,
186+
) ([]decodedEvent, tableBatchStats, error) {
187+
var stats tableBatchStats
188+
stats.refreshedRows = int64(len(batch))
189+
190+
rows := make([]tree.Datums, 0, len(batch))
191+
for _, event := range batch {
192+
rows = append(rows, event.row)
193+
}
194+
195+
var refreshedRows map[int]priorRow
196+
err := t.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
197+
var err error
198+
// TODO(jeffswenson): should we apply the batch in the same transaction
199+
// that we perform the read refresh? We could maybe even use locking reads.
200+
refreshedRows, err = t.sqlReader.ReadRows(ctx, txn, rows)
201+
return err
202+
})
203+
if err != nil {
204+
return nil, tableBatchStats{}, err
205+
}
206+
207+
refreshedBatch := make([]decodedEvent, 0, len(batch))
208+
for i, event := range batch {
209+
var prevRow tree.Datums
210+
if refreshed, found := refreshedRows[i]; found {
211+
if !refreshed.logicalTimestamp.Less(event.originTimestamp) {
212+
// TODO(jeffswenson): update this logic when its time to handle
213+
// ties.
214+
// Skip the row because it is a lww loser. Note: we can only identify LWW
215+
// losers during the read refresh if the row exists. If its a tombstone,
216+
// the local value may win LWW, but we have to attempt the
217+
// insert/tombstone update to find out. We even have to do this if the
218+
// replicated event is a delete because the local tombstone may have an
219+
// older logical timestamp.
220+
stats.refreshLwwLosers++
221+
continue
222+
}
223+
prevRow = refreshed.row
224+
}
225+
refreshedEvent := decodedEvent{
226+
dstDescID: event.dstDescID,
227+
isDelete: event.isDelete,
228+
row: event.row,
229+
originTimestamp: event.originTimestamp,
230+
prevRow: prevRow,
231+
}
232+
refreshedBatch = append(refreshedBatch, refreshedEvent)
233+
}
234+
235+
return refreshedBatch, stats, nil
236+
}
237+
238+
func (t *tableHandler) ReleaseLeases(ctx context.Context) {
239+
t.tombstoneUpdater.ReleaseLeases(ctx)
240+
}

0 commit comments

Comments
 (0)