Skip to content

Commit e06eaf6

Browse files
craig[bot]KeithCh
andcommitted
Merge #143984
143984: changefeedccl: enable cloudstorage sink with enriched envelope r=KeithCh a=KeithCh Previously we disallowed the use of cloudstorage sink when creating a changefeed with the enriched envelope option. Release note: None Resolves: #143883 Jira issue: CRDB-49134 Co-authored-by: Keith Chow <[email protected]>
2 parents 615456f + dfa2cc5 commit e06eaf6

File tree

4 files changed

+13
-4
lines changed

4 files changed

+13
-4
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,7 @@ func validateSink(
10471047
sinkTypeKafka: {},
10481048
sinkTypeWebhook: {},
10491049
sinkTypeSinklessBuffer: {},
1050+
sinkTypeCloudstorage: {},
10501051
}
10511052
if _, ok := allowedSinkTypes[sinkTy]; !ok {
10521053
return errors.Newf("envelope=%s is incompatible with %s sink", changefeedbase.OptEnvelopeEnriched, sinkTy)

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4348,7 +4348,7 @@ func TestChangefeedEnrichedWithDiff(t *testing.T) {
43484348
assertion func(topic string) []string
43494349
}{
43504350
{
4351-
name: "json with diff", options: []string{"diff", "format=json"}, sinks: []string{"kafka", "pubsub", "sinkless", "webhook"},
4351+
name: "json with diff", options: []string{"diff", "format=json"}, sinks: []string{"kafka", "pubsub", "sinkless", "webhook", "cloudstorage"},
43524352
assertion: func(topic string) []string {
43534353
return []string{
43544354
fmt.Sprintf(`%s: {"a": 0}->{"after": {"a": 0, "b": "dog"}, "before": null, "op": "c"}`, topic),
@@ -4536,6 +4536,7 @@ func TestChangefeedEnrichedSourceWithDataAvro(t *testing.T) {
45364536
})
45374537
})
45384538
}
4539+
45394540
func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
45404541
defer leaktest.AfterTest(t)()
45414542
defer log.Scope(t).Close(t)
@@ -4631,7 +4632,7 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
46314632
assertPayloadsEnriched(t, testFeed, []string{`foo: {"i": 0}->{"after": {"i": 0}, "op": "c"}`}, sourceAssertion)
46324633
}
46334634
}
4634-
for _, sink := range []string{"kafka", "pubsub", "sinkless"} {
4635+
for _, sink := range []string{"kafka", "pubsub", "sinkless", "cloudstorage"} {
46354636
testLocality := roachpb.Locality{
46364637
Tiers: []roachpb.Tier{{
46374638
Key: "region",
@@ -7114,7 +7115,7 @@ func TestChangefeedErrors(t *testing.T) {
71147115
)
71157116
sqlDB.ExpectErrWithTimeout(
71167117
t, `this sink is incompatible with envelope=enriched`,
7117-
`CREATE CHANGEFEED FOR foo INTO 'nodelocal://.' WITH envelope=enriched`,
7118+
`CREATE CHANGEFEED FOR foo INTO 'pulsar://.' WITH envelope=enriched`,
71187119
)
71197120
sqlDB.ExpectErrWithTimeout(
71207121
t, `enriched_properties is only usable with envelope=enriched`,

pkg/ccl/changefeedccl/sink_cloudstorage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ func makeCloudStorageSink(
464464
}
465465

466466
switch encodingOpts.Envelope {
467-
case changefeedbase.OptEnvelopeWrapped, changefeedbase.OptEnvelopeBare:
467+
case changefeedbase.OptEnvelopeWrapped, changefeedbase.OptEnvelopeBare, changefeedbase.OptEnvelopeEnriched:
468468
default:
469469
return nil, errors.Errorf(`this sink is incompatible with %s=%s`,
470470
changefeedbase.OptEnvelope, encodingOpts.Envelope)

pkg/ccl/changefeedccl/sink_pulsar.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,13 @@ func makePulsarSink(
321321
return nil, err
322322
}
323323

324+
switch encodingOpts.Envelope {
325+
case changefeedbase.OptEnvelopeEnriched:
326+
return nil, errors.Errorf(`this sink is incompatible with %s=%s`,
327+
changefeedbase.OptEnvelope, encodingOpts.Envelope)
328+
default:
329+
}
330+
324331
sink := &pulsarSink{
325332
format: encodingOpts.Format,
326333
// TODO (jayant): make parallelism configurable

0 commit comments

Comments
 (0)