Skip to content

Commit e9db41f

Browse files
craig[bot]jeffswensonnormanchenncockroach-teamcity
committed
143439: logical: add support for using new crud statements r=jeffswenson a=jeffswenson This change adds a functions that call the new LDR INSERT/UPDATE/DELETE statements based on a decoded row from a replication stream. Release note: none Epic: CRDB-48647 143675: jsonpath: add support for `starts with` r=normanchenn a=normanchenn #### jsonpath: reuse `evalComparison` logic for all predicate expressions This commit refactors `evalComparison` to be used for `like_regex` logic in addition to comparison operators. This allows for `like_regex` to have existence semantics when being evaluated on arrays of strings, rather than throwing an error. Epic: None Release note: None #### jsonpath: add support for `starts with` This commit adds support for using `starts with` within jsonpath queries. This returns a boolean - whether or not the string or variable following the `starts with` directive is the prefix of the current json string. Epic: None Release note (sql change): Add support for `starts with ""` in JSONPath queries. For example, `SELECT jsonb_path_query('"abcdef"', '$ starts with "abc"');`. 143996: release: released CockroachDB version 25.2.0-alpha.3. Next version: 25.2.0-alpha.4 r=celiala a=cockroach-teamcity Release note: None Epic: None Release justification: non-production (release infra) change. Co-authored-by: Jeff Swenson <[email protected]> Co-authored-by: Norman Chen <[email protected]> Co-authored-by: Justin Beaver <[email protected]>
4 parents 2733799 + 2983e7b + afdd093 + 31a56ee commit e9db41f

File tree

11 files changed

+447
-44
lines changed

11 files changed

+447
-44
lines changed

