Skip to content

Conversation

@freeznet
Copy link
Member

@freeznet freeznet commented Jul 31, 2025

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #

feat: align topic level latest apis for policies

Master Issue: #

Motivation

The Pulsar Resources Operator's PulsarTopic CRD was missing several important topic-level policy configurations that are available in the latest Pulsar admin APIs. This gap prevented Kubernetes operators from fully managing Pulsar topics declaratively with all available policy options.

Users managing Pulsar clusters through Kubernetes need the ability to configure comprehensive topic policies including:

  • Rate limiting for subscriptions
  • Message size constraints
  • Schema validation and compatibility strategies
  • Offload policies for tiered storage
  • Auto-subscription creation rules
  • Deduplication settings
  • Consumer and producer limits

Without these policies in the CRD, operators had to resort to manual API calls or scripts to configure topics fully, breaking the declarative GitOps workflow. This PR brings feature parity with Pulsar's admin APIs, enabling full topic lifecycle management through Kubernetes resources.

Modifications

This PR adds comprehensive topic-level policy support to the PulsarTopic CRD by:

1. Extended PulsarTopicSpec with 11 new policy fields:

  • SubscribeRate - Control subscription throttling rates
  • MaxMessageSize - Set maximum message size in bytes
  • MaxProducers - Limit number of producers per topic
  • MaxConsumers - Limit number of consumers per topic
  • Deduplication - Enable/disable message deduplication
  • PublishRate - Control publish rate limiting
  • SchemaValidationEnforced - Enforce schema validation
  • OffloadPolicies - Configure tiered storage offload behavior
  • AutoSubscriptionCreation - Control automatic subscription creation
  • SchemaCompatibilityStrategy - Set schema evolution rules
  • ReplicatorDispatchRate - Control replication dispatch rates

2. Type System Improvements:

  • Moved from external package types to local type definitions for SchemaCompatibilityStrategy, OffloadPolicies, and AutoSubscriptionCreationOverride
  • This ensures proper Kubernetes deep copy generation and CRD schema validation
  • Added comprehensive deepcopy implementations for all new types

3. Admin API Integration:

  • Updated TopicParams interface with all new policy parameters
  • Implemented admin client methods to apply each policy during topic creation/update
  • Added proper type conversions between CRD types and admin API types
  • Enhanced error handling for policy application failures

4. Resource Reconciliation:

  • Modified reconcile_topic.go to map all new spec fields to admin parameters
  • Ensured policies are applied both on topic creation and updates
  • Added logic to handle nil/empty policy values appropriately

5. CRD Schema Updates:

  • Updated the PulsarTopic CRD with detailed field definitions
  • Added validation rules and descriptions for each policy field
  • Specified proper types and formats for complex nested structures

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@freeznet freeznet self-assigned this Jul 31, 2025
@freeznet freeznet requested review from a team as code owners July 31, 2025 15:55
@github-actions
Copy link
Contributor

@freeznet:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@github-actions github-actions bot added the doc-info-missing This pr needs to mark a document option in description label Jul 31, 2025
… including subscribeRate, schemaCompatibilityStrategy, and autoSubscriptionCreation
…mespace and PulsarTopic

- Updated PulsarNamespaceSpec and PulsarTopicSpec to use local type definitions for SchemaCompatibilityStrategy, OffloadPolicies, and AutoSubscriptionCreationOverride.
- Enhanced deepcopy functions for new local types to ensure proper Kubernetes deep copy generation.
- Removed unnecessary imports of adminutils from relevant files.
- Adjusted tests to reflect changes in type definitions for schema compatibility strategy and offload policies.
@freeznet freeznet requested a review from Copilot July 31, 2025 16:32

This comment was marked as outdated.

@jiangpengcheng jiangpengcheng requested a review from Copilot August 1, 2025 01:05
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR enhances the PulsarTopic CRD to align with Pulsar's latest admin APIs by adding comprehensive topic-level policy configurations. The changes enable full declarative management of Pulsar topics through Kubernetes resources, bringing feature parity with Pulsar's admin APIs.

