Skip to content

Commit e35ddcd

Browse files
committed
changefeedccl: support resolved option with protobuf format
This change introduces support for the resolved option with protobuf format for changefeeds Fixes #148934 Epic: CRDB-44007 Release note (general change): the changefeeds with protobuf format now supports the resolved option for emitting resolved timestamps.
1 parent b1bb226 commit e35ddcd

File tree

2 files changed

+79
-3
lines changed

2 files changed

+79
-3
lines changed

pkg/ccl/changefeedccl/encoder_protobuf.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb"
1616
"github.com/cockroachdb/cockroach/pkg/geo"
1717
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
18-
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
1918
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2019
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
2120
"github.com/cockroachdb/errors"
@@ -77,9 +76,29 @@ func (e *protobufEncoder) EncodeValue(
7776

7877
// EncodeResolvedTimestamp encodes a resolved timestamp message for the specified topic.
7978
func (e *protobufEncoder) EncodeResolvedTimestamp(
80-
context.Context, string, hlc.Timestamp,
79+
ctx context.Context, topic string, ts hlc.Timestamp,
8180
) ([]byte, error) {
82-
return nil, unimplemented.NewWithIssuef(148934, "protobuf encoder does not support resolved timestamps yet")
81+
var msg *changefeedpb.Message
82+
if e.envelopeType == changefeedbase.OptEnvelopeBare {
83+
msg = &changefeedpb.Message{
84+
Data: &changefeedpb.Message_BareResolved{
85+
BareResolved: &changefeedpb.BareResolved{
86+
XCrdb__: &changefeedpb.Resolved{
87+
Resolved: ts.AsOfSystemTime(),
88+
},
89+
},
90+
},
91+
}
92+
} else {
93+
msg = &changefeedpb.Message{
94+
Data: &changefeedpb.Message_Resolved{
95+
Resolved: &changefeedpb.Resolved{
96+
Resolved: ts.AsOfSystemTime(),
97+
},
98+
},
99+
}
100+
}
101+
return protoutil.Marshal(msg)
83102
}
84103

85104
// buildBare constructs a BareEnvelope with optional metadata and serializes it.

pkg/ccl/changefeedccl/encoder_protobuf_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,60 @@ func TestProtoEncoder_BareEnvelope_WithMetadata(t *testing.T) {
7070
require.Equal(t, int64(1), bare.Values["id"].GetInt64Value())
7171
require.Equal(t, "Alice", bare.Values["name"].GetStringValue())
7272
}
73+
74+
func TestProtoEncoder_ResolvedEnvelope(t *testing.T) {
75+
defer leaktest.AfterTest(t)()
76+
defer log.Scope(t).Close(t)
77+
78+
tableDesc, err := parseTableDesc(`CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
79+
require.NoError(t, err)
80+
targets := mkTargets(tableDesc)
81+
82+
ts := hlc.Timestamp{WallTime: 123, Logical: 456}
83+
84+
tests := []struct {
85+
name string
86+
envelopeType changefeedbase.EnvelopeType
87+
expectWrapped bool
88+
}{
89+
{
90+
name: "wrapped envelope",
91+
envelopeType: changefeedbase.OptEnvelopeWrapped,
92+
},
93+
{
94+
name: "bare envelope",
95+
envelopeType: changefeedbase.OptEnvelopeBare,
96+
},
97+
}
98+
99+
for _, tc := range tests {
100+
t.Run(tc.name, func(t *testing.T) {
101+
opts := changefeedbase.EncodingOptions{
102+
Envelope: tc.envelopeType,
103+
Format: changefeedbase.OptFormatProtobuf,
104+
}
105+
106+
enc, err := getEncoder(context.Background(), opts, targets, false, nil, nil, nil)
107+
require.NoError(t, err)
108+
109+
b, err := enc.EncodeResolvedTimestamp(context.Background(), "test-topic", ts)
110+
require.NoError(t, err)
111+
112+
var msg changefeedpb.Message
113+
require.NoError(t, protoutil.Unmarshal(b, &msg))
114+
115+
switch tc.envelopeType {
116+
case changefeedbase.OptEnvelopeWrapped:
117+
res := msg.GetResolved()
118+
require.NotNil(t, res, "wrapped envelope should populate Resolved field")
119+
require.Equal(t, ts.AsOfSystemTime(), res.Resolved)
120+
case changefeedbase.OptEnvelopeBare:
121+
res := msg.GetBareResolved()
122+
require.NotNil(t, res, "bare envelope should populate BareResolved field")
123+
require.Equal(t, ts.AsOfSystemTime(), res.XCrdb__.Resolved)
124+
default:
125+
t.Fatalf("unexpected envelope type: %v", tc.envelopeType)
126+
}
127+
})
128+
}
129+
}

0 commit comments

Comments
 (0)