Skip to content

Commit 1957a64

Browse files
craig[bot]asg0451wenyihu6rafiss
committed
146813: changefeedccl: support for constant headers for webhook and kafka sinks r=andyyang890,KeithCh a=asg0451 This adds support for constant headers for webhook and kafka sinks, to be specified in the CREATE CHANGEFEED statement as a JSON object. Part of: #142273 Epic: CRDB-48880 Release note (general change): The CREATE CHANGEFEED statement now supports the `headers` option, which can be used to specify constant headers for webhook and kafka sinks. This can be used to add headers to all messages sent to the sink. 149904: asim: final port over from mma prototype for asim r=tbg a=wenyihu6 **asim: pass false to ReplicaChangeDelayFn for AllocationTransferLeaseOp** Previously, we incorrectly passed true to ReplicaChangeDelayFn for AllocationTransferLeaseOp, causing it to treat the operation like a replica addition and extend the delay based on range size and snapshot rate. Since lease transfers are expected to be much faster, this was unintended. This commit fixes the issue by applying only the base delay (ReplicaChangeBaseDelay) to AllocationTransferLeaseOp, which defaults to 100ms. Epic: none Release note: none --- **asim: correctly update queue.next in pushReplicateChange** Previously, we did not assign the next field of the queues correctly because we passed the base queue by value into pushReplicateChange, which caused updates to the next field to affect only the copy, not the original queue. This commit fixes the issue by returning the correct next value and updating the queue directly. Epic: none Release note: none --- **asim: pass simulator.Replica instead to pushReplicateChange** Previously, we passed state.Range to pushReplicateChange. This commit changes it to pass the simulator.Replica directly, as future commits will require access to additional fields such as Desc and RangeUsageInfo. Epic: none Release note: none --- **asim: pass state changer directly to pushReplicateChange** Previously, we passed base queue to pushReplicateChange, but only its state changer was being used. This commit updates pushReplicateChange to only take in the state changer. Epic: none Release note: none --- **asim: integrate allocatorsync with lease and replicate queue properly** This commit integrates lease and replicate queue changes with the mma allocator properly by calling allocatorSync.PreApply* and allocatorSync.PostApply, completing the allocator integration. Some data-driven test outputs changed because allocatorSync.PostApply now updates store pool after operations (such as UpdateLocalStoresAfterLeaseTransfer), which was missing previously in asim. It’s unclear if this was an oversight or previously seemed as unnecessary. More reading is needed. Epic: none Release note: none --- **asim: pass queue names for logging** This commit passes the name of the queue making changes to pushReplicateChange to improve logging. Epic: none Release note: none --- **asim: remove setting from store** This commit removes the setting field from store struct as it is not used anywhere. Epic: none Release note: none --- **asim: add rebalance_objective and delay for rebalance_mode** This commit adds rebalance_objective as a setting and allows changes to rebalance_mode to be scheduled as a delayed simulation event. This enables testing transitions between mma enabled and disabled mode. Epic: none Release note: none --- **asim: check for LBRebalancingMultiMetric in ShouldRebalanceStore** This commit updates the old store rebalancer's ShouldRebalanceStore to check the LBRebalancingMultiMetric cluster setting. asim still invokes this method when checking changes from the old store rebalancer. Epic: none Release note: none 150191: roachtest: fix check for npm being installed r=rafiss a=rafiss We have seen that this check can return a 0 exit code even when the npm command doesn't exist. Using && should fix this by short-circuiting as soon as a non-zero exit code is received. fixes #149536 fixes #149532 Release note: None Co-authored-by: Miles Frankel <[email protected]> Co-authored-by: wenyihu6 <[email protected]> Co-authored-by: Rafi Shamim <[email protected]>
4 parents a5a996e + a4b0a34 + c64e2c3 + 70b98c1 commit 1957a64

34 files changed

+602
-297
lines changed

pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ import (
1717
"github.com/klauspost/compress/zstd"
1818
)
1919

