Skip to content

Commit a107716

Browse files
craig[bot]elizaMkraule
andcommitted
Merge #149696
149696: changefeedccl: protobuf support envelope=wrapped with kafka r=asg0451 a=elizaMkraule This change adds support for wrapped envelopes for changefeeds in protobuf format. Release note (general change): Changefeeds using the protobuf format now support wrapped envelopes in kafka sinks. Fixes #148936 Co-authored-by: Eliza Kraule <[email protected]>
2 parents 05a306c + 3bceff8 commit a107716

File tree

3 files changed

+153
-30
lines changed

3 files changed

+153
-30
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 68 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12105,37 +12105,79 @@ func TestDatabaseLevelChangefeed(t *testing.T) {
1210512105
cdcTest(t, testFn)
1210612106
}
1210712107

12108-
func TestChangefeedBareFullProtobuf(t *testing.T) {
12108+
func TestChangefeedProtobuf(t *testing.T) {
1210912109
defer leaktest.AfterTest(t)()
1211012110
defer log.Scope(t).Close(t)
1211112111

12112-
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12113-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12112+
type testCase struct {
12113+
envelope string
12114+
withDiff bool
12115+
expectedRows []string
12116+
}
1211412117

12115-
sqlDB.Exec(t, `
12116-
CREATE TABLE pricing (
12117-
id INT PRIMARY KEY,
12118-
name STRING,
12119-
discount FLOAT,
12120-
tax DECIMAL,
12121-
options STRING[]
12122-
)`)
12123-
sqlDB.Exec(t, `
12124-
INSERT INTO pricing VALUES
12125-
(1, 'Chair', 15.75, 2.500, ARRAY['Brown', 'Black']),
12126-
(2, 'Table', 20.00, 1.23456789, ARRAY['Brown', 'Black'])
12127-
`)
12128-
pricingFeed := feed(t, f,
12129-
`CREATE CHANGEFEED FOR pricing WITH envelope='bare', format='protobuf', key_in_value, topic_in_value`)
12130-
defer closeFeed(t, pricingFeed)
12118+
tests := []testCase{
12119+
{
12120+
envelope: "bare",
12121+
withDiff: false,
12122+
expectedRows: []string{
12123+
`pricing: {"id":1}->{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"},"__crdb__":{"key":{"id":1},"topic":"pricing"}}`,
12124+
`pricing: {"id":2}->{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"},"__crdb__":{"key":{"id":2},"topic":"pricing"}}`,
12125+
`pricing: {"id":2}->{"values":{"discount":25.5,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"},"__crdb__":{"key":{"id":2},"topic":"pricing"}}`,
12126+
`pricing: {"id":1}->{"values":{"discount":10,"id":1,"name":"Armchair","options":["Red"],"tax":"1.000"},"__crdb__":{"key":{"id":1},"topic":"pricing"}}`,
12127+
`pricing: {"id":3}->{"values":{"discount":50,"id":3,"name":"Sofa","options":["Gray"],"tax":"4.250"},"__crdb__":{"key":{"id":3},"topic":"pricing"}}`,
12128+
`pricing: {"id":2}->{"values":{"discount":null,"id":2,"name":null,"options":null,"tax":null},"__crdb__":{"key":{"id":2},"topic":"pricing"}}`,
12129+
},
12130+
},
12131+
{
12132+
envelope: "wrapped",
12133+
withDiff: true,
12134+
expectedRows: []string{
12135+
`pricing: {"id":1}->{"after":{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"}},"before":{},"key":{"id":1},"topic":"pricing"}`,
12136+
`pricing: {"id":2}->{"after":{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"before":{},"key":{"id":2},"topic":"pricing"}`,
12137+
`pricing: {"id":2}->{"after":{"values":{"discount":25.5,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"before":{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"key":{"id":2},"topic":"pricing"}`,
12138+
`pricing: {"id":1}->{"after":{"values":{"discount":10,"id":1,"name":"Armchair","options":["Red"],"tax":"1.000"}},"before":{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"}},"key":{"id":1},"topic":"pricing"}`,
12139+
`pricing: {"id":3}->{"after":{"values":{"discount":50,"id":3,"name":"Sofa","options":["Gray"],"tax":"4.250"}},"before":{},"key":{"id":3},"topic":"pricing"}`,
12140+
`pricing: {"id":2}->{"after":{},"before":{"values":{"discount":25.5,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"key":{"id":2},"topic":"pricing"}`,
12141+
},
12142+
},
12143+
}
1213112144

12132-
expected := []string{
12133-
`pricing: {"id":1}->{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"},"__crdb__":{"key":{"id":1},"topic":"pricing"}}`,
12134-
`pricing: {"id":2}->{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"},"__crdb__":{"key":{"id":2},"topic":"pricing"}}`,
12135-
}
12145+
for _, tc := range tests {
12146+
t.Run(fmt.Sprintf("envelope=%s", tc.envelope), func(t *testing.T) {
12147+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12148+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1213612149

12137-
assertPayloads(t, pricingFeed, expected)
12138-
}
12150+
sqlDB.Exec(t, `
12151+
CREATE TABLE pricing (
12152+
id INT PRIMARY KEY,
12153+
name STRING,
12154+
discount FLOAT,
12155+
tax DECIMAL,
12156+
options STRING[]
12157+
)`)
12158+
sqlDB.Exec(t, `
12159+
INSERT INTO pricing VALUES
12160+
(1, 'Chair', 15.75, 2.500, ARRAY['Brown', 'Black']),
12161+
(2, 'Table', 20.00, 1.23456789, ARRAY['Brown', 'Black'])`)
12162+
12163+
var opts []string
12164+
opts = append(opts, fmt.Sprintf("envelope='%s'", tc.envelope))
12165+
opts = append(opts, "format='protobuf'", "key_in_value", "topic_in_value")
12166+
if tc.withDiff {
12167+
opts = append(opts, "diff")
12168+
}
1213912169

12140-
cdcTest(t, testFn, feedTestForceSink("kafka"))
12170+
feed := feed(t, f, fmt.Sprintf("CREATE CHANGEFEED FOR pricing WITH %s", strings.Join(opts, ", ")))
12171+
defer closeFeed(t, feed)
12172+
12173+
sqlDB.Exec(t, `UPDATE pricing SET discount = 25.50 WHERE id = 2`)
12174+
sqlDB.Exec(t, `UPSERT INTO pricing (id, name, discount, tax, options) VALUES (1, 'Armchair', 10.00, 1.000, ARRAY['Red'])`)
12175+
sqlDB.Exec(t, `INSERT INTO pricing VALUES (3, 'Sofa', 50.00, 4.250, ARRAY['Gray'])`)
12176+
sqlDB.Exec(t, `DELETE FROM pricing WHERE id = 2`)
12177+
12178+
assertPayloads(t, feed, tc.expectedRows)
12179+
}
12180+
cdcTest(t, testFn, feedTestForceSink("kafka"))
12181+
})
12182+
}
1214112183
}

