Skip to content

Commit e889c68

Browse files
craig[bot]log-headajstorm
committed
154744: changefeedccl: make range_distribution_strategy a changefeed option r=andyyang890,aerfrei a=log-head Previously, range_distribution_strategy was a cluster setting. Moving this from a cluster setting to a changefeed option gives users control over which changefeeds it applies to (eg initial scans vs long-running feeds). Fixes: #147628 Release note (sql change): Replace cluster setting changefeed.default_range_distribution_strategy="balanced_simple" with changefeed setting range_distribution_strategy with values 'default' or 'balanced_simple'. 'balanced_simple' naively redistributes spans evenly across aggregators with no regard for replica placement. 156392: dev-inf: Improve AI review workflow based on feedback r=rickystewart a=ajstorm ## Summary Addresses feedback on the three-stage Claude Code PR review workflow to improve reliability and user experience. ## Changes - Added 60-minute timeout to prevent jobs from running indefinitely - Skip workflow entirely if PR is already merged - Removed 'labeled' from trigger types to prevent unnecessary runs - Added guidance in PR comment directing users to scroll to bottom for Final Analysis Summary 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Matthew Lougheed <[email protected]> Co-authored-by: Adam Storm <[email protected]>
3 parents ee20eae + 9760067 + b3eee64 commit e889c68

File tree

7 files changed

+143
-22
lines changed

7 files changed

+143
-22
lines changed

.github/workflows/pr-analyzer-threestage.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ name: Claude Code PR Review
22

33
on:
44
pull_request_target:
5-
types: [synchronize, ready_for_review, reopened, labeled]
5+
types: [synchronize, ready_for_review, reopened]
66

