-
Notifications
You must be signed in to change notification settings - Fork 6
feature: partition strategy #520
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Valery Piashchynski <[email protected]>
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a PartitioningStrategy type and constants, exposes Changes
Sequence Diagram(s)sequenceDiagram
participant UserConfig as User config (schema.json)
participant Jobs as Jobs Plugin
participant KafkaInit as kafkajobs.InitDefault
participant kgo as franz-go (kgo)
UserConfig->>Jobs: Provide producer_options.partitioning_strategy
Jobs->>KafkaInit: Build ProducerOpts
KafkaInit->>KafkaInit: Check strategy string (exact match)
alt strategy == "Manual"
KafkaInit->>kgo: Apply ManualPartitioner()
else strategy == "RoundRobin"
KafkaInit->>kgo: Apply RoundRobinPartitioner()
else strategy == "LeastBackup"
KafkaInit->>kgo: Apply LeastBackupPartitioner()
else strategy == "Sticky"
KafkaInit->>kgo: Apply StickyPartitioner()
else strategy == "Uniform" or empty
KafkaInit->>kgo: Use default partitioner (uniform)
else unknown
KafkaInit-->>Jobs: return error "unknown partitioning strategy: <value>"
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Possibly related issues
Poem
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a new partitioning strategy feature to the Kafka producer configuration, allowing users to choose between manual and uniform partitioning strategies. It addresses issue #2219 by introducing two new producer options.
Key changes:
- Added
partitioning_strategyconfiguration option with "Manual" and "Uniform" values - Updated JSON schema to include the new partitioning strategy field with appropriate validation
- Implemented partitioning strategy logic in the Kafka configuration initialization
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| schema.json | Added partitioning_strategy field to producer options with enum validation and formatting cleanup |
| kafkajobs/opts.go | Defined PartitioningStrategy type and constants for manual/uniform partitioning |
| kafkajobs/config.go | Implemented partitioning strategy configuration logic with case-insensitive handling |
| go.work.sum | Updated dependency checksums |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
kafkajobs/opts.go (1)
117-123: Public enum added — consider documenting and aligning casing with schema.Add a short doc comment on PartitioningStrategy and constants. Also, schema uses "Manual"/"Uniform" while code constants are lower-case; case-insensitive handling exists downstream, but consider canonicalizing to a single representation across docs/schema for consistency.
kafkajobs/config.go (1)
218-229: Manual partitioner selection correctly fixes the bug; add small safety/UX tweaks.
- Optional: explicitly set the uniform partitioner when chosen to guard against upstream default changes.
- Recommend logging the selected partitioning strategy at INFO/DEBUG to ease support.
- Heads-up: with Manual, franz-go requires Record.Partition to be set; producing without it will error. Ensure Jobs sets partition when strategy=manual, or document this clearly.
Apply if you prefer explicit uniform:
switch PartitioningStrategy(strategy) { case PartitionManual: opts = append(opts, kgo.RecordPartitioner(kgo.ManualPartitioner())) case PartitionUniform: - // already used by default + // be explicit to avoid surprises if franz-go defaults ever change + opts = append(opts, kgo.RecordPartitioner(kgo.UniformBytesPartitioner())) default: return nil, errors.Errorf("unknown partitioning strategy: %s", c.ProducerOpts.PartitioningStrategy) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
go.work.sumis excluded by!**/*.sum
📒 Files selected for processing (3)
kafkajobs/config.go(2 hunks)kafkajobs/opts.go(1 hunks)schema.json(8 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
kafkajobs/config.go (1)
kafkajobs/opts.go (4)
ProducerOpts(106-115)PartitioningStrategy(117-117)PartitionManual(120-120)PartitionUniform(121-121)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Kafka durability plugin (Go stable, PHP 8.4, OS ubuntu-latest)
- GitHub Check: Kafka plugin (Go stable, PHP 8.4, OS ubuntu-latest)
🔇 Additional comments (3)
kafkajobs/opts.go (1)
106-115: LGTM: adds partitioning_strategy to ProducerOpts.Field wiring and tags look correct.
kafkajobs/config.go (1)
9-9: Import for strings — OK.schema.json (1)
239-239: SASL enum alignment — OK.Values match the implementation.
Signed-off-by: Valery Piashchynski <[email protected]>
Signed-off-by: Valery Piashchynski <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
schema.json (2)
39-50: Enum casing mismatch vs. case-insensitive runtime; accept both casings.Runtime normalizes input to lowercase, but the schema only allows TitleCase. Expand enum to include lowercase variants to prevent valid configs from being rejected.
"partitioning_strategy": { "description": "The partitioning strategy to use for produced records.", "type": "string", - "enum": [ - "Manual", - "Uniform", - "RoundRobin", - "LeastBackup", - "Sticky" - ], + "enum": [ + "Manual", "Uniform", "RoundRobin", "LeastBackup", "Sticky", + "manual", "uniform", "roundrobin", "leastbackup", "sticky" + ], "default": "Uniform" },
201-202: TLS schema blocks Root CA–only setups; align with code’s enableTLS().Code enables TLS when (key && cert) OR root_ca is provided. The schema currently requires key+cert and will reject valid root_ca-only configs. Switch to anyOf.
- "required": ["key", "cert"] + "anyOf": [ + { "required": ["key", "cert"] }, + { "required": ["root_ca"] } + ]
🧹 Nitpick comments (1)
kafkajobs/opts.go (1)
117-126: Make parsing case-insensitive at the type level to centralize validation.Implement encoding.TextUnmarshaler on PartitioningStrategy so mapstructure/json decode normalizes values. This removes the need for ad-hoc lowercasing in config.go.
Example (additional code outside this hunk):
// Optional: centralize normalization func (p *PartitioningStrategy) UnmarshalText(b []byte) error { switch strings.ToLower(string(b)) { case "manual": *p = PartitionManual case "uniform", "": *p = PartitionUniform case "roundrobin": *p = PartitionRoundRobin case "leastbackup": *p = PartitionLeastBackup case "sticky": *p = PartitionSticky default: return fmt.Errorf("unknown partitioning strategy: %s", b) } return nil }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
kafkajobs/config.go(2 hunks)kafkajobs/opts.go(1 hunks)schema.json(8 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
kafkajobs/config.go (1)
kafkajobs/opts.go (7)
ProducerOpts(106-115)PartitioningStrategy(117-117)PartitionManual(120-120)PartitionUniform(121-121)PartitionRoundRobin(122-122)PartitionLeastBackup(123-123)PartitionSticky(124-124)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Kafka durability plugin (Go stable, PHP 8.4, OS ubuntu-latest)
- GitHub Check: Kafka plugin (Go stable, PHP 8.4, OS ubuntu-latest)
🔇 Additional comments (2)
kafkajobs/opts.go (1)
106-115: Public API extension looks good.Field and tags for PartitioningStrategy are consistent with the schema location and naming.
kafkajobs/config.go (1)
218-235: Bug: lowercasing then switching on TitleCase constants makes all cases fall through to error.You convert to lowercase, then compare against constants with TitleCase values; none will ever match, so any non-empty strategy returns “unknown partitioning strategy”. Fix by switching on the lowercased string literals (or remove lowercasing and compare against constants).
- if c.ProducerOpts.PartitioningStrategy != "" { - // lowercase to prevent user errors, like Manual vs manual - strategy := strings.ToLower(string(c.ProducerOpts.PartitioningStrategy)) - switch PartitioningStrategy(strategy) { - case PartitionManual: - opts = append(opts, kgo.RecordPartitioner(kgo.ManualPartitioner())) - case PartitionUniform: - // already used by default - case PartitionRoundRobin: - opts = append(opts, kgo.RecordPartitioner(kgo.RoundRobinPartitioner())) - case PartitionLeastBackup: - opts = append(opts, kgo.RecordPartitioner(kgo.LeastBackupPartitioner())) - case PartitionSticky: - opts = append(opts, kgo.RecordPartitioner(kgo.StickyPartitioner())) - default: - return nil, errors.Errorf("unknown partitioning strategy: %s", c.ProducerOpts.PartitioningStrategy) - } - } + if s := strings.ToLower(string(c.ProducerOpts.PartitioningStrategy)); s != "" { + switch s { + case "manual": + opts = append(opts, kgo.RecordPartitioner(kgo.ManualPartitioner())) + case "uniform": + // default behavior; no option needed + case "roundrobin": + opts = append(opts, kgo.RecordPartitioner(kgo.RoundRobinPartitioner())) + case "leastbackup": + opts = append(opts, kgo.RecordPartitioner(kgo.LeastBackupPartitioner())) + case "sticky": + opts = append(opts, kgo.RecordPartitioner(kgo.StickyPartitioner())) + default: + return nil, errors.Errorf("unknown partitioning strategy: %s", c.ProducerOpts.PartitioningStrategy) + } + }Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
schema.json (1)
201-204: TLS schema now matches runtime (key+cert OR root_ca).
This addresses the earlier review note about blocking root-CA–only configs.
🧹 Nitpick comments (2)
schema.json (2)
39-50: Newpartitioning_strategyoption looks good; clarify casing in the description.
If runtime parsing is case-sensitive, make that explicit to avoid user confusion. If it’s case-insensitive, align the schema accordingly.Apply one of the following:
Option A — document case sensitivity (safe regardless of runtime behavior):
- "description": "The partitioning strategy to use for produced records.", + "description": "The partitioning strategy to use for produced records. Values are case-sensitive.",Option B — if runtime accepts case-insensitive values, broaden the enum:
- "enum": [ - "Manual", - "Uniform", - "RoundRobin", - "LeastBackup", - "Sticky" - ], + "enum": [ + "Manual","Uniform","RoundRobin","LeastBackup","Sticky", + "manual","uniform","roundrobin","leastbackup","sticky" + ],
248-248: Extend SASL enum with OAUTHBEARER
franz-go v1.19.5 supports SASL OAUTHBEARER (pkg/sasl/oauth); if you intend to enable it, import and map the OAuth mechanism in your Go code and add"oauthbearer"(matching your casing) to the schema.json enum.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
kafkajobs/config.go(1 hunks)schema.json(8 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- kafkajobs/config.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Kafka durability plugin (Go stable, PHP 8.4, OS ubuntu-latest)
- GitHub Check: Kafka plugin (Go stable, PHP 8.4, OS ubuntu-latest)
🔇 Additional comments (7)
schema.json (7)
9-9: LGTM: keepdriverrequired at pipeline level.
Matches existing config expectations.
14-14: LGTM: constraindriverenum to "kafka".
Prevents misconfiguration.
56-56: LGTM:required_acksenum normalization.
Consistent with existing option names and default.
70-70: LGTM:compression_codecenum.
Clear, explicit set; leaving “no compression” as absence is fine.
85-85: LGTM:group_optionsrequiresgroup_id.
Matches the doc string that other group options depend on it.
174-174: LGTM: brokers examples updated.
Helpful, keeps examples realistic.
217-217: LGTM:Offsetrequirestype.
Prevents ambiguous offset objects.
Signed-off-by: Valery Piashchynski <[email protected]>
Signed-off-by: Valery Piashchynski <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
go.mod (1)
20-20: Verified gRPC/protobuf alignment
The genproto pseudo-version v0.0.0-20250826171959-ef028d996bc1 depends on google.golang.org/protobuf v1.36.8, matching your pinned protobuf version and the resolved gRPC modules. Consider pinning to a stable genproto release for better reproducibility.tests/go.mod (1)
63-63: Pre-release dependency (golang/mock v1.7.0-rc.1).RC versions can be volatile. Prefer a stable tag unless you need RC fixes.
- github.com/golang/mock v1.7.0-rc.1 // indirect + // Prefer a stable release unless RC is required + github.com/golang/mock v1.7.0 // indirectIf v1.7.0 is not viable, consider the latest stable 1.6.x that satisfies your needs.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (3)
go.sumis excluded by!**/*.sumgo.work.sumis excluded by!**/*.sumtests/go.sumis excluded by!**/*.sum
📒 Files selected for processing (2)
go.mod(1 hunks)tests/go.mod(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Kafka plugin (Go stable, PHP 8.4, OS ubuntu-latest)
- GitHub Check: Kafka durability plugin (Go stable, PHP 8.4, OS ubuntu-latest)
🔇 Additional comments (7)
go.mod (3)
15-18: OTel 1.38.x upgrades are consistent.Core, sdk, trace, and metric versions align at 1.38.0. No cross-version drift detected in root module.
Also applies to: 45-45
8-8: AWS SDK v2 patch bumps look safe.All AWS leaf modules are bumped in lockstep; no mixed minor versions observed.
Also applies to: 24-35, 30-35
3-5: CI already installs Go ≥1.21 (actions/setup-go@v5 with go-version-file/go-version/matrix), so the Go 1.25 toolchain directive is supported—no CI changes needed. Ensure local dev environments use Go 1.25+.tests/go.mod (4)
20-20: OTel/test stack aligns with root (1.38.0 + otelhttp 0.63.0).Good consistency between root and tests; reduces surprise in integration runs.
Also applies to: 111-121
32-37: AWS SDK v2 patches mirrored in tests module.Mirrors root versions; avoids test-only drift. LGTM.
Also applies to: 39-45
24-24: Testify patch bump.Non-breaking; fine to proceed.
133-135: genproto pseudo-versions in tests mirror root — double-check lockstep.Matching the root is good; still advisable to validate resolved graph is single-version to avoid subtle ABI mismatches.
You can reuse the genproto/protobuf verification script from the root comment (it checks both modules).
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #520 +/- ##
=============================
=============================
☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Reason for This PR
closes: roadrunner-server/roadrunner#2219
Description of Changes
partitioning_strategy: Manual|Uniform|RoundRobin|LeastBackup|StickyLicense Acceptance
By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.
PR Checklist
[Author TODO: Meet these criteria.][Reviewer TODO: Verify that these criteria are met. Request changes if not]git commit -s).CHANGELOG.md.Summary by CodeRabbit