pkg/build/version.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v25.2.0-alpha.3
1+
v25.2.0-alpha.4

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"purgatory.go",
1717
"range_stats.go",
1818
"replication_statements.go",
19+
"sql_row_writer.go",
1920
"tombstone_updater.go",
2021
"udf_row_processor.go",
2122
],
@@ -121,6 +122,7 @@ go_test(
121122
"purgatory_test.go",
122123
"range_stats_test.go",
123124
"replication_statements_test.go",
125+
"sql_row_writer_test.go",
124126
"udf_row_processor_test.go",
125127
],
126128
data = glob(["testdata/**"]) + [

pkg/crosscluster/logical/replication_statements.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"fmt"
1010

1111
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
12+
"github.com/cockroachdb/cockroach/pkg/sql/parser"
13+
"github.com/cockroachdb/cockroach/pkg/sql/parser/statements"
1214
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
1315
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treecmp"
1416
)
@@ -44,7 +46,9 @@ func newTypedPlaceholder(idx int, col catalog.Column) (*tree.CastExpr, error) {
4446
//
4547
// The statement will have `n` parameters, where `n` is the number of columns
4648
// in the table. Parameters are ordered by column ID.
47-
func newInsertStatement(table catalog.TableDescriptor) (tree.Statement, error) {
49+
func newInsertStatement(
50+
table catalog.TableDescriptor,
51+
) (statements.Statement[tree.Statement], error) {
4852
columns := getPhysicalColumns(table)
4953

5054
columnNames := make(tree.NameList, len(columns))
@@ -57,7 +61,7 @@ func newInsertStatement(table catalog.TableDescriptor) (tree.Statement, error) {
5761
var err error
5862
parameters[i], err = newTypedPlaceholder(i+1, col)
5963
if err != nil {
60-
return nil, err
64+
return statements.Statement[tree.Statement]{}, err
6165
}
6266
}
6367

@@ -81,7 +85,7 @@ func newInsertStatement(table catalog.TableDescriptor) (tree.Statement, error) {
8185
Returning: tree.AbsentReturningClause,
8286
}
8387

84-
return insert, nil
88+
return toParsedStatement(insert)
8589
}
8690

8791
// newMatchesLastRow creates a WHERE clause for matching all columns of a row.
@@ -118,13 +122,15 @@ func newMatchesLastRow(columns []catalog.Column, startParamIdx int) (tree.Expr,
118122
// and the last `n` parameters are the new values of the row.
119123
//
120124
// Parameters are ordered by column ID.
121-
func newUpdateStatement(table catalog.TableDescriptor) (tree.Statement, error) {
125+
func newUpdateStatement(
126+
table catalog.TableDescriptor,
127+
) (statements.Statement[tree.Statement], error) {
122128
columns := getPhysicalColumns(table)
123129

124130
// Create WHERE clause for matching the previous row values
125131
whereClause, err := newMatchesLastRow(columns, 1)
126132
if err != nil {
127-
return nil, err
133+
return statements.Statement[tree.Statement]{}, err
128134
}
129135

130136
exprs := make(tree.UpdateExprs, len(columns))
@@ -137,7 +143,7 @@ func newUpdateStatement(table catalog.TableDescriptor) (tree.Statement, error) {
137143
// are for the where clause.
138144
placeholder, err := newTypedPlaceholder(len(columns)+i+1, col)
139145
if err != nil {
140-
return nil, err
146+
return statements.Statement[tree.Statement]{}, err
141147
}
142148

143149
exprs[i] = &tree.UpdateExpr{
@@ -157,7 +163,7 @@ func newUpdateStatement(table catalog.TableDescriptor) (tree.Statement, error) {
157163
Returning: tree.AbsentReturningClause,
158164
}
159165

160-
return update, nil
166+
return toParsedStatement(update)
161167
}
162168

163169
// newDeleteStatement returns a statement that can be used to delete a row from
@@ -166,13 +172,15 @@ func newUpdateStatement(table catalog.TableDescriptor) (tree.Statement, error) {
166172
// identify the row to delete.
167173
//
168174
// Parameters are ordered by column ID.
169-
func newDeleteStatement(table catalog.TableDescriptor) (tree.Statement, error) {
175+
func newDeleteStatement(
176+
table catalog.TableDescriptor,
177+
) (statements.Statement[tree.Statement], error) {
170178
columns := getPhysicalColumns(table)
171179

172180
// Create WHERE clause for matching the row to delete
173181
whereClause, err := newMatchesLastRow(columns, 1)
174182
if err != nil {
175-
return nil, err
183+
return statements.Statement[tree.Statement]{}, err
176184
}
177185

178186
// Create the final delete statement
@@ -185,5 +193,11 @@ func newDeleteStatement(table catalog.TableDescriptor) (tree.Statement, error) {
185193
Returning: tree.AbsentReturningClause,
186194
}
187195

188-
return delete, nil
196+
return toParsedStatement(delete)
197+
}
198+
199+
func toParsedStatement(stmt tree.Statement) (statements.Statement[tree.Statement], error) {
200+
// TODO(jeffswenson): do I have to round trip through the string or can I
201+
// safely construct the statement directly?
202+
return parser.ParseOne(stmt.String())
189203
}

pkg/crosscluster/logical/replication_statements_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ func TestReplicationStatements(t *testing.T) {
6666
require.NoError(t, err)
6767

6868
// Test preparing the statement to ensure it is valid SQL.
69-
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), insertStmt.String()))
69+
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), insertStmt.SQL))
7070
require.NoError(t, err)
7171

72-
return insertStmt.String()
72+
return insertStmt.SQL
7373
case "show-update":
7474
var tableName string
7575
d.ScanArgs(t, "table", &tableName)
@@ -80,10 +80,10 @@ func TestReplicationStatements(t *testing.T) {
8080
require.NoError(t, err)
8181

8282
// Test preparing the statement to ensure it is valid SQL.
83-
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), updateStmt.String()))
83+
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), updateStmt.SQL))
8484
require.NoError(t, err)
8585

86-
return updateStmt.String()
86+
return updateStmt.SQL
8787
case "show-delete":
8888
var tableName string
8989
d.ScanArgs(t, "table", &tableName)
@@ -94,10 +94,10 @@ func TestReplicationStatements(t *testing.T) {
9494
require.NoError(t, err)
9595

9696
// Test preparing the statement to ensure it is valid SQL.
97-
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), deleteStmt.String()))
97+
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), deleteStmt.SQL))
9898
require.NoError(t, err)
9999