Key changes include:

  • Extended PulsarTopicSpec with 11 new policy fields for rate limiting, message constraints, schema management, and offload policies
  • Migrated from external package types to local type definitions for better Kubernetes integration
  • Updated admin client implementation to apply all new policies during topic creation/updates

Reviewed Changes

Copilot reviewed 12 out of 13 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
tests/utils/spec.go Added utility functions for creating test topics with new policy configurations
tests/operator/resources_test.go Added comprehensive integration tests for new topic policies
pkg/connection/reconcile_topic.go Updated topic parameter mapping to include all new policy fields
pkg/admin/interface.go Extended TopicParams interface with new policy parameters and updated type references
pkg/admin/impl.go Implemented admin client methods for applying new policies and added type conversion functions
go.work.sum, go.mod Updated Pulsar client dependency version
docs/pulsar_topic.md Added documentation for all new policy fields with examples
config/crd/bases/resource.streamnative.io_pulsartopics.yaml Updated CRD schema with new policy field definitions
api/v1alpha1/zz_generated.deepcopy.go Updated generated deepcopy methods for new types
api/v1alpha1/pulsartopic_types.go Added new policy fields and local type definitions
api/v1alpha1/pulsarnamespace_types.go Updated to use local SchemaCompatibilityStrategy type
Comments suppressed due to low confidence (4)

tests/operator/resources_test.go:928

  • [nitpick] The import alias 'pointer' is inconsistent with the rest of the test file which uses 'ptr'. Consider using 'ptr.Toint32' to maintain consistency.
				topic.Spec.SubscribeRate.SubscribeThrottlingRatePerConsumer = pointer.Int32(20)

tests/operator/resources_test.go:929

  • [nitpick] The import alias 'pointer' is inconsistent with the rest of the test file which uses 'ptr'. Consider using 'ptr.Toint32' to maintain consistency.
				topic.Spec.SubscribeRate.RatePeriodInSecond = pointer.Int32(60)

api/v1alpha1/zz_generated.deepcopy.go:22

  • The import alias 'pkgutils' for the external library and 'utils' for the local package creates confusion. Consider using more descriptive aliases like 'pulsarutils' for the external library.
	pkgutils "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

api/v1alpha1/zz_generated.deepcopy.go:23

  • The import alias 'utils' for the local package creates potential confusion with the previous 'pkgutils'. Consider using 'localutils' or a more specific alias.
	"github.com/streamnative/pulsar-resources-operator/pkg/utils"

Comment on lines +450 to +451
SubscribeThrottlingRatePerConsumer: -1, // default to unlimited
RatePeriodInSecond: 30, // default period
Copy link

Copilot AI Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default values are hardcoded (-1 and 30). Consider defining these as named constants at the package level to improve maintainability and make the defaults explicit.

Suggested change
SubscribeThrottlingRatePerConsumer: -1, // default to unlimited
RatePeriodInSecond: 30, // default period
SubscribeThrottlingRatePerConsumer: DefaultSubscribeThrottlingRatePerConsumer,
RatePeriodInSecond: DefaultRatePeriodInSeconds,

Copilot uses AI. Check for mistakes.
labuladong
labuladong previously approved these changes Aug 1, 2025
maxsxu
maxsxu previously approved these changes Aug 1, 2025
…rces test

- Updated the test for ManagedLedgerOffloadMaxThreads to directly compare with an integer instead of casting to int32, improving readability and consistency in the test code.
@freeznet freeznet dismissed stale reviews from maxsxu and labuladong via 58d986a August 1, 2025 10:04
@freeznet freeznet merged commit 13712f4 into main Aug 2, 2025
5 checks passed
@freeznet freeznet deleted the freeznet/align-topic-level-latest-apis branch August 2, 2025 12:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-info-missing This pr needs to mark a document option in description

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants