Skip to content

Commit a4b0a34

Browse files
committed
changefeedccl: support for constant headers for webhook and kafka sinks
This adds support for constant headers for webhook and kafka sinks, to be specified in the CREATE CHANGEFEED statement as a JSON object. Part of: #142273 Epic: CRDB-48880 Release note (general change): The CREATE CHANGEFEED statement now supports the `extra_headers` option, which can be used to specify extra headers for webhook and kafka sinks. This can be used to add headers to all messages sent to the sink.
1 parent 2f72cf4 commit a4b0a34

File tree

12 files changed

+249
-37
lines changed

12 files changed

+249
-37
lines changed

pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ import (
1717
"github.com/klauspost/compress/zstd"
1818
)
1919

20+
type RowWithHeaders struct {
21+
Row string
22+
Headers http.Header
23+
}
24+
2025
// MockWebhookSink is the Webhook sink used in tests.
2126
type MockWebhookSink struct {
2227
basicAuth bool
@@ -28,7 +33,7 @@ type MockWebhookSink struct {
2833
responseBodies map[int][]byte
2934
statusCodes []int
3035
statusCodesIndex int
31-
rows []string
36+
rows []RowWithHeaders
3237
lastHeaders http.Header
3338
notify chan struct{}
3439
}
@@ -160,7 +165,7 @@ func (s *MockWebhookSink) Latest() string {
160165
return ""
161166
}
162167
latest := s.mu.rows[len(s.mu.rows)-1]
163-
return latest
168+
return latest.Row
164169
}
165170

166171
// Pop deletes and returns the oldest message from MockWebhookSink.
@@ -170,11 +175,23 @@ func (s *MockWebhookSink) Pop() string {
170175
if len(s.mu.rows) > 0 {
171176
oldest := s.mu.rows[0]
172177
s.mu.rows = s.mu.rows[1:]
173-
return oldest
178+
return oldest.Row
174179
}
175180
return ""
176181
}
177182

183+
// PopWithHeaders deletes and returns the oldest message from MockWebhookSink along with its headers.
184+
func (s *MockWebhookSink) PopWithHeaders() RowWithHeaders {
185+
s.mu.Lock()
186+
defer s.mu.Unlock()
187+
if len(s.mu.rows) > 0 {
188+
oldest := s.mu.rows[0]
189+
s.mu.rows = s.mu.rows[1:]
190+
return oldest
191+
}
192+
return RowWithHeaders{}
193+
}
194+
178195
// NotifyMessage arranges for channel to be closed when message arrives.
179196
func (s *MockWebhookSink) NotifyMessage() chan struct{} {
180197
c := make(chan struct{})
@@ -298,7 +315,7 @@ func (s *MockWebhookSink) publish(hw http.ResponseWriter, hr *http.Request) erro
298315
}
299316

