Skip to content

Commit 3bceff8

Browse files
committed
changefeedccl: protobuf support envelope=wrapped with kafka
This change adds support for wrapped envelopes for changefeeds in protobuf format. Fixes #148936 Epic CRDB-44007 Release note (general change): Changefeeds using the protobuf format now support wrapped envelopes in kafka sinks
1 parent ac400e7 commit 3bceff8

File tree

3 files changed

+153
-30
lines changed

3 files changed

+153
-30
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 68 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12057,37 +12057,79 @@ func TestDatabaseLevelChangefeed(t *testing.T) {
1205712057
cdcTest(t, testFn)
1205812058
}
1205912059

12060-
func TestChangefeedBareFullProtobuf(t *testing.T) {
12060+
func TestChangefeedProtobuf(t *testing.T) {
1206112061
defer leaktest.AfterTest(t)()
1206212062
defer log.Scope(t).Close(t)
1206312063

12064-
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12065-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12064+
type testCase struct {
12065+
envelope string
12066+
withDiff bool
12067+
expectedRows []string
12068+
}
1206612069

12067-
sqlDB.Exec(t, `
12068-
CREATE TABLE pricing (
12069-
id INT PRIMARY KEY,
12070-
name STRING,
12071-
discount FLOAT,
12072-
tax DECIMAL,
12073-
options STRING[]
12074-
)`)
12075-
sqlDB.Exec(t, `
12076-
INSERT INTO pricing VALUES
12077-
(1, 'Chair', 15.75, 2.500, ARRAY['Brown', 'Black']),
12078-
(2, 'Table', 20.00, 1.23456789, ARRAY['Brown', 'Black'])
12079-
`)
12080-
pricingFeed := feed(t, f,
12081-
`CREATE CHANGEFEED FOR pricing WITH envelope='bare', format='protobuf', key_in_value, topic_in_value`)
12082-
defer closeFeed(t, pricingFeed)
12070+
tests := []testCase{
12071+
{
12072+
envelope: "bare",
12073+
withDiff: false,
12074+
expectedRows: []string{
12075+
`pricing: {"id":1}->{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"},"__crdb__":{"key":{"id":1},"topic":"pricing"}}`,
12076+
`pricing: {"id":2}->{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"},"__crdb__":{"key":{"id":2},"topic":"pricing"}}`,
12077+
`pricing: {"id":2}->{"values":{"discount":25.5,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"},"__crdb__":{"key":{"id":2},"topic":"pricing"}}`,
12078+
`pricing: {"id":1}->{"values":{"discount":10,"id":1,"name":"Armchair","options":["Red"],"tax":"1.000"},"__crdb__":{"key":{"id":1},"topic":"pricing"}}`,
12079+
`pricing: {"id":3}->{"values":{"discount":50,"id":3,"name":"Sofa","options":["Gray"],"tax":"4.250"},"__crdb__":{"key":{"id":3},"topic":"pricing"}}`,
12080+
`pricing: {"id":2}->{"values":{"discount":null,"id":2,"name":null,"options":null,"tax":null},"__crdb__":{"key":{"id":2},"topic":"pricing"}}`,
12081+
},
12082+
},
12083+
{
12084+
envelope: "wrapped",
12085+
withDiff: true,
12086+
expectedRows: []string{
12087+
`pricing: {"id":1}->{"after":{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"}},"before":{},"key":{"id":1},"topic":"pricing"}`,
12088+
`pricing: {"id":2}->{"after":{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"before":{},"key":{"id":2},"topic":"pricing"}`,
12089+
`pricing: {"id":2}->{"after":{"values":{"discount":25.5,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"before":{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"key":{"id":2},"topic":"pricing"}`,
12090+
`pricing: {"id":1}->{"after":{"values":{"discount":10,"id":1,"name":"Armchair","options":["Red"],"tax":"1.000"}},"before":{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"}},"key":{"id":1},"topic":"pricing"}`,
12091+
`pricing: {"id":3}->{"after":{"values":{"discount":50,"id":3,"name":"Sofa","options":["Gray"],"tax":"4.250"}},"before":{},"key":{"id":3},"topic":"pricing"}`,
12092+
`pricing: {"id":2}->{"after":{},"before":{"values":{"discount":25.5,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"key":{"id":2},"topic":"pricing"}`,
12093+
},
12094+
},
12095+
}
1208312096

12084-
expected := []string{
12085-
`pricing: {"id":1}->{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"},"__crdb__":{"key":{"id":1},"topic":"pricing"}}`,
12086-
`pricing: {"id":2}->{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"},"__crdb__":{"key":{"id":2},"topic":"pricing"}}`,
12087-
}
12097+
for _, tc := range tests {
12098+
t.Run(fmt.Sprintf("envelope=%s", tc.envelope), func(t *testing.T) {
12099+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12100+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1208812101

12089-
assertPayloads(t, pricingFeed, expected)
12090-
}
12102+
sqlDB.Exec(t, `
12103+
CREATE TABLE pricing (
12104+
id INT PRIMARY KEY,
12105+
name STRING,
12106+
discount FLOAT,
12107+
tax DECIMAL,
12108+
options STRING[]
12109+
)`)
12110+
sqlDB.Exec(t, `
12111+
INSERT INTO pricing VALUES
12112+
(1, 'Chair', 15.75, 2.500, ARRAY['Brown', 'Black']),
12113+
(2, 'Table', 20.00, 1.23456789, ARRAY['Brown', 'Black'])`)
12114+
12115+
var opts []string
12116+
opts = append(opts, fmt.Sprintf("envelope='%s'", tc.envelope))
12117+
opts = append(opts, "format='protobuf'", "key_in_value", "topic_in_value")
12118+
if tc.withDiff {
12119+
opts = append(opts, "diff")
12120+
}
1209112121

12092-
cdcTest(t, testFn, feedTestForceSink("kafka"))
12122+
feed := feed(t, f, fmt.Sprintf("CREATE CHANGEFEED FOR pricing WITH %s", strings.Join(opts, ", ")))
12123+
defer closeFeed(t, feed)
12124+
12125+
sqlDB.Exec(t, `UPDATE pricing SET discount = 25.50 WHERE id = 2`)
12126+
sqlDB.Exec(t, `UPSERT INTO pricing (id, name, discount, tax, options) VALUES (1, 'Armchair', 10.00, 1.000, ARRAY['Red'])`)
12127+
sqlDB.Exec(t, `INSERT INTO pricing VALUES (3, 'Sofa', 50.00, 4.250, ARRAY['Gray'])`)
12128+
sqlDB.Exec(t, `DELETE FROM pricing WHERE id = 2`)
12129+
12130+
assertPayloads(t, feed, tc.expectedRows)
12131+
}
12132+
cdcTest(t, testFn, feedTestForceSink("kafka"))
12133+
})
12134+
}
1209312135
}