77
jobs:
88
claude-code-pr-review:
99
runs-on: ubuntu-latest
10-
if: "!startsWith(github.base_ref, 'release-') && !contains(github.event.pull_request.labels.*.name, 'O-No-AI-Review')"
10+
timeout-minutes: 60
11+
if: "!startsWith(github.base_ref, 'release-') && !contains(github.event.pull_request.labels.*.name, 'O-No-AI-Review') && github.event.pull_request.merged == false"
1112
permissions:
1213
contents: read
1314
pull-requests: write
@@ -237,8 +238,10 @@ jobs:
237238
**Next Steps:**
238239
Please review the detailed findings in the [workflow run](${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}).
239240
241+
**Note:** When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary.
242+
240243
After you review the findings, please tag the issue as follows:
241244
- If the detected issue is real or was helpful in any way, please tag the issue with \`O-AI-Review-Real-Issue-Found\`
242245
- If the detected issue was not helpful in any way, please tag the issue with \`O-AI-Review-Not-Helpful\`"
243246
244-
gh pr edit ${{ github.event.pull_request.number }} --add-label "o-AI-Review-Potential-Issue-Detected"
247+
gh pr edit ${{ github.event.pull_request.number }} --add-label "o-AI-Review-Potential-Issue-Detected"

docs/generated/settings/settings-for-tenants.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a frac
1212
changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application
1313
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application
1414
changefeed.batch_reduction_retry.enabled (alias: changefeed.batch_reduction_retry_enabled) boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes application
15-
changefeed.default_range_distribution_strategy enumeration default configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1] application
15+
changefeed.default_range_distribution_strategy enumeration default configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions. this setting can be overridden by the changefeed option `range_distribution_strategy` [default = 0, balanced_simple = 1] application
1616
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
1717
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
1818
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application

docs/generated/settings/settings.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
<tr><td><div id="setting-changefeed-backfill-concurrent-scan-requests" class="anchored"><code>changefeed.backfill.concurrent_scan_requests</code></div></td><td>integer</td><td><code>0</code></td><td>number of concurrent scan requests per node issued during a backfill</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
1818
<tr><td><div id="setting-changefeed-backfill-scan-request-size" class="anchored"><code>changefeed.backfill.scan_request_size</code></div></td><td>integer</td><td><code>524288</code></td><td>the maximum number of bytes returned by each scan request</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
1919
<tr><td><div id="setting-changefeed-batch-reduction-retry-enabled" class="anchored"><code>changefeed.batch_reduction_retry.enabled<br />(alias: changefeed.batch_reduction_retry_enabled)</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
20-
<tr><td><div id="setting-changefeed-default-range-distribution-strategy" class="anchored"><code>changefeed.default_range_distribution_strategy</code></div></td><td>enumeration</td><td><code>default</code></td><td>configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1]</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
20+
<tr><td><div id="setting-changefeed-default-range-distribution-strategy" class="anchored"><code>changefeed.default_range_distribution_strategy</code></div></td><td>enumeration</td><td><code>default</code></td><td>configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions. this setting can be overridden by the changefeed option `range_distribution_strategy` [default = 0, balanced_simple = 1]</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
2121
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
2222
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
2323
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -345,35 +345,36 @@ func startDistChangefeed(
345345
// The bin packing choice gives preference to leaseholder replicas if possible.
346346
var replicaOracleChoice = replicaoracle.BinPackingChoice
347347

348-
type rangeDistributionType int
348+
type clusterSettingRangeDistributionType int
349349

350350
const (
351351
// defaultDistribution employs no load balancing on the changefeed
352352
// side. We defer to distsql to select nodes and distribute work.
353-
defaultDistribution rangeDistributionType = 0
353+
defaultDistribution clusterSettingRangeDistributionType = 0
354354
// balancedSimpleDistribution defers to distsql for selecting the
355355
// set of nodes to distribute work to. However, changefeeds will try to
356356
// distribute work evenly across this set of nodes.
357-
balancedSimpleDistribution rangeDistributionType = 1
357+
balancedSimpleDistribution clusterSettingRangeDistributionType = 1
358358
// TODO(jayant): add balancedFullDistribution which takes
359359
// full control of node selection and distribution.
360360
)
361361

362-
// RangeDistributionStrategy is used to determine how the changefeed balances
363-
// ranges between nodes.
364-
// TODO: deprecate this setting in favor of a changefeed option.
362+
var rangeDistributionStrategyStrings = map[clusterSettingRangeDistributionType]string{
363+
defaultDistribution: string(changefeedbase.ChangefeedRangeDistributionStrategyDefault),
364+
balancedSimpleDistribution: string(changefeedbase.ChangefeedRangeDistributionStrategyBalancedSimple),
365+
}
366+
365367
var RangeDistributionStrategy = settings.RegisterEnumSetting(
366368
settings.ApplicationLevel,
367369
"changefeed.default_range_distribution_strategy",
368370
"configures how work is distributed among nodes for a given changefeed. "+
369371
"for the most balanced distribution, use `balanced_simple`. changing this setting "+
370-
"will not override locality restrictions",
372+
"will not override locality restrictions. this setting can be overridden by the "+
373+
"changefeed option `range_distribution_strategy`",
371374
metamorphic.ConstantWithTestChoice("default_range_distribution_strategy",
372-
"default", "balanced_simple"),
373-
map[rangeDistributionType]string{
374-
defaultDistribution: "default",
375-
balancedSimpleDistribution: "balanced_simple",
376-
},
375+
string(changefeedbase.ChangefeedRangeDistributionStrategyDefault),
376+
string(changefeedbase.ChangefeedRangeDistributionStrategyBalancedSimple)),
377+
rangeDistributionStrategyStrings,
377378
settings.WithPublic)
378379

379380
var useBulkOracle = settings.RegisterBoolSetting(
@@ -410,7 +411,6 @@ func makePlan(
410411
}
411412
}
412413

413-
rangeDistribution := RangeDistributionStrategy.Get(sv)
414414
evalCtx := execCtx.ExtendedEvalContext()
415415
oracle := replicaoracle.NewOracle(replicaOracleChoice, dsp.ReplicaOracleConfig(locFilter))
416416
if useBulkOracle.Get(&evalCtx.Settings.SV) {
@@ -426,9 +426,27 @@ func makePlan(
426426
if log.ExpensiveLogEnabled(ctx, 2) {
427427
log.Changefeed.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
428428
}
429+
// Preference for the range distribution strategy is given to the
430+
// changefeed option. If none is specified, the cluster setting,
431+
// defaulting to 'default', is used. The default behavior is to defer
432+
// to distsql for range distribution and not rebalance.
433+
changefeedRangeDistribution, err := changefeedbase.MakeStatementOptions(details.Opts).GetChangefeedRangeDistributionStrategy()
434+
if err != nil {
435+
return nil, nil, err
436+
}
437+
clusterRangeDistribution := changefeedbase.ChangefeedRangeDistributionStrategy(
438+
rangeDistributionStrategyStrings[RangeDistributionStrategy.Get(sv)])
439+
440+
rangeDistributionStrategy := changefeedRangeDistribution
441+
if rangeDistributionStrategy == changefeedbase.ChangefeedRangeDistributionStrategyNotSpecified {
442+
rangeDistributionStrategy = clusterRangeDistribution
443+
}
444+
if haveKnobs && maybeCfKnobs.RangeDistributionStrategyCallback != nil {
445+
maybeCfKnobs.RangeDistributionStrategyCallback(rangeDistributionStrategy)
446+
}
429447
switch {
430-
case distMode == sql.LocalDistribution || rangeDistribution == defaultDistribution:
431-
case rangeDistribution == balancedSimpleDistribution:
448+
case distMode == sql.LocalDistribution || rangeDistributionStrategy == changefeedbase.ChangefeedRangeDistributionStrategyDefault:
449+
case rangeDistributionStrategy == changefeedbase.ChangefeedRangeDistributionStrategyBalancedSimple:
432450
log.Changefeed.Infof(ctx, "rebalancing ranges using balanced simple distribution")
433451
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
434452
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
@@ -442,8 +460,11 @@ func makePlan(
442460
log.Changefeed.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
443461
}
444462
default:
445-
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
446-
rangeDistribution, distMode)
463+
return nil, nil, errors.AssertionFailedf(
464+
"unsupported dist strategy %s and dist mode %d (cluster setting: %q, changefeed option: %q)",
465+
rangeDistributionStrategy, distMode,
466+
clusterRangeDistribution, changefeedRangeDistribution,
467+
)
447468
}
448469

449470
if haveKnobs && maybeCfKnobs.FilterDrainingNodes != nil && len(drainingNodes) > 0 {

pkg/ccl/changefeedccl/changefeed_dist_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"time"
1717

1818
"github.com/cockroachdb/cockroach/pkg/base"
19+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
20+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1921
"github.com/cockroachdb/cockroach/pkg/jobs"
2022
"github.com/cockroachdb/cockroach/pkg/keys"
2123
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
@@ -480,6 +482,64 @@ func (rdt *rangeDistributionTester) balancedDistributionUpperBound(numNodes int)
480482
return int(math.Ceil((1 + rebalanceThreshold.Get(&rdt.lastNode.ClusterSettings().SV) + 0.1) * 64 / float64(numNodes)))
481483
}
482484

485+
func TestChangefeedWithDistributionStrategyOptions(t *testing.T) {
486+
defer leaktest.AfterTest(t)()
487+
defer log.Scope(t).Close(t)
488+
489+
clusterOptions := []changefeedbase.ChangefeedRangeDistributionStrategy{
490+
changefeedbase.ChangefeedRangeDistributionStrategyDefault,
491+
changefeedbase.ChangefeedRangeDistributionStrategyBalancedSimple,
492+
}
493+
changefeedOptions := []changefeedbase.ChangefeedRangeDistributionStrategy{
494+
changefeedbase.ChangefeedRangeDistributionStrategyDefault,
495+
changefeedbase.ChangefeedRangeDistributionStrategyBalancedSimple,
496+
changefeedbase.ChangefeedRangeDistributionStrategyNotSpecified,
497+
}
498+
testutils.RunValues(t, "cluster option", clusterOptions, func(t *testing.T, clusterOption changefeedbase.ChangefeedRangeDistributionStrategy) {
499+
testutils.RunValues(t, "changefeed option", changefeedOptions, func(t *testing.T, changefeedOption changefeedbase.ChangefeedRangeDistributionStrategy) {
500+
expectedStrat := changefeedOption
501+
if changefeedOption == changefeedbase.ChangefeedRangeDistributionStrategyNotSpecified {
502+
expectedStrat = clusterOption
503+
}
504+
505+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
506+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
507+
sqlDB.Exec(t, fmt.Sprintf("SET CLUSTER SETTING changefeed.default_range_distribution_strategy = '%s'", clusterOption))
508+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
509+
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
510+
511+
stratCh := make(chan changefeedbase.ChangefeedRangeDistributionStrategy, 1)
512+
513+
knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
514+
knobs.RangeDistributionStrategyCallback = func(strat changefeedbase.ChangefeedRangeDistributionStrategy) {
515+
select {
516+
case stratCh <- strat:
517+
default:
518+
fmt.Printf("skipping strat: %s\n", strat)
519+
}
520+
}
521+
522+
createStmt := "CREATE CHANGEFEED FOR foo"
523+
if changefeedOption != changefeedbase.ChangefeedRangeDistributionStrategyNotSpecified {
524+
createStmt += fmt.Sprintf(" WITH range_distribution_strategy='%s'", changefeedOption)
525+
}
526+
feed := feed(t, f, createStmt)
527+
defer closeFeed(t, feed)
528+
testutils.SucceedsSoon(t, func() error {
529+
select {
530+
case strat := <-stratCh:
531+
require.Equal(t, expectedStrat, strat)
532+
return nil
533+
default:
534+
return errors.New("no range distribution strategy callback found")
535+
}
536+
})
537+
}
538+
cdcTest(t, testFn)
539+
})
540+
})
541+
}
542+
483543
func TestChangefeedWithNoDistributionStrategy(t *testing.T) {
484544
defer leaktest.AfterTest(t)()
485545
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,23 @@ const (
7575
EnrichedPropertySchema EnrichedProperty = `schema`
7676
)
7777

78+
// ChangefeedRangeDistributionStrategy configures how the changefeed balances
79+
// ranges between nodes.
80+
type ChangefeedRangeDistributionStrategy string
81+
82+
const (
83+
// ChangefeedRangeDistributionStrategyDefault employs no load balancing on
84+
// the changefeed side. We defer to distsql to select nodes and distribute work.
85+
ChangefeedRangeDistributionStrategyDefault ChangefeedRangeDistributionStrategy = `default`
86+
// ChangefeedRangeDistributionStrategyBalancedSimple defers to distsql for
87+
// selecting the set of nodes to distribute work to. However, changefeeds
88+
// will try to distribute work evenly across this set of nodes.
89+
ChangefeedRangeDistributionStrategyBalancedSimple ChangefeedRangeDistributionStrategy = `balanced_simple`
90+
// ChangefeedRangeDistributionStrategyNotSpecified is used to indicate that
91+
// the changefeed range distribution strategy is not specified.
92+
ChangefeedRangeDistributionStrategyNotSpecified ChangefeedRangeDistributionStrategy = ``
93+
)
94+
7895
// Constants for the initial scan types
7996
const (
8097
InitialScan InitialScanType = iota
@@ -162,6 +179,8 @@ const (
162179

163180
OptEnrichedProperties = `enriched_properties`
164181

182+
OptRangeDistributionStrategy = `range_distribution_strategy`
183+
165184
OptEnvelopeKeyOnly EnvelopeType = `key_only`
166185
OptEnvelopeRow EnvelopeType = `row`
167186
OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row`
@@ -412,6 +431,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
412431
OptIgnoreDisableChangefeedReplication: flagOption,
413432
OptEncodeJSONValueNullAsObject: flagOption,
414433
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
434+
OptRangeDistributionStrategy: enum(string(ChangefeedRangeDistributionStrategyDefault), string(ChangefeedRangeDistributionStrategyBalancedSimple)),
415435
OptHeadersJSONColumnName: stringOption,
416436
OptExtraHeaders: jsonOption,
417437
}
@@ -428,6 +448,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
428448
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter,
429449
OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval,
430450
OptIgnoreDisableChangefeedReplication, OptEncodeJSONValueNullAsObject, OptEnrichedProperties,
451+
OptRangeDistributionStrategy,
431452
)
432453

