Skip to content

Commit 2f7651b

Browse files
committed
changefeedccl: fix split_column_families changefeeds
This fixes the following when creating changefeeds for tables that have multiple columns and with the option split_column_families, into sinks that have topics: When using a Kafka V1 sink, we would always attempt to create a nonsense topic even if the Kafka broker setting has auto.create.topics.enable=false. This would cause the changefeed to fail. Release note(bug fix): Remove unncessary Kafka topic creation that would could cause changefeed start up to fail when using changefeed.new_kafka_sink_enabled=false. Fixes: #128973 Epic: CRDB-41784
1 parent 40c8a12 commit 2f7651b

File tree

5 files changed

+61
-14
lines changed

5 files changed

+61
-14
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6708,15 +6708,15 @@ func TestChangefeedErrors(t *testing.T) {
67086708
`CREATE CHANGEFEED FOR foo INTO $1`, `experimental-sql://d/?confluent_schema_registry=foo&weird=bar`,
67096709
)
67106710

6711-
badHostErrRE := "client has run out of available brokers"
6711+
badHostErrRE := "(no such host|connection refused|network is unreachable)"
67126712
if KafkaV2Enabled.Get(&s.ClusterSettings().SV) {
67136713
badHostErrRE = "(unable to dial|unable to open connection to broker|lookup .* on .*: server misbehaving|connection refused)"
67146714
}
67156715

67166716
// Check unavailable kafka - bad dns.
67176717
longTimeoutSQLDB.ExpectErrWithTimeout(
67186718
t, badHostErrRE,
6719-
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope'`,
6719+
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope:9999'`,
67206720
)
67216721

