Skip to content

Commit ea5b5bc

Browse files
committed
crosscluster: add new insert/update/delete replication statements
These statements take advantage of the KV level LWW validation added by PR #143100 and do not explicitly validate the origin timestamp. Update and Delete specify the whole row in the where clause, which is intended to enable future SQL optimizations that generate a CPUT to replace the row instead Release note: none Epic: CRDB-48647
1 parent e06b49b commit ea5b5bc

File tree

4 files changed

+456
-1
lines changed

4 files changed

+456
-1
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"offline_initial_scan_processor.go",
1616
"purgatory.go",
1717
"range_stats.go",
18+
"replication_statements.go",
1819
"udf_row_processor.go",
1920
],
2021
importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/logical",
@@ -76,6 +77,7 @@ go_library(
7677
"//pkg/sql/sem/catid",
7778
"//pkg/sql/sem/eval",
7879
"//pkg/sql/sem/tree",
80+
"//pkg/sql/sem/tree/treecmp",
7981
"//pkg/sql/sessiondata",
8082
"//pkg/sql/sessiondatapb",
8183
"//pkg/sql/stats",
@@ -117,9 +119,12 @@ go_test(
117119
"main_test.go",
118120
"purgatory_test.go",
119121
"range_stats_test.go",
122+
"replication_statements_test.go",
120123
"udf_row_processor_test.go",
121124
],
122-
data = ["//c-deps:libgeos"],
125+
data = glob(["testdata/**"]) + [
126+
"//c-deps:libgeos",
127+
],
123128
embed = [":logical"],
124129
exec_properties = {"test.Pool": "large"},
125130
deps = [
@@ -163,6 +168,7 @@ go_test(
163168
"//pkg/sql/sqltestutils",
164169
"//pkg/sql/stats",
165170
"//pkg/testutils",
171+
"//pkg/testutils/datapathutils",
166172
"//pkg/testutils/jobutils",
167173
"//pkg/testutils/serverutils",
168174
"//pkg/testutils/skip",
@@ -182,6 +188,7 @@ go_test(
182188
"//pkg/util/timeutil",
183189
"//pkg/util/uuid",
184190
"@com_github_cockroachdb_cockroach_go_v2//crdb",
191+
"@com_github_cockroachdb_datadriven//:datadriven",
185192
"@com_github_cockroachdb_errors//:errors",
186193
"@com_github_cockroachdb_redact//:redact",
187194
"@com_github_lib_pq//:pq",
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"fmt"
10+
11+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
12+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
13+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treecmp"
14+
)
15+
16+
// getPhysicalColumns returns the list of columns that are part of the table's
17+
// primary key and value.
18+
func getPhysicalColumns(table catalog.TableDescriptor) []catalog.Column {
19+
columns := table.AllColumns()
20+
result := make([]catalog.Column, 0, len(columns))
21+
for _, col := range columns {
22+
if !col.IsComputed() && !col.IsVirtual() && !col.IsSystemColumn() {
23+
result = append(result, col)
24+
}
25+
}
26+
return result
27+
}
28+
29+
// newTypedPlaceholder creates a placeholder with the appropriate type for a column.
30+
func newTypedPlaceholder(idx int, col catalog.Column) (*tree.CastExpr, error) {
31+
placeholder, err := tree.NewPlaceholder(fmt.Sprintf("%d", idx))
32+
if err != nil {
33+
return nil, err
34+
}
35+
return &tree.CastExpr{
36+
Expr: placeholder,
37+
Type: col.GetType(),
38+
SyntaxMode: tree.CastShort,
39+
}, nil
40+
}
41+
42+
// newInsertStatement returns a statement that can be used to insert a row into
43+
// the table.
44+
//
45+
// The statement will have `n` parameters, where `n` is the number of columns
46+
// in the table. Parameters are ordered by column ID.
47+
func newInsertStatement(table catalog.TableDescriptor) (tree.Statement, error) {
48+
columns := getPhysicalColumns(table)
49+
50+
columnNames := make(tree.NameList, len(columns))
51+
for i, col := range columns {
52+
columnNames[i] = tree.Name(col.GetName())
53+
}
54+
55+
parameters := make(tree.Exprs, len(columnNames))
56+
for i, col := range columns {
57+
var err error
58+
parameters[i], err = newTypedPlaceholder(i+1, col)
59+
if err != nil {
60+
return nil, err
61+
}
62+
}
63+
64+
parameterValues := &tree.ValuesClause{
65+
Rows: []tree.Exprs{
66+
parameters,
67+
},
68+
}
69+
70+
rows := &tree.Select{
71+
Select: parameterValues,
72+
}
73+
74+
insert := &tree.Insert{
75+
Table: &tree.TableRef{
76+
TableID: int64(table.GetID()),
77+
As: tree.AliasClause{Alias: "replication_target"},
78+
},
79+
Rows: rows,
80+
Columns: columnNames,
81+
Returning: tree.AbsentReturningClause,
82+
}
83+
84+
return insert, nil
85+
}
86+
87+
// newMatchesLastRow creates a WHERE clause for matching all columns of a row.
88+
// It returns a tree.Expr that compares each column to a placeholder parameter.
89+
// Parameters are ordered by column ID, starting from startParamIdx.
90+
func newMatchesLastRow(columns []catalog.Column, startParamIdx int) (tree.Expr, error) {
91+
var whereClause tree.Expr
92+
for i, col := range columns {
93+
placeholder, err := newTypedPlaceholder(startParamIdx+i, col)
94+
if err != nil {
95+
return nil, err
96+
}
97+
colExpr := &tree.ComparisonExpr{
98+
Operator: treecmp.MakeComparisonOperator(treecmp.IsNotDistinctFrom),
99+
Left: &tree.ColumnItem{ColumnName: tree.Name(col.GetName())},
100+
Right: placeholder,
101+
}
102+
103+
if whereClause == nil {
104+
whereClause = colExpr
105+
} else {
106+
whereClause = &tree.AndExpr{
107+
Left: whereClause,
108+
Right: colExpr,
109+
}
110+
}
111+
}
112+
return whereClause, nil
113+
}
114+
115+
// newUpdateStatement returns a statement that can be used to update a row in
116+
// the table. If a table has `n` columns, the statement will have `2n`
117+
// parameters, where the first `n` parameters are the previous values of the row
118+
// and the last `n` parameters are the new values of the row.
119+
//
120+
// Parameters are ordered by column ID.
121+
func newUpdateStatement(table catalog.TableDescriptor) (tree.Statement, error) {
122+
columns := getPhysicalColumns(table)
123+
124+
// Create WHERE clause for matching the previous row values
125+
whereClause, err := newMatchesLastRow(columns, 1)
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
exprs := make(tree.UpdateExprs, len(columns))
131+
for i, col := range columns {
132+
nameNode := tree.Name(col.GetName())
133+
names := tree.NameList{nameNode}
134+
135+
// Create a placeholder for the new value (len(columns)+i+1) since we
136+
// use 1-indexed placeholders and the first len(columns) placeholders
137+
// are for the where clause.
138+
placeholder, err := newTypedPlaceholder(len(columns)+i+1, col)
139+
if err != nil {
140+
return nil, err
141+
}
142+
143+
exprs[i] = &tree.UpdateExpr{
144+
Names: names,
145+
Expr: placeholder,
146+
}
147+
}
148+
149+
// Create the final update statement
150+
update := &tree.Update{
151+
Table: &tree.TableRef{
152+
TableID: int64(table.GetID()),
153+
As: tree.AliasClause{Alias: "replication_target"},
154+
},
155+
Exprs: exprs,
156+
Where: &tree.Where{Type: tree.AstWhere, Expr: whereClause},
157+
Returning: tree.AbsentReturningClause,
158+
}
159+
160+
return update, nil
161+
}
162+
163+
// newDeleteStatement returns a statement that can be used to delete a row from
164+
// the table. The statement will have `n` parameters, where `n` is the number of
165+
// columns in the table. Parameters are used in the WHERE clause to precisely
166+
// identify the row to delete.
167+
//
168+
// Parameters are ordered by column ID.
169+
func newDeleteStatement(table catalog.TableDescriptor) (tree.Statement, error) {
170+
columns := getPhysicalColumns(table)
171+
172+
// Create WHERE clause for matching the row to delete
173+
whereClause, err := newMatchesLastRow(columns, 1)
174+
if err != nil {
175+
return nil, err
176+
}
177+
178+
// Create the final delete statement
179+
delete := &tree.Delete{
180+
Table: &tree.TableRef{
181+
TableID: int64(table.GetID()),
182+
As: tree.AliasClause{Alias: "replication_target"},
183+
},
184+
Where: &tree.Where{Type: tree.AstWhere, Expr: whereClause},
185+
Returning: tree.AbsentReturningClause,
186+
}
187+
188+
return delete, nil
189+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"math/rand"
12+
"testing"
13+
14+
"github.com/cockroachdb/cockroach/pkg/base"
15+
"github.com/cockroachdb/cockroach/pkg/roachpb"
16+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
17+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
18+
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
19+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
20+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
21+
"github.com/cockroachdb/cockroach/pkg/util/log"
22+
"github.com/cockroachdb/datadriven"
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
func TestReplicationStatements(t *testing.T) {
27+
defer leaktest.AfterTest(t)()
28+
defer log.Scope(t).Close(t)
29+
30+
ctx := context.Background()
31+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
32+
Locality: roachpb.Locality{
33+
Tiers: []roachpb.Tier{
34+
{Key: "region", Value: "us-east1"},
35+
},
36+
},
37+
})
38+
defer s.Stopper().Stop(ctx)
39+
40+
getTableDesc := func(tableName string) catalog.TableDescriptor {
41+
return desctestutils.TestingGetTableDescriptor(
42+
s.DB(),
43+
s.Codec(),
44+
"defaultdb",
45+
"public",
46+
tableName,
47+
)
48+
}
49+
50+
datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
51+
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
52+
switch d.Cmd {
53+
case "exec":
54+
_, err := sqlDB.Exec(d.Input)
55+
if err != nil {
56+
return err.Error()
57+
}
58+
return "ok"
59+
case "show-insert":
60+
var tableName string
61+
d.ScanArgs(t, "table", &tableName)
62+
63+
desc := getTableDesc(tableName)
64+
65+
insertStmt, err := newInsertStatement(desc)
66+
require.NoError(t, err)
67+
68+
// Test preparing the statement to ensure it is valid SQL.
69+
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), insertStmt.String()))
70+
require.NoError(t, err)
71+
72+
return insertStmt.String()
73+
case "show-update":
74+
var tableName string
75+
d.ScanArgs(t, "table", &tableName)
76+
77+
desc := getTableDesc(tableName)
78+
79+
updateStmt, err := newUpdateStatement(desc)
80+
require.NoError(t, err)
81+
82+
// Test preparing the statement to ensure it is valid SQL.
83+
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), updateStmt.String()))
84+
require.NoError(t, err)
85+
86+
return updateStmt.String()
87+
case "show-delete":
88+
var tableName string
89+
d.ScanArgs(t, "table", &tableName)
90+
91+
desc := getTableDesc(tableName)
92+
93+
deleteStmt, err := newDeleteStatement(desc)
94+
require.NoError(t, err)
95+
96+
// Test preparing the statement to ensure it is valid SQL.
97+
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), deleteStmt.String()))
98+
require.NoError(t, err)
99+
100+
return deleteStmt.String()
101+
default:
102+
return "unknown command: " + d.Cmd
103+
}
104+
})
105+
})
106+
}

0 commit comments

Comments
 (0)