Skip to content

Commit 8528f98

Browse files
committed
changefeedccl: make range_distribution_strategy a changefeed option
Previously, range_distribution_strategy was a cluster setting. Moving this from a cluster setting to a changefeed option gives users control over which changefeeds it applies to (eg initial scans vs long-running feeds). Fixes: #147628 Release note (sql change): Replace cluster setting changefeed.default_range_distribution_strategy="balanced_simple" with changefeed setting range_distribution_strategy with values 'default' or 'balanced_simple'. 'balanced_simple' naively redistributes spans evenly across aggregators with no regard for replica placement.
1 parent 4b9c7ab commit 8528f98

File tree

6 files changed

+87
-83
lines changed

6 files changed

+87
-83
lines changed

docs/generated/settings/settings-for-tenants.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a frac
1212
changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application
1313
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application
1414
changefeed.batch_reduction_retry.enabled (alias: changefeed.batch_reduction_retry_enabled) boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes application
15-
changefeed.default_range_distribution_strategy enumeration default configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1] application
1615
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
1716
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
1817
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application

docs/generated/settings/settings.html

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
<tr><td><div id="setting-changefeed-backfill-concurrent-scan-requests" class="anchored"><code>changefeed.backfill.concurrent_scan_requests</code></div></td><td>integer</td><td><code>0</code></td><td>number of concurrent scan requests per node issued during a backfill</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
1818
<tr><td><div id="setting-changefeed-backfill-scan-request-size" class="anchored"><code>changefeed.backfill.scan_request_size</code></div></td><td>integer</td><td><code>524288</code></td><td>the maximum number of bytes returned by each scan request</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
1919
<tr><td><div id="setting-changefeed-batch-reduction-retry-enabled" class="anchored"><code>changefeed.batch_reduction_retry.enabled<br />(alias: changefeed.batch_reduction_retry_enabled)</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
20-
<tr><td><div id="setting-changefeed-default-range-distribution-strategy" class="anchored"><code>changefeed.default_range_distribution_strategy</code></div></td><td>enumeration</td><td><code>default</code></td><td>configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1]</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
2120
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
2221
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
2322
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import (
4242
"github.com/cockroachdb/cockroach/pkg/util/envutil"
4343
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4444
"github.com/cockroachdb/cockroach/pkg/util/log"
45-
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
4645
"github.com/cockroachdb/errors"
4746
)
4847

@@ -345,37 +344,6 @@ func startDistChangefeed(
345344
// The bin packing choice gives preference to leaseholder replicas if possible.
346345
var replicaOracleChoice = replicaoracle.BinPackingChoice
347346

348-
type rangeDistributionType int
349-
350-
const (
351-
// defaultDistribution employs no load balancing on the changefeed
352-
// side. We defer to distsql to select nodes and distribute work.
353-
defaultDistribution rangeDistributionType = 0
354-
// balancedSimpleDistribution defers to distsql for selecting the
355-
// set of nodes to distribute work to. However, changefeeds will try to
356-
// distribute work evenly across this set of nodes.
357-
balancedSimpleDistribution rangeDistributionType = 1
358-
// TODO(jayant): add balancedFullDistribution which takes
359-
// full control of node selection and distribution.
360-
)
361-
362-
// RangeDistributionStrategy is used to determine how the changefeed balances
363-
// ranges between nodes.
364-
// TODO: deprecate this setting in favor of a changefeed option.
365-
var RangeDistributionStrategy = settings.RegisterEnumSetting(
366-
settings.ApplicationLevel,
367-
"changefeed.default_range_distribution_strategy",
368-
"configures how work is distributed among nodes for a given changefeed. "+
369-
"for the most balanced distribution, use `balanced_simple`. changing this setting "+
370-
"will not override locality restrictions",
371-
metamorphic.ConstantWithTestChoice("default_range_distribution_strategy",
372-
"default", "balanced_simple"),
373-
map[rangeDistributionType]string{
374-
defaultDistribution: "default",
375-
balancedSimpleDistribution: "balanced_simple",
376-
},
377-
settings.WithPublic)
378-
379347
var useBulkOracle = settings.RegisterBoolSetting(
380348
settings.ApplicationLevel,
381349
"changefeed.random_replica_selection.enabled",
@@ -410,7 +378,10 @@ func makePlan(
410378
}
411379
}
412380

413-
rangeDistribution := RangeDistributionStrategy.Get(sv)
381+
rangeDistributionStrat, err := changefeedbase.MakeStatementOptions(details.Opts).GetRangeDistributionStrategy()
382+
if err != nil {
383+
return nil, nil, err
384+
}
414385
evalCtx := execCtx.ExtendedEvalContext()
415386
oracle := replicaoracle.NewOracle(replicaOracleChoice, dsp.ReplicaOracleConfig(locFilter))
416387
if useBulkOracle.Get(&evalCtx.Settings.SV) {
@@ -427,8 +398,8 @@ func makePlan(
427398
log.Changefeed.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
428399
}
429400
switch {
430-
case distMode == sql.LocalDistribution || rangeDistribution == defaultDistribution:
431-
case rangeDistribution == balancedSimpleDistribution:
401+
case distMode == sql.LocalDistribution || rangeDistributionStrat == changefeedbase.RangeDistributionStrategyDefault:
402+
case rangeDistributionStrat == changefeedbase.RangeDistributionStrategyBalancedSimple:
432403
log.Changefeed.Infof(ctx, "rebalancing ranges using balanced simple distribution")
433404
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
434405
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
@@ -442,8 +413,8 @@ func makePlan(
442413
log.Changefeed.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
443414
}
444415
default:
445-
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
446-
rangeDistribution, distMode)
416+
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %s and dist mode %d",
417+
rangeDistributionStrat, distMode)
447418
}
448419