67226722
// Check unavailable kafka - not running.
@@ -6728,7 +6728,7 @@ func TestChangefeedErrors(t *testing.T) {
67286728
// Test that a well-formed URI gets as far as unavailable kafka error.
67296729
longTimeoutSQLDB.ExpectErrWithTimeout(
67306730
t, badHostErrRE,
6731-
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope/?tls_enabled=true&insecure_tls_skip_verify=true&topic_name=foo'`,
6731+
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope:9999/?tls_enabled=true&insecure_tls_skip_verify=true&topic_name=foo'`,
67326732
)
67336733

67346734
// kafka_topic_prefix was referenced by an old version of the RFC, it's
@@ -6843,12 +6843,12 @@ func TestChangefeedErrors(t *testing.T) {
68436843
)
68446844
sqlDB.ExpectErrWithTimeout(
68456845
t, badHostErrRE,
6846-
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope/' WITH kafka_sink_config='{"Flush": {"Messages": 100, "Frequency": "1s"}}'`,
6846+
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope:9999/' WITH kafka_sink_config='{"Flush": {"Messages": 100, "Frequency": "1s"}}'`,
68476847
)
68486848
sqlDB.ExpectErrWithTimeout(
68496849
t, `this sink is incompatible with option webhook_client_timeout`,
68506850
`CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_client_timeout='1s'`,
6851-
`kafka://nope/`,
6851+
`kafka://nope:9999/`,
68526852
)
68536853
// The avro format doesn't support key_in_value or topic_in_value yet.
68546854
sqlDB.ExpectErrWithTimeout(
@@ -7276,6 +7276,21 @@ func TestChangefeedDescription(t *testing.T) {
72767276
}
72777277
}
72787278

7279+
func TestChangefeedKafkaV1ConnectionError(t *testing.T) {
7280+
defer leaktest.AfterTest(t)()
7281+
defer log.Scope(t).Close(t)
7282+
7283+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
7284+
KafkaV2Enabled.Override(context.Background(), &s.Server.ClusterSettings().SV, false)
7285+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
7286+
sqlDB.Exec(t, `CREATE TABLE foo(id int primary key, s string)`)
7287+
sqlDB.Exec(t, `INSERT INTO foo(id, s) VALUES (0, 'hello'), (1, null)`)
7288+
_, err := f.Feed(`CREATE CHANGEFEED FOR foo`)
7289+
require.ErrorContains(t, err, "client has run out of available brokers")
7290+
}
7291+
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestForceKafkaV1ConnectionCheck)
7292+
}
7293+
72797294
func TestChangefeedPanicRecovery(t *testing.T) {
72807295
defer leaktest.AfterTest(t)()
72817296
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,7 @@ type feedTestOptions struct {
776776
debugUseAfterFinish bool
777777
clusterName string
778778
locality roachpb.Locality
779+
forceKafkaV1ConnectionCheck bool
779780
}
780781

781782
type feedTestOption func(opts *feedTestOptions)
@@ -795,6 +796,12 @@ var feedTestNoExternalConnection = func(opts *feedTestOptions) { opts.forceNoExt
795796
// has privileges to create changefeeds on tables in the default database `d` only.
796797
var feedTestUseRootUserConnection = func(opts *feedTestOptions) { opts.forceRootUserConnection = true }
797798

799+
// feedTestForceKafkaV1ConnectionCheck is a feedTestOption that will force the connection check
800+
// inside Dial() when using a Kafka v1 sink.
801+
var feedTestForceKafkaV1ConnectionCheck = func(opts *feedTestOptions) {
802+
opts.forceKafkaV1ConnectionCheck = true
803+
}
804+
798805
var feedTestForceSink = func(sinkType string) feedTestOption {
799806
return feedTestRestrictSinks(sinkType)
800807
}
@@ -1339,7 +1346,7 @@ func makeFeedFactoryWithOptions(
13391346
}
13401347
switch sinkType {
13411348
case "kafka":
1342-
f := makeKafkaFeedFactory(t, srvOrCluster, db)
1349+
f := makeKafkaFeedFactoryWithConnectionCheck(t, srvOrCluster, db, options.forceKafkaV1ConnectionCheck)
13431350
userDB, cleanup := getInitialDBForEnterpriseFactory(t, s, db, options)
13441351
f.(*kafkaFeedFactory).configureUserDB(userDB)
13451352
return f, func() { cleanup() }

pkg/ccl/changefeedccl/sink_kafka.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ type kafkaSinkKnobs struct {
7474
OverrideClientInit func(config *sarama.Config) (kafkaClient, error)
7575
OverrideAsyncProducerFromClient func(kafkaClient) (sarama.AsyncProducer, error)
7676
OverrideSyncProducerFromClient func(kafkaClient) (sarama.SyncProducer, error)
77+
BypassConnectionCheck bool
7778
}
7879

7980
var _ sarama.StdLogger = (*kafkaLogAdapter)(nil)
@@ -114,6 +115,8 @@ type kafkaClient interface {
114115
Config() *sarama.Config
115116
// Close closes kafka connection.
116117
Close() error
118+
// LeastLoadedBroker retrieves broker that has the least responses pending.
119+
LeastLoadedBroker() *sarama.Broker
117120
}
118121

119122
// kafkaSink emits to Kafka asynchronously. It is not concurrency-safe; all
@@ -264,10 +267,16 @@ func (s *kafkaSink) Dial() error {
264267
return err
265268
}
266269

267-
if err = client.RefreshMetadata(s.Topics()...); err != nil {
268-
// Now that we do not fetch metadata for all topics by default, we try
269-
// RefreshMetadata manually to check for any connection error.
270-
return errors.CombineErrors(err, client.Close())
270+
// Make sure the broker can be reached.
271+
if broker := client.LeastLoadedBroker(); broker != nil {
272+
if err := broker.Open(s.kafkaCfg); err != nil && !errors.Is(err, sarama.ErrAlreadyConnected) {
273+
return errors.CombineErrors(err, client.Close())
274+
}
275+
if ok, err := broker.Connected(); !ok || err != nil {
276+
return errors.CombineErrors(err, client.Close())
277+
}
278+
} else if !s.knobs.BypassConnectionCheck {
279+
return errors.CombineErrors(errors.New("client has run out of available brokers"), client.Close())
271280
}
272281

273282
producer, err := s.newAsyncProducer(client)

pkg/ccl/changefeedccl/sink_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ func makeTestKafkaSink(
278278
client := &fakeKafkaClient{config}
279279
return client, nil
280280
},
281+
BypassConnectionCheck: true,
281282
},
282283
}
283284
err = s.Dial()

pkg/ccl/changefeedccl/testfeed_test.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1745,6 +1745,10 @@ func (c *fakeKafkaClient) Close() error {
17451745
return nil
17461746
}
17471747

1748+
func (c *fakeKafkaClient) LeastLoadedBroker() *sarama.Broker {
1749+
return nil
1750+
}
1751+
17481752
func (c *fakeKafkaClient) Config() *sarama.Config {
17491753
return c.config
17501754
}
@@ -1775,6 +1779,8 @@ func (p *asyncIgnoreCloseProducer) Close() error {
17751779
type sinkKnobs struct {
17761780
// kafkaInterceptor is only valid for the v1 kafka sink.
17771781
kafkaInterceptor func(m *sarama.ProducerMessage, client kafkaClient) error
1782+
// BypassConnectionCheck is used for v1 kafka sink.
1783+
bypassKafkaV1ConnectionCheck bool
17781784
}
17791785

17801786
// fakeKafkaSink is a sink that arranges for fake kafka client and producer
@@ -1795,6 +1801,7 @@ func (s *fakeKafkaSink) Dial() error {
17951801
client := &fakeKafkaClient{config}
17961802
return client, nil
17971803
}
1804+
kafka.knobs.BypassConnectionCheck = s.knobs.bypassKafkaV1ConnectionCheck
17981805

17991806
kafka.knobs.OverrideAsyncProducerFromClient = func(client kafkaClient) (sarama.AsyncProducer, error) {
18001807
// The producer we give to kafka sink ignores close call.
@@ -1946,13 +1953,14 @@ func mustBeKafkaFeedFactory(f cdctest.TestFeedFactory) *kafkaFeedFactory {
19461953
}
19471954
}
19481955

1949-
// makeKafkaFeedFactory returns a TestFeedFactory implementation using the `kafka` uri.
1950-
func makeKafkaFeedFactory(
1951-
t *testing.T, srvOrCluster interface{}, rootDB *gosql.DB,
1956+
func makeKafkaFeedFactoryWithConnectionCheck(
1957+
t *testing.T, srvOrCluster interface{}, rootDB *gosql.DB, forceKafkaV1ConnectionCheck bool,
19521958
) cdctest.TestFeedFactory {
19531959
s, injectables := getInjectables(srvOrCluster)
19541960
return &kafkaFeedFactory{
1955-
knobs: &sinkKnobs{},
1961+
knobs: &sinkKnobs{
1962+
bypassKafkaV1ConnectionCheck: !forceKafkaV1ConnectionCheck,
1963+
},
19561964
enterpriseFeedFactory: enterpriseFeedFactory{
19571965
s: s,
19581966
db: rootDB,
@@ -1963,6 +1971,13 @@ func makeKafkaFeedFactory(
19631971
}
19641972
}
19651973

1974+
// makeKafkaFeedFactory returns a TestFeedFactory implementation using the `kafka` uri.
1975+
func makeKafkaFeedFactory(
1976+
t *testing.T, srvOrCluster interface{}, rootDB *gosql.DB,
1977+
) cdctest.TestFeedFactory {
1978+
return makeKafkaFeedFactoryWithConnectionCheck(t, srvOrCluster, rootDB, false)
1979+
}
1980+
19661981
func exprAsString(expr tree.Expr) (string, error) {
19671982
evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
19681983
semaCtx := tree.MakeSemaContext(nil /* resolver */)

0 commit comments

Comments
 (0)