20+
type RowWithHeaders struct {
21+
Row string
22+
Headers http.Header
23+
}
24+
2025
// MockWebhookSink is the Webhook sink used in tests.
2126
type MockWebhookSink struct {
2227
basicAuth bool
@@ -28,7 +33,7 @@ type MockWebhookSink struct {
2833
responseBodies map[int][]byte
2934
statusCodes []int
3035
statusCodesIndex int
31-
rows []string
36+
rows []RowWithHeaders
3237
lastHeaders http.Header
3338
notify chan struct{}
3439
}
@@ -160,7 +165,7 @@ func (s *MockWebhookSink) Latest() string {
160165
return ""
161166
}
162167
latest := s.mu.rows[len(s.mu.rows)-1]
163-
return latest
168+
return latest.Row
164169
}
165170

166171
// Pop deletes and returns the oldest message from MockWebhookSink.
@@ -170,11 +175,23 @@ func (s *MockWebhookSink) Pop() string {
170175
if len(s.mu.rows) > 0 {
171176
oldest := s.mu.rows[0]
172177
s.mu.rows = s.mu.rows[1:]
173-
return oldest
178+
return oldest.Row
174179
}
175180
return ""
176181
}
177182

183+
// PopWithHeaders deletes and returns the oldest message from MockWebhookSink along with its headers.
184+
func (s *MockWebhookSink) PopWithHeaders() RowWithHeaders {
185+
s.mu.Lock()
186+
defer s.mu.Unlock()
187+
if len(s.mu.rows) > 0 {
188+
oldest := s.mu.rows[0]
189+
s.mu.rows = s.mu.rows[1:]
190+
return oldest
191+
}
192+
return RowWithHeaders{}
193+
}
194+
178195
// NotifyMessage arranges for channel to be closed when message arrives.
179196
func (s *MockWebhookSink) NotifyMessage() chan struct{} {
180197
c := make(chan struct{})
@@ -298,7 +315,7 @@ func (s *MockWebhookSink) publish(hw http.ResponseWriter, hr *http.Request) erro
298315
}
299316