pkg/ccl/changefeedccl/encoder_protobuf.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ func (e *protobufEncoder) EncodeValue(
6969
switch e.envelopeType {
7070
case changefeedbase.OptEnvelopeBare:
7171
return e.buildBare(evCtx, updatedRow, prevRow)
72+
case changefeedbase.OptEnvelopeWrapped:
73+
return e.buildWrapped(ctx, evCtx, updatedRow, prevRow)
7274
default:
7375
return nil, errors.AssertionFailedf("envelope format not supported: %s", e.envelopeType)
7476
}
@@ -126,6 +128,71 @@ func (e *protobufEncoder) buildBare(
126128
return protoutil.Marshal(env)
127129
}
128130

131+
// buildWrapped constructs a WrappedEnvelope serializes it.
132+
func (e *protobufEncoder) buildWrapped(
133+
ctx context.Context, evCtx eventContext, updatedRow, prevRow cdcevent.Row,
134+
) ([]byte, error) {
135+
136+
var after *changefeedpb.Record
137+
var err error
138+
if !updatedRow.IsDeleted() {
139+
after, err = encodeRowToRecord(updatedRow)
140+
if err != nil {
141+
return nil, err
142+
}
143+
} else {
144+
after = &changefeedpb.Record{}
145+
}
146+
147+
var before *changefeedpb.Record
148+
if e.beforeField {
149+
if prevRow.IsInitialized() && !prevRow.IsDeleted() {
150+
before, err = encodeRowToRecord(prevRow)
151+
if err != nil {
152+
return nil, err
153+
}
154+
} else {
155+
before = &changefeedpb.Record{}
156+
}
157+
}
158+
var keyMsg *changefeedpb.Key
159+
if e.keyInValue {
160+
keyMsg, err = buildKeyMessage(updatedRow)
161+
if err != nil {
162+
return nil, err
163+
}
164+
}
165+
166+
var topicStr string
167+
if e.topicInValue {
168+
topicStr = evCtx.topic
169+
}
170+
171+
var updatedStr string
172+
if e.updatedField {
173+
updatedStr = evCtx.updated.AsOfSystemTime()
174+
}
175+
176+
var mvccStr string
177+
if e.mvccTimestampField {
178+
mvccStr = evCtx.mvcc.AsOfSystemTime()
179+
}
180+
181+
wrapped := &changefeedpb.WrappedEnvelope{
182+
After: after,
183+
Before: before,
184+
Key: keyMsg,
185+
Topic: topicStr,
186+
Updated: updatedStr,
187+
MvccTimestamp: mvccStr,
188+
}
189+
190+
env := &changefeedpb.Message{
191+
Data: &changefeedpb.Message_Wrapped{Wrapped: wrapped},
192+
}
193+
return protoutil.Marshal(env)
194+
}
195+
129196
// buildMetadata returns metadata to include in the BareEnvelope.
130197
func (e *protobufEncoder) buildMetadata(
131198
evCtx eventContext, row cdcevent.Row,
@@ -155,6 +222,9 @@ func (e *protobufEncoder) buildMetadata(
155222

156223
// encodeRowToRecord converts a Row into a Record proto.
157224
func encodeRowToRecord(row cdcevent.Row) (*changefeedpb.Record, error) {
225+
if !row.HasValues() {
226+
return nil, nil
227+
}
158228
record := &changefeedpb.Record{Values: make(map[string]*changefeedpb.Value, row.NumValueColumns())}
159229
if err := row.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
160230
val, err := datumToProtoValue(d)

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,11 +391,22 @@ func assertPayloadsBaseErr(
391391
if err := protoutil.Unmarshal(m.Value, &msg); err != nil {
392392
return err
393393
}
394-
m.Value, err = gojson.Marshal(msg.GetBare())
395-
if err != nil {
396-
return err
397-
}
398394

395+
switch env := msg.GetData().(type) {
396+
case *changefeedpb.Message_Bare:
397+
m.Value, err = gojson.Marshal(env.Bare)
398+
if err != nil {
399+
return err
400+
}
401+
case *changefeedpb.Message_Wrapped:
402+
m.Value, err = gojson.Marshal(env.Wrapped)
403+
if err != nil {
404+
return err
405+
}
406+
407+
default:
408+
return errors.Newf("unexpected message type: %T", env)
409+
}
399410
var key changefeedpb.Key
400411
if err := protoutil.Unmarshal(m.Key, &key); err != nil {
401412
return err

0 commit comments

Comments
 (0)