Skip to content

Commit 50ddf9f

Browse files
committed
logical: read refresh statement
This change includes the SQL query that is used by the crud SQL writer to read the local value of rows that had value mismatch between the previous value in the source cluster and the current value in the destination cluster. Release note: none Epic: CRDB-48647
1 parent 6146578 commit 50ddf9f

File tree

6 files changed

+523
-2
lines changed

6 files changed

+523
-2
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"purgatory.go",
1717
"range_stats.go",
1818
"replication_statements.go",
19+
"sql_row_reader.go",
1920
"sql_row_writer.go",
2021
"tombstone_updater.go",
2122
"udf_row_processor.go",
@@ -122,6 +123,7 @@ go_test(
122123
"purgatory_test.go",
123124
"range_stats_test.go",
124125
"replication_statements_test.go",
126+
"sql_row_reader_test.go",
125127
"sql_row_writer_test.go",
126128
"udf_row_processor_test.go",
127129
],
@@ -194,6 +196,7 @@ go_test(
194196
"//pkg/util/syncutil",
195197
"//pkg/util/timeutil",
196198
"//pkg/util/uuid",
199+
"@com_github_cockroachdb_apd_v3//:apd",
197200
"@com_github_cockroachdb_cockroach_go_v2//crdb",
198201
"@com_github_cockroachdb_datadriven//:datadriven",
199202
"@com_github_cockroachdb_errors//:errors",

pkg/crosscluster/logical/replication_statements.go

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,17 @@ import (
1111
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1212
"github.com/cockroachdb/cockroach/pkg/sql/parser"
1313
"github.com/cockroachdb/cockroach/pkg/sql/parser/statements"
14+
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
1415
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
1516
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treecmp"
17+
"github.com/cockroachdb/cockroach/pkg/sql/types"
1618
)
1719

20+
type columnSchema struct {
21+
column catalog.Column
22+
isPrimaryKey bool
23+
}
24+
1825
// getPhysicalColumns returns the list of columns that are part of the table's
1926
// primary key and value.
2027
func getPhysicalColumns(table catalog.TableDescriptor) []catalog.Column {
@@ -28,6 +35,27 @@ func getPhysicalColumns(table catalog.TableDescriptor) []catalog.Column {
2835
return result
2936
}
3037

38+
func getPhysicalColumnsSchema(table catalog.TableDescriptor) []columnSchema {
39+
columns := getPhysicalColumns(table)
40+
primaryIdx := table.GetPrimaryIndex()
41+
42+
// Create a map of column ID to column for fast lookup
43+
isPrimaryKey := make(map[catid.ColumnID]bool)
44+
for _, col := range primaryIdx.IndexDesc().KeyColumnIDs {
45+
isPrimaryKey[col] = true
46+
}
47+
48+
cols := make([]columnSchema, 0, len(columns))
49+
for _, col := range columns {
50+
cols = append(cols, columnSchema{
51+
column: col,
52+
isPrimaryKey: isPrimaryKey[col.GetID()],
53+
})
54+
}
55+
56+
return cols
57+
}
58+
3159
// newTypedPlaceholder creates a placeholder with the appropriate type for a column.
3260
func newTypedPlaceholder(idx int, col catalog.Column) (*tree.CastExpr, error) {
3361
placeholder, err := tree.NewPlaceholder(fmt.Sprintf("%d", idx))
@@ -196,6 +224,175 @@ func newDeleteStatement(
196224
return toParsedStatement(delete)
197225
}
198226

227+
// newBulkSelectStatement returns a statement that can be used to query
228+
// multiple rows by primary key in a single operation. It uses ROWS FROM clause
229+
// with UNNEST to handle a variable number of primary keys provided as array
230+
// parameters.
231+
//
232+
// The statement will have one parameter for each primary key column, where
233+
// each parameter is an array of values for that column. The columns are
234+
// expected in column ID order, not primary key order.
235+
//
236+
// For example, given a table with primary key columns (id, secondary_id) and
237+
// additional columns (value1, value2), the generated statement would be
238+
// equivalent to:
239+
//
240+
// SELECT
241+
// key_list.index,
242+
// replication_target.crdb_internal_origin_timestamp,
243+
// replication_target.crdb_internal_mvcc_timestamp,
244+
// replication_target.id, replication_target.secondary_id,
245+
// replication_target.value1, replication_target.value2
246+
// FROM ROWS FROM unnest($1::INT[], $2::INT[]) WITH ORDINALITY AS key_list(key1, key2, ordinality)
247+
// INNER JOIN LOOKUP [table_id AS replication_target]
248+
// ON replication_target.id = key_list.key1
249+
// AND replication_target.secondary_id = key_list.key2
250+
func newBulkSelectStatement(
251+
table catalog.TableDescriptor,
252+
) (statements.Statement[tree.Statement], error) {
253+
cols := getPhysicalColumnsSchema(table)
254+
primaryKeyColumns := make([]catalog.Column, 0, len(cols))
255+
for _, col := range cols {
256+
if col.isPrimaryKey {
257+
primaryKeyColumns = append(primaryKeyColumns, col.column)
258+
}
259+
}
260+
261+
// keyListName is the name of the CTE that contains the primary keys supplied
262+
// via array parameters.
263+
keyListName, err := tree.NewUnresolvedObjectName(1, [3]string{"key_list"}, tree.NoAnnotation)
264+
if err != nil {
265+
return statements.Statement[tree.Statement]{}, err
266+
}
267+
268+
// targetName is used to name the user's table.
269+
targetName, err := tree.NewUnresolvedObjectName(1, [3]string{"replication_target"}, tree.NoAnnotation)
270+
if err != nil {
271+
return statements.Statement[tree.Statement]{}, err
272+
}
273+
274+
// Create the `SELECT unnest($1::[]INT, $2::[]INT) WITH ORDINALITY AS key_list(key1, key2, index)` table expression.
275+
primaryKeyExprs := make(tree.Exprs, 0, len(primaryKeyColumns))
276+
primaryKeyNames := make(tree.ColumnDefList, 0, len(primaryKeyColumns)+1)
277+
for i, pkCol := range primaryKeyColumns {
278+
primaryKeyNames = append(primaryKeyNames, tree.ColumnDef{
279+
Name: tree.Name(fmt.Sprintf("key%d", i+1)),
280+
})
281+
primaryKeyExprs = append(primaryKeyExprs, &tree.CastExpr{
282+
Expr: &tree.Placeholder{Idx: tree.PlaceholderIdx(i)},
283+
Type: types.MakeArray(pkCol.GetType()),
284+
SyntaxMode: tree.CastShort,
285+
})
286+
}
287+
primaryKeyNames = append(primaryKeyNames, tree.ColumnDef{
288+
Name: tree.Name("index"),
289+
})
290+
keyList := &tree.AliasedTableExpr{
291+
Expr: &tree.RowsFromExpr{
292+
Items: tree.Exprs{
293+
&tree.FuncExpr{
294+
Func: tree.ResolvableFunctionReference{FunctionReference: &tree.UnresolvedName{
295+
NumParts: 1,
296+
Parts: [4]string{"unnest"},
297+
}},
298+
Exprs: primaryKeyExprs,
299+
},
300+
},
301+
},
302+
As: tree.AliasClause{
303+
Alias: "key_list",
304+
Cols: primaryKeyNames,
305+
},
306+
Ordinality: true,
307+
}
308+
309+
// Build the select statement for the final query.
310+
selectColumns := make(tree.SelectExprs, 0, 1+len(primaryKeyColumns))
311+
selectColumns = append(selectColumns, tree.SelectExpr{
312+
Expr: &tree.ColumnItem{
313+
ColumnName: "index",
314+
TableName: keyListName,
315+
},
316+
})
317+
selectColumns = append(selectColumns, tree.SelectExpr{
318+
Expr: &tree.ColumnItem{
319+
ColumnName: "crdb_internal_origin_timestamp",
320+
TableName: targetName,
321+
},
322+
})
323+
selectColumns = append(selectColumns, tree.SelectExpr{
324+
Expr: &tree.ColumnItem{
325+
ColumnName: "crdb_internal_mvcc_timestamp",
326+
TableName: targetName,
327+
},
328+
})
329+
330+
for _, col := range cols {
331+
selectColumns = append(selectColumns, tree.SelectExpr{
332+
Expr: &tree.ColumnItem{
333+
ColumnName: tree.Name(col.column.GetName()),
334+
TableName: targetName,
335+
},
336+
})
337+
}
338+
339+
// Construct the JOIN clause for the final query.
340+
var joinCond tree.Expr
341+
for i, pkCol := range primaryKeyColumns {
342+
colName := tree.Name(pkCol.GetName())
343+
keyColName := fmt.Sprintf("key%d", i+1)
344+
345+
eqExpr := &tree.ComparisonExpr{
346+
// Use EQ operator to compare primary key columns because primary key
347+
// columns are guaranteed to be non-NULL. For some reason using IS NOT
348+
// DISTINCT FROM causes the query to be unable to use a lookup join.
349+
Operator: treecmp.MakeComparisonOperator(treecmp.EQ),
350+
Left: &tree.ColumnItem{
351+
TableName: targetName,
352+
ColumnName: colName,
353+
},
354+
Right: &tree.ColumnItem{
355+
TableName: keyListName,
356+
ColumnName: tree.Name(keyColName),
357+
},
358+
}
359+
360+
if i == 0 {
361+
joinCond = eqExpr
362+
} else {
363+
joinCond = &tree.AndExpr{
364+
Left: joinCond,
365+
Right: eqExpr,
366+
}
367+
}
368+
}
369+
370+
// Construct the SELECT statement that is the root of the AST.
371+
selectStmt := &tree.Select{
372+
Select: &tree.SelectClause{
373+
Exprs: selectColumns,
374+
From: tree.From{
375+
Tables: tree.TableExprs{
376+
&tree.JoinTableExpr{
377+
JoinType: tree.AstInner,
378+
Left: keyList,
379+
Right: &tree.TableRef{
380+
TableID: int64(table.GetID()),
381+
As: tree.AliasClause{Alias: "replication_target"},
382+
},
383+
Cond: &tree.OnJoinCond{
384+
Expr: joinCond,
385+
},
386+
Hint: tree.AstLookup,
387+
},
388+
},
389+
},
390+
},
391+
}
392+
393+
return toParsedStatement(selectStmt)
394+
}
395+
199396
func toParsedStatement(stmt tree.Statement) (statements.Statement[tree.Statement], error) {
200397
// TODO(jeffswenson): do I have to round trip through the string or can I
201398
// safely construct the statement directly?

pkg/crosscluster/logical/replication_statements_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,20 @@ func TestReplicationStatements(t *testing.T) {
9898
require.NoError(t, err)
9999

100100
return deleteStmt.SQL
101+
case "show-select":
102+
var tableName string
103+
d.ScanArgs(t, "table", &tableName)
104+
105+
desc := getTableDesc(tableName)
106+
107+
stmt, err := newBulkSelectStatement(desc)
108+
require.NoError(t, err)
109+
110+
// Test preparing the statement to ensure it is valid SQL.
111+
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), stmt.SQL))
112+
require.NoError(t, err)
113+
114+
return stmt.SQL
101115
default:
102116
return "unknown command: " + d.Cmd
103117
}

0 commit comments

Comments
 (0)