449420
if haveKnobs && maybeCfKnobs.FilterDrainingNodes != nil && len(drainingNodes) > 0 {

pkg/ccl/changefeedccl/changefeed_dist_test.go

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,6 @@ func TestChangefeedWithNoDistributionStrategy(t *testing.T) {
500500
tester := newRangeDistributionTester(t, noLocality)
501501
defer tester.cleanup()
502502

503-
serverutils.SetClusterSetting(t, tester.tc, "changefeed.default_range_distribution_strategy", "default")
504503
serverutils.SetClusterSetting(t, tester.tc, "changefeed.random_replica_selection.enabled", false)
505504
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'")
506505
partitions := tester.getPartitions()
@@ -527,10 +526,9 @@ func TestChangefeedWithSimpleDistributionStrategy(t *testing.T) {
527526
// Check that we roughly assign (64 ranges / 6 nodes) ranges to each node.
528527
tester := newRangeDistributionTester(t, noLocality)
529528
defer tester.cleanup()
530-
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'balanced_simple'")
531529
// We need to disable the bulk oracle in order to ensure the leaseholder is selected.
532530
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.random_replica_selection.enabled = false")
533-
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'")
531+
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', range_distribution_strategy='balanced_simple'")
534532
partitions := tester.getPartitions()
535533
counts := tester.countRangesPerNode(partitions)
536534
upper := int(math.Ceil((1 + rebalanceThreshold.Get(&tester.lastNode.ClusterSettings().SV)) * 64 / 6))
@@ -548,30 +546,56 @@ func TestChangefeedWithNoDistributionStrategyAndConstrainedLocality(t *testing.T
548546
skip.UnderShort(t)
549547
skip.UnderDuress(t)
550548

551-
// The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
552-
// to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
553-
// to these nodes are distributed arbitrarily to any nodes which pass the filter.
554-
tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
555-
if i%2 == 1 {
556-
return []roachpb.Tier{{Key: "y", Value: "1"}}
549+
t.Run("default specified", func(t *testing.T) {
550+
// The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
551+
// to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
552+
// to these nodes are distributed arbitrarily to any nodes which pass the filter.
553+
tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
554+
if i%2 == 1 {
555+
return []roachpb.Tier{{Key: "y", Value: "1"}}
556+
}
557+
return []roachpb.Tier{}
558+
})
559+
defer tester.cleanup()
560+
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1', range_distribution_strategy='default'")
561+
partitions := tester.getPartitions()
562+
counts := tester.countRangesPerNode(partitions)
563+
564+
totalRanges := 0
565+
for i, count := range counts {
566+
if i%2 == 1 {
567+
totalRanges += count
568+
} else {
569+
require.Equal(t, count, 0)
570+
}
557571
}
558-
return []roachpb.Tier{}
572+
require.Equal(t, totalRanges, 64)
559573
})
560-
defer tester.cleanup()
561-
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'default'")
562-
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
563-
partitions := tester.getPartitions()
564-
counts := tester.countRangesPerNode(partitions)
565-
566-
totalRanges := 0
567-
for i, count := range counts {
568-
if i%2 == 1 {
569-
totalRanges += count
570-
} else {
571-
require.Equal(t, count, 0)
574+
t.Run("no distribution strategy specified", func(t *testing.T) {
575+
// The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
576+
// to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
577+
// to these nodes are distributed arbitrarily to any nodes which pass the filter.
578+
tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
579+
if i%2 == 1 {
580+
return []roachpb.Tier{{Key: "y", Value: "1"}}
581+
}
582+
return []roachpb.Tier{}
583+
})
584+
defer tester.cleanup()
585+
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
586+
partitions := tester.getPartitions()
587+
counts := tester.countRangesPerNode(partitions)
588+
589+
totalRanges := 0
590+
for i, count := range counts {
591+
if i%2 == 1 {
592+
totalRanges += count
593+
} else {
594+
require.Equal(t, count, 0)
595+
}
572596
}
573-
}
574-
require.Equal(t, totalRanges, 64)
597+
require.Equal(t, totalRanges, 64)
598+
})
575599
}
576600

577601
func TestChangefeedWithSimpleDistributionStrategyAndConstrainedLocality(t *testing.T) {
@@ -593,8 +617,7 @@ func TestChangefeedWithSimpleDistributionStrategyAndConstrainedLocality(t *testi
593617
return []roachpb.Tier{}
594618
})
595619
defer tester.cleanup()
596-
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'balanced_simple'")
597-
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
620+
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1', range_distribution_strategy='balanced_simple'")
598621
partitions := tester.getPartitions()
599622
counts := tester.countRangesPerNode(partitions)
600623

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ const (
7575
EnrichedPropertySchema EnrichedProperty = `schema`
7676
)
7777

78+
// RangeDistributionStrategy configures how the changefeed balances ranges between nodes.
79+
type RangeDistributionStrategy string
80+
81+
const (
82+
// RangeDistributionStrategyDefault employs no load balancing on the changefeed
83+
// side. We defer to distsql to select nodes and distribute work.
84+
RangeDistributionStrategyDefault RangeDistributionStrategy = `default`
85+
// RangeDistributionStrategyBalancedSimple defers to distsql for selecting the
86+
// set of nodes to distribute work to. However, changefeeds will try to
87+
// distribute work evenly across this set of nodes.
88+
RangeDistributionStrategyBalancedSimple RangeDistributionStrategy = `balanced_simple`
89+
)
90+
7891
// Constants for the initial scan types
7992
const (
8093
InitialScan InitialScanType = iota
@@ -162,6 +175,8 @@ const (
162175

163176
OptEnrichedProperties = `enriched_properties`
164177

178+
OptRangeDistributionStrategy = `range_distribution_strategy`
179+
165180
OptEnvelopeKeyOnly EnvelopeType = `key_only`
166181
OptEnvelopeRow EnvelopeType = `row`
167182
OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row`
@@ -412,6 +427,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
412427
OptIgnoreDisableChangefeedReplication: flagOption,
413428
OptEncodeJSONValueNullAsObject: flagOption,
414429
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
430+
OptRangeDistributionStrategy: enum("default", "balanced_simple"),
415431
OptHeadersJSONColumnName: stringOption,
416432
OptExtraHeaders: jsonOption,
417433
}
@@ -428,6 +444,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
428444
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter,
429445
OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval,
430446
OptIgnoreDisableChangefeedReplication, OptEncodeJSONValueNullAsObject, OptEnrichedProperties,
447+
OptRangeDistributionStrategy,
431448
)
432449

433450
// SQLValidOptions is options exclusive to SQL sink
@@ -805,6 +822,17 @@ func (s StatementOptions) IsInitialScanSpecified() bool {
805822
return true
806823
}
807824

825+
func (s StatementOptions) GetRangeDistributionStrategy() (RangeDistributionStrategy, error) {
826+
v, err := s.getEnumValue(OptRangeDistributionStrategy)
827+
if err != nil {
828+
return "", err
829+
}
830+
if v == `` {
831+
return RangeDistributionStrategyDefault, nil
832+
}
833+
return RangeDistributionStrategy(v), nil
834+
}
835+
808836
// ShouldUseFullStatementTimeName returns true if references to the table should be in db.schema.table
809837
// format (e.g. in Kafka topics).
810838
func (s StatementOptions) ShouldUseFullStatementTimeName() bool {

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -512,9 +512,8 @@ func (f *enumFeatureFlag) enabled(r entropy, choose func(entropy) string) (strin
512512
// cdcFeatureFlags describes various cdc feature flags.
513513
// zero value cdcFeatureFlags uses metamorphic settings for features.
514514
type cdcFeatureFlags struct {
515-
RangeFeedScheduler featureFlag
516-
SchemaLockTables featureFlag
517-
DistributionStrategy enumFeatureFlag
515+
RangeFeedScheduler featureFlag
516+
SchemaLockTables featureFlag
518517
}
519518

520519
func makeDefaultFeatureFlags() cdcFeatureFlags {
@@ -4445,11 +4444,6 @@ func (cfc *changefeedCreator) Args(args ...interface{}) *changefeedCreator {
44454444
return cfc
44464445
}
44474446

4448-
func chooseDistributionStrategy(r entropy) string {
4449-
vals := changefeedccl.RangeDistributionStrategy.GetAvailableValues()
4450-
return vals[r.Intn(len(vals))]
4451-
}
4452-
44534447
// applySettings aplies various settings to the cluster -- once per the
44544448
// lifetime of changefeedCreator
44554449
func (cfc *changefeedCreator) applySettings() error {
@@ -4471,16 +4465,6 @@ func (cfc *changefeedCreator) applySettings() error {
44714465
}
44724466
}
44734467

4474-
rangeDistribution, rangeDistributionEnabled := cfc.flags.DistributionStrategy.enabled(cfc.rng,
4475-
chooseDistributionStrategy)
4476-
if rangeDistributionEnabled == featureEnabled {
4477-
cfc.logger.Printf("Setting changefeed.default_range_distribution_strategy to %s", rangeDistribution)
4478-
if _, err := cfc.db.Exec(fmt.Sprintf(
4479-
"SET CLUSTER SETTING changefeed.default_range_distribution_strategy = '%s'", rangeDistribution)); err != nil {
4480-
return err
4481-
}
4482-
}
4483-
44844468
return nil
44854469
}
44864470

0 commit comments

Comments
 (0)