433454
// SQLValidOptions is options exclusive to SQL sink
@@ -805,6 +826,17 @@ func (s StatementOptions) IsInitialScanSpecified() bool {
805826
return true
806827
}
807828

829+
func (s StatementOptions) GetChangefeedRangeDistributionStrategy() (
830+
ChangefeedRangeDistributionStrategy,
831+
error,
832+
) {
833+
v, err := s.getEnumValue(OptRangeDistributionStrategy)
834+
if err != nil {
835+
return "", err
836+
}
837+
return ChangefeedRangeDistributionStrategy(v), nil
838+
}
839+
808840
// ShouldUseFullStatementTimeName returns true if references to the table should be in db.schema.table
809841
// format (e.g. in Kafka topics).
810842
func (s StatementOptions) ShouldUseFullStatementTimeName() bool {

pkg/ccl/changefeedccl/testing_knobs.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package changefeedccl
88
import (
99
"context"
1010

11+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1112
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
1213
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
1314
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/resolvedspan"
@@ -86,6 +87,10 @@ type TestingKnobs struct {
8687
// when the changefeed is planned.
8788
SpanPartitionsCallback func([]sql.SpanPartition)
8889

90+
// RangeDistributionStrategyCallback is called with the resolved
91+
// range distribution strategy when the changefeed is planned.
92+
RangeDistributionStrategyCallback func(changefeedbase.ChangefeedRangeDistributionStrategy)
93+
8994
// PreserveDeprecatedPts is used to prevent a changefeed from upgrading
9095
// its PTS record from the deprecated style to the new style.
9196
PreserveDeprecatedPts func() bool

0 commit comments

Comments
 (0)