pkg/ccl/changefeedccl/encoder_protobuf.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ func (e *protobufEncoder) EncodeValue(
6969
switch e.envelopeType {
7070
case changefeedbase.OptEnvelopeBare:
7171
return e.buildBare(evCtx, updatedRow, prevRow)
72+
case changefeedbase.OptEnvelopeWrapped:
73+
return e.buildWrapped(ctx, evCtx, updatedRow, prevRow)
7274
default:
7375
return nil, errors.AssertionFailedf("envelope format not supported: %s", e.envelopeType)
7476
}
@@ -126,6 +128,71 @@ func (e *protobufEncoder) buildBare(
126128
return protoutil.Marshal(env)
127129
}
128130

131+
// buildWrapped constructs a WrappedEnvelope serializes it.
132+
func (e *protobufEncoder) buildWrapped(
133+
ctx context.Context, evCtx eventContext, updatedRow, prevRow cdcevent.Row,
134+
) ([]byte, error) {
135+
136+
var after *changefeedpb.Record
137+
var err error
138+
if !updatedRow.IsDeleted() {
139+
after, err = encodeRowToRecord(updatedRow)
140+
if err != nil {
141+
return nil, err
142+
}
143+
} else {
144+
after = &changefeedpb.Record{}
145+
}
146+
147+
var before *changefeedpb.Record
148+
if e.beforeField {
149+
if prevRow.IsInitialized() && !prevRow.IsDeleted() {
150+
before, err = encodeRowToRecord(prevRow)
151+
if err != nil {
152+
return nil, err
153+
}
154+
} else {
155+
before = &changefeedpb.Record{}
156+
}
157+
}
158+
var keyMsg *changefeedpb.Key
159+
if e.keyInValue {
160+
keyMsg, err = buildKeyMessage(updatedRow)
161+
if err != nil {
162+
return nil, err
163+
}
164+
}
165+
166+
var topicStr string
167+
if e.topicInValue {
168+
topicStr = evCtx.topic
169+
}
170+
171+
var updatedStr string
172+
if e.updatedField {
173+
updatedStr = evCtx.updated.AsOfSystemTime()
174+
}
175+
176+
var mvccStr string
177+
if e.mvccTimestampField {
178+
mvccStr = evCtx.mvcc.AsOfSystemTime()
179+
}
180+
181+
wrapped := &changefeedpb.WrappedEnvelope{
182+
After: after,
183+
Before: before,
184+
Key: keyMsg,
185+
Topic: topicStr,
186+
Updated: updatedStr,
187+
MvccTimestamp: mvccStr,
188+
}
189+
190+
env := &changefeedpb.Message{
191+
Data: &changefeedpb.Message_Wrapped{Wrapped: wrapped},
192+
}
193+
return protoutil.Marshal(env)
194+
}
195+
129196
// buildMetadata returns metadata to include in the BareEnvelope.
130197
func (e *protobufEncoder) buildMetadata(
131198
evCtx eventContext, row cdcevent.Row,
@@ -155,6 +222,9 @@ func (e *protobufEncoder) buildMetadata(
155222

156223
// encodeRowToRecord converts a Row into a Record proto.
157224
func encodeRowToRecord(row cdcevent.Row) (*changefeedpb.Record, error) {
225+
if !row.HasValues() {
226+
return nil, nil
227+
}
158228
record := &changefeedpb.Record{Values: make(map[string]*changefeedpb.Value, row.NumValueColumns())}
159229
if err := row.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
160230
val, err := datumToProtoValue(d)

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,11 +391,22 @@ func assertPayloadsBaseErr(
391391
if err := protoutil.Unmarshal(m.Value, &msg); err != nil {
392392
return err
393393
}
394-
m.Value, err = gojson.Marshal(msg.GetBare())
395-
if err != nil {
396-
return err
397-
}
398394

395+
switch env := msg.GetData().(type) {
396+
case *changefeedpb.Message_Bare:
397+
m.Value, err = gojson.Marshal(env.Bare)
398+
if err != nil {
399+
return err
400+
}
401+
case *changefeedpb.Message_Wrapped:
402+
m.Value, err = gojson.Marshal(env.Wrapped)
403+
if err != nil {
404+
return err
405+
}
406+
407+
default:
408+
return errors.Newf("unexpected message type: %T", env)
409+
}
399410
var key changefeedpb.Key
400411
if err := protoutil.Unmarshal(m.Key, &key); err != nil {
401412
return err

0 commit comments

Comments
 (0)