100-
return deleteStmt.String()
100+
return deleteStmt.SQL
101101
default:
102102
return "unknown command: " + d.Cmd
103103
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
12+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
13+
"github.com/cockroachdb/cockroach/pkg/sql/parser/statements"
14+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
16+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
17+
"github.com/cockroachdb/errors"
18+
)
19+
20+
// errStalePreviousValue is returned if the row supplied to UpdateRow,
21+
// UpdateTombstone, or DeleteRow does not match the value in the local
22+
// database.
23+
var errStalePreviousValue = errors.New("stale previous value")
24+
25+
// sqlRowWriter is configured to write rows to a specific table and descriptor
26+
// version.
27+
type sqlRowWriter struct {
28+
insert statements.Statement[tree.Statement]
29+
update statements.Statement[tree.Statement]
30+
delete statements.Statement[tree.Statement]
31+
32+
scratchDatums []any
33+
columns []string
34+
}
35+
36+
func (s *sqlRowWriter) getExecutorOverride(
37+
originTimestamp hlc.Timestamp,
38+
) sessiondata.InternalExecutorOverride {
39+
session := ieOverrideBase
40+
session.OriginTimestampForLogicalDataReplication = originTimestamp
41+
return session
42+
}
43+
44+
// DeleteRow deletes a row from the table. It returns errStalePreviousValue
45+
// if the oldRow argument does not match the value in the local database.
46+
func (s *sqlRowWriter) DeleteRow(
47+
ctx context.Context, txn isql.Txn, originTimestamp hlc.Timestamp, oldRow tree.Datums,
48+
) error {
49+
s.scratchDatums = s.scratchDatums[:0]
50+
51+
for _, d := range oldRow {
52+
s.scratchDatums = append(s.scratchDatums, d)
53+
}
54+
55+
rowsAffected, err := txn.ExecParsed(ctx, "replicated-delete", txn.KV(),
56+
s.getExecutorOverride(originTimestamp),
57+
s.delete,
58+
s.scratchDatums...,
59+
)
60+
if err != nil {
61+
return err
62+
}
63+
if rowsAffected != 1 {
64+
return errStalePreviousValue
65+
}
66+
return nil
67+
}
68+
69+
// InsertRow inserts a row into the table. It will return an error if the row
70+
// already exists.
71+
func (s *sqlRowWriter) InsertRow(
72+
ctx context.Context, txn isql.Txn, originTimestamp hlc.Timestamp, row tree.Datums,
73+
) error {
74+
s.scratchDatums = s.scratchDatums[:0]
75+
for _, d := range row {
76+
s.scratchDatums = append(s.scratchDatums, d)
77+
}
78+
rowsImpacted, err := txn.ExecParsed(ctx, "replicated-insert", txn.KV(),
79+
s.getExecutorOverride(originTimestamp),
80+
s.insert,
81+
s.scratchDatums...,
82+
)
83+
if err != nil {
84+
return err
85+
}
86+
if rowsImpacted != 1 {
87+
return errors.AssertionFailedf("expected 1 row impacted, got %d", rowsImpacted)
88+
}
89+
return nil
90+
}
91+
92+
// UpdateRow updates a row in the table. It returns errStalePreviousValue
93+
// if the oldRow argument does not match the value in the local database.
94+
func (s *sqlRowWriter) UpdateRow(
95+
ctx context.Context,
96+
txn isql.Txn,
97+
originTimestamp hlc.Timestamp,
98+
oldRow tree.Datums,
99+
newRow tree.Datums,
100+
) error {
101+
s.scratchDatums = s.scratchDatums[:0]
102+
103+
for _, d := range oldRow {
104+
s.scratchDatums = append(s.scratchDatums, d)
105+
}
106+
for _, d := range newRow {
107+
s.scratchDatums = append(s.scratchDatums, d)
108+
}
109+
110+
rowsAffected, err := txn.ExecParsed(ctx, "replicated-update", txn.KV(),
111+
s.getExecutorOverride(originTimestamp),
112+
s.update,
113+
s.scratchDatums...,
114+
)
115+
if err != nil {
116+
return err
117+
}
118+
if rowsAffected != 1 {
119+
return errStalePreviousValue
120+
}
121+
return err
122+
}
123+
124+
func newSQLRowWriter(table catalog.TableDescriptor) (*sqlRowWriter, error) {
125+
physicalColumns := getPhysicalColumns(table)
126+
columns := make([]string, len(physicalColumns))
127+
for i, col := range physicalColumns {
128+
columns[i] = col.GetName()
129+
}
130+
131+
// TODO(jeffswenson): figure out how to manage prepared statements and
132+
// transactions in an internal executor. The original plan was to prepare
133+
// statements on initialization then reuse them, but the internal executor
134+
// is scoped to a single transaction and I couldn't figure out how to
135+
// maintain prepared statements across different instances of the internal
136+
// executor.
137+
138+
insert, err := newInsertStatement(table)
139+
if err != nil {
140+
return nil, err
141+
}
142+
143+
update, err := newUpdateStatement(table)
144+
if err != nil {
145+
return nil, err
146+
}
147+
148+
delete, err := newDeleteStatement(table)
149+
if err != nil {
150+
return nil, err
151+
}
152+
153+
return &sqlRowWriter{
154+
insert: insert,
155+
update: update,
156+
delete: delete,
157+
columns: columns,
158+
}, nil
159+
}

0 commit comments

Comments
 (0)