Skip to content

Commit 277bdcf

Browse files
committed
logical: support array primary keys in the crud writer
This adds a second point select row reader to the LDR crud writer. This is a work around for #32552. This change also includes a small fix to the batch row reader. It now initializes the array parameter with the canonical type. This is required because the arguments come from the cdc event decoder and the event decoder always returns canonical datums. Release note: none Fixes: #148303
1 parent 436c452 commit 277bdcf

File tree

6 files changed

+361
-19
lines changed

6 files changed

+361
-19
lines changed

pkg/crosscluster/logical/replication_statements.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,129 @@ func newBulkSelectStatement(
475475
return stmt, paramTypes, nil
476476
}
477477

478+
// newPointSelectStatement returns a statement that can be used to query
479+
// a single row by primary key. Unlike newBulkSelectStatement which handles
480+
// multiple rows with arrays, this generates a simple SELECT statement with
481+
// individual parameters for each primary key column.
482+
//
483+
// The statement will have one parameter for each primary key column, where
484+
// each parameter is the value for that column. The columns are expected in
485+
// column ID order.
486+
//
487+
// For example, given a table with primary key columns (id, secondary_id) and
488+
// additional columns (value1, value2), the generated statement would be
489+
// equivalent to:
490+
//
491+
// SELECT
492+
// replication_target.crdb_internal_origin_timestamp,
493+
// replication_target.crdb_internal_mvcc_timestamp,
494+
// replication_target.id, replication_target.secondary_id,
495+
// replication_target.value1, replication_target.value2
496+
// FROM [table_id AS replication_target]
497+
// WHERE replication_target.id = $1 AND replication_target.secondary_id = $2
498+
func newPointSelectStatement(
499+
table catalog.TableDescriptor,
500+
) (statements.Statement[tree.Statement], []*types.T, error) {
501+
cols := getColumnSchema(table)
502+
primaryKeyColumns := make([]columnSchema, 0, len(cols))
503+
for _, col := range cols {
504+
if col.isPrimaryKey {
505+
primaryKeyColumns = append(primaryKeyColumns, col)
506+
}
507+
}
508+
509+
// Create parameter types for primary key values
510+
paramTypes := make([]*types.T, 0, len(primaryKeyColumns))
511+
for _, pkCol := range primaryKeyColumns {
512+
paramTypes = append(paramTypes, pkCol.columnType)
513+
}
514+
515+
// Create the table reference for `replication_target`
516+
targetName, err := tree.NewUnresolvedObjectName(1, [3]string{"replication_target"}, tree.NoAnnotation)
517+
if err != nil {
518+
return statements.Statement[tree.Statement]{}, nil, err
519+
}
520+
521+
// Build the SELECT clause columns: timestamps first, then all table columns
522+
selectColumns := make(tree.SelectExprs, 0, 2+len(cols))
523+
524+
// Add `replication_target.crdb_internal_origin_timestamp`
525+
selectColumns = append(selectColumns, tree.SelectExpr{
526+
Expr: &tree.ColumnItem{
527+
ColumnName: "crdb_internal_origin_timestamp",
528+
TableName: targetName,
529+
},
530+
})
531+
// Add `replication_target.crdb_internal_mvcc_timestamp`
532+
selectColumns = append(selectColumns, tree.SelectExpr{
533+
Expr: &tree.ColumnItem{
534+
ColumnName: "crdb_internal_mvcc_timestamp",
535+
TableName: targetName,
536+
},
537+
})
538+
539+
// Add all table columns: `replication_target.column_name`
540+
for _, col := range cols {
541+
selectColumns = append(selectColumns, tree.SelectExpr{
542+
Expr: &tree.ColumnItem{
543+
ColumnName: tree.Name(col.column.GetName()),
544+
TableName: targetName,
545+
},
546+
})
547+
}
548+
549+
// Build the WHERE clause: `replication_target.pk_col1 = $1 AND replication_target.pk_col2 = $2`
550+
var whereClause tree.Expr
551+
for i, pkCol := range primaryKeyColumns {
552+
placeholder, err := newTypedPlaceholder(i+1, pkCol.column)
553+
if err != nil {
554+
return statements.Statement[tree.Statement]{}, nil, err
555+
}
556+
557+
eqExpr := &tree.ComparisonExpr{
558+
// Use EQ operator to compare primary key columns because primary key
559+
// columns are guaranteed to be non-NULL.
560+
Operator: treecmp.MakeComparisonOperator(treecmp.EQ),
561+
Left: &tree.ColumnItem{
562+
TableName: targetName,
563+
ColumnName: tree.Name(pkCol.column.GetName()),
564+
},
565+
Right: placeholder,
566+
}
567+
568+
if i == 0 {
569+
whereClause = eqExpr
570+
} else {
571+
whereClause = &tree.AndExpr{
572+
Left: whereClause,
573+
Right: eqExpr,
574+
}
575+
}
576+
}
577+
578+
// Construct the complete SELECT statement
579+
selectStmt := &tree.Select{
580+
Select: &tree.SelectClause{
581+
Exprs: selectColumns,
582+
From: tree.From{
583+
Tables: tree.TableExprs{
584+
&tree.TableRef{
585+
TableID: int64(table.GetID()),
586+
As: tree.AliasClause{Alias: "replication_target"},
587+
},
588+
},
589+
},
590+
Where: &tree.Where{Type: tree.AstWhere, Expr: whereClause},
591+
},
592+
}
593+
594+
stmt, err := toParsedStatement(selectStmt)
595+
if err != nil {
596+
return statements.Statement[tree.Statement]{}, nil, err
597+
}
598+
return stmt, paramTypes, nil
599+
}
600+
478601
func toParsedStatement(stmt tree.Statement) (statements.Statement[tree.Statement], error) {
479602
// User Serialize instead of String to ensure the type casts use fully
480603
// qualified names.

pkg/crosscluster/logical/replication_statements_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,18 @@ func TestReplicationStatements(t *testing.T) {
130130

131131
prepareStatement(t, sqlDB, types, stmt)
132132

133+
return stmt.SQL
134+
case "show-point-select":
135+
var tableName string
136+
d.ScanArgs(t, "table", &tableName)
137+
138+
desc := getTableDesc(tableName)
139+
140+
stmt, types, err := newPointSelectStatement(desc)
141+
require.NoError(t, err)
142+
143+
prepareStatement(t, sqlDB, types, stmt)
144+
133145
return stmt.SQL
134146
default:
135147
return "unknown command: " + d.Cmd

pkg/crosscluster/logical/sql_row_reader.go

Lines changed: 146 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@ import (
1111
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1212
"github.com/cockroachdb/cockroach/pkg/sql/isql"
1313
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
14+
"github.com/cockroachdb/cockroach/pkg/sql/types"
1415
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1516
"github.com/cockroachdb/errors"
1617
)
1718

18-
type sqlRowReader struct {
19-
session isql.Session
20-
21-
selectStatement isql.PreparedStatement
22-
23-
// keyColumnIndices is the index of the datums that are part of the primary key.
24-
keyColumnIndices []int
25-
columns []columnSchema
19+
type sqlRowReader interface {
20+
// ReadRows reads the rows from the table using the provided transaction. A row
21+
// will only be present in the result set if it exists. The index of the row in
22+
// the input is the key to the output map.
23+
//
24+
// E.g. result[i] and rows[i] are the same row.
25+
ReadRows(ctx context.Context, rows []tree.Datums) (map[int]priorRow, error)
2626
}
2727

2828
// priorRow is a row returned by the SQL reader. It contains the rows local
@@ -42,7 +42,37 @@ type priorRow struct {
4242

4343
func newSQLRowReader(
4444
ctx context.Context, table catalog.TableDescriptor, session isql.Session,
45-
) (*sqlRowReader, error) {
45+
) (sqlRowReader, error) {
46+
hasArrayPrimaryKey := false
47+
for _, col := range getColumnSchema(table) {
48+
if col.isPrimaryKey && col.columnType.Family() == types.ArrayFamily {
49+
hasArrayPrimaryKey = true
50+
break
51+
}
52+
}
53+
if hasArrayPrimaryKey {
54+
// TODO(#32552): delete point row reader once CockroachDB supports nested
55+
// array types. We can't use the bulk reader because it passes all of the
56+
// primary key values in an array, which results in an array of arrays when
57+
// a primary key column is an array.
58+
return newPointRowReader(ctx, table, session)
59+
}
60+
return newBulkRowReader(ctx, table, session)
61+
}
62+
63+
type bulkRowReader struct {
64+
session isql.Session
65+
66+
selectStatement isql.PreparedStatement
67+
68+
// keyColumnIndices is the index of the datums that are part of the primary key.
69+
keyColumnIndices []int
70+
columns []columnSchema
71+
}
72+
73+
func newBulkRowReader(
74+
ctx context.Context, table catalog.TableDescriptor, session isql.Session,
75+
) (*bulkRowReader, error) {
4676
cols := getColumnSchema(table)
4777
keyColumns := make([]int, 0, len(cols))
4878
for i, col := range cols {
@@ -60,20 +90,17 @@ func newSQLRowReader(
6090
return nil, err
6191
}
6292

63-
return &sqlRowReader{
93+
return &bulkRowReader{
6494
session: session,
6595
selectStatement: selectStatement,
6696
keyColumnIndices: keyColumns,
6797
columns: cols,
6898
}, nil
6999
}
70100

71-
// ReadRows reads the rows from the table using the provided transaction. A row
72-
// will only be present in the result set if it exists. The index of the row in
73-
// the input is the key to the output map.
74-
//
75-
// E.g. result[i] and rows[i] are the same row.
76-
func (r *sqlRowReader) ReadRows(ctx context.Context, rows []tree.Datums) (map[int]priorRow, error) {
101+
func (r *bulkRowReader) ReadRows(
102+
ctx context.Context, rows []tree.Datums,
103+
) (map[int]priorRow, error) {
77104
// TODO(jeffswenson): optimize allocations. It may require a change to the
78105
// API. For now, this probably isn't a performance bottleneck because:
79106
// 1. Many of the allocations are one per batch instead of one per row.
@@ -86,7 +113,7 @@ func (r *sqlRowReader) ReadRows(ctx context.Context, rows []tree.Datums) (map[in
86113

87114
params := make([]tree.Datum, 0, len(r.keyColumnIndices))
88115
for _, index := range r.keyColumnIndices {
89-
array := tree.NewDArray(r.columns[index].column.GetType())
116+
array := tree.NewDArray(r.columns[index].columnType)
90117
for _, row := range rows {
91118
if err := array.Append(row[index]); err != nil {
92119
return nil, err
@@ -145,3 +172,105 @@ func (r *sqlRowReader) ReadRows(ctx context.Context, rows []tree.Datums) (map[in
145172

146173
return result, nil
147174
}
175+
176+
type pointReadRowReader struct {
177+
session isql.Session
178+
179+
selectStatement isql.PreparedStatement
180+
181+
// keyColumnIndices is the index of the datums that are part of the primary key.
182+
keyColumnIndices []int
183+
columns []columnSchema
184+
}
185+
186+
func newPointRowReader(
187+
ctx context.Context, table catalog.TableDescriptor, session isql.Session,
188+
) (*pointReadRowReader, error) {
189+
cols := getColumnSchema(table)
190+
keyColumns := make([]int, 0, len(cols))
191+
for i, col := range cols {
192+
if col.isPrimaryKey {
193+
keyColumns = append(keyColumns, i)
194+
}
195+
}
196+
197+
selectStatementRaw, types, err := newPointSelectStatement(table)
198+
if err != nil {
199+
return nil, err
200+
}
201+
selectStatement, err := session.Prepare(ctx, "replication-read-point", selectStatementRaw, types)
202+
if err != nil {
203+
return nil, err
204+
}
205+
206+
return &pointReadRowReader{
207+
session: session,
208+
selectStatement: selectStatement,
209+
keyColumnIndices: keyColumns,
210+
columns: cols,
211+
}, nil
212+
}
213+
214+
func (p *pointReadRowReader) ReadRows(
215+
ctx context.Context, rows []tree.Datums,
216+
) (map[int]priorRow, error) {
217+
if len(rows) == 0 {
218+
return nil, nil
219+
}
220+
221+
result := make(map[int]priorRow, len(rows))
222+
223+
for i, row := range rows {
224+
params := make([]tree.Datum, 0, len(p.keyColumnIndices))
225+
for _, keyIndex := range p.keyColumnIndices {
226+
params = append(params, row[keyIndex])
227+
}
228+
229+
queryRows, err := p.session.QueryPrepared(ctx, p.selectStatement, params)
230+
if err != nil {
231+
return nil, err
232+
}
233+
234+
if len(queryRows) > 1 {
235+
return nil, errors.AssertionFailedf("expected at most 1 row, got %d", len(queryRows))
236+
}
237+
if len(queryRows) == 0 {
238+
continue
239+
}
240+
241+
resultRow := queryRows[0]
242+
// The columns are:
243+
// 0. The origin timestamp.
244+
// 1. The mvcc timestamp.
245+
// 2+. The table columns.
246+
const prefixColumns = 2
247+
if len(resultRow) != len(p.columns)+prefixColumns {
248+
return nil, errors.AssertionFailedf("expected %d columns, got %d", len(p.columns)+prefixColumns, len(resultRow))
249+
}
250+
251+
isLocal := false
252+
timestamp := resultRow[0]
253+
if timestamp == tree.DNull {
254+
timestamp = resultRow[1]
255+
isLocal = true
256+
}
257+
258+
decimal, ok := timestamp.(*tree.DDecimal)
259+
if !ok {
260+
return nil, errors.AssertionFailedf("expected column 0 or 1 to be origin timestamp")
261+
}
262+
263+
logicalTimestamp, err := hlc.DecimalToHLC(&decimal.Decimal)
264+
if err != nil {
265+
return nil, err
266+
}
267+
268+
result[i] = priorRow{
269+
row: resultRow[prefixColumns:],
270+
logicalTimestamp: logicalTimestamp,
271+
isLocal: isLocal,
272+
}
273+
}
274+
275+
return result, nil
276+
}

0 commit comments

Comments
 (0)