Skip to content

Commit a72d15b

Browse files
craig[bot]jeffswenson
andcommitted
Merge #142368
142368: crosscluster: handle lww condition failures r=jeffswenson a=jeffswenson This change aims to correct the SQL writer's LWW semantics, enabling the retirement of the KV writer. If the SQL writer observes the condition failed error's introduced by #143100, it will drop the row as a LWW loss. This closes a LWW bug where an old replicated write can overwrite a recent local tombstone. The first two commits in this PR are from: * #143096 * #143100 Release note: none Epic: [CRDB-48647](https://cockroachlabs.atlassian.net/browse/CRDB-48647) Co-authored-by: Jeff Swenson <[email protected]>
2 parents cafa087 + 7193813 commit a72d15b

File tree

3 files changed

+23
-7
lines changed

3 files changed

+23
-7
lines changed

pkg/crosscluster/logical/lww_row_processor.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1616
"github.com/cockroachdb/cockroach/pkg/keys"
1717
"github.com/cockroachdb/cockroach/pkg/kv"
18+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1819
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1920
"github.com/cockroachdb/cockroach/pkg/roachpb"
2021
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -80,6 +81,15 @@ type querier interface {
8081
RequiresParsedBeforeRow(catid.DescID) bool
8182
}
8283

84+
// isLwwLoser returns true if the error is a ConditionFailedError with an
85+
// OriginTimestampOlderThan set.
86+
func isLwwLoser(err error) bool {
87+
if condErr := (*kvpb.ConditionFailedError)(nil); errors.As(err, &condErr) {
88+
return condErr.OriginTimestampOlderThan.IsSet()
89+
}
90+
return false
91+
}
92+
8393
type queryBuilder struct {
8494
// stmts are parsed SQL statements. They should have the same number
8595
// of inputs.
@@ -592,6 +602,9 @@ func (lww *lwwQuerier) InsertRow(
592602
sess.QualityOfService = nil
593603
}
594604
if _, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, sess, stmt, datums...); err != nil {
605+
if isLwwLoser(err) {
606+
return batchStats{}, nil
607+
}
595608
// If the optimistic insert failed with unique violation, we have to
596609
// fall back to the pessimistic path. If we got a different error,
597610
// then we bail completely.
@@ -615,7 +628,11 @@ func (lww *lwwQuerier) InsertRow(
615628
sess.QualityOfService = nil
616629
}
617630
sess.OriginTimestampForLogicalDataReplication = row.MvccTimestamp
618-
if _, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...); err != nil {
631+
_, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...)
632+
if isLwwLoser(err) {
633+
return batchStats{}, nil
634+
}
635+
if err != nil {
619636
log.Warningf(ctx, "replicated insert failed (query: %s): %s", stmt.SQL, err.Error())
620637
return batchStats{}, err
621638
}

pkg/crosscluster/logical/lww_row_processor_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -459,9 +459,6 @@ func TestLWWConflictResolution(t *testing.T) {
459459
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
460460
})
461461
t.Run("cross-cluster-local-delete", func(t *testing.T) {
462-
if !useKVProc {
463-
skip.IgnoreLint(t, "local delete ordering is not handled correctly by the SQL processor")
464-
}
465462
tableNameDst, rp, encoder := setup(t, useKVProc)
466463

467464
runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", tableNameDst), row1...)
@@ -507,9 +504,6 @@ func TestLWWConflictResolution(t *testing.T) {
507504
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
508505
})
509506
t.Run("remote-delete-after-local-delete", func(t *testing.T) {
510-
if !useKVProc {
511-
skip.IgnoreLint(t, "local delete ordering is not handled correctly by the SQL processor")
512-
}
513507
tableNameDst, rp, encoder := setup(t, useKVProc)
514508

515509
runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", tableNameDst), row1...)

pkg/sql/row/errors.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ func ConvertBatchError(ctx context.Context, tableDesc catalog.TableDescriptor, b
4141
)
4242

4343
case *kvpb.ConditionFailedError:
44+
if !v.OriginTimestampOlderThan.IsEmpty() {
45+
// NOTE: we return the go error here because this error should never be
46+
// communicated to pgwire. It's exposed for the LDR writer.
47+
return origPErr.GoError()
48+
}
4449
if origPErr.Index == nil {
4550
break
4651
}

0 commit comments

Comments
 (0)