Skip to content

Commit 63545b4

Browse files
craig[bot]KeithChspilchenrafisspav-kv
committed
146476: changefeedccl: fix split_column_families changefeeds r=asg0451 a=KeithCh This fixes the following when creating changefeeds for tables that have multiple columns and with the option split_column_families, into sinks that have topics: When using a Kafka V1 sink, we would always attempt to create a nonsense topic even if the Kafka broker setting has auto.create.topics.enable=false. This would cause the changefeed to fail. Release note(bug fix): Remove unncessary Kafka topic creation that would could cause changefeed start up to fail when using changefeed.new_kafka_sink_enabled=false. Fixes: #128973 Epic: CRDB-41784 148683: sql: add infrastructure for new inspect job r=spilchen a=spilchen This adds infrastructure for a new job that will handle data consistency checking (tentatively called inspect). It will be initially invoked via the `EXPERIMENTAL SCRUB` command and a new session variable (`enable_scrub_job`). It will eventually be invoked under its own SQL, but that will be handled separately. I split this into 4 commits to make reviewing easier. I will squash them prior to merging to master. ## jobs: add stub implementation of inspect job This commit introduces a stub for the new INSPECT job type. The Resumer doesn't do anything useful yet. It simply returns as a no-op. Informs: #148289 Epic: CRDB-30356 Release note: None ## sql: support starting inspect job from SCRUB This change adds support for initiating a INSPECT job when running SCRUB TABLE, controlled by the session variable enable_scrub_job. When enable_scrub_job is true, SCRUB TABLE will schedule or start a consistency check job instead of running inline checks. Informs: #148289 Epic: CRDB-30356 Release note: None ## sql: add logic tests for inspect job via SCRUB This change introduces logic tests for the new inspect job path when enable_scrub_job is enabled. Informs: #148289 Epic: CRDB-30356 Release note: None ## sql: add unit test for SCRUB job execution semantics This change adds a unit test (TestInspectJobImplicitTxnSemantics) to verify the behavior of inspect jobs started via EXPERIMENTAL SCRUB TABLE in implicit transactions. Includes test hooks via InspectTestingKnobs to simulate and control job execution behavior. Informs: #148289 Epic: CRDB-30356 Release note: None 149439: roachtest: capture metrics for index backfill in schemachange/bulkingest r=rafiss a=rafiss This will allow us to make a roachperf chart to track changes over time. informs #146571 Release note: None 149483: opentelemetry: add raftlog size metrics r=arulajmani a=pav-kv Part of #141126 149488: tracing: fix tests for simple flight recorder r=angles-n-daemons a=dhartunian We weren't returning errors correctly in SucceedsSoon. First failure would fail the test instead of retrying. Resolves: #149450 Release note: None Co-authored-by: Keith Chow <[email protected]> Co-authored-by: Matt Spilchen <[email protected]> Co-authored-by: Rafi Shamim <[email protected]> Co-authored-by: Pavel Kalinnikov <[email protected]> Co-authored-by: David Hartunian <[email protected]>
6 parents ce20a39 + 2f7651b + 6193ddf + 56bd548 + bad46c1 + 8124276 commit 63545b4

File tree

38 files changed

+733
-21
lines changed

38 files changed

