Skip to content

Commit 6efb6e0

Browse files
committed
changefeedccl: make range_distribution_strategy a changefeed option
Previously, range_distribution_strategy was a cluster setting. Moving this from a cluster setting to a changefeed option gives users control over which changefeeds it applies to (eg initial scans vs long-running feeds). Fixes: #147628 Release note (sql change): Replace cluster setting changefeed.default_range_distribution_strategy="balanced_simple" with changefeed setting range_distribution_strategy with values 'default' or 'balanced_simple'. 'balanced_simple' naively redistributes spans evenly across aggregators with no regard for replica placement.
1 parent 4b9c7ab commit 6efb6e0

File tree

3 files changed

+85
-63
lines changed

3 files changed

+85
-63
lines changed

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import (
4242
"github.com/cockroachdb/cockroach/pkg/util/envutil"
4343
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4444
"github.com/cockroachdb/cockroach/pkg/util/log"
45-
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
4645
"github.com/cockroachdb/errors"
4746
)
4847

@@ -345,37 +344,6 @@ func startDistChangefeed(
345344
// The bin packing choice gives preference to leaseholder replicas if possible.
346345
var replicaOracleChoice = replicaoracle.BinPackingChoice
347346

348-
type rangeDistributionType int
349-
350-
const (
351-
// defaultDistribution employs no load balancing on the changefeed
352-
// side. We defer to distsql to select nodes and distribute work.
353-
defaultDistribution rangeDistributionType = 0
354-
// balancedSimpleDistribution defers to distsql for selecting the
355-
// set of nodes to distribute work to. However, changefeeds will try to
356-
// distribute work evenly across this set of nodes.
357-
balancedSimpleDistribution rangeDistributionType = 1
358-
// TODO(jayant): add balancedFullDistribution which takes
359-
// full control of node selection and distribution.
360-
)
361-
362-
// RangeDistributionStrategy is used to determine how the changefeed balances
363-
// ranges between nodes.
364-
// TODO: deprecate this setting in favor of a changefeed option.
365-
var RangeDistributionStrategy = settings.RegisterEnumSetting(
366-
settings.ApplicationLevel,
367-
"changefeed.default_range_distribution_strategy",
368-
"configures how work is distributed among nodes for a given changefeed. "+
369-
"for the most balanced distribution, use `balanced_simple`. changing this setting "+
370-
"will not override locality restrictions",
371-
metamorphic.ConstantWithTestChoice("default_range_distribution_strategy",
372-
"default", "balanced_simple"),
373-
map[rangeDistributionType]string{
374-
defaultDistribution: "default",
375-
balancedSimpleDistribution: "balanced_simple",
376-
},
377-
settings.WithPublic)
378-
379347
var useBulkOracle = settings.RegisterBoolSetting(
380348
settings.ApplicationLevel,
381349
"changefeed.random_replica_selection.enabled",
@@ -410,7 +378,10 @@ func makePlan(
410378
}
411379
}
412380

413-
rangeDistribution := RangeDistributionStrategy.Get(sv)
381+
rangeDistributionStrat, err := changefeedbase.MakeStatementOptions(details.Opts).GetRangeDistributionStrategy()
382+
if err != nil {
383+
return nil, nil, err
384+
}
414385
evalCtx := execCtx.ExtendedEvalContext()
415386
oracle := replicaoracle.NewOracle(replicaOracleChoice, dsp.ReplicaOracleConfig(locFilter))
416387
if useBulkOracle.Get(&evalCtx.Settings.SV) {
@@ -427,8 +398,8 @@ func makePlan(
427398
log.Changefeed.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
428399
}
429400
switch {
430-
case distMode == sql.LocalDistribution || rangeDistribution == defaultDistribution:
431-
case rangeDistribution == balancedSimpleDistribution:
401+
case distMode == sql.LocalDistribution || rangeDistributionStrat == changefeedbase.RangeDistributionStrategyDefault:
402+
case rangeDistributionStrat == changefeedbase.RangeDistributionStrategyBalancedSimple:
432403
log.Changefeed.Infof(ctx, "rebalancing ranges using balanced simple distribution")
433404
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
434405
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
@@ -442,8 +413,8 @@ func makePlan(
442413
log.Changefeed.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
443414
}
444415
default:
445-
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
446-
rangeDistribution, distMode)
416+
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %s and dist mode %d",
417+
rangeDistributionStrat, distMode)
447418
}
448419

