Skip to content

Commit 65d8edd

Browse files
craig[bot]jeffswenson
andcommitted
Merge #144703
144703: logical: improve crud writer sql queries r=jeffswenson a=jeffswenson This change includes two improvements to the SQL queries used by the crud writer: 1. Where clauses now use '=' when comparing to the primary key. This works around an issue with generic query plans in the optimizer that caused `INSERT` and `UPDATE` to plan full table scans. 2. The read refresh statement includes computed columns in the primary key lookup. This was needed to support computed virtual and stored columns in the crud writer. Virtual columns in the primary key are still prohibited by schema validation because the classic SQL writer does not support them. Epic: [CRDB-48647](https://cockroachlabs.atlassian.net/browse/CRDB-48647) Release note: none Co-authored-by: Jeff Swenson <[email protected]>
2 parents 3acc730 + a010b2c commit 65d8edd

File tree

8 files changed

+217
-80
lines changed

8 files changed

+217
-80
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ go_test(
173173
"//pkg/sql/execinfra",
174174
"//pkg/sql/execinfrapb",
175175
"//pkg/sql/isql",
176+
"//pkg/sql/parser/statements",
176177
"//pkg/sql/pgwire/pgcode",
177178
"//pkg/sql/pgwire/pgerror",
178179
"//pkg/sql/randgen",
@@ -184,6 +185,7 @@ go_test(
184185
"//pkg/sql/sqlclustersettings",
185186
"//pkg/sql/sqltestutils",
186187
"//pkg/sql/stats",
188+
"//pkg/sql/types",
187189
"//pkg/testutils",
188190
"//pkg/testutils/datapathutils",
189191
"//pkg/testutils/jobutils",

pkg/crosscluster/logical/event_decoder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func newEventDecoder(
7676
return err
7777
}
7878

79-
columns := getPhysicalColumnsSchema(descriptor)
79+
columns := getColumnSchema(descriptor)
8080
columnNames := make([]string, 0, len(columns))
8181
for _, column := range columns {
8282
columnNames = append(columnNames, column.column.GetName())

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func TestOptimsitcInsertCorruption(t *testing.T) {
196196

197197
dbSource.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.try_optimistic_insert.enabled = true")
198198

199-
createTableStmt := `CREATE TABLE computed_cols (
199+
createTableStmt := `CREATE TABLE indexed (
200200
a INT,
201201
b INT,
202202
c INT,
@@ -205,35 +205,35 @@ func TestOptimsitcInsertCorruption(t *testing.T) {
205205
dbSource.Exec(t, createTableStmt)
206206
dbDest.Exec(t, createTableStmt)
207207

208-
createIdxStmt := `CREATE INDEX c ON computed_cols (c)`
208+
createIdxStmt := `CREATE INDEX c ON indexed (c)`
209209
dbSource.Exec(t, createIdxStmt)
210210
dbDest.Exec(t, createIdxStmt)
211211

212212
// Insert initial data into destination that should be overwritten. This
213213
// gives more opportunities for corruption.
214-
dbDest.Exec(t, "INSERT INTO computed_cols (a, b, c) VALUES (1, 2, 3), (3, 4, 5)")
215-
dbSource.Exec(t, "INSERT INTO computed_cols (a, b, c) VALUES (1, 2, 6), (3, 4, 7)")
214+
dbDest.Exec(t, "INSERT INTO indexed (a, b, c) VALUES (1, 2, 3), (3, 4, 5)")
215+
dbSource.Exec(t, "INSERT INTO indexed (a, b, c) VALUES (1, 2, 6), (3, 4, 7)")
216216

217217
// Create logical replication stream from source to destination
218218
sourceURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("a"))
219219
var jobID jobspb.JobID
220220
dbDest.QueryRow(t,
221-
"CREATE LOGICAL REPLICATION STREAM FROM TABLE computed_cols ON $1 INTO TABLE computed_cols",
221+
"CREATE LOGICAL REPLICATION STREAM FROM TABLE indexed ON $1 INTO TABLE indexed",
222222
sourceURL.String(),
223223
).Scan(&jobID)
224224

225225
WaitUntilReplicatedTime(t, s.Clock().Now(), dbDest, jobID)
226226

227-
dbSource.CheckQueryResults(t, "SELECT * FROM computed_cols", [][]string{
227+
dbSource.CheckQueryResults(t, "SELECT * FROM indexed", [][]string{
228228
{"1", "2", "6"},
229229
{"3", "4", "7"},
230230
})
231-
dbDest.CheckQueryResults(t, "SELECT * FROM computed_cols", [][]string{
231+
dbDest.CheckQueryResults(t, "SELECT * FROM indexed", [][]string{
232232
{"1", "2", "6"},
233233
{"3", "4", "7"},
234234
})
235235

236-
compareReplicatedTables(t, s, "a", "b", "computed_cols", dbSource, dbDest)
236+
compareReplicatedTables(t, s, "a", "b", "indexed", dbSource, dbDest)
237237
}
238238

239239
type fatalDLQ struct{ *testing.T }
@@ -1473,6 +1473,64 @@ func TestTombstoneUpdate(t *testing.T) {
14731473
dst.CheckQueryResults(t, "SELECT * FROM tab WHERE pk = 1", [][]string{})
14741474
}
14751475

1476+
func TestReplicateComputedColumns(t *testing.T) {
1477+
defer leaktest.AfterTest(t)()
1478+
skip.UnderDeadlock(t)
1479+
defer log.Scope(t).Close(t)
1480+
1481+
ctx := context.Background()
1482+
1483+
server, s, dbSource, dbDest := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
1484+
defer server.Stopper().Stop(ctx)
1485+
1486+
// Note: the primary key is basicaly (a, b), but with a computed column that
1487+
// is d=a+b.
1488+
createTableStmt := `CREATE TABLE computed_cols (
1489+
a INT,
1490+
b INT,
1491+
c INT,
1492+
d INT AS (a + b) STORED,
1493+
cIsOdd BOOLEAN AS (c % 2 = 1) VIRTUAL,
1494+
PRIMARY KEY (a, d)
1495+
)`
1496+
dbSource.Exec(t, createTableStmt)
1497+
dbDest.Exec(t, createTableStmt)
1498+
1499+
var writer string
1500+
dbSource.QueryRow(t, "SHOW CLUSTER SETTING logical_replication.consumer.immediate_mode_writer").Scan(&writer)
1501+
if writer != "legacy-kv" {
1502+
createIdxStmt := `CREATE INDEX c ON computed_cols (c) WHERE cIsOdd`
1503+
dbSource.Exec(t, createIdxStmt)
1504+
dbDest.Exec(t, createIdxStmt)
1505+
}
1506+
1507+
// Insert initial data into destination that should be overwritten. This
1508+
// tests the conflict case which is more error prone.
1509+
dbDest.Exec(t, "INSERT INTO computed_cols (a, b, c) VALUES (1, 2, 3), (3, 4, 5)")
1510+
dbSource.Exec(t, "INSERT INTO computed_cols (a, b, c) VALUES (1, 2, 6), (3, 4, 7)")
1511+
1512+
// Create logical replication stream from source to destination
1513+
sourceURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("a"))
1514+
var jobID jobspb.JobID
1515+
dbDest.QueryRow(t,
1516+
"CREATE LOGICAL REPLICATION STREAM FROM TABLE computed_cols ON $1 INTO TABLE computed_cols",
1517+
sourceURL.String(),
1518+
).Scan(&jobID)
1519+
1520+
WaitUntilReplicatedTime(t, s.Clock().Now(), dbDest, jobID)
1521+
1522+
dbSource.CheckQueryResults(t, "SELECT * FROM computed_cols", [][]string{
1523+
{"1", "2", "6", "3", "false"},
1524+
{"3", "4", "7", "7", "true"},
1525+
})
1526+
dbDest.CheckQueryResults(t, "SELECT * FROM computed_cols", [][]string{
1527+
{"1", "2", "6", "3", "false"},
1528+
{"3", "4", "7", "7", "true"},
1529+
})
1530+
1531+
compareReplicatedTables(t, s, "a", "b", "computed_cols", dbSource, dbDest)
1532+
}
1533+
14761534
func TestForeignKeyConstraints(t *testing.T) {
14771535
defer leaktest.AfterTest(t)()
14781536
skip.UnderDeadlock(t)

pkg/crosscluster/logical/replication_statements.go

Lines changed: 76 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,40 +20,50 @@ import (
2020
type columnSchema struct {
2121
column catalog.Column
2222
isPrimaryKey bool
23+
isComputed bool
2324
}
2425

25-
// getPhysicalColumns returns the list of columns that are part of the table's
26-
// primary key and value.
27-
func getPhysicalColumns(table catalog.TableDescriptor) []catalog.Column {
28-
columns := table.AllColumns()
29-
result := make([]catalog.Column, 0, len(columns))
30-
for _, col := range columns {
31-
if !col.IsComputed() && !col.IsVirtual() && !col.IsSystemColumn() {
32-
result = append(result, col)
33-
}
34-
}
35-
return result
36-
}
37-
38-
func getPhysicalColumnsSchema(table catalog.TableDescriptor) []columnSchema {
39-
columns := getPhysicalColumns(table)
40-
primaryIdx := table.GetPrimaryIndex()
41-
26+
// getColumnSchema returns the list of all columns that is decoded by the CRUD
27+
// writer. It returns columns in column key order. The crud writer passes
28+
// around a tree.Datums for each row where column[i] is the column definition
29+
// for datums[i].
30+
//
31+
// A column is decoded by crud LDR writer if either are true:
32+
// 1. The column is part of the primary key. Every primary key column is needed
33+
// to perform read refreshes.
34+
// 2. The column is not a computed column. If a column is not computed, it must
35+
// be included in update and insert statements.
36+
//
37+
// Notably, this excludes computed columns that are not part of the primary key
38+
// and system columns like crdb_internal_mvcc_timestamp.
39+
func getColumnSchema(table catalog.TableDescriptor) []columnSchema {
4240
// Create a map of column ID to column for fast lookup
41+
primaryIdx := table.GetPrimaryIndex()
4342
isPrimaryKey := make(map[catid.ColumnID]bool)
4443
for _, col := range primaryIdx.IndexDesc().KeyColumnIDs {
4544
isPrimaryKey[col] = true
4645
}
4746

48-
cols := make([]columnSchema, 0, len(columns))
47+
columns := table.AllColumns()
48+
result := make([]columnSchema, 0, len(columns))
4949
for _, col := range columns {
50-
cols = append(cols, columnSchema{
50+
if col.IsSystemColumn() {
51+
continue
52+
}
53+
54+
isComputed := col.IsComputed()
55+
if isComputed && !isPrimaryKey[col.GetID()] {
56+
continue
57+
}
58+
59+
result = append(result, columnSchema{
5160
column: col,
5261
isPrimaryKey: isPrimaryKey[col.GetID()],
62+
isComputed: isComputed,
5363
})
5464
}
5565

56-
return cols
66+
return result
5767
}
5868

5969
// newTypedPlaceholder creates a placeholder with the appropriate type for a column.
@@ -77,20 +87,26 @@ func newTypedPlaceholder(idx int, col catalog.Column) (*tree.CastExpr, error) {
7787
func newInsertStatement(
7888
table catalog.TableDescriptor,
7989
) (statements.Statement[tree.Statement], error) {
80-
columns := getPhysicalColumns(table)
90+
columns := getColumnSchema(table)
8191

82-
columnNames := make(tree.NameList, len(columns))
92+
columnNames := make(tree.NameList, 0, len(columns))
93+
parameters := make(tree.Exprs, 0, len(columns))
8394
for i, col := range columns {
84-
columnNames[i] = tree.Name(col.GetName())
85-
}
95+
// NOTE: this consumes a placholder ID because its part of the tree.Datums,
96+
// but it doesn't show up in the query because computed columns are not
97+
// needed for insert statements.
98+
if col.isComputed {
99+
continue
100+
}
86101

87-
parameters := make(tree.Exprs, len(columnNames))
88-
for i, col := range columns {
89102
var err error
90-
parameters[i], err = newTypedPlaceholder(i+1, col)
103+
parameter, err := newTypedPlaceholder(i+1, col.column)
91104
if err != nil {
92105
return statements.Statement[tree.Statement]{}, err
93106
}
107+
108+
columnNames = append(columnNames, tree.Name(col.column.GetName()))
109+
parameters = append(parameters, parameter)
94110
}
95111

96112
parameterValues := &tree.ValuesClause{
@@ -119,16 +135,31 @@ func newInsertStatement(
119135
// newMatchesLastRow creates a WHERE clause for matching all columns of a row.
120136
// It returns a tree.Expr that compares each column to a placeholder parameter.
121137
// Parameters are ordered by column ID, starting from startParamIdx.
122-
func newMatchesLastRow(columns []catalog.Column, startParamIdx int) (tree.Expr, error) {
138+
func newMatchesLastRow(columns []columnSchema, startParamIdx int) (tree.Expr, error) {
123139
var whereClause tree.Expr
124140
for i, col := range columns {
125-
placeholder, err := newTypedPlaceholder(startParamIdx+i, col)
141+
if col.isComputed {
142+
// Skip computed columns since they are not needed to fully specify the
143+
// contents of the row.
144+
continue
145+
}
146+
147+
placeholder, err := newTypedPlaceholder(startParamIdx+i, col.column)
126148
if err != nil {
127149
return nil, err
128150
}
151+
152+
eq := treecmp.MakeComparisonOperator(treecmp.IsNotDistinctFrom)
153+
if col.isPrimaryKey {
154+
// Generic query planner plans full table scans if we use `IS NOT
155+
// DISTINCT FROM`. Use `=` operator for primary key columns since they
156+
// are guaranteed to be non-NULL.
157+
eq = treecmp.MakeComparisonOperator(treecmp.EQ)
158+
}
159+
129160
colExpr := &tree.ComparisonExpr{
130-
Operator: treecmp.MakeComparisonOperator(treecmp.IsNotDistinctFrom),
131-
Left: &tree.ColumnItem{ColumnName: tree.Name(col.GetName())},
161+
Operator: eq,
162+
Left: &tree.ColumnItem{ColumnName: tree.Name(col.column.GetName())},
132163
Right: placeholder,
133164
}
134165

@@ -153,31 +184,37 @@ func newMatchesLastRow(columns []catalog.Column, startParamIdx int) (tree.Expr,
153184
func newUpdateStatement(
154185
table catalog.TableDescriptor,
155186
) (statements.Statement[tree.Statement], error) {
156-
columns := getPhysicalColumns(table)
187+
columns := getColumnSchema(table)
157188

158189
// Create WHERE clause for matching the previous row values
159190
whereClause, err := newMatchesLastRow(columns, 1)
160191
if err != nil {
161192
return statements.Statement[tree.Statement]{}, err
162193
}
163194

164-
exprs := make(tree.UpdateExprs, len(columns))
195+
exprs := make(tree.UpdateExprs, 0, len(columns))
165196
for i, col := range columns {
166-
nameNode := tree.Name(col.GetName())
197+
if col.isComputed {
198+
// Skip computed columns since they are not needed to fully specify the
199+
// contents of the row.
200+
continue
201+
}
202+
203+
nameNode := tree.Name(col.column.GetName())
167204
names := tree.NameList{nameNode}
168205

169206
// Create a placeholder for the new value (len(columns)+i+1) since we
170207
// use 1-indexed placeholders and the first len(columns) placeholders
171208
// are for the where clause.
172-
placeholder, err := newTypedPlaceholder(len(columns)+i+1, col)
209+
placeholder, err := newTypedPlaceholder(len(columns)+i+1, col.column)
173210
if err != nil {
174211
return statements.Statement[tree.Statement]{}, err
175212
}
176213

177-
exprs[i] = &tree.UpdateExpr{
214+
exprs = append(exprs, &tree.UpdateExpr{
178215
Names: names,
179216
Expr: placeholder,
180-
}
217+
})
181218
}
182219

183220
// Create the final update statement
@@ -203,7 +240,7 @@ func newUpdateStatement(
203240
func newDeleteStatement(
204241
table catalog.TableDescriptor,
205242
) (statements.Statement[tree.Statement], error) {
206-
columns := getPhysicalColumns(table)
243+
columns := getColumnSchema(table)
207244

208245
// Create WHERE clause for matching the row to delete
209246
whereClause, err := newMatchesLastRow(columns, 1)
@@ -250,7 +287,7 @@ func newDeleteStatement(
250287
func newBulkSelectStatement(
251288
table catalog.TableDescriptor,
252289
) (statements.Statement[tree.Statement], error) {
253-
cols := getPhysicalColumnsSchema(table)
290+
cols := getColumnSchema(table)
254291
primaryKeyColumns := make([]catalog.Column, 0, len(cols))
255292
for _, col := range cols {
256293
if col.isPrimaryKey {

0 commit comments

Comments
 (0)