+733
-21
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5086,6 +5086,114 @@ layers:
50865086
unit: COUNT
50875087
aggregation: AVG
50885088
derivative: NON_NEGATIVE_DERIVATIVE
5089+
- name: jobs.inspect.currently_idle
5090+
exported_name: jobs_inspect_currently_idle
5091+
labeled_name: 'jobs{type: inspect, status: currently_idle}'
5092+
description: Number of inspect jobs currently considered Idle and can be freely shut down
5093+
y_axis_label: jobs
5094+
type: GAUGE
5095+
unit: COUNT
5096+
aggregation: AVG
5097+
derivative: NONE
5098+
- name: jobs.inspect.currently_paused
5099+
exported_name: jobs_inspect_currently_paused
5100+
labeled_name: 'jobs{name: inspect, status: currently_paused}'
5101+
description: Number of inspect jobs currently considered Paused
5102+
y_axis_label: jobs
5103+
type: GAUGE
5104+
unit: COUNT
5105+
aggregation: AVG
5106+
derivative: NONE
5107+
- name: jobs.inspect.currently_running
5108+
exported_name: jobs_inspect_currently_running
5109+
labeled_name: 'jobs{type: inspect, status: currently_running}'
5110+
description: Number of inspect jobs currently running in Resume or OnFailOrCancel state
5111+
y_axis_label: jobs
5112+
type: GAUGE
5113+
unit: COUNT
5114+
aggregation: AVG
5115+
derivative: NONE
5116+
- name: jobs.inspect.expired_pts_records
5117+
exported_name: jobs_inspect_expired_pts_records
5118+
labeled_name: 'jobs.expired_pts_records{type: inspect}'
5119+
description: Number of expired protected timestamp records owned by inspect jobs
5120+
y_axis_label: records
5121+
type: COUNTER
5122+
unit: COUNT
5123+
aggregation: AVG
5124+
derivative: NON_NEGATIVE_DERIVATIVE
5125+
- name: jobs.inspect.fail_or_cancel_completed
5126+
exported_name: jobs_inspect_fail_or_cancel_completed
5127+
labeled_name: 'jobs.fail_or_cancel{name: inspect, status: completed}'
5128+
description: Number of inspect jobs which successfully completed their failure or cancelation process
5129+
y_axis_label: jobs
5130+
type: COUNTER
5131+
unit: COUNT
5132+
aggregation: AVG
5133+
derivative: NON_NEGATIVE_DERIVATIVE
5134+
- name: jobs.inspect.fail_or_cancel_failed
5135+
exported_name: jobs_inspect_fail_or_cancel_failed
5136+
labeled_name: 'jobs.fail_or_cancel{name: inspect, status: failed}'
5137+
description: Number of inspect jobs which failed with a non-retriable error on their failure or cancelation process
5138+
y_axis_label: jobs
5139+
type: COUNTER
5140+
unit: COUNT
5141+
aggregation: AVG
5142+
derivative: NON_NEGATIVE_DERIVATIVE
5143+
- name: jobs.inspect.fail_or_cancel_retry_error
5144+
exported_name: jobs_inspect_fail_or_cancel_retry_error
5145+
labeled_name: 'jobs.fail_or_cancel{name: inspect, status: retry_error}'
5146+
description: Number of inspect jobs which failed with a retriable error on their failure or cancelation process
5147+
y_axis_label: jobs
5148+
type: COUNTER
5149+
unit: COUNT
5150+
aggregation: AVG
5151+
derivative: NON_NEGATIVE_DERIVATIVE
5152+
- name: jobs.inspect.protected_age_sec
5153+
exported_name: jobs_inspect_protected_age_sec
5154+
labeled_name: 'jobs.protected_age_sec{type: inspect}'
5155+
description: The age of the oldest PTS record protected by inspect jobs
5156+
y_axis_label: seconds
5157+
type: GAUGE
5158+
unit: SECONDS
5159+
aggregation: AVG
5160+
derivative: NONE
5161+
- name: jobs.inspect.protected_record_count
5162+
exported_name: jobs_inspect_protected_record_count
5163+
labeled_name: 'jobs.protected_record_count{type: inspect}'
5164+
description: Number of protected timestamp records held by inspect jobs
5165+
y_axis_label: records
5166+
type: GAUGE
5167+
unit: COUNT
5168+
aggregation: AVG
5169+
derivative: NONE
5170+
- name: jobs.inspect.resume_completed
5171+
exported_name: jobs_inspect_resume_completed
5172+
labeled_name: 'jobs.resume{name: inspect, status: completed}'
5173+
description: Number of inspect jobs which successfully resumed to completion
5174+
y_axis_label: jobs
5175+
type: COUNTER
5176+
unit: COUNT
5177+
aggregation: AVG
5178+
derivative: NON_NEGATIVE_DERIVATIVE
5179+
- name: jobs.inspect.resume_failed
5180+
exported_name: jobs_inspect_resume_failed
5181+
labeled_name: 'jobs.resume{name: inspect, status: failed}'
5182+
description: Number of inspect jobs which failed with a non-retriable error
5183+
y_axis_label: jobs
5184+
type: COUNTER
5185+
unit: COUNT
5186+
aggregation: AVG
5187+
derivative: NON_NEGATIVE_DERIVATIVE
5188+
- name: jobs.inspect.resume_retry_error
5189+
exported_name: jobs_inspect_resume_retry_error
5190+
labeled_name: 'jobs.resume{name: inspect, status: retry_error}'
5191+
description: Number of inspect jobs which failed with a retriable error
5192+
y_axis_label: jobs
5193+
type: COUNTER
5194+
unit: COUNT
5195+
aggregation: AVG
5196+
derivative: NON_NEGATIVE_DERIVATIVE
50895197
- name: jobs.key_visualizer.currently_idle
50905198
exported_name: jobs_key_visualizer_currently_idle
50915199
labeled_name: 'jobs{type: key_visualizer, status: currently_idle}'

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ ALL_TESTS = [
496496
"//pkg/sql/idxrecommendations:idxrecommendations_test",
497497
"//pkg/sql/idxusage:idxusage_test",
498498
"//pkg/sql/importer:importer_test",
499+
"//pkg/sql/inspect:inspect_test",
499500
"//pkg/sql/inverted:inverted_disallowed_imports_test",
500501
"//pkg/sql/inverted:inverted_test",
501502
"//pkg/sql/lex:lex_disallowed_imports_test",
@@ -2020,6 +2021,8 @@ GO_TARGETS = [
20202021
"//pkg/sql/idxusage:idxusage_test",
20212022
"//pkg/sql/importer:importer",
20222023
"//pkg/sql/importer:importer_test",
2024+
"//pkg/sql/inspect:inspect",
2025+
"//pkg/sql/inspect:inspect_test",
20232026
"//pkg/sql/inverted:inverted",
20242027
"//pkg/sql/inverted:inverted_test",
20252028
"//pkg/sql/isql:isql",

pkg/base/testing_knobs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type TestingKnobs struct {
3232
JobsTestingKnobs ModuleTestingKnobs
3333
BackupRestore ModuleTestingKnobs
3434
TTL ModuleTestingKnobs
35+
Inspect ModuleTestingKnobs
3536
SchemaTelemetry ModuleTestingKnobs
3637
Streaming ModuleTestingKnobs
3738
UpgradeManager ModuleTestingKnobs

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6711,15 +6711,15 @@ func TestChangefeedErrors(t *testing.T) {
67116711
`CREATE CHANGEFEED FOR foo INTO $1`, `experimental-sql://d/?confluent_schema_registry=foo&weird=bar`,
67126712
)
67136713

6714-
badHostErrRE := "client has run out of available brokers"
6714+
badHostErrRE := "(no such host|connection refused|network is unreachable)"
67156715
if KafkaV2Enabled.Get(&s.ClusterSettings().SV) {
67166716
badHostErrRE = "(unable to dial|unable to open connection to broker|lookup .* on .*: server misbehaving|connection refused)"
67176717
}
67186718

67196719
// Check unavailable kafka - bad dns.
67206720
longTimeoutSQLDB.ExpectErrWithTimeout(
67216721
t, badHostErrRE,
6722-
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope'`,
6722+
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope:9999'`,
67236723
)
67246724

67256725
// Check unavailable kafka - not running.
@@ -6731,7 +6731,7 @@ func TestChangefeedErrors(t *testing.T) {
67316731
// Test that a well-formed URI gets as far as unavailable kafka error.
67326732
longTimeoutSQLDB.ExpectErrWithTimeout(
67336733
t, badHostErrRE,
6734-
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope/?tls_enabled=true&insecure_tls_skip_verify=true&topic_name=foo'`,
6734+
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope:9999/?tls_enabled=true&insecure_tls_skip_verify=true&topic_name=foo'`,
67356735
)
67366736

67376737
// kafka_topic_prefix was referenced by an old version of the RFC, it's
@@ -6846,12 +6846,12 @@ func TestChangefeedErrors(t *testing.T) {
68466846
)
68476847
sqlDB.ExpectErrWithTimeout(
68486848
t, badHostErrRE,
6849-
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope/' WITH kafka_sink_config='{"Flush": {"Messages": 100, "Frequency": "1s"}}'`,
6849+
`CREATE CHANGEFEED FOR foo INTO 'kafka://nope:9999/' WITH kafka_sink_config='{"Flush": {"Messages": 100, "Frequency": "1s"}}'`,
68506850
)
68516851
sqlDB.ExpectErrWithTimeout(
68526852
t, `this sink is incompatible with option webhook_client_timeout`,
68536853
`CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_client_timeout='1s'`,
6854-
`kafka://nope/`,
6854+
`kafka://nope:9999/`,
68556855
)
68566856
// The avro format doesn't support key_in_value or topic_in_value yet.
68576857
sqlDB.ExpectErrWithTimeout(
@@ -7279,6 +7279,21 @@ func TestChangefeedDescription(t *testing.T) {
72797279
}
72807280
}
72817281

7282+
func TestChangefeedKafkaV1ConnectionError(t *testing.T) {
7283+
defer leaktest.AfterTest(t)()
7284+
defer log.Scope(t).Close(t)
7285+
7286+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
7287+
KafkaV2Enabled.Override(context.Background(), &s.Server.ClusterSettings().SV, false)
7288+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
7289+
sqlDB.Exec(t, `CREATE TABLE foo(id int primary key, s string)`)
7290+
sqlDB.Exec(t, `INSERT INTO foo(id, s) VALUES (0, 'hello'), (1, null)`)
7291+
_, err := f.Feed(`CREATE CHANGEFEED FOR foo`)
7292+
require.ErrorContains(t, err, "client has run out of available brokers")
7293+
}
7294+
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestForceKafkaV1ConnectionCheck)
7295+
}
7296+
72827297
func TestChangefeedPanicRecovery(t *testing.T) {
72837298
defer leaktest.AfterTest(t)()
72847299
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,7 @@ type feedTestOptions struct {
776776
debugUseAfterFinish bool
777777
clusterName string
778778
locality roachpb.Locality
779+
forceKafkaV1ConnectionCheck bool
779780
}
780781

781782
type feedTestOption func(opts *feedTestOptions)
@@ -795,6 +796,12 @@ var feedTestNoExternalConnection = func(opts *feedTestOptions) { opts.forceNoExt
795796
// has privileges to create changefeeds on tables in the default database `d` only.
796797
var feedTestUseRootUserConnection = func(opts *feedTestOptions) { opts.forceRootUserConnection = true }
797798

799+
// feedTestForceKafkaV1ConnectionCheck is a feedTestOption that will force the connection check
800+
// inside Dial() when using a Kafka v1 sink.
801+
var feedTestForceKafkaV1ConnectionCheck = func(opts *feedTestOptions) {
802+
opts.forceKafkaV1ConnectionCheck = true
803+
}
804+
798805
var feedTestForceSink = func(sinkType string) feedTestOption {
799806
return feedTestRestrictSinks(sinkType)
800807
}
@@ -1339,7 +1346,7 @@ func makeFeedFactoryWithOptions(
13391346
}
13401347
switch sinkType {
13411348
case "kafka":
1342-
f := makeKafkaFeedFactory(t, srvOrCluster, db)
1349+
f := makeKafkaFeedFactoryWithConnectionCheck(t, srvOrCluster, db, options.forceKafkaV1ConnectionCheck)
13431350
userDB, cleanup := getInitialDBForEnterpriseFactory(t, s, db, options)
13441351
f.(*kafkaFeedFactory).configureUserDB(userDB)
13451352
return f, func() { cleanup() }

pkg/ccl/changefeedccl/sink_kafka.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ type kafkaSinkKnobs struct {
7474
OverrideClientInit func(config *sarama.Config) (kafkaClient, error)
7575
OverrideAsyncProducerFromClient func(kafkaClient) (sarama.AsyncProducer, error)
7676
OverrideSyncProducerFromClient func(kafkaClient) (sarama.SyncProducer, error)
77+
BypassConnectionCheck bool
7778
}
7879

7980
var _ sarama.StdLogger = (*kafkaLogAdapter)(nil)
@@ -114,6 +115,8 @@ type kafkaClient interface {
114115
Config() *sarama.Config
115116
// Close closes kafka connection.
116117
Close() error
118+
// LeastLoadedBroker retrieves broker that has the least responses pending.
119+
LeastLoadedBroker() *sarama.Broker
117120
}
118121

119122
// kafkaSink emits to Kafka asynchronously. It is not concurrency-safe; all
@@ -264,10 +267,16 @@ func (s *kafkaSink) Dial() error {
264267
return err
265268
}
266269

267-
if err = client.RefreshMetadata(s.Topics()...); err != nil {
268-
// Now that we do not fetch metadata for all topics by default, we try
269-
// RefreshMetadata manually to check for any connection error.
270-
return errors.CombineErrors(err, client.Close())
270+
// Make sure the broker can be reached.
271+
if broker := client.LeastLoadedBroker(); broker != nil {
272+
if err := broker.Open(s.kafkaCfg); err != nil && !errors.Is(err, sarama.ErrAlreadyConnected) {
273+
return errors.CombineErrors(err, client.Close())
274+
}
275+
if ok, err := broker.Connected(); !ok || err != nil {
276+
return errors.CombineErrors(err, client.Close())
277+
}
278+
} else if !s.knobs.BypassConnectionCheck {
279+
return errors.CombineErrors(errors.New("client has run out of available brokers"), client.Close())
271280
}
272281

273282
producer, err := s.newAsyncProducer(client)

pkg/ccl/changefeedccl/sink_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ func makeTestKafkaSink(
278278
client := &fakeKafkaClient{config}
279279
return client, nil
280280
},
281+
BypassConnectionCheck: true,
281282
},
282283
}
283284
err = s.Dial()

pkg/ccl/changefeedccl/testfeed_test.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1745,6 +1745,10 @@ func (c *fakeKafkaClient) Close() error {
17451745
return nil
17461746
}
17471747

1748+
func (c *fakeKafkaClient) LeastLoadedBroker() *sarama.Broker {
1749+
return nil
1750+
}
1751+
17481752
func (c *fakeKafkaClient) Config() *sarama.Config {
17491753
return c.config
17501754
}
@@ -1775,6 +1779,8 @@ func (p *asyncIgnoreCloseProducer) Close() error {
17751779
type sinkKnobs struct {
17761780
// kafkaInterceptor is only valid for the v1 kafka sink.
17771781
kafkaInterceptor func(m *sarama.ProducerMessage, client kafkaClient) error
1782+
// BypassConnectionCheck is used for v1 kafka sink.
1783+
bypassKafkaV1ConnectionCheck bool
17781784
}
17791785

17801786
// fakeKafkaSink is a sink that arranges for fake kafka client and producer
@@ -1795,6 +1801,7 @@ func (s *fakeKafkaSink) Dial() error {
17951801
client := &fakeKafkaClient{config}
17961802
return client, nil
17971803
}
1804+
kafka.knobs.BypassConnectionCheck = s.knobs.bypassKafkaV1ConnectionCheck
17981805

17991806
kafka.knobs.OverrideAsyncProducerFromClient = func(client kafkaClient) (sarama.AsyncProducer, error) {
18001807
// The producer we give to kafka sink ignores close call.
@@ -1946,13 +1953,14 @@ func mustBeKafkaFeedFactory(f cdctest.TestFeedFactory) *kafkaFeedFactory {
19461953
}
19471954
}
19481955

1949-
// makeKafkaFeedFactory returns a TestFeedFactory implementation using the `kafka` uri.
1950-
func makeKafkaFeedFactory(
1951-
t *testing.T, srvOrCluster interface{}, rootDB *gosql.DB,
1956+
func makeKafkaFeedFactoryWithConnectionCheck(
1957+
t *testing.T, srvOrCluster interface{}, rootDB *gosql.DB, forceKafkaV1ConnectionCheck bool,
19521958
) cdctest.TestFeedFactory {
19531959
s, injectables := getInjectables(srvOrCluster)
19541960
return &kafkaFeedFactory{
1955-
knobs: &sinkKnobs{},
1961+
knobs: &sinkKnobs{
1962+
bypassKafkaV1ConnectionCheck: !forceKafkaV1ConnectionCheck,
1963+
},
19561964
enterpriseFeedFactory: enterpriseFeedFactory{
19571965
s: s,
19581966
db: rootDB,
@@ -1963,6 +1971,13 @@ func makeKafkaFeedFactory(
19631971
}
19641972
}
19651973

1974+
// makeKafkaFeedFactory returns a TestFeedFactory implementation using the `kafka` uri.
1975+
func makeKafkaFeedFactory(
1976+
t *testing.T, srvOrCluster interface{}, rootDB *gosql.DB,
1977+
) cdctest.TestFeedFactory {
1978+
return makeKafkaFeedFactoryWithConnectionCheck(t, srvOrCluster, rootDB, false)
1979+
}
1980+
19661981
func exprAsString(expr tree.Expr) (string, error) {
19671982
evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
19681983
semaCtx := tree.MakeSemaContext(nil /* resolver */)

pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/ccl/logictestccl/tests/local-read-committed/generated_test.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)