449420
if haveKnobs && maybeCfKnobs.FilterDrainingNodes != nil && len(drainingNodes) > 0 {

pkg/ccl/changefeedccl/changefeed_dist_test.go

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,6 @@ func TestChangefeedWithNoDistributionStrategy(t *testing.T) {
500500
tester := newRangeDistributionTester(t, noLocality)
501501
defer tester.cleanup()
502502

503-
serverutils.SetClusterSetting(t, tester.tc, "changefeed.default_range_distribution_strategy", "default")
504503
serverutils.SetClusterSetting(t, tester.tc, "changefeed.random_replica_selection.enabled", false)
505504
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'")
506505
partitions := tester.getPartitions()
@@ -527,10 +526,9 @@ func TestChangefeedWithSimpleDistributionStrategy(t *testing.T) {
527526
// Check that we roughly assign (64 ranges / 6 nodes) ranges to each node.
528527
tester := newRangeDistributionTester(t, noLocality)
529528
defer tester.cleanup()
530-
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'balanced_simple'")
531529
// We need to disable the bulk oracle in order to ensure the leaseholder is selected.
532530
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.random_replica_selection.enabled = false")
533-
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'")
531+
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', range_distribution_strategy='balanced_simple'")
534532
partitions := tester.getPartitions()
535533
counts := tester.countRangesPerNode(partitions)
536534
upper := int(math.Ceil((1 + rebalanceThreshold.Get(&tester.lastNode.ClusterSettings().SV)) * 64 / 6))
@@ -548,30 +546,56 @@ func TestChangefeedWithNoDistributionStrategyAndConstrainedLocality(t *testing.T
548546
skip.UnderShort(t)
549547
skip.UnderDuress(t)
550548

551-
// The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
552-
// to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
553-
// to these nodes are distributed arbitrarily to any nodes which pass the filter.
554-
tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
555-
if i%2 == 1 {
556-
return []roachpb.Tier{{Key: "y", Value: "1"}}
549+
t.Run("default specified", func(t *testing.T) {
550+
// The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
551+
// to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
552+
// to these nodes are distributed arbitrarily to any nodes which pass the filter.
553+
tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
554+
if i%2 == 1 {
555+
return []roachpb.Tier{{Key: "y", Value: "1"}}
556+
}
557+
return []roachpb.Tier{}
558+
})
559+
defer tester.cleanup()
560+
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1', range_distribution_strategy='default'")
561+
partitions := tester.getPartitions()
562+
counts := tester.countRangesPerNode(partitions)
563+
564+
totalRanges := 0
565+
for i, count := range counts {
566+
if i%2 == 1 {
567+
totalRanges += count
568+
} else {
569+
require.Equal(t, count, 0)
570+
}
557571
}
558-
return []roachpb.Tier{}
572+
require.Equal(t, totalRanges, 64)
559573
})
560-
defer tester.cleanup()
561-
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'default'")
562-
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
563-
partitions := tester.getPartitions()
564-
counts := tester.countRangesPerNode(partitions)
565-
566-
totalRanges := 0
567-
for i, count := range counts {
568-
if i%2 == 1 {
569-
totalRanges += count
570-
} else {
571-
require.Equal(t, count, 0)
574+
t.Run("no distribution strategy specified", func(t *testing.T) {
575+
// The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
576+
// to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
577+
// to these nodes are distributed arbitrarily to any nodes which pass the filter.
578+
tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
579+
if i%2 == 1 {
580+
return []roachpb.Tier{{Key: "y", Value: "1"}}
581+
}
582+
return []roachpb.Tier{}
583+
})
584+
defer tester.cleanup()
585+
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
586+
partitions := tester.getPartitions()
587+
counts := tester.countRangesPerNode(partitions)
588+
589+
totalRanges := 0
590+
for i, count := range counts {
591+
if i%2 == 1 {
592+
totalRanges += count
593+
} else {
594+
require.Equal(t, count, 0)
595+
}
572596
}
573-
}
574-
require.Equal(t, totalRanges, 64)
597+
require.Equal(t, totalRanges, 64)
598+
})
575599
}
576600

577601
func TestChangefeedWithSimpleDistributionStrategyAndConstrainedLocality(t *testing.T) {
@@ -593,8 +617,7 @@ func TestChangefeedWithSimpleDistributionStrategyAndConstrainedLocality(t *testi
593617
return []roachpb.Tier{}
594618
})
595619
defer tester.cleanup()
596-
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'balanced_simple'")
597-
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
620+
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1', range_distribution_strategy='balanced_simple'")
598621
partitions := tester.getPartitions()
599622
counts := tester.countRangesPerNode(partitions)
600623

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ const (
7575
EnrichedPropertySchema EnrichedProperty = `schema`
7676
)
7777

78+
// RangeDistributionStrategy configures how the changefeed balances ranges between nodes.
79+
type RangeDistributionStrategy string
80+
81+
const (
82+
// RangeDistributionStrategyDefault employs no load balancing on the changefeed
83+
// side. We defer to distsql to select nodes and distribute work.
84+
RangeDistributionStrategyDefault RangeDistributionStrategy = `default`
85+
// RangeDistributionStrategyBalancedSimple defers to distsql for selecting the
86+
// set of nodes to distribute work to. However, changefeeds will try to
87+
// distribute work evenly across this set of nodes.
88+
RangeDistributionStrategyBalancedSimple RangeDistributionStrategy = `balanced_simple`
89+
)
90+
7891
// Constants for the initial scan types
7992
const (
8093
InitialScan InitialScanType = iota
@@ -162,6 +175,8 @@ const (
162175

163176
OptEnrichedProperties = `enriched_properties`
164177

178+
OptRangeDistributionStrategy = `range_distribution_strategy`
179+
165180
OptEnvelopeKeyOnly EnvelopeType = `key_only`
166181
OptEnvelopeRow EnvelopeType = `row`
167182
OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row`
@@ -412,6 +427,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
412427
OptIgnoreDisableChangefeedReplication: flagOption,
413428
OptEncodeJSONValueNullAsObject: flagOption,
414429
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
430+
OptRangeDistributionStrategy: enum("default", "balanced_simple"),
415431
OptHeadersJSONColumnName: stringOption,
416432
OptExtraHeaders: jsonOption,
417433
}
@@ -428,6 +444,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
428444
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter,
429445
OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval,
430446
OptIgnoreDisableChangefeedReplication, OptEncodeJSONValueNullAsObject, OptEnrichedProperties,
447+
OptRangeDistributionStrategy,
431448
)
432449

433450
// SQLValidOptions is options exclusive to SQL sink
@@ -805,6 +822,17 @@ func (s StatementOptions) IsInitialScanSpecified() bool {
805822
return true
806823
}
807824

825+
func (s StatementOptions) GetRangeDistributionStrategy() (RangeDistributionStrategy, error) {
826+
v, err := s.getEnumValue(OptRangeDistributionStrategy)
827+
if err != nil {
828+
return "", err
829+
}
830+
if v == `` {
831+
return RangeDistributionStrategyDefault, nil
832+
}
833+
return RangeDistributionStrategy(v), nil
834+
}
835+
808836
// ShouldUseFullStatementTimeName returns true if references to the table should be in db.schema.table
809837
// format (e.g. in Kafka topics).
810838
func (s StatementOptions) ShouldUseFullStatementTimeName() bool {

0 commit comments

Comments
 (0)