From 3fe56702c2e8f1d9a58cc860c71fe5e63a7cbc6f Mon Sep 17 00:00:00 2001 From: Matthew Lougheed Date: Wed, 1 Oct 2025 15:39:35 -0400 Subject: [PATCH] changefeedccl: make range_distribution_strategy a changefeed option Previously, range_distribution_strategy was a cluster setting. Moving this from a cluster setting to a changefeed option gives users control over which changefeeds it applies to (eg initial scans vs long-running feeds). Fixes: #147628 Release note (sql change): Replace cluster setting changefeed.default_range_distribution_strategy="balanced_simple" with changefeed setting range_distribution_strategy with values 'default' or 'balanced_simple'. 'balanced_simple' naively redistributes spans evenly across aggregators with no regard for replica placement. --- .../settings/settings-for-tenants.txt | 1 - docs/generated/settings/settings.html | 1 - pkg/ccl/changefeedccl/changefeed_dist.go | 45 ++--------- pkg/ccl/changefeedccl/changefeed_dist_test.go | 75 ++++++++++++------- .../changefeedccl/changefeedbase/options.go | 28 +++++++ pkg/cmd/roachtest/tests/cdc.go | 40 +--------- pkg/cmd/roachtest/tests/mixed_version_cdc.go | 14 ---- 7 files changed, 87 insertions(+), 117 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 6f5416fd2b85..29ad8db31ba4 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -12,7 +12,6 @@ changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a frac changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application changefeed.batch_reduction_retry.enabled (alias: changefeed.batch_reduction_retry_enabled) boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes application -changefeed.default_range_distribution_strategy enumeration default configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1] application changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application changefeed.fast_gzip.enabled boolean true use fast gzip implementation application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 1d1025aef11b..b1c02dc9f305 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -17,7 +17,6 @@
changefeed.backfill.concurrent_scan_requests
integer0number of concurrent scan requests per node issued during a backfillBasic/Standard/Advanced/Self-Hosted
changefeed.backfill.scan_request_size
integer524288the maximum number of bytes returned by each scan requestBasic/Standard/Advanced/Self-Hosted
changefeed.batch_reduction_retry.enabled
(alias: changefeed.batch_reduction_retry_enabled)
booleanfalseif true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizesBasic/Standard/Advanced/Self-Hosted -
changefeed.default_range_distribution_strategy
enumerationdefaultconfigures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1]Basic/Standard/Advanced/Self-Hosted
changefeed.event_consumer_worker_queue_size
integer16if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can bufferBasic/Standard/Advanced/Self-Hosted
changefeed.event_consumer_workers
integer0the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabledBasic/Standard/Advanced/Self-Hosted
changefeed.fast_gzip.enabled
booleantrueuse fast gzip implementationBasic/Standard/Advanced/Self-Hosted diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index bf2634ba80da..f55510258790 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -42,7 +42,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/errors" ) @@ -345,37 +344,6 @@ func startDistChangefeed( // The bin packing choice gives preference to leaseholder replicas if possible. var replicaOracleChoice = replicaoracle.BinPackingChoice -type rangeDistributionType int - -const ( - // defaultDistribution employs no load balancing on the changefeed - // side. We defer to distsql to select nodes and distribute work. - defaultDistribution rangeDistributionType = 0 - // balancedSimpleDistribution defers to distsql for selecting the - // set of nodes to distribute work to. However, changefeeds will try to - // distribute work evenly across this set of nodes. - balancedSimpleDistribution rangeDistributionType = 1 - // TODO(jayant): add balancedFullDistribution which takes - // full control of node selection and distribution. -) - -// RangeDistributionStrategy is used to determine how the changefeed balances -// ranges between nodes. -// TODO: deprecate this setting in favor of a changefeed option. -var RangeDistributionStrategy = settings.RegisterEnumSetting( - settings.ApplicationLevel, - "changefeed.default_range_distribution_strategy", - "configures how work is distributed among nodes for a given changefeed. "+ - "for the most balanced distribution, use `balanced_simple`. changing this setting "+ - "will not override locality restrictions", - metamorphic.ConstantWithTestChoice("default_range_distribution_strategy", - "default", "balanced_simple"), - map[rangeDistributionType]string{ - defaultDistribution: "default", - balancedSimpleDistribution: "balanced_simple", - }, - settings.WithPublic) - var useBulkOracle = settings.RegisterBoolSetting( settings.ApplicationLevel, "changefeed.random_replica_selection.enabled", @@ -410,7 +378,10 @@ func makePlan( } } - rangeDistribution := RangeDistributionStrategy.Get(sv) + rangeDistributionStrat, err := changefeedbase.MakeStatementOptions(details.Opts).GetRangeDistributionStrategy() + if err != nil { + return nil, nil, err + } evalCtx := execCtx.ExtendedEvalContext() oracle := replicaoracle.NewOracle(replicaOracleChoice, dsp.ReplicaOracleConfig(locFilter)) if useBulkOracle.Get(&evalCtx.Settings.SV) { @@ -427,8 +398,8 @@ func makePlan( log.Changefeed.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions) } switch { - case distMode == sql.LocalDistribution || rangeDistribution == defaultDistribution: - case rangeDistribution == balancedSimpleDistribution: + case distMode == sql.LocalDistribution || rangeDistributionStrat == changefeedbase.RangeDistributionStrategyDefault: + case rangeDistributionStrat == changefeedbase.RangeDistributionStrategyBalancedSimple: log.Changefeed.Infof(ctx, "rebalancing ranges using balanced simple distribution") sender := execCtx.ExecCfg().DB.NonTransactionalSender() distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender) @@ -442,8 +413,8 @@ func makePlan( log.Changefeed.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions) } default: - return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d", - rangeDistribution, distMode) + return nil, nil, errors.AssertionFailedf("unsupported dist strategy %s and dist mode %d", + rangeDistributionStrat, distMode) } if haveKnobs && maybeCfKnobs.FilterDrainingNodes != nil && len(drainingNodes) > 0 { diff --git a/pkg/ccl/changefeedccl/changefeed_dist_test.go b/pkg/ccl/changefeedccl/changefeed_dist_test.go index b32d1b34e606..007c7aa0e22b 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist_test.go +++ b/pkg/ccl/changefeedccl/changefeed_dist_test.go @@ -500,7 +500,6 @@ func TestChangefeedWithNoDistributionStrategy(t *testing.T) { tester := newRangeDistributionTester(t, noLocality) defer tester.cleanup() - serverutils.SetClusterSetting(t, tester.tc, "changefeed.default_range_distribution_strategy", "default") serverutils.SetClusterSetting(t, tester.tc, "changefeed.random_replica_selection.enabled", false) tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'") partitions := tester.getPartitions() @@ -527,10 +526,9 @@ func TestChangefeedWithSimpleDistributionStrategy(t *testing.T) { // Check that we roughly assign (64 ranges / 6 nodes) ranges to each node. tester := newRangeDistributionTester(t, noLocality) defer tester.cleanup() - tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'balanced_simple'") // We need to disable the bulk oracle in order to ensure the leaseholder is selected. tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.random_replica_selection.enabled = false") - tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'") + tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', range_distribution_strategy='balanced_simple'") partitions := tester.getPartitions() counts := tester.countRangesPerNode(partitions) upper := int(math.Ceil((1 + rebalanceThreshold.Get(&tester.lastNode.ClusterSettings().SV)) * 64 / 6)) @@ -548,30 +546,56 @@ func TestChangefeedWithNoDistributionStrategyAndConstrainedLocality(t *testing.T skip.UnderShort(t) skip.UnderDuress(t) - // The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica - // to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned - // to these nodes are distributed arbitrarily to any nodes which pass the filter. - tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier { - if i%2 == 1 { - return []roachpb.Tier{{Key: "y", Value: "1"}} + t.Run("default specified", func(t *testing.T) { + // The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica + // to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned + // to these nodes are distributed arbitrarily to any nodes which pass the filter. + tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier { + if i%2 == 1 { + return []roachpb.Tier{{Key: "y", Value: "1"}} + } + return []roachpb.Tier{} + }) + defer tester.cleanup() + tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1', range_distribution_strategy='default'") + partitions := tester.getPartitions() + counts := tester.countRangesPerNode(partitions) + + totalRanges := 0 + for i, count := range counts { + if i%2 == 1 { + totalRanges += count + } else { + require.Equal(t, count, 0) + } } - return []roachpb.Tier{} + require.Equal(t, totalRanges, 64) }) - defer tester.cleanup() - tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'default'") - tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'") - partitions := tester.getPartitions() - counts := tester.countRangesPerNode(partitions) - - totalRanges := 0 - for i, count := range counts { - if i%2 == 1 { - totalRanges += count - } else { - require.Equal(t, count, 0) + t.Run("no distribution strategy specified", func(t *testing.T) { + // The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica + // to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned + // to these nodes are distributed arbitrarily to any nodes which pass the filter. + tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier { + if i%2 == 1 { + return []roachpb.Tier{{Key: "y", Value: "1"}} + } + return []roachpb.Tier{} + }) + defer tester.cleanup() + tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'") + partitions := tester.getPartitions() + counts := tester.countRangesPerNode(partitions) + + totalRanges := 0 + for i, count := range counts { + if i%2 == 1 { + totalRanges += count + } else { + require.Equal(t, count, 0) + } } - } - require.Equal(t, totalRanges, 64) + require.Equal(t, totalRanges, 64) + }) } func TestChangefeedWithSimpleDistributionStrategyAndConstrainedLocality(t *testing.T) { @@ -593,8 +617,7 @@ func TestChangefeedWithSimpleDistributionStrategyAndConstrainedLocality(t *testi return []roachpb.Tier{} }) defer tester.cleanup() - tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'balanced_simple'") - tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'") + tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1', range_distribution_strategy='balanced_simple'") partitions := tester.getPartitions() counts := tester.countRangesPerNode(partitions) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 325b7b4db732..abac9c12d205 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -75,6 +75,19 @@ const ( EnrichedPropertySchema EnrichedProperty = `schema` ) +// RangeDistributionStrategy configures how the changefeed balances ranges between nodes. +type RangeDistributionStrategy string + +const ( + // RangeDistributionStrategyDefault employs no load balancing on the changefeed + // side. We defer to distsql to select nodes and distribute work. + RangeDistributionStrategyDefault RangeDistributionStrategy = `default` + // RangeDistributionStrategyBalancedSimple defers to distsql for selecting the + // set of nodes to distribute work to. However, changefeeds will try to + // distribute work evenly across this set of nodes. + RangeDistributionStrategyBalancedSimple RangeDistributionStrategy = `balanced_simple` +) + // Constants for the initial scan types const ( InitialScan InitialScanType = iota @@ -162,6 +175,8 @@ const ( OptEnrichedProperties = `enriched_properties` + OptRangeDistributionStrategy = `range_distribution_strategy` + OptEnvelopeKeyOnly EnvelopeType = `key_only` OptEnvelopeRow EnvelopeType = `row` OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row` @@ -412,6 +427,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{ OptIgnoreDisableChangefeedReplication: flagOption, OptEncodeJSONValueNullAsObject: flagOption, OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)), + OptRangeDistributionStrategy: enum("default", "balanced_simple"), OptHeadersJSONColumnName: stringOption, OptExtraHeaders: jsonOption, } @@ -428,6 +444,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope, OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter, OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval, OptIgnoreDisableChangefeedReplication, OptEncodeJSONValueNullAsObject, OptEnrichedProperties, + OptRangeDistributionStrategy, ) // SQLValidOptions is options exclusive to SQL sink @@ -805,6 +822,17 @@ func (s StatementOptions) IsInitialScanSpecified() bool { return true } +func (s StatementOptions) GetRangeDistributionStrategy() (RangeDistributionStrategy, error) { + v, err := s.getEnumValue(OptRangeDistributionStrategy) + if err != nil { + return "", err + } + if v == `` { + return RangeDistributionStrategyDefault, nil + } + return RangeDistributionStrategy(v), nil +} + // ShouldUseFullStatementTimeName returns true if references to the table should be in db.schema.table // format (e.g. in Kafka topics). func (s StatementOptions) ShouldUseFullStatementTimeName() bool { diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index af34526cd700..5daa6a207344 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -490,32 +490,11 @@ func (f *featureFlag) enabled(r entropy) featureState { return featureDisabled } -type enumFeatureFlag struct { - state string - v *featureState -} - -// enabled returns a valid string if the returned featureState is featureEnabled. -func (f *enumFeatureFlag) enabled(r entropy, choose func(entropy) string) (string, featureState) { - if f.v != nil { - return f.state, *f.v - } - - if r.Bool() { - f.v = &featureEnabled - f.state = choose(r) - return f.state, featureEnabled - } - f.v = &featureDisabled - return f.state, featureDisabled -} - // cdcFeatureFlags describes various cdc feature flags. // zero value cdcFeatureFlags uses metamorphic settings for features. type cdcFeatureFlags struct { - RangeFeedScheduler featureFlag - SchemaLockTables featureFlag - DistributionStrategy enumFeatureFlag + RangeFeedScheduler featureFlag + SchemaLockTables featureFlag } func makeDefaultFeatureFlags() cdcFeatureFlags { @@ -4425,11 +4404,6 @@ func (cfc *changefeedCreator) Args(args ...interface{}) *changefeedCreator { return cfc } -func chooseDistributionStrategy(r entropy) string { - vals := changefeedccl.RangeDistributionStrategy.GetAvailableValues() - return vals[r.Intn(len(vals))] -} - // applySettings aplies various settings to the cluster -- once per the // lifetime of changefeedCreator func (cfc *changefeedCreator) applySettings() error { @@ -4451,16 +4425,6 @@ func (cfc *changefeedCreator) applySettings() error { } } - rangeDistribution, rangeDistributionEnabled := cfc.flags.DistributionStrategy.enabled(cfc.rng, - chooseDistributionStrategy) - if rangeDistributionEnabled == featureEnabled { - cfc.logger.Printf("Setting changefeed.default_range_distribution_strategy to %s", rangeDistribution) - if _, err := cfc.db.Exec(fmt.Sprintf( - "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = '%s'", rangeDistribution)); err != nil { - return err - } - } - return nil } diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 6de0e0c9bff3..8c80c789774c 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -410,14 +410,6 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed( ff.RangeFeedScheduler.v = &featureUnset } - distributionStrategySupported, err := cmvt.distributionStrategySupported(r, h) - if err != nil { - return err - } - if !distributionStrategySupported { - ff.DistributionStrategy.v = &featureUnset - } - jobID, err := newChangefeedCreator(db, systemDB, l, r, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.manager.sinkURL(ctx), ff). With(options). @@ -485,12 +477,6 @@ func (cmvt *cdcMixedVersionTester) rangefeedSchedulerSupported( return h.ClusterVersionAtLeast(r, v232CV) } -func (cmvt *cdcMixedVersionTester) distributionStrategySupported( - r *rand.Rand, h *mixedversion.Helper, -) (bool, error) { - return h.ClusterVersionAtLeast(r, v241CV) -} - // canMixedVersionUseDeletedClusterSetting returns whether a // mixed-version cluster can use a deleted (system) cluster // setting. If it returns true, it will also return the subset of