Skip to content

Commit 3af03be

Browse files
elizaMkrauleasg0451
andcommitted
changefeedccl: add protobuf encoder support for Kafka with envelope=bare
This adds initial support for the format=protobuf option in changefeeds, targeting the Kafka sink with the envelope=bare config as well as the key_in_value, mvcc_timestamp, updated and topic_in_value flags. At this stage, wrapped envelope and resolved support are partially implemented but not the focus of this PR. The encoder serializes key value pairs into Protobuf messages using protobufEncoder that conforms to the existing Encoder interface. The schema is defined under changefeedpb. Part of: #148377 – Support protobuf in Kafka with envelope=bare Update pkg/ccl/changefeedccl/encoder_protobuf_test.go Co-authored-by: Miles Frankel <[email protected]>
1 parent 8f667c4 commit 3af03be

File tree

11 files changed

+649
-7
lines changed

11 files changed

+649
-7
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"encoder_avro.go",
1717
"encoder_csv.go",
1818
"encoder_json.go",
19+
"encoder_protobuf.go",
1920
"enriched_source_provider.go",
2021
"event_processing.go",
2122
"fetch_table_bytes.go",
@@ -70,6 +71,7 @@ go_library(
7071
"//pkg/clusterversion",
7172
"//pkg/docs",
7273
"//pkg/featureflag",
74+
"//pkg/geo",
7375
"//pkg/jobs",
7476
"//pkg/jobs/jobsauth",
7577
"//pkg/jobs/jobspb",
@@ -200,6 +202,7 @@ go_test(
200202
"changefeed_test.go",
201203
"csv_test.go",
202204
"encoder_json_test.go",
205+
"encoder_protobuf_test.go",
203206
"encoder_test.go",
204207
"event_processing_test.go",
205208
"fetch_table_bytes_test.go",

pkg/ccl/changefeedccl/cdcevent/event.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,16 @@ func (r Row) ForEachUDTColumn() Iterator {
111111
return iter{r: r, cols: r.udtCols}
112112
}
113113

114+
// NumKeyColumns returns the number of primary key columns in the row.
115+
func (r Row) NumKeyColumns() int {
116+
return len(r.keyCols)
117+
}
118+
119+
// NumValueColumns returns the number of value columns in the row.
120+
func (r Row) NumValueColumns() int {
121+
return len(r.valueCols)
122+
}
123+
114124
// DatumNamed returns the datum with the specified column name, in the form of an Iterator.
115125
func (r Row) DatumNamed(n string) (Iterator, error) {
116126
idx, ok := r.EventDescriptor.colsByName[n]

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12051,3 +12051,38 @@ func TestDatabaseLevelChangefeed(t *testing.T) {
1205112051
}
1205212052
cdcTest(t, testFn)
1205312053
}
12054+
12055+
func TestChangefeedBareFullProtobuf(t *testing.T) {
12056+
defer leaktest.AfterTest(t)()
12057+
defer log.Scope(t).Close(t)
12058+
12059+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12060+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12061+
12062+
sqlDB.Exec(t, `
12063+
CREATE TABLE pricing (
12064+
id INT PRIMARY KEY,
12065+
name STRING,
12066+
discount FLOAT,
12067+
tax DECIMAL,
12068+
options STRING[]
12069+
)`)
12070+
sqlDB.Exec(t, `
12071+
INSERT INTO pricing VALUES
12072+
(1, 'Chair', 15.75, 2.500, ARRAY['Brown', 'Black']),
12073+
(2, 'Table', 20.00, 1.23456789, ARRAY['Brown', 'Black'])
12074+
`)
12075+
pricingFeed := feed(t, f,
12076+
`CREATE CHANGEFEED FOR pricing WITH envelope='bare', format='protobuf', key_in_value, topic_in_value`)
12077+
defer closeFeed(t, pricingFeed)
12078+
12079+
expected := []string{
12080+
`pricing: {"id":1}->{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"},"__crdb__":{"key":{"id":1},"topic":"pricing"}}`,
12081+
`pricing: {"id":2}->{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"},"__crdb__":{"key":{"id":2},"topic":"pricing"}}`,
12082+
}
12083+
12084+
assertPayloads(t, pricingFeed, expected)
12085+
}
12086+
12087+
cdcTest(t, testFn, feedTestForceSink("kafka"))
12088+
}

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,11 @@ const (
169169
OptEnvelopeBare EnvelopeType = `bare`
170170
OptEnvelopeEnriched EnvelopeType = `enriched`
171171

172-
OptFormatJSON FormatType = `json`
173-
OptFormatAvro FormatType = `avro`
174-
OptFormatCSV FormatType = `csv`
175-
OptFormatParquet FormatType = `parquet`
172+
OptFormatJSON FormatType = `json`
173+
OptFormatAvro FormatType = `avro`
174+
OptFormatCSV FormatType = `csv`
175+
OptFormatParquet FormatType = `parquet`
176+
OptFormatProtobuf FormatType = `protobuf`
176177

177178
OptOnErrorFail OnErrorType = `fail`
178179
OptOnErrorPause OnErrorType = `pause`
@@ -376,7 +377,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
376377
OptCustomKeyColumn: stringOption,
377378
OptEndTime: timestampOption,
378379
OptEnvelope: enum("row", "key_only", "wrapped", "deprecated_row", "bare", "enriched"),
379-
OptFormat: enum("json", "avro", "csv", "experimental_avro", "parquet"),
380+
OptFormat: enum("json", "avro", "csv", "experimental_avro", "parquet", "protobuf"),
380381
OptFullTableName: flagOption,
381382
OptKeyInValue: flagOption,
382383
OptTopicInValue: flagOption,
@@ -948,7 +949,7 @@ func (e EncodingOptions) Validate() error {
948949
}
949950

950951
// TODO(#140110): refactor this logic.
951-
if (e.Envelope != OptEnvelopeWrapped && e.Envelope != OptEnvelopeEnriched) && e.Format != OptFormatJSON && e.Format != OptFormatParquet {
952+
if (e.Envelope != OptEnvelopeWrapped && e.Envelope != OptEnvelopeEnriched) && e.Format != OptFormatProtobuf && e.Format != OptFormatJSON && e.Format != OptFormatParquet {
952953
requiresWrap := []struct {
953954
k string
954955
b bool

pkg/ccl/changefeedccl/changefeedpb/BUILD.bazel

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,13 @@ load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
44

55
proto_library(
66
name = "changefeedpb_proto",
7-
srcs = ["scheduled_changefeed.proto"],
7+
srcs = [
8+
"changefeed.proto",
9+
"scheduled_changefeed.proto",
10+
],
811
strip_import_prefix = "/pkg",
912
visibility = ["//visibility:public"],
13+
deps = ["@com_google_protobuf//:timestamp_proto"],
1014
)
1115

1216
go_proto_library(
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
// TODO(#149703): Move this .proto file to the new changefeedpb repo,
7+
// import it into CockroachDB, and update usages accordingly.
8+
9+
syntax = "proto3";
10+
package cockroach.ccl.changefeedccl;
11+
option go_package = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb";
12+
13+
import "google/protobuf/timestamp.proto";
14+
15+
// MessageBatch is a batch of messages for use in webhook sinks.
16+
message MessageBatch {
17+
repeated Message payload = 1;
18+
}
19+
20+
// Message is an enum of the different envelope types. This is what will be emitted to the client.
21+
message Message {
22+
oneof data {
23+
// wrapped is a Message in WrappedEnvelope format.
24+
WrappedEnvelope wrapped = 1;
25+
26+
// bare is a Message in BareEnvelope format.
27+
BareEnvelope bare = 2;
28+
29+
// enriched is a Message in EnrichedEnvelope format.
30+
EnrichedEnvelope enriched = 3;
31+
32+
// resolved is a Message in Resolved format for Resolved Timestamps.
33+
Resolved resolved = 4;
34+
35+
// bareResolved wraps a resolved envelope inside a BareResolved format for Resolved Timestamps in Bare envelopes.
36+
BareResolved bareResolved = 5;
37+
}
38+
}
39+
40+
// WrappedEnvelope includes both the changed data and contextual metadata about the change.
41+
message WrappedEnvelope {
42+
Record after = 1;
43+
Record before = 2;
44+
string updated = 4;
45+
string mvcc_timestamp = 5;
46+
Key key = 6;
47+
string topic = 7;
48+
}
49+
50+
// BareEnvelope contains change data as a flat map along with
51+
// CockroachDB-specific metadata under the '__crdb__' field.
52+
message BareEnvelope {
53+
map<string, Value> values = 1;
54+
Metadata __crdb__ = 2;
55+
}
56+
57+
// BareResolved is a minimal envelope that wraps a resolved timestamp in a '__crdb__' field.
58+
message BareResolved {
59+
Resolved __crdb__ = 1;
60+
}
61+
62+
// EnrichedEnvelope includes detailed context about the change event and source.
63+
message EnrichedEnvelope {
64+
Record after = 1;
65+
Record before = 2;
66+
Op op = 3;
67+
int64 ts_ns = 4;
68+
EnrichedSource source = 5;
69+
}
70+
71+
// Resolved carries resolved timestamp information for a changefeed span.
72+
message Resolved {
73+
string resolved = 1;
74+
}
75+
76+
// EnrichedSource records information about the origin and context
77+
// of a change event, for operational traceability.
78+
message EnrichedSource {
79+
string job_id = 1;
80+
string changefeed_sink = 2;
81+
string db_version = 3;
82+
string cluster_name = 4;
83+
string cluster_id = 5;
84+
string source_node_locality = 6;
85+
string node_name = 7;
86+
string node_id = 8;
87+
string mvcc_timestamp = 9;
88+
int64 ts_ns = 10;
89+
string ts_hlc = 11;
90+
string origin = 12;
91+
string database_name = 13;
92+
string schema_name = 14;
93+
string table_name = 15;
94+
repeated string primary_keys = 16;
95+
}
96+
97+
// Op enumerates the types of operations represented in a change event.
98+
enum Op {
99+
OP_UNSPECIFIED = 0;
100+
OP_CREATE = 1;
101+
OP_UPDATE = 2;
102+
OP_DELETE = 3;
103+
}
104+
105+
// Metadata contains CockroachDB-specific metadata about a change event.
106+
// This message is also referred to as '__crdb__'.
107+
message Metadata {
108+
string updated = 1;
109+
string mvcc_timestamp = 2;
110+
Key key = 3;
111+
string topic = 4;
112+
}
113+
114+
// Value represents a value of arbitrary type carried in a change event.
115+
message Value {
116+
oneof value {
117+
string string_value = 1;
118+
bytes bytes_value = 2;
119+
int32 int32_value = 3;
120+
int64 int64_value = 4;
121+
float float_value = 5;
122+
double double_value = 6;
123+
bool bool_value = 7;
124+
google.protobuf.Timestamp timestamp_value = 8;
125+
Array array_value = 9;
126+
Record tuple_value = 10;
127+
Decimal decimal_value = 11;
128+
string date_value = 12;
129+
string interval_value = 13;
130+
string time_value = 14;
131+
string uuid_value = 15;
132+
}
133+
}
134+
135+
// Key contains the primary key values for a row.
136+
message Key {
137+
map<string, Value> key = 1;
138+
}
139+
140+
// Array represents an ordered list of values.
141+
message Array {
142+
repeated Value values = 1;
143+
}
144+
145+
// Decimal contains a fixed-point decimal value represented as a string.
146+
message Decimal {
147+
string value = 1;
148+
}
149+
150+
// Record represents a flat mapping of column names to values for a row.
151+
message Record {
152+
map<string, Value> values = 1;
153+
}

pkg/ccl/changefeedccl/changefeedpb/marshal.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,58 @@ func (m ScheduledChangefeedExecutionArgs) MarshalJSONPB(x *jsonpb.Marshaler) ([]
4040
m.ChangefeedStatement = tree.AsStringWithFlags(export, tree.FmtShowFullURIs)
4141
return json.Marshal(m)
4242
}
43+
44+
// MarshalJSON provides clean, unwrapped JSON output for changefeedpb.Value.
45+
func (v *Value) MarshalJSON() ([]byte, error) {
46+
if v == nil || v.Value == nil {
47+
return []byte("null"), nil
48+
}
49+
50+
var out any
51+
switch val := v.Value.(type) {
52+
case *Value_BoolValue:
53+
out = val.BoolValue
54+
case *Value_Int64Value:
55+
out = val.Int64Value
56+
case *Value_DoubleValue:
57+
out = val.DoubleValue
58+
case *Value_StringValue:
59+
out = val.StringValue
60+
case *Value_DecimalValue:
61+
out = val.DecimalValue.Value
62+
case *Value_TimestampValue:
63+
out = val.TimestampValue
64+
case *Value_DateValue:
65+
out = val.DateValue
66+
case *Value_IntervalValue:
67+
out = val.IntervalValue
68+
case *Value_UuidValue:
69+
out = val.UuidValue
70+
case *Value_BytesValue:
71+
out = string(val.BytesValue)
72+
case *Value_TimeValue:
73+
out = val.TimeValue
74+
case *Value_ArrayValue:
75+
arr := make([]any, len(val.ArrayValue.Values))
76+
for i, elem := range val.ArrayValue.Values {
77+
arr[i] = elem
78+
}
79+
out = arr
80+
case *Value_TupleValue:
81+
m := make(map[string]any, len(val.TupleValue.Values))
82+
for k, v := range val.TupleValue.Values {
83+
m[k] = v
84+
}
85+
out = m
86+
default:
87+
return nil, errors.AssertionFailedf("unexpected protobuf value type: %T", v.Value)
88+
}
89+
return json.Marshal(out)
90+
}
91+
92+
func (k *Key) MarshalJSON() ([]byte, error) {
93+
if k == nil || k.Key == nil {
94+
return []byte("null"), nil
95+
}
96+
return json.Marshal(k.Key)
97+
}

pkg/ccl/changefeedccl/encoder.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ func getEncoder(
6262
//of both encoder and sink. See parquet_sink_cloudstorage.go file for more
6363
//information on why this was needed.
6464
return nil, nil
65+
case changefeedbase.OptFormatProtobuf:
66+
return newProtobufEncoder(ctx, protobufEncoderOptions{EncodingOptions: opts}, targets), nil
6567
default:
6668
return nil, errors.AssertionFailedf(`unknown format: %s`, opts.Format)
6769
}

0 commit comments

Comments
 (0)