300317
s.mu.Lock()
301-
s.mu.rows = append(s.mu.rows, string(row))
318+
s.mu.rows = append(s.mu.rows, RowWithHeaders{Row: string(row), Headers: hr.Header})
302319
if s.mu.notify != nil {
303320
close(s.mu.notify)
304321
s.mu.notify = nil

pkg/ccl/changefeedccl/cdctest/testfeed.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ package cdctest
77

88
import (
99
"fmt"
10+
"slices"
11+
"strings"
1012
"time"
1113

1214
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -39,7 +41,10 @@ func (h Headers) String() string {
3941
return ""
4042
}
4143
s := "("
42-
for _, v := range h {
44+
sorted := slices.SortedFunc(slices.Values(h), func(a, b Header) int {
45+
return strings.Compare(a.K, b.K)
46+
})
47+
for _, v := range sorted {
4348
s += fmt.Sprintf("%s: %s, ", v.K, v.V)
4449
}
4550
return s[:len(s)-2] + ")"

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11821,6 +11821,91 @@ func TestCloudstorageParallelCompression(t *testing.T) {
1182111821
})
1182211822
}
1182311823

11824+
func TestChangefeedExtraHeaders(t *testing.T) {
11825+
defer leaktest.AfterTest(t)()
11826+
defer log.Scope(t).Close(t)
11827+
11828+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11829+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
11830+
// Headers are not supported in the v1 kafka sink.
11831+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.new_kafka_sink.enabled = true`)
11832+
11833+
sqlDB.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY);`)
11834+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1);`)
11835+
11836+
cases := []struct {
11837+
name string
11838+
headersArg string
11839+
wantHeaders cdctest.Headers
11840+
expectErr bool
11841+
}{
11842+
{
11843+
name: "single header",
11844+
headersArg: `{"X-Someheader": "somevalue"}`,
11845+
wantHeaders: cdctest.Headers{{K: "X-Someheader", V: []byte("somevalue")}},
11846+
},
11847+
{
11848+
name: "multiple headers",
11849+
headersArg: `{"X-Someheader": "somevalue", "X-Someotherheader": "someothervalue"}`,
11850+
wantHeaders: cdctest.Headers{
11851+
{K: "X-Someheader", V: []byte("somevalue")},
11852+
{K: "X-Someotherheader", V: []byte("someothervalue")},
11853+
},
11854+
},
11855+
{
11856+
name: "inappropriate json",
11857+
headersArg: `4`,
11858+
expectErr: true,
11859+
},
11860+
{
11861+
name: "also inappropriate json",
11862+
headersArg: `["X-Someheader", "somevalue"]`,
11863+
expectErr: true,
11864+
},
11865+
{
11866+
name: "invalid json",
11867+
headersArg: `xxxx`,
11868+
expectErr: true,
11869+
},
11870+
}
11871+
11872+
for _, c := range cases {
11873+
feed, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH extra_headers='%s'`, c.headersArg))
11874+
if c.expectErr {
11875+
require.Error(t, err)
11876+
continue
11877+
} else {
11878+
require.NoError(t, err)
11879+
}
11880+
11881+
assertPayloads(t, feed, []string{
11882+
fmt.Sprintf(`foo: [1]%s->{"after": {"key": 1}}`, c.wantHeaders.String()),
11883+
})
11884+
closeFeed(t, feed)
11885+
}
11886+
}
11887+
11888+
cdcTest(t, testFn, feedTestRestrictSinks("kafka", "webhook"))
11889+
}
11890+
func TestChangefeedAdditionalHeadersDoesntWorkWithV1KafkaSink(t *testing.T) {
11891+
defer leaktest.AfterTest(t)()
11892+
defer log.Scope(t).Close(t)
11893+
11894+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11895+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
11896+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.new_kafka_sink.enabled = false`)
11897+
11898+
sqlDB.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY);`)
11899+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1);`)
11900+
11901+
_, err := f.Feed(`CREATE CHANGEFEED FOR foo WITH extra_headers='{"X-Someheader": "somevalue"}'`)
11902+
require.Error(t, err)
11903+
require.Contains(t, err.Error(), "headers are not supported for the v1 kafka sink")
11904+
}
11905+
11906+
cdcTest(t, testFn, feedTestForceSink("kafka"))
11907+
}
11908+
1182411909
func TestDatabaseLevelChangefeed(t *testing.T) {
1182511910
defer leaktest.AfterTest(t)()
1182611911
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ go_library(
2525
"//pkg/sql/types",
2626
"//pkg/util",
2727
"//pkg/util/iterutil",
28+
"//pkg/util/json",
2829
"//pkg/util/metamorphic",
2930
"@com_github_cockroachdb_errors//:errors",
3031
],

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
1717
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
1818
"github.com/cockroachdb/cockroach/pkg/sql/types"
19+
"github.com/cockroachdb/cockroach/pkg/util/json"
1920
"github.com/cockroachdb/errors"
2021
)
2122

@@ -117,6 +118,7 @@ const (
117118
// TODO(#142273): look into whether we want to add headers to pub/sub, and other
118119
// sinks as well (eg cloudstorage, webhook, ..). Currently it's kafka-only.
119120
OptHeadersJSONColumnName = `headers_json_column_name`
121+
OptExtraHeaders = `extra_headers`
120122

121123
OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted`
122124
OptVirtualColumnsNull VirtualColumnVisibility = `null`
@@ -408,6 +410,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
408410
OptEncodeJSONValueNullAsObject: flagOption,
409411
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
410412
OptHeadersJSONColumnName: stringOption,
413+
OptExtraHeaders: jsonOption,
411414
}
412415

413416
// CommonOptions is options common to all sinks
@@ -428,13 +431,13 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
428431
var SQLValidOptions map[string]struct{} = nil
429432

