Skip to content

Commit 58e75c8

Browse files
txnlock: implement lock synthesis and sorting within txn (cockroachdb#164390)
txnlock: implement lock synthesis and sorting within txn
2 parents 4b2944b + 8e55cfb commit 58e75c8

File tree

12 files changed

+1626
-0
lines changed

12 files changed

+1626
-0
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ ALL_TESTS = [
205205
"//pkg/crosscluster/ldrrandgen:ldrrandgen_test",
206206
"//pkg/crosscluster/logical/ldrdecoder:ldrdecoder_test",
207207
"//pkg/crosscluster/logical/sqlwriter:sqlwriter_test",
208+
"//pkg/crosscluster/logical/txnlock:txnlock_test",
208209
"//pkg/crosscluster/logical/txnwriter:txnwriter_test",
209210
"//pkg/crosscluster/logical:logical_disallowed_imports_test",
210211
"//pkg/crosscluster/logical:logical_test",
@@ -1527,6 +1528,8 @@ GO_TARGETS = [
15271528
"//pkg/crosscluster/logical/ldrdecoder:ldrdecoder_test",
15281529
"//pkg/crosscluster/logical/sqlwriter:sqlwriter",
15291530
"//pkg/crosscluster/logical/sqlwriter:sqlwriter_test",
1531+
"//pkg/crosscluster/logical/txnlock:txnlock",
1532+
"//pkg/crosscluster/logical/txnlock:txnlock_test",
15301533
"//pkg/crosscluster/logical/txnwriter:txnwriter",
15311534
"//pkg/crosscluster/logical/txnwriter:txnwriter_test",
15321535
"//pkg/crosscluster/logical:logical",
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "txnlock",
5+
srcs = [
6+
"column_set.go",
7+
"doc.go",
8+
"lock_synthesis.go",
9+
"schema.go",
10+
"sort.go",
11+
],
12+
importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnlock",
13+
visibility = ["//visibility:public"],
14+
deps = [
15+
"//pkg/crosscluster/logical/ldrdecoder",
16+
"//pkg/crosscluster/logical/sqlwriter",
17+
"//pkg/sql/catalog",
18+
"//pkg/sql/catalog/descpb",
19+
"//pkg/sql/catalog/lease",
20+
"//pkg/sql/rowenc",
21+
"//pkg/sql/sem/eval",
22+
"//pkg/sql/sem/tree",
23+
"//pkg/util/hlc",
24+
"@com_github_cockroachdb_errors//:errors",
25+
],
26+
)
27+
28+
go_test(
29+
name = "txnlock_test",
30+
srcs = [
31+
"column_set_test.go",
32+
"lock_synthesis_bench_test.go",
33+
"lock_synthesis_test.go",
34+
"main_test.go",
35+
"schema_test.go",
36+
],
37+
embed = [":txnlock"],
38+
deps = [
39+
"//pkg/base",
40+
"//pkg/ccl",
41+
"//pkg/ccl/changefeedccl/cdctest",
42+
"//pkg/crosscluster/logical/ldrdecoder",
43+
"//pkg/repstream/streampb",
44+
"//pkg/security/securityassets",
45+
"//pkg/security/securitytest",
46+
"//pkg/server",
47+
"//pkg/sql/catalog/descpb",
48+
"//pkg/sql/catalog/descs",
49+
"//pkg/sql/catalog/desctestutils",
50+
"//pkg/sql/catalog/lease",
51+
"//pkg/sql/randgen",
52+
"//pkg/sql/sem/eval",
53+
"//pkg/sql/sem/tree",
54+
"//pkg/sql/types",
55+
"//pkg/testutils/serverutils",
56+
"//pkg/testutils/sqlutils",
57+
"//pkg/util/leaktest",
58+
"//pkg/util/log",
59+
"//pkg/util/randutil",
60+
"@com_github_stretchr_testify//require",
61+
],
62+
)
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2026 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package txnlock
7+
8+
import (
9+
"context"
10+
"encoding/binary"
11+
"hash/fnv"
12+
13+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
14+
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
16+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
17+
"github.com/cockroachdb/errors"
18+
)
19+
20+
func uniqueIndexMixin(tableID descpb.ID, ucID descpb.ConstraintID) (uint64, error) {
21+
h := fnv.New64a()
22+
var buf [8]byte
23+
binary.BigEndian.PutUint32(buf[:4], uint32(tableID))
24+
binary.BigEndian.PutUint32(buf[4:], uint32(ucID))
25+
if _, err := h.Write(buf[:]); err != nil {
26+
return 0, errors.Wrap(err, "hashing unique index mixin")
27+
}
28+
return h.Sum64(), nil
29+
}
30+
31+
// A columnSet is a collection of columns that are relevant for an individual
32+
// constraint.
33+
type columnSet struct {
34+
columns []int32
35+
// mixin is an integer that is combined with the hash. It is used to ensure
36+
// different tables or unique constraints produce different hashes.
37+
mixin uint64
38+
}
39+
40+
// hash computes the hash that is used as the lock. hash(rowA) != hash(rowB)
41+
// implies !equal(rowA, rowB).
42+
func (c *columnSet) hash(ctx context.Context, row tree.Datums) (LockHash, error) {
43+
h := fnv.New64a()
44+
var prefixBytes [8]byte
45+
binary.BigEndian.PutUint64(prefixBytes[:], c.mixin)
46+
if _, err := h.Write(prefixBytes[:]); err != nil {
47+
return 0, errors.Wrap(err, "hashing mixin for lock derivation")
48+
}
49+
for _, idx := range c.columns {
50+
datum := row[idx]
51+
ed := rowenc.EncDatum{Datum: datum}
52+
encoded, err := ed.Fingerprint(
53+
ctx,
54+
datum.ResolvedType(),
55+
&tree.DatumAlloc{},
56+
nil, /* appendTo */
57+
nil, /* acc */
58+
)
59+
if err != nil {
60+
return 0, errors.Wrap(err, "hashing datum for lock derivation")
61+
}
62+
if _, err := h.Write(encoded); err != nil {
63+
return 0, errors.Wrap(err, "hashing encoded datum for lock derivation")
64+
}
65+
}
66+
return LockHash(h.Sum64()), nil
67+
}
68+
69+
// null returns true if any of the columns are null.
70+
func (c *columnSet) null(row tree.Datums) bool {
71+
if len(row) == 0 {
72+
return true
73+
}
74+
for _, idx := range c.columns {
75+
if row[idx] == tree.DNull {
76+
return true
77+
}
78+
}
79+
return false
80+
}
81+
82+
// equal returns true if the columns are equal. equal(rowA, rowB) implies
83+
// hash(rowA) == hash(rowB).
84+
func (c *columnSet) equal(
85+
ctx context.Context, evalCtx *eval.Context, rowA, rowB tree.Datums,
86+
) (bool, error) {
87+
if c.null(rowA) || c.null(rowB) {
88+
return false, nil
89+
}
90+
for _, idx := range c.columns {
91+
cmp, err := rowA[idx].Compare(ctx, evalCtx, rowB[idx])
92+
if err != nil {
93+
return false, errors.Wrap(err, "comparing datums for lock derivation")
94+
}
95+
if cmp != 0 {
96+
return false, nil
97+
}
98+
}
99+
return true, nil
100+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2026 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package txnlock
7+
8+
import (
9+
"context"
10+
"math/rand"
11+
"testing"
12+
13+
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
14+
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
16+
"github.com/cockroachdb/cockroach/pkg/sql/types"
17+
"github.com/stretchr/testify/require"
18+
)
19+
20+
func TestColumnSet(t *testing.T) {
21+
rng := rand.New(rand.NewSource(0))
22+
23+
randomTypes := func(numCols int) []*types.T {
24+
types := make([]*types.T, numCols)
25+
for i := range types {
26+
types[i] = randgen.RandColumnType(rng)
27+
}
28+
return types
29+
}
30+
31+
randomRow := func(types []*types.T) tree.Datums {
32+
row := make(tree.Datums, len(types))
33+
for i, typ := range types {
34+
row[i] = randgen.RandDatum(rng, typ, false /* nullOk */)
35+
}
36+
return row
37+
}
38+
39+
test := func(t *testing.T, types []*types.T) {
40+
ctx := context.Background()
41+
evalCtx := eval.Context{}
42+
colSet := columnSet{
43+
columns: make([]int32, len(types)),
44+
}
45+
for i := range len(types) {
46+
colSet.columns[i] = int32(i)
47+
}
48+
for range 1000 {
49+
a := randomRow(types)
50+
b := randomRow(types)
51+
52+
hashA1, err := colSet.hash(ctx, a)
53+
require.NoError(t, err)
54+
hashA2, err := colSet.hash(ctx, a)
55+
require.NoError(t, err)
56+
if hashA1 != hashA2 {
57+
t.Errorf("hash(a) != hash(a)")
58+
}
59+
60+
if !colSet.null(a) {
61+
eq, err := colSet.equal(ctx, &evalCtx, a, a)
62+
require.NoError(t, err)
63+
if !eq {
64+
t.Errorf("!equal(a, a) when !null(a)")
65+
}
66+
}
67+
68+
eqAB, err := colSet.equal(ctx, &evalCtx, a, b)
69+
require.NoError(t, err)
70+
if eqAB {
71+
hashA, err := colSet.hash(ctx, a)
72+
require.NoError(t, err)
73+
hashB, err := colSet.hash(ctx, b)
74+
require.NoError(t, err)
75+
if hashA != hashB {
76+
t.Errorf("equal(a, b) but hash(a) != hash(b)")
77+
}
78+
}
79+
80+
eqBA, err := colSet.equal(ctx, &evalCtx, b, a)
81+
require.NoError(t, err)
82+
if eqAB != eqBA {
83+
t.Errorf("equal(a, b) != equal(b, a)")
84+
}
85+
86+
if (colSet.null(a) || colSet.null(b)) && eqAB {
87+
t.Errorf("null(a) or null(b) but equal(a, b) is true")
88+
}
89+
}
90+
}
91+
92+
for range 100 {
93+
types := randomTypes(rand.Intn(5) + 1)
94+
t.Logf("types: %v", types)
95+
test(t, types)
96+
}
97+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright 2026 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
// Package txnlock synthesizes replication locks for transactional logical data
7+
// replication.
8+
//
9+
// # Replication Locks
10+
//
11+
// A replication lock is a (mode, kind, lock_id) triple:
12+
//
13+
// - mode is one of {R, W}.
14+
// - kind is one of {primary_key, foreign_key_constraint,
15+
// unique_constraint}.
16+
// - lock_id is a datum tuple whose values depend on the lock's kind.
17+
//
18+
// Two rows or transactions conflict when they hold locks with the same
19+
// lock_id and at least one of the locks is a write.
20+
//
21+
// # Primary Key Locks
22+
//
23+
// For a primary key lock, the lock_id is the primary key values of the
24+
// row inserted, updated, or deleted. The primary key values are derived
25+
// from the (prev_value, new_value) decoded from a rangefeed event.
26+
// Because both values always share the same primary key, each row
27+
// produces exactly one primary key lock.
28+
//
29+
// # Foreign Key Constraint Locks
30+
//
31+
// Foreign key constraint locks are derived from the (prev_value,
32+
// new_value) decoded from a rangefeed event.
33+
//
34+
// If table_parent has tables that reference it via foreign key
35+
// constraints, it generates a write lock: (W, foreign_key_constraint,
36+
// referenced_columns). The write lock can be omitted when (prev_value,
37+
// new_value) shows the referenced_columns did not change. If the
38+
// replication event does not modify the referenced columns, the
39+
// primary_key lock already orders it relative to any transaction that
40+
// does modify the referenced columns, which is sufficient to ensure
41+
// referential integrity.
42+
//
43+
// If table_child has a foreign key reference on table_parent, it
44+
// generates a read lock: (R, foreign_key_constraint, reference_key),
45+
// where reference_key is the referenced datums for the table_parent
46+
// row referenced by table_child. A replicated row may generate two
47+
// foreign key read locks for the same constraint if prev_value and
48+
// new_value have different non-null reference_keys. The read lock can
49+
// be omitted when prev_value and new_value have the same
50+
// reference_key.
51+
//
52+
// If any column in the reference_key is NULL, no lock is generated for
53+
// that constraint.
54+
//
55+
// # Unique Constraint Locks
56+
//
57+
// Unique constraint locks are derived from the (prev_value, new_value)
58+
// decoded from a rangefeed event. The lock_id is the tuple of column
59+
// values that form the unique constraint.
60+
//
61+
// A unique constraint lock is only generated when the constraint
62+
// column values differ between prev_value and new_value. If both the
63+
// old and new constraint values are non-null and different, the
64+
// replication event generates two unique constraint locks: one for the
65+
// old constraint values being released and one for the new constraint
66+
// values being acquired.
67+
//
68+
// # Replication Locks vs Transaction Locks
69+
//
70+
// An alternative to lock inference is to record the locks acquired by
71+
// the original transaction. This does not work because there are cases
72+
// where the SQL engine validates a constraint using a non-locking read.
73+
package txnlock

0 commit comments

Comments
 (0)