Skip to content

Commit ca0d407

Browse files
craig[bot]elizaMkrauleasg0451
committed
Merge #150004
150004: changefeedccl: protobuf encoder randomized test for all types r=asg0451 a=elizaMkraule We recently added support for Protobuf encoding in changefeeds. This test ensures that all types supported in CockroachDB are correctly encoded and decoded using Protobuf. fixes [#149797](#149797) Co-authored-by: Eliza Kraule <[email protected]> Co-authored-by: Miles Frankel <[email protected]>
2 parents a4ad94a + df504c6 commit ca0d407

File tree

6 files changed

+539
-51
lines changed

6 files changed

+539
-51
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ go_library(
7171
"//pkg/clusterversion",
7272
"//pkg/docs",
7373
"//pkg/featureflag",
74-
"//pkg/geo",
7574
"//pkg/jobs",
7675
"//pkg/jobs/jobsauth",
7776
"//pkg/jobs/jobspb",
@@ -254,6 +253,8 @@ go_test(
254253
"//pkg/cloud",
255254
"//pkg/cloud/cloudpb",
256255
"//pkg/cloud/impl:cloudimpl",
256+
"//pkg/geo",
257+
"//pkg/geo/geopb",
257258
"//pkg/internal/sqlsmith",
258259
"//pkg/jobs",
259260
"//pkg/jobs/jobspb",
@@ -322,12 +323,15 @@ go_test(
322323
"//pkg/testutils/testcluster",
323324
"//pkg/util",
324325
"//pkg/util/cidr",
326+
"//pkg/util/collatedstring",
325327
"//pkg/util/ctxgroup",
328+
"//pkg/util/duration",
326329
"//pkg/util/encoding",
327330
"//pkg/util/hlc",
328331
"//pkg/util/intsets",
329332
"//pkg/util/ioctx",
330333
"//pkg/util/json",
334+
"//pkg/util/keysutil",
331335
"//pkg/util/leaktest",
332336
"//pkg/util/log",
333337
"//pkg/util/log/eventpb",
@@ -341,6 +345,7 @@ go_test(
341345
"//pkg/util/span",
342346
"//pkg/util/syncutil",
343347
"//pkg/util/timeutil",
348+
"//pkg/util/timeutil/pgdate",
344349
"//pkg/util/tracing",
345350
"//pkg/util/tracing/tracingpb",
346351
"//pkg/util/uuid",
@@ -359,6 +364,7 @@ go_test(
359364
"@com_github_jackc_pgx_v5//:pgx",
360365
"@com_github_klauspost_compress//gzip",
361366
"@com_github_lib_pq//:pq",
367+
"@com_github_lib_pq//oid",
362368
"@com_github_stretchr_testify//assert",
363369
"@com_github_stretchr_testify//require",
364370
"@com_github_twmb_franz_go//pkg/kerr",

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ import (
9191
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
9292
"github.com/cockroachdb/cockroach/pkg/util/hlc"
9393
"github.com/cockroachdb/cockroach/pkg/util/json"
94+
"github.com/cockroachdb/cockroach/pkg/util/keysutil"
9495
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
9596
"github.com/cockroachdb/cockroach/pkg/util/log"
9697
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
@@ -823,24 +824,24 @@ func TestChangefeedQuotedIdentifiersTopicName(t *testing.T) {
823824
sqlDB := sqlutils.MakeSQLRunner(s.DB)
824825

825826
sqlDB.Exec(t, `CREATE TABLE mytable (
826-
id INT PRIMARY KEY,
827+
id INT PRIMARY KEY,
827828
"SomeField" JSONB,
828829
"AnotherField" JSONB
829830
)`)
830831

831832
sqlDB.Exec(t, `INSERT INTO mytable VALUES (
832-
1,
833+
1,
833834
'{"PropA": "value1", "prop_b": "value2"}'::jsonb,
834835
'{"PropC": "value3", "prop_d": "value4"}'::jsonb
835836
)`)
836837

837838
sqlDB.Exec(t, `INSERT INTO mytable VALUES (
838-
2,
839+
2,
839840
'{"PropA": "value5", "prop_b": "value6"}'::jsonb,
840841
'{"PropC": "value7", "prop_d": "value8"}'::jsonb
841842
)`)
842843

843-
foo := feed(t, f, `CREATE CHANGEFEED WITH diff, full_table_name, on_error=pause, envelope=wrapped AS SELECT
844+
foo := feed(t, f, `CREATE CHANGEFEED WITH diff, full_table_name, on_error=pause, envelope=wrapped AS SELECT
844845
id,
845846
"SomeField"->>'PropA' AS "PropA",
846847
"SomeField"->>'prop_b' AS "PropB",
@@ -9392,22 +9393,66 @@ func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) {
93929393
defer leaktest.AfterTest(t)()
93939394
defer log.Scope(t).Close(t)
93949395

9395-
s, cleanup := makeServer(t)
9396-
defer cleanup()
9396+
scanner := keysutil.MakePrettyScanner(nil, nil)
93979397

9398-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
9399-
sqlDB.Exec(t, `CREATE TABLE tbl (a INT, b STRING);`)
9400-
sqlDB.Exec(t, `INSERT INTO tbl VALUES (1, 'one'), (2, 'two'), (3, 'three');`)
9401-
sqlDB.Exec(t, `CREATE CHANGEFEED FOR tbl INTO 'null://';`)
9398+
observeTables := func(sqlDB *sqlutils.SQLRunner, codec keys.SQLCodec) []int {
9399+
rows := sqlDB.Query(t, "SELECT range_start FROM crdb_internal.active_range_feeds")
9400+
defer rows.Close()
9401+
var tableIDs []int
9402+
for rows.Next() {
9403+
var prettyKey string
9404+
require.NoError(t, rows.Scan(&prettyKey))
9405+
key, err := scanner.Scan(prettyKey)
9406+
require.NoError(t, err)
9407+
_, tableID, err := codec.DecodeTablePrefix(key)
9408+
require.NoError(t, err)
9409+
tableIDs = append(tableIDs, int(tableID))
9410+
}
9411+
return tableIDs
9412+
}
9413+
9414+
cases := []struct {
9415+
user string
9416+
shouldSeeTable bool
9417+
}{
9418+
{`feedCreator`, false},
9419+
{`regularUser`, false},
9420+
{`adminUser`, true},
9421+
{`viewClusterMetadataUser`, true},
9422+
}
9423+
9424+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
9425+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
9426+
9427+
// Creates several different tables, users, and roles for us to use.
9428+
ChangefeedJobPermissionsTestSetup(t, s)
94029429

9403-
var tableID int
9404-
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name='tbl'").Scan(&tableID)
9405-
tableKey := s.Codec.TablePrefix(uint32(tableID))
9430+
var tableID int
9431+
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name = 'table_a'").Scan(&tableID)
94069432

9407-
numRangesQuery := fmt.Sprintf(
9408-
"SELECT count(*) FROM crdb_internal.active_range_feeds WHERE range_start LIKE '%s/%%'",
9409-
tableKey)
9410-
sqlDB.CheckQueryResultsRetry(t, numRangesQuery, [][]string{{"1"}})
9433+
var cf cdctest.TestFeed
9434+
asUser(t, f, `feedCreator`, func(userDB *sqlutils.SQLRunner) {
9435+
cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`)
9436+
})
9437+
defer closeFeed(t, cf)
9438+
9439+
for _, c := range cases {
9440+
testutils.SucceedsSoon(t, func() error {
9441+
asUser(t, f, c.user, func(userDB *sqlutils.SQLRunner) {
9442+
tableIDs := observeTables(userDB, s.Codec)
9443+
if c.shouldSeeTable {
9444+
require.Containsf(t, tableIDs, tableID, "user %s should see table %d", c.user, tableID)
9445+
} else {
9446+
require.Emptyf(t, tableIDs, "user %s should not see any tables", c.user)
9447+
}
9448+
})
9449+
return nil
9450+
})
9451+
}
9452+
9453+
}
9454+
9455+
cdcTest(t, testFn, feedTestEnterpriseSinks)
94119456
}
94129457

94139458
func TestChangefeedCaseInsensitiveOpts(t *testing.T) {
@@ -12218,7 +12263,7 @@ func TestChangefeedProtobuf(t *testing.T) {
1221812263
)`)
1221912264
sqlDB.Exec(t, `
1222012265
INSERT INTO pricing VALUES
12221-
(1, 'Chair', 15.75, 2.500, ARRAY['Brown', 'Black']),
12266+
(1, 'Chair', 15.75, 2.500, ARRAY['Brown', 'Black']),
1222212267
(2, 'Table', 20.00, 1.23456789, ARRAY['Brown', 'Black'])`)
1222312268

1222412269
var opts []string

pkg/ccl/changefeedccl/encoder_protobuf.go

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ package changefeedccl
88
import (
99
"context"
1010
"fmt"
11-
"strings"
11+
"time"
1212

1313
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
1414
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb"
16-
"github.com/cockroachdb/cockroach/pkg/geo"
1716
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
17+
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
1818
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1919
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
2020
"github.com/cockroachdb/errors"
@@ -227,7 +227,7 @@ func encodeRowToRecord(row cdcevent.Row) (*changefeedpb.Record, error) {
227227
}
228228
record := &changefeedpb.Record{Values: make(map[string]*changefeedpb.Value, row.NumValueColumns())}
229229
if err := row.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
230-
val, err := datumToProtoValue(d)
230+
val, err := datumToProtoValue(d, sessiondatapb.DataConversionConfig{}, time.UTC)
231231
if err != nil {
232232
return err
233233
}
@@ -244,7 +244,7 @@ func buildKeyMessage(row cdcevent.Row) (*changefeedpb.Key, error) {
244244
keyMap := make(map[string]*changefeedpb.Value, row.NumKeyColumns())
245245

246246
if err := row.ForEachKeyColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
247-
val, err := datumToProtoValue(d)
247+
val, err := datumToProtoValue(d, sessiondatapb.DataConversionConfig{}, time.UTC)
248248
if err != nil {
249249
return err
250250
}
@@ -259,12 +259,16 @@ func buildKeyMessage(row cdcevent.Row) (*changefeedpb.Key, error) {
259259
// datumToProtoValue converts a tree.Datum into a changefeedpb.Value.
260260
// It handles all common CockroachDB datum types and maps them to their
261261
// corresponding protobuf representation.
262-
func datumToProtoValue(d tree.Datum) (*changefeedpb.Value, error) {
262+
func datumToProtoValue(
263+
d tree.Datum, dcc sessiondatapb.DataConversionConfig, loc *time.Location,
264+
) (*changefeedpb.Value, error) {
263265
d = tree.UnwrapDOidWrapper(d)
266+
264267
if d == tree.DNull {
265268
return nil, nil
266269
}
267270
switch v := d.(type) {
271+
268272
case *tree.DBool:
269273
return &changefeedpb.Value{Value: &changefeedpb.Value_BoolValue{BoolValue: bool(*v)}}, nil
270274
case *tree.DInt:
@@ -285,7 +289,7 @@ func datumToProtoValue(d tree.Datum) (*changefeedpb.Value, error) {
285289
case *tree.DArray:
286290
elems := make([]*changefeedpb.Value, 0, v.Len())
287291
for _, elt := range v.Array {
288-
pv, err := datumToProtoValue(elt)
292+
pv, err := datumToProtoValue(elt, sessiondatapb.DataConversionConfig{}, time.UTC)
289293
if err != nil {
290294
return nil, err
291295
}
@@ -297,7 +301,7 @@ func datumToProtoValue(d tree.Datum) (*changefeedpb.Value, error) {
297301
labels := v.ResolvedType().TupleLabels()
298302
records := make(map[string]*changefeedpb.Value, len(v.D))
299303
for i, elem := range v.D {
300-
pv, err := datumToProtoValue(elem)
304+
pv, err := datumToProtoValue(elem, sessiondatapb.DataConversionConfig{}, time.UTC)
301305
if err != nil {
302306
return nil, err
303307
}
@@ -330,36 +334,25 @@ func datumToProtoValue(d tree.Datum) (*changefeedpb.Value, error) {
330334
return &changefeedpb.Value{Value: &changefeedpb.Value_TimestampValue{TimestampValue: ts}}, nil
331335
case *tree.DBytes:
332336
return &changefeedpb.Value{Value: &changefeedpb.Value_BytesValue{BytesValue: []byte(*v)}}, nil
333-
334337
case *tree.DGeography:
335-
geostr, err := geo.SpatialObjectToWKT(v.Geography.SpatialObject(), -1)
336-
if err != nil {
337-
return nil, err
338-
}
339-
return &changefeedpb.Value{Value: &changefeedpb.Value_StringValue{StringValue: string(geostr)}}, nil
338+
ewkb := v.EWKB()
339+
return &changefeedpb.Value{Value: &changefeedpb.Value_BytesValue{BytesValue: ewkb}}, nil
340340
case *tree.DGeometry:
341-
geostr, err := geo.SpatialObjectToWKT(v.Geometry.SpatialObject(), -1)
342-
if err != nil {
343-
return nil, err
344-
}
345-
return &changefeedpb.Value{Value: &changefeedpb.Value_StringValue{StringValue: string(geostr)}}, nil
341+
ewkb := v.EWKB()
342+
return &changefeedpb.Value{Value: &changefeedpb.Value_BytesValue{BytesValue: ewkb}}, nil
346343
case *tree.DVoid:
347344
return nil, nil
345+
case *tree.DOid, *tree.DIPAddr, *tree.DBitArray, *tree.DBox2D,
346+
*tree.DTSVector, *tree.DTSQuery, *tree.DPGLSN, *tree.DPGVector:
347+
return &changefeedpb.Value{Value: &changefeedpb.Value_StringValue{StringValue: tree.AsStringWithFlags(v, tree.FmtBareStrings, tree.FmtDataConversionConfig(dcc), tree.FmtLocation(loc))}}, nil
348348
case *tree.DDate:
349-
date, err := v.ToTime()
350-
if err != nil {
351-
return nil, err
352-
}
353-
return &changefeedpb.Value{Value: &changefeedpb.Value_DateValue{DateValue: date.Format("2006-01-02")}}, nil
349+
return &changefeedpb.Value{Value: &changefeedpb.Value_DateValue{DateValue: tree.AsStringWithFlags(v, tree.FmtBareStrings, tree.FmtDataConversionConfig(dcc), tree.FmtLocation(loc))}}, nil
354350
case *tree.DInterval:
355-
return &changefeedpb.Value{Value: &changefeedpb.Value_IntervalValue{IntervalValue: v.Duration.String()}}, nil
351+
return &changefeedpb.Value{Value: &changefeedpb.Value_IntervalValue{IntervalValue: tree.AsStringWithFlags(v, tree.FmtBareStrings, tree.FmtDataConversionConfig(dcc), tree.FmtLocation(loc))}}, nil
356352
case *tree.DUuid:
357-
return &changefeedpb.Value{Value: &changefeedpb.Value_UuidValue{UuidValue: strings.Trim(v.UUID.String(), "'")}}, nil
353+
return &changefeedpb.Value{Value: &changefeedpb.Value_UuidValue{UuidValue: tree.AsStringWithFlags(v, tree.FmtBareStrings, tree.FmtDataConversionConfig(dcc), tree.FmtLocation(loc))}}, nil
358354
case *tree.DTime, *tree.DTimeTZ:
359-
return &changefeedpb.Value{Value: &changefeedpb.Value_TimeValue{TimeValue: tree.AsStringWithFlags(v, tree.FmtBareStrings)}}, nil
360-
case *tree.DOid, *tree.DIPAddr, *tree.DBitArray, *tree.DBox2D,
361-
*tree.DTSVector, *tree.DTSQuery, *tree.DPGLSN, *tree.DPGVector:
362-
return &changefeedpb.Value{Value: &changefeedpb.Value_StringValue{StringValue: d.String()}}, nil
355+
return &changefeedpb.Value{Value: &changefeedpb.Value_TimeValue{TimeValue: tree.AsStringWithFlags(v, tree.FmtBareStrings, tree.FmtDataConversionConfig(dcc), tree.FmtLocation(loc))}}, nil
363356
default:
364357
return nil, errors.AssertionFailedf("unexpected type %T for datumToProtoValue", d)
365358
}

0 commit comments

Comments
 (0)