300317
s.mu.Lock()
301-
s.mu.rows = append(s.mu.rows, string(row))
318+
s.mu.rows = append(s.mu.rows, RowWithHeaders{Row: string(row), Headers: hr.Header})
302319
if s.mu.notify != nil {
303320
close(s.mu.notify)
304321
s.mu.notify = nil

pkg/ccl/changefeedccl/cdctest/testfeed.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ package cdctest
77

88
import (
99
"fmt"
10+
"slices"
11+
"strings"
1012
"time"
1113

1214
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -39,7 +41,10 @@ func (h Headers) String() string {
3941
return ""
4042
}
4143
s := "("
42-
for _, v := range h {
44+
sorted := slices.SortedFunc(slices.Values(h), func(a, b Header) int {
45+
return strings.Compare(a.K, b.K)
46+
})
47+
for _, v := range sorted {
4348
s += fmt.Sprintf("%s: %s, ", v.K, v.V)
4449
}
4550
return s[:len(s)-2] + ")"

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11881,6 +11881,91 @@ func TestCloudstorageParallelCompression(t *testing.T) {
1188111881
})
1188211882
}
1188311883

11884+
func TestChangefeedExtraHeaders(t *testing.T) {
11885+
defer leaktest.AfterTest(t)()
11886+
defer log.Scope(t).Close(t)
11887+
11888+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11889+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
11890+
// Headers are not supported in the v1 kafka sink.
11891+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.new_kafka_sink.enabled = true`)
11892+
11893+
sqlDB.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY);`)
11894+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1);`)
11895+
11896+
cases := []struct {
11897+
name string
11898+
headersArg string
11899+
wantHeaders cdctest.Headers
11900+
expectErr bool
11901+
}{
11902+
{
11903+
name: "single header",
11904+
headersArg: `{"X-Someheader": "somevalue"}`,
11905+
wantHeaders: cdctest.Headers{{K: "X-Someheader", V: []byte("somevalue")}},
11906+
},
11907+
{
11908+
name: "multiple headers",
11909+
headersArg: `{"X-Someheader": "somevalue", "X-Someotherheader": "someothervalue"}`,
11910+
wantHeaders: cdctest.Headers{
11911+
{K: "X-Someheader", V: []byte("somevalue")},
11912+
{K: "X-Someotherheader", V: []byte("someothervalue")},
11913+
},
11914+
},
11915+
{
11916+
name: "inappropriate json",
11917+
headersArg: `4`,
11918+
expectErr: true,
11919+
},
11920+
{
11921+
name: "also inappropriate json",
11922+
headersArg: `["X-Someheader", "somevalue"]`,
11923+
expectErr: true,
11924+
},
11925+
{
11926+
name: "invalid json",
11927+
headersArg: `xxxx`,
11928+
expectErr: true,
11929+
},
11930+
}
11931+
11932+
for _, c := range cases {
11933+
feed, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH extra_headers='%s'`, c.headersArg))
11934+
if c.expectErr {
11935+
require.Error(t, err)
11936+
continue
11937+
} else {
11938+
require.NoError(t, err)
11939+
}
11940+
11941+
assertPayloads(t, feed, []string{
11942+
fmt.Sprintf(`foo: [1]%s->{"after": {"key": 1}}`, c.wantHeaders.String()),
11943+
})
11944+
closeFeed(t, feed)
11945+
}
11946+
}
11947+
11948+
cdcTest(t, testFn, feedTestRestrictSinks("kafka", "webhook"))
11949+
}
11950+
func TestChangefeedAdditionalHeadersDoesntWorkWithV1KafkaSink(t *testing.T) {
11951+
defer leaktest.AfterTest(t)()
11952+
defer log.Scope(t).Close(t)
11953+
11954+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11955+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
11956+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.new_kafka_sink.enabled = false`)
11957+
11958+
sqlDB.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY);`)
11959+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1);`)
11960+
11961+
_, err := f.Feed(`CREATE CHANGEFEED FOR foo WITH extra_headers='{"X-Someheader": "somevalue"}'`)
11962+
require.Error(t, err)
11963+
require.Contains(t, err.Error(), "headers are not supported for the v1 kafka sink")
11964+
}
11965+
11966+
cdcTest(t, testFn, feedTestForceSink("kafka"))
11967+
}
11968+
1188411969
func TestDatabaseLevelChangefeed(t *testing.T) {
1188511970
defer leaktest.AfterTest(t)()
1188611971
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ go_library(
2525
"//pkg/sql/types",
2626
"//pkg/util",
2727
"//pkg/util/iterutil",
28+
"//pkg/util/json",
2829
"//pkg/util/metamorphic",
2930
"@com_github_cockroachdb_errors//:errors",
3031
],

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
1717
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
1818
"github.com/cockroachdb/cockroach/pkg/sql/types"
19+
"github.com/cockroachdb/cockroach/pkg/util/json"
1920
"github.com/cockroachdb/errors"
2021
)
2122

@@ -117,6 +118,7 @@ const (
117118
// TODO(#142273): look into whether we want to add headers to pub/sub, and other
118119
// sinks as well (eg cloudstorage, webhook, ..). Currently it's kafka-only.
119120
OptHeadersJSONColumnName = `headers_json_column_name`
121+
OptExtraHeaders = `extra_headers`
120122

121123
OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted`
122124
OptVirtualColumnsNull VirtualColumnVisibility = `null`
@@ -408,6 +410,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
408410
OptEncodeJSONValueNullAsObject: flagOption,
409411
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
410412
OptHeadersJSONColumnName: stringOption,
413+
OptExtraHeaders: jsonOption,
411414
}
412415

413416
// CommonOptions is options common to all sinks
@@ -428,13 +431,13 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
428431
var SQLValidOptions map[string]struct{} = nil
429432

430433
// KafkaValidOptions is options exclusive to Kafka sink
431-
var KafkaValidOptions = makeStringSet(OptAvroSchemaPrefix, OptConfluentSchemaRegistry, OptKafkaSinkConfig, OptHeadersJSONColumnName)
434+
var KafkaValidOptions = makeStringSet(OptAvroSchemaPrefix, OptConfluentSchemaRegistry, OptKafkaSinkConfig, OptHeadersJSONColumnName, OptExtraHeaders)
432435