430433
// KafkaValidOptions is options exclusive to Kafka sink
431-
var KafkaValidOptions = makeStringSet(OptAvroSchemaPrefix, OptConfluentSchemaRegistry, OptKafkaSinkConfig, OptHeadersJSONColumnName)
434+
var KafkaValidOptions = makeStringSet(OptAvroSchemaPrefix, OptConfluentSchemaRegistry, OptKafkaSinkConfig, OptHeadersJSONColumnName, OptExtraHeaders)
432435

433436
// CloudStorageValidOptions is options exclusive to cloud storage sink
434437
var CloudStorageValidOptions = makeStringSet(OptCompression)
435438

436439
// WebhookValidOptions is options exclusive to webhook sink
437-
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig, OptCompression)
440+
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig, OptCompression, OptExtraHeaders)
438441

439442
// PubsubValidOptions is options exclusive to pubsub sink
440443
var PubsubValidOptions = makeStringSet(OptPubsubSinkConfig)
@@ -477,6 +480,7 @@ func RedactUserFromURI(uri string) (string, error) {
477480
// RedactedOptions are options whose values should be replaced with "redacted" in job descriptions and errors.
478481
var RedactedOptions = map[string]redactionFunc{
479482
OptWebhookAuthHeader: redactSimple,
483+
OptExtraHeaders: redactSimple,
480484
SinkParamClientKey: redactSimple,
481485
OptConfluentSchemaRegistry: RedactUserFromURI,
482486
}
@@ -1032,6 +1036,7 @@ func (s StatementOptions) GetFilters() Filters {
10321036
type WebhookSinkOptions struct {
10331037
JSONConfig SinkSpecificJSONConfig
10341038
AuthHeader string
1039+
ExtraHeaders map[string]string
10351040
ClientTimeout *time.Duration
10361041
Compression string
10371042
}
@@ -1049,13 +1054,66 @@ func (s StatementOptions) GetWebhookSinkOptions() (WebhookSinkOptions, error) {
10491054
return o, err
10501055
}
10511056
o.ClientTimeout = timeout
1057+
1058+
headersMap, err := parseHeaders[string](s.m[OptExtraHeaders])
1059+
if err != nil {
1060+
return o, err
1061+
}
1062+
o.ExtraHeaders = headersMap
10521063
return o, nil
10531064
}
10541065

1055-
// GetKafkaConfigJSON returns arbitrary json to be interpreted
1056-
// by the kafka sink.
1057-
func (s StatementOptions) GetKafkaConfigJSON() SinkSpecificJSONConfig {
1058-
return s.getJSONValue(OptKafkaSinkConfig)
1066+
func parseHeaders[S interface{ string | []byte }](headers string) (map[string]S, error) {
1067+
if headers == "" {
1068+
return nil, nil
1069+
}
1070+
headersJ, err := json.ParseJSON(headers)
1071+
if err != nil {
1072+
return nil, errors.Wrap(err, "parsing headers")
1073+
}
1074+
it, err := headersJ.ObjectIter()
1075+
if err != nil {
1076+
return nil, errors.Wrap(err, "parsing headers as object")
1077+
}
1078+
if it == nil {
1079+
return nil, errors.Newf("headers is not a JSON object: %s", headers)
1080+
}
1081+
headersMap := make(map[string]S, headersJ.Len())
1082+
for it.Next() {
1083+
k := it.Key()
1084+
v := it.Value()
1085+
s, err := v.AsText()
1086+
if err != nil {
1087+
return nil, errors.Wrap(err, "parsing header value as text")
1088+
}
1089+
if s == nil {
1090+
continue
1091+
}
1092+
headersMap[k] = S(*s)
1093+
}
1094+
return headersMap, nil
1095+
}
1096+
1097+
type KafkaSinkOptions struct {
1098+
// JSONConfig is arbitrary json to be interpreted
1099+
// by the kafka sink.
1100+
JSONConfig SinkSpecificJSONConfig
1101+
1102+
// Headers is a map of header names to values.
1103+
Headers map[string][]byte
1104+
}
1105+
1106+
func (s StatementOptions) GetKafkaSinkOptions() (KafkaSinkOptions, error) {
1107+
headersMap, err := parseHeaders[[]byte](s.m[OptExtraHeaders])
1108+
if err != nil {
1109+
return KafkaSinkOptions{}, err
1110+
}
1111+
1112+
o := KafkaSinkOptions{
1113+
JSONConfig: s.getJSONValue(OptKafkaSinkConfig),
1114+
Headers: headersMap,
1115+
}
1116+
return o, nil
10591117
}
10601118

10611119
// GetPubsubConfigJSON returns arbitrary json to be interpreted

pkg/ccl/changefeedccl/sink.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,20 +249,28 @@ func getSink(
249249
return makeNullSink(&changefeedbase.SinkURL{URL: u}, metricsBuilder(nullIsAccounted))
250250
case isKafkaSink(u):
251251
return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) {
252+
sinkOpts, err := opts.GetKafkaSinkOptions()
253+
if err != nil {
254+
return nil, err
255+
}
252256
if KafkaV2Enabled.Get(&serverCfg.Settings.SV) {
253-
return makeKafkaSinkV2(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(),
257+
return makeKafkaSinkV2(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), sinkOpts,
254258
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
255259
serverCfg.Settings, metricsBuilder, kafkaSinkV2Knobs{})
256260
} else {
257-
return makeKafkaSink(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(), serverCfg.Settings, metricsBuilder)
261+
return makeKafkaSink(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), sinkOpts, serverCfg.Settings, metricsBuilder)
258262
}
259263
})
260264
case isPulsarSink(u):
261265
var testingKnobs *TestingKnobs
262266
if knobs, ok := serverCfg.TestingKnobs.Changefeed.(*TestingKnobs); ok {
263267
testingKnobs = knobs
264268
}
265-
return makePulsarSink(ctx, &changefeedbase.SinkURL{URL: u}, encodingOpts, AllTargets(feedCfg), opts.GetKafkaConfigJSON(),
269+
sinkOpts, err := opts.GetKafkaSinkOptions()
270+
if err != nil {
271+
return nil, err
272+
}
273+
return makePulsarSink(ctx, &changefeedbase.SinkURL{URL: u}, encodingOpts, AllTargets(feedCfg), sinkOpts.JSONConfig,
266274
serverCfg.Settings, metricsBuilder, testingKnobs)
267275
case isWebhookSink(u):
268276
webhookOpts, err := opts.GetWebhookSinkOptions()

pkg/ccl/changefeedccl/sink_kafka.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1200,7 +1200,7 @@ func makeKafkaSink(
12001200
ctx context.Context,
12011201
u *changefeedbase.SinkURL,
12021202
targets changefeedbase.Targets,
1203-
jsonStr changefeedbase.SinkSpecificJSONConfig,
1203+
sinkOpts changefeedbase.KafkaSinkOptions,
12041204
settings *cluster.Settings,
12051205
mb metricsRecorderBuilder,
12061206
) (Sink, error) {
@@ -1210,6 +1210,12 @@ func makeKafkaSink(
12101210
return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic)
12111211
}
12121212

1213+
jsonStr := sinkOpts.JSONConfig
1214+
if len(sinkOpts.Headers) > 0 {
1215+
return nil, errors.Newf("headers are not supported for the v1 kafka sink;"+
1216+
" use the v2 sink instead via the `%s` cluster setting", KafkaV2Enabled.Name())
1217+
}
1218+
12131219
m := mb(requiresResourceAccounting)
12141220
config, err := buildKafkaConfig(ctx, u, jsonStr, m.getKafkaThrottlingMetrics(settings), m.netMetrics())
12151221
if err != nil {

pkg/ccl/changefeedccl/sink_kafka_connection_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ func TestAzureKafkaDefaults(t *testing.T) {
403403

404404
assertExpectedKgoOpts := func(exp expectation, opts []kgo.Opt) {
405405
sinkClient, err := newKafkaSinkClientV2(ctx, opts, sinkBatchConfig{},
406-
"", cluster.MakeTestingClusterSettings(), kafkaSinkV2Knobs{}, nilMetricsRecorderBuilder, nil)
406+
"", cluster.MakeTestingClusterSettings(), kafkaSinkV2Knobs{}, nilMetricsRecorderBuilder, nil, nil)
407407
require.NoError(t, err)
408408
defer func() { require.NoError(t, sinkClient.Close()) }()
409409
client := sinkClient.client.(*kgo.Client)

0 commit comments

Comments
 (0)