Skip to content

Commit f4a0482

Browse files
committed
changefeedccl: protobuf encoder randomized test for all types
We recently added support for the Protobuf encoding format in changefeeds. This test ensures that all types supported in CockroachDB are correctly encoded and decoded using Protobuf. Release note: None Fixes #149797
1 parent 658cecb commit f4a0482

File tree

3 files changed

+462
-30
lines changed

3 files changed

+462
-30
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ go_library(
7171
"//pkg/clusterversion",
7272
"//pkg/docs",
7373
"//pkg/featureflag",
74-
"//pkg/geo",
7574
"//pkg/jobs",
7675
"//pkg/jobs/jobsauth",
7776
"//pkg/jobs/jobspb",
@@ -254,6 +253,8 @@ go_test(
254253
"//pkg/cloud",
255254
"//pkg/cloud/cloudpb",
256255
"//pkg/cloud/impl:cloudimpl",
256+
"//pkg/geo",
257+
"//pkg/geo/geopb",
257258
"//pkg/internal/sqlsmith",
258259
"//pkg/jobs",
259260
"//pkg/jobs/jobspb",
@@ -322,7 +323,9 @@ go_test(
322323
"//pkg/testutils/testcluster",
323324
"//pkg/util",
324325
"//pkg/util/cidr",
326+
"//pkg/util/collatedstring",
325327
"//pkg/util/ctxgroup",
328+
"//pkg/util/duration",
326329
"//pkg/util/encoding",
327330
"//pkg/util/hlc",
328331
"//pkg/util/intsets",
@@ -341,6 +344,7 @@ go_test(
341344
"//pkg/util/span",
342345
"//pkg/util/syncutil",
343346
"//pkg/util/timeutil",
347+
"//pkg/util/timeutil/pgdate",
344348
"//pkg/util/tracing",
345349
"//pkg/util/tracing/tracingpb",
346350
"//pkg/util/uuid",
@@ -359,6 +363,7 @@ go_test(
359363
"@com_github_jackc_pgx_v5//:pgx",
360364
"@com_github_klauspost_compress//gzip",
361365
"@com_github_lib_pq//:pq",
366+
"@com_github_lib_pq//oid",
362367
"@com_github_stretchr_testify//assert",
363368
"@com_github_stretchr_testify//require",
364369
"@com_github_twmb_franz_go//pkg/kerr",

pkg/ccl/changefeedccl/encoder_protobuf.go

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ package changefeedccl
88
import (
99
"context"
1010
"fmt"
11-
"strings"
11+
"time"
1212

1313
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
1414
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb"
16-
"github.com/cockroachdb/cockroach/pkg/geo"
1716
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
17+
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
1818
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1919
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
2020
"github.com/cockroachdb/errors"
@@ -227,7 +227,7 @@ func encodeRowToRecord(row cdcevent.Row) (*changefeedpb.Record, error) {
227227
}
228228
record := &changefeedpb.Record{Values: make(map[string]*changefeedpb.Value, row.NumValueColumns())}
229229
if err := row.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
230-
val, err := datumToProtoValue(d)
230+
val, err := datumToProtoValue(d, sessiondatapb.DataConversionConfig{}, time.UTC)
231231
if err != nil {
232232
return err
233233
}
@@ -244,7 +244,7 @@ func buildKeyMessage(row cdcevent.Row) (*changefeedpb.Key, error) {
244244
keyMap := make(map[string]*changefeedpb.Value, row.NumKeyColumns())
245245

246246
if err := row.ForEachKeyColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
247-
val, err := datumToProtoValue(d)
247+
val, err := datumToProtoValue(d, sessiondatapb.DataConversionConfig{}, time.UTC)
248248
if err != nil {
249249
return err
250250
}
@@ -259,12 +259,16 @@ func buildKeyMessage(row cdcevent.Row) (*changefeedpb.Key, error) {
259259
// datumToProtoValue converts a tree.Datum into a changefeedpb.Value.
260260
// It handles all common CockroachDB datum types and maps them to their
261261
// corresponding protobuf representation.
262-
func datumToProtoValue(d tree.Datum) (*changefeedpb.Value, error) {
262+
func datumToProtoValue(
263+
d tree.Datum, dcc sessiondatapb.DataConversionConfig, loc *time.Location,
264+
) (*changefeedpb.Value, error) {
263265
d = tree.UnwrapDOidWrapper(d)
266+
264267
if d == tree.DNull {
265268
return nil, nil
266269
}
267270
switch v := d.(type) {
271+
268272
case *tree.DBool:
269273
return &changefeedpb.Value{Value: &changefeedpb.Value_BoolValue{BoolValue: bool(*v)}}, nil
270274
case *tree.DInt:
@@ -285,7 +289,7 @@ func datumToProtoValue(d tree.Datum) (*changefeedpb.Value, error) {
285289
case *tree.DArray:
286290
elems := make([]*changefeedpb.Value, 0, v.Len())
287291
for _, elt := range v.Array {
288-
pv, err := datumToProtoValue(elt)
292+
pv, err := datumToProtoValue(elt, sessiondatapb.DataConversionConfig{}, time.UTC)
289293
if err != nil {
290294
return nil, err
291295
}
@@ -297,7 +301,7 @@ func datumToProtoValue(d tree.Datum) (*changefeedpb.Value, error) {
297301
labels := v.ResolvedType().TupleLabels()
298302
records := make(map[string]*changefeedpb.Value, len(v.D))
299303
for i, elem := range v.D {
300-
pv, err := datumToProtoValue(elem)
304+
pv, err := datumToProtoValue(elem, sessiondatapb.DataConversionConfig{}, time.UTC)
301305
if err != nil {
302306
return nil, err
303307
}
@@ -330,36 +334,25 @@ func datumToProtoValue(d tree.Datum) (*changefeedpb.Value, error) {
330334
return &changefeedpb.Value{Value: &changefeedpb.Value_TimestampValue{TimestampValue: ts}}, nil
331335
case *tree.DBytes:
332336
return &changefeedpb.Value{Value: &changefeedpb.Value_BytesValue{BytesValue: []byte(*v)}}, nil
333-
334337
case *tree.DGeography:
335-
geostr, err := geo.SpatialObjectToWKT(v.Geography.SpatialObject(), -1)
336-
if err != nil {
337-
return nil, err
338-
}
339-
return &changefeedpb.Value{Value: &changefeedpb.Value_StringValue{StringValue: string(geostr)}}, nil
338+
ewkb := v.EWKB()
339+
return &changefeedpb.Value{Value: &changefeedpb.Value_BytesValue{BytesValue: ewkb}}, nil
340340
case *tree.DGeometry:
341-
geostr, err := geo.SpatialObjectToWKT(v.Geometry.SpatialObject(), -1)
342-
if err != nil {
343-
return nil, err
344-
}
345-
return &changefeedpb.Value{Value: &changefeedpb.Value_StringValue{StringValue: string(geostr)}}, nil
341+
ewkb := v.EWKB()
342+
return &changefeedpb.Value{Value: &changefeedpb.Value_BytesValue{BytesValue: ewkb}}, nil
346343
case *tree.DVoid:
347344
return nil, nil
345+
case *tree.DOid, *tree.DIPAddr, *tree.DBitArray, *tree.DBox2D,
346+
*tree.DTSVector, *tree.DTSQuery, *tree.DPGLSN, *tree.DPGVector:
347+
return &changefeedpb.Value{Value: &changefeedpb.Value_StringValue{StringValue: tree.AsStringWithFlags(v, tree.FmtBareStrings, tree.FmtDataConversionConfig(dcc), tree.FmtLocation(loc))}}, nil
348348
case *tree.DDate:
349-
date, err := v.ToTime()
350-
if err != nil {
351-
return nil, err
352-
}
353-
return &changefeedpb.Value{Value: &changefeedpb.Value_DateValue{DateValue: date.Format("2006-01-02")}}, nil
349+
return &changefeedpb.Value{Value: &changefeedpb.Value_DateValue{DateValue: tree.AsStringWithFlags(v, tree.FmtBareStrings, tree.FmtDataConversionConfig(dcc), tree.FmtLocation(loc))}}, nil
354350
case *tree.DInterval:
355-
return &changefeedpb.Value{Value: &changefeedpb.Value_IntervalValue{IntervalValue: v.Duration.String()}}, nil
351+
return &changefeedpb.Value{Value: &changefeedpb.Value_IntervalValue{IntervalValue: tree.AsStringWithFlags(v, tree.FmtBareStrings, tree.FmtDataConversionConfig(dcc), tree.FmtLocation(loc))}}, nil
356352
case *tree.DUuid:
357-
return &changefeedpb.Value{Value: &changefeedpb.Value_UuidValue{UuidValue: strings.Trim(v.UUID.String(), "'")}}, nil
353+
return &changefeedpb.Value{Value: &changefeedpb.Value_UuidValue{UuidValue: tree.AsStringWithFlags(v, tree.FmtBareStrings, tree.FmtDataConversionConfig(dcc), tree.FmtLocation(loc))}}, nil
358354
case *tree.DTime, *tree.DTimeTZ:
359-
return &changefeedpb.Value{Value: &changefeedpb.Value_TimeValue{TimeValue: tree.AsStringWithFlags(v, tree.FmtBareStrings)}}, nil
360-
case *tree.DOid, *tree.DIPAddr, *tree.DBitArray, *tree.DBox2D,
361-
*tree.DTSVector, *tree.DTSQuery, *tree.DPGLSN, *tree.DPGVector:
362-
return &changefeedpb.Value{Value: &changefeedpb.Value_StringValue{StringValue: d.String()}}, nil
355+
return &changefeedpb.Value{Value: &changefeedpb.Value_TimeValue{TimeValue: tree.AsStringWithFlags(v, tree.FmtBareStrings, tree.FmtDataConversionConfig(dcc), tree.FmtLocation(loc))}}, nil
363356
default:
364357
return nil, errors.AssertionFailedf("unexpected type %T for datumToProtoValue", d)
365358
}

0 commit comments

Comments
 (0)