-
Notifications
You must be signed in to change notification settings - Fork 21
feat: enhance Topic with missing policies #337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…h, publish, and inactive topic management
…istence, delayed delivery, dispatch rate, publish rate, and inactive topic policies
… including compaction, persistence, delayed delivery, dispatch rate, publish rate, and inactive topic management
|
@freeznet:Thanks for your contribution. For this PR, do we need to update docs? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the PulsarTopic resource by adding support for several missing topic-level policies. The changes enable users to configure advanced persistence settings, delayed delivery, rate limiting, and inactive topic management policies directly through the Kubernetes resource definition.
Key changes include:
- Added new policy structs for PersistencePolicies, DelayedDeliveryData, DispatchRate, PublishRate, and InactiveTopicPolicies
- Extended PulsarTopic spec to include these new policy fields
- Implemented backend support in admin client and reconciliation logic
- Added comprehensive test coverage for all new policies
- Updated documentation with detailed usage examples
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
api/v1alpha1/pulsartopic_types.go |
Added new policy type definitions and extended PulsarTopicSpec |
config/crd/bases/resource.streamnative.io_pulsartopics.yaml |
Updated CRD schema with new policy field definitions |
pkg/admin/interface.go |
Extended TopicParams struct with new policy fields |
pkg/admin/impl.go |
Implemented policy application logic in admin client |
pkg/connection/reconcile_topic.go |
Added new policy fields to topic parameter creation |
tests/utils/spec.go |
Added utility functions for creating topics with different policy configurations |
tests/operator/resources_test.go |
Added comprehensive integration tests for all new policies |
docs/pulsar_topic.md |
Updated documentation with detailed policy descriptions and examples |
api/v1alpha1/zz_generated.deepcopy.go |
Auto-generated deepcopy methods for new types |
go.work.sum |
Updated go module checksums |
| } | ||
|
|
||
| // Handle inactive topic policies | ||
| if params.InactiveTopicPolicies != nil { |
Copilot
AI
Jul 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference. The InactiveTopicDeleteMode field is a pointer and should be checked for nil before dereferencing.
| if params.InactiveTopicPolicies != nil { | |
| if params.InactiveTopicPolicies != nil { | |
| if params.InactiveTopicPolicies.InactiveTopicDeleteMode == nil { | |
| return fmt.Errorf("InactiveTopicDeleteMode is nil in InactiveTopicPolicies") | |
| } |
| inactiveTopicPolicies := utils.InactiveTopicPolicies{ | ||
| InactiveTopicDeleteMode: &deleteMode, | ||
| MaxInactiveDurationSeconds: int(*params.InactiveTopicPolicies.MaxInactiveDurationInSeconds), |
Copilot
AI
Jul 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference. The MaxInactiveDurationInSeconds field is a pointer and should be checked for nil before dereferencing.
| inactiveTopicPolicies := utils.InactiveTopicPolicies{ | |
| InactiveTopicDeleteMode: &deleteMode, | |
| MaxInactiveDurationSeconds: int(*params.InactiveTopicPolicies.MaxInactiveDurationInSeconds), | |
| var maxInactiveDurationSeconds int | |
| if params.InactiveTopicPolicies.MaxInactiveDurationInSeconds != nil { | |
| maxInactiveDurationSeconds = int(*params.InactiveTopicPolicies.MaxInactiveDurationInSeconds) | |
| } else { | |
| maxInactiveDurationSeconds = 0 // Default value if nil | |
| } | |
| inactiveTopicPolicies := utils.InactiveTopicPolicies{ | |
| InactiveTopicDeleteMode: &deleteMode, | |
| MaxInactiveDurationSeconds: maxInactiveDurationSeconds, |
| DeleteWhileInactive: *params.InactiveTopicPolicies.DeleteWhileInactive, | ||
| } |
Copilot
AI
Jul 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference. The DeleteWhileInactive field is a pointer and should be checked for nil before dereferencing.
| DeleteWhileInactive: *params.InactiveTopicPolicies.DeleteWhileInactive, | |
| } | |
| DeleteWhileInactive: false, // Default value | |
| } | |
| if params.InactiveTopicPolicies.DeleteWhileInactive != nil { | |
| inactiveTopicPolicies.DeleteWhileInactive = *params.InactiveTopicPolicies.DeleteWhileInactive | |
| } |
| return err | ||
| } | ||
| } | ||
|
|
Copilot
AI
Jul 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference. The BookkeeperEnsemble field is a pointer and should be checked for nil before dereferencing.
| // Check for nil pointers in PersistencePolicies fields | |
| if params.PersistencePolicies.BookkeeperEnsemble == nil { | |
| return errors.New("BookkeeperEnsemble cannot be nil") | |
| } |
|
|
||
| persistenceData := utils.PersistenceData{ | ||
| BookkeeperEnsemble: int64(*params.PersistencePolicies.BookkeeperEnsemble), | ||
| BookkeeperWriteQuorum: int64(*params.PersistencePolicies.BookkeeperWriteQuorum), |
Copilot
AI
Jul 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference. The BookkeeperWriteQuorum field is a pointer and should be checked for nil before dereferencing.
| dispatchRateData := utils.DispatchRateData{ | ||
| DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg), |
Copilot
AI
Jul 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference. The DispatchThrottlingRateInMsg field is a pointer and should be checked for nil before dereferencing.
| dispatchRateData := utils.DispatchRateData{ | |
| DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg), | |
| var dispatchThrottlingRateInMsg int64 | |
| if params.DispatchRate.DispatchThrottlingRateInMsg != nil { | |
| dispatchThrottlingRateInMsg = int64(*params.DispatchRate.DispatchThrottlingRateInMsg) | |
| } | |
| dispatchRateData := utils.DispatchRateData{ | |
| DispatchThrottlingRateInMsg: dispatchThrottlingRateInMsg, |
| DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg), | ||
| DispatchThrottlingRateInByte: *params.DispatchRate.DispatchThrottlingRateInByte, | ||
| RatePeriodInSecond: int64(*params.DispatchRate.RatePeriodInSecond), | ||
| } |
Copilot
AI
Jul 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference. The DispatchThrottlingRateInByte field is a pointer and should be checked for nil before dereferencing.
| DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg), | |
| DispatchThrottlingRateInByte: *params.DispatchRate.DispatchThrottlingRateInByte, | |
| RatePeriodInSecond: int64(*params.DispatchRate.RatePeriodInSecond), | |
| } | |
| DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg), | |
| } | |
| if params.DispatchRate.DispatchThrottlingRateInByte != nil { | |
| dispatchRateData.DispatchThrottlingRateInByte = *params.DispatchRate.DispatchThrottlingRateInByte | |
| } | |
| dispatchRateData.RatePeriodInSecond = int64(*params.DispatchRate.RatePeriodInSecond) |
| dispatchRateData := utils.DispatchRateData{ | ||
| DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg), | ||
| DispatchThrottlingRateInByte: *params.DispatchRate.DispatchThrottlingRateInByte, | ||
| RatePeriodInSecond: int64(*params.DispatchRate.RatePeriodInSecond), |
Copilot
AI
Jul 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference. The RatePeriodInSecond field is a pointer and should be checked for nil before dereferencing.
| RatePeriodInSecond: int64(*params.DispatchRate.RatePeriodInSecond), | |
| RatePeriodInSecond: func() int64 { | |
| if params.DispatchRate.RatePeriodInSecond != nil { | |
| return int64(*params.DispatchRate.RatePeriodInSecond) | |
| } | |
| return 0 // Default value if RatePeriodInSecond is nil | |
| }(), |
| // Handle publish rate | ||
| if params.PublishRate != nil { | ||
| publishRateData := utils.PublishRateData{ | ||
| PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg), |
Copilot
AI
Jul 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference. The PublishThrottlingRateInMsg field is a pointer and should be checked for nil before dereferencing.
| PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg), | |
| PublishThrottlingRateInMsg: func() int64 { | |
| if params.PublishRate.PublishThrottlingRateInMsg != nil { | |
| return int64(*params.PublishRate.PublishThrottlingRateInMsg) | |
| } | |
| return 0 | |
| }(), |
| PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg), | ||
| PublishThrottlingRateInByte: *params.PublishRate.PublishThrottlingRateInByte, |
Copilot
AI
Jul 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference. The PublishThrottlingRateInByte field is a pointer and should be checked for nil before dereferencing.
| PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg), | |
| PublishThrottlingRateInByte: *params.PublishRate.PublishThrottlingRateInByte, | |
| PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg), | |
| PublishThrottlingRateInByte: func() int64 { | |
| if params.PublishRate.PublishThrottlingRateInByte != nil { | |
| return *params.PublishRate.PublishThrottlingRateInByte | |
| } | |
| return 0 | |
| }(), |
(If this PR fixes a github issue, please add
Fixes #<xyz>.)Fixes #
(or if this PR is one task of a github issue, please add
Master Issue: #<xyz>to link to the master issue.)Master Issue: #
Motivation
Explain here the context, and why you're making that change. What is the problem you're trying to solve.
Modifications
Describe the modifications you've done.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Documentation
Check the box below.
Need to update docs?
doc-required(If you need help on updating docs, create a doc issue)
no-need-doc(Please explain why)
doc(If this PR contains doc changes)