433436
// CloudStorageValidOptions is options exclusive to cloud storage sink
434437
var CloudStorageValidOptions = makeStringSet(OptCompression)
435438

436439
// WebhookValidOptions is options exclusive to webhook sink
437-
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig, OptCompression)
440+
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig, OptCompression, OptExtraHeaders)
438441

439442
// PubsubValidOptions is options exclusive to pubsub sink
440443
var PubsubValidOptions = makeStringSet(OptPubsubSinkConfig)
@@ -477,6 +480,7 @@ func RedactUserFromURI(uri string) (string, error) {
477480
// RedactedOptions are options whose values should be replaced with "redacted" in job descriptions and errors.
478481
var RedactedOptions = map[string]redactionFunc{
479482
OptWebhookAuthHeader: redactSimple,
483+
OptExtraHeaders: redactSimple,
480484
SinkParamClientKey: redactSimple,
481485
OptConfluentSchemaRegistry: RedactUserFromURI,
482486
}
@@ -1032,6 +1036,7 @@ func (s StatementOptions) GetFilters() Filters {
10321036
type WebhookSinkOptions struct {
10331037
JSONConfig SinkSpecificJSONConfig
10341038
AuthHeader string
1039+
ExtraHeaders map[string]string
10351040
ClientTimeout *time.Duration
10361041
Compression string
10371042
}
@@ -1049,13 +1054,66 @@ func (s StatementOptions) GetWebhookSinkOptions() (WebhookSinkOptions, error) {
10491054
return o, err
10501055
}
10511056
o.ClientTimeout = timeout
1057+
1058+
headersMap, err := parseHeaders[string](s.m[OptExtraHeaders])
1059+
if err != nil {
1060+
return o, err
1061+
}
1062+
o.ExtraHeaders = headersMap
10521063
return o, nil
10531064
}
10541065

1055-
// GetKafkaConfigJSON returns arbitrary json to be interpreted
1056-
// by the kafka sink.
1057-
func (s StatementOptions) GetKafkaConfigJSON() SinkSpecificJSONConfig {
1058-
return s.getJSONValue(OptKafkaSinkConfig)
1066+
func parseHeaders[S interface{ string | []byte }](headers string) (map[string]S, error) {
1067+
if headers == "" {
1068+
return nil, nil
1069+
}
1070+
headersJ, err := json.ParseJSON(headers)
1071+
if err != nil {
1072+
return nil, errors.Wrap(err, "parsing headers")
1073+
}
1074+
it, err := headersJ.ObjectIter()
1075+
if err != nil {
1076+
return nil, errors.Wrap(err, "parsing headers as object")
1077+
}
1078+
if it == nil {
1079+
return nil, errors.Newf("headers is not a JSON object: %s", headers)
1080+
}
1081+
headersMap := make(map[string]S, headersJ.Len())
1082+
for it.Next() {
1083+
k := it.Key()
1084+
v := it.Value()
1085+
s, err := v.AsText()
1086+
if err != nil {
1087+
return nil, errors.Wrap(err, "parsing header value as text")
1088+
}
1089+
if s == nil {
1090+
continue
1091+
}
1092+
headersMap[k] = S(*s)
1093+
}
1094+
return headersMap, nil
1095+
}
1096+
1097+
type KafkaSinkOptions struct {
1098+
// JSONConfig is arbitrary json to be interpreted
1099+
// by the kafka sink.
1100+
JSONConfig SinkSpecificJSONConfig
1101+
1102+
// Headers is a map of header names to values.
1103+
Headers map[string][]byte
1104+
}
1105+
1106+
func (s StatementOptions) GetKafkaSinkOptions() (KafkaSinkOptions, error) {
1107+
headersMap, err := parseHeaders[[]byte](s.m[OptExtraHeaders])
1108+
if err != nil {
1109+
return KafkaSinkOptions{}, err
1110+
}
1111+
1112+
o := KafkaSinkOptions{
1113+
JSONConfig: s.getJSONValue(OptKafkaSinkConfig),
1114+
Headers: headersMap,
1115+
}
1116+
return o, nil
10591117
}
10601118

10611119
// GetPubsubConfigJSON returns arbitrary json to be interpreted

pkg/ccl/changefeedccl/sink.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,20 +249,28 @@ func getSink(
249249
return makeNullSink(&changefeedbase.SinkURL{URL: u}, metricsBuilder(nullIsAccounted))
250250
case isKafkaSink(u):
251251
return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) {
252+
sinkOpts, err := opts.GetKafkaSinkOptions()
253+
if err != nil {
254+
return nil, err
255+
}
252256
if KafkaV2Enabled.Get(&serverCfg.Settings.SV) {
253-
return makeKafkaSinkV2(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(),
257+
return makeKafkaSinkV2(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), sinkOpts,
254258
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
255259
serverCfg.Settings, metricsBuilder, kafkaSinkV2Knobs{})
256260
} else {
257-
return makeKafkaSink(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(), serverCfg.Settings, metricsBuilder)
261+
return makeKafkaSink(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), sinkOpts, serverCfg.Settings, metricsBuilder)
258262
}
259263
})
260264
case isPulsarSink(u):
261265
var testingKnobs *TestingKnobs
262266
if knobs, ok := serverCfg.TestingKnobs.Changefeed.(*TestingKnobs); ok {
263267
testingKnobs = knobs
264268
}
265-
return makePulsarSink(ctx, &changefeedbase.SinkURL{URL: u}, encodingOpts, AllTargets(feedCfg), opts.GetKafkaConfigJSON(),
269+
sinkOpts, err := opts.GetKafkaSinkOptions()
270+
if err != nil {
271+
return nil, err
272+
}
273+
return makePulsarSink(ctx, &changefeedbase.SinkURL{URL: u}, encodingOpts, AllTargets(feedCfg), sinkOpts.JSONConfig,
266274
serverCfg.Settings, metricsBuilder, testingKnobs)
267275
case isWebhookSink(u):
268276
webhookOpts, err := opts.GetWebhookSinkOptions()

pkg/ccl/changefeedccl/sink_kafka.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1200,7 +1200,7 @@ func makeKafkaSink(
12001200
ctx context.Context,
12011201
u *changefeedbase.SinkURL,
12021202
targets changefeedbase.Targets,
1203-
jsonStr changefeedbase.SinkSpecificJSONConfig,
1203+
sinkOpts changefeedbase.KafkaSinkOptions,
12041204
settings *cluster.Settings,
12051205
mb metricsRecorderBuilder,
12061206
) (Sink, error) {
@@ -1210,6 +1210,12 @@ func makeKafkaSink(
12101210
return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic)
12111211
}
12121212

1213+
jsonStr := sinkOpts.JSONConfig
1214+
if len(sinkOpts.Headers) > 0 {
1215+
return nil, errors.Newf("headers are not supported for the v1 kafka sink;"+
1216+
" use the v2 sink instead via the `%s` cluster setting", KafkaV2Enabled.Name())
1217+
}
1218+
12131219
m := mb(requiresResourceAccounting)
12141220
config, err := buildKafkaConfig(ctx, u, jsonStr, m.getKafkaThrottlingMetrics(settings), m.netMetrics())
12151221
if err != nil {

pkg/ccl/changefeedccl/sink_kafka_connection_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ func TestAzureKafkaDefaults(t *testing.T) {
403403

404404
assertExpectedKgoOpts := func(exp expectation, opts []kgo.Opt) {
405405
sinkClient, err := newKafkaSinkClientV2(ctx, opts, sinkBatchConfig{},
406-
"", cluster.MakeTestingClusterSettings(), kafkaSinkV2Knobs{}, nilMetricsRecorderBuilder, nil)
406+
"", cluster.MakeTestingClusterSettings(), kafkaSinkV2Knobs{}, nilMetricsRecorderBuilder, nil, nil)
407407
require.NoError(t, err)
408408
defer func() { require.NoError(t, sinkClient.Close()) }()
409409
client := sinkClient.client.(*kgo.Client)

0 commit comments

Comments
 (0)