Skip to content

Commit 6d9a13c

Browse files
committed
refactor: replace adminutils types with local definitions in PulsarNamespace and PulsarTopic
- Updated PulsarNamespaceSpec and PulsarTopicSpec to use local type definitions for SchemaCompatibilityStrategy, OffloadPolicies, and AutoSubscriptionCreationOverride. - Enhanced deepcopy functions for new local types to ensure proper Kubernetes deep copy generation. - Removed unnecessary imports of adminutils from relevant files. - Adjusted tests to reflect changes in type definitions for schema compatibility strategy and offload policies.
1 parent a3a69d5 commit 6d9a13c

File tree

8 files changed

+149
-39
lines changed

8 files changed

+149
-39
lines changed

api/v1alpha1/pulsarnamespace_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ type PulsarNamespaceSpec struct {
161161
// This setting controls how schema evolution is handled for topics within this namespace.
162162
// +optional
163163
// +kubebuilder:validation:Enum=UNDEFINED;ALWAYS_INCOMPATIBLE;ALWAYS_COMPATIBLE;BACKWARD;FORWARD;FULL;BACKWARD_TRANSITIVE;FORWARD_TRANSITIVE;FULL_TRANSITIVE
164-
SchemaCompatibilityStrategy *adminutils.SchemaCompatibilityStrategy `json:"schemaCompatibilityStrategy,omitempty"`
164+
SchemaCompatibilityStrategy *SchemaCompatibilityStrategy `json:"schemaCompatibilityStrategy,omitempty"`
165165

166166
// SchemaValidationEnforced controls whether schema validation is enforced for this namespace.
167167
// When enabled, producers must provide a schema when publishing messages.

api/v1alpha1/pulsartopic_types.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"k8s.io/apimachinery/pkg/api/resource"
2020
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2121

22-
adminutils "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
2322
"github.com/streamnative/pulsar-resources-operator/pkg/utils"
2423
)
2524

@@ -204,18 +203,18 @@ type PulsarTopicSpec struct {
204203
// OffloadPolicies defines the offload policies for the topic.
205204
// This controls how data is offloaded to external storage systems.
206205
// +optional
207-
OffloadPolicies *adminutils.OffloadPolicies `json:"offloadPolicies,omitempty"`
206+
OffloadPolicies *OffloadPolicies `json:"offloadPolicies,omitempty"`
208207

209208
// AutoSubscriptionCreation defines the auto subscription creation override for the topic.
210209
// This controls whether subscriptions can be created automatically.
211210
// +optional
212-
AutoSubscriptionCreation *adminutils.AutoSubscriptionCreationOverride `json:"autoSubscriptionCreation,omitempty"`
211+
AutoSubscriptionCreation *AutoSubscriptionCreationOverride `json:"autoSubscriptionCreation,omitempty"`
213212

214213
// SchemaCompatibilityStrategy defines the schema compatibility strategy for the topic.
215214
// This controls how schema evolution is handled.
216215
// +optional
217216
// +kubebuilder:validation:Enum=UNDEFINED;ALWAYS_INCOMPATIBLE;ALWAYS_COMPATIBLE;BACKWARD;FORWARD;FULL;BACKWARD_TRANSITIVE;FORWARD_TRANSITIVE;FULL_TRANSITIVE
218-
SchemaCompatibilityStrategy *adminutils.SchemaCompatibilityStrategy `json:"schemaCompatibilityStrategy,omitempty"`
217+
SchemaCompatibilityStrategy *SchemaCompatibilityStrategy `json:"schemaCompatibilityStrategy,omitempty"`
219218

220219
// Properties is a map of user-defined properties associated with the topic.
221220
// These can be used to store additional metadata about the topic.
@@ -254,6 +253,38 @@ type SchemaInfo struct {
254253
Properties map[string]string `json:"properties,omitempty"`
255254
}
256255

256+
// OffloadPolicies defines the offload policies for a topic.
257+
// This is a local type definition that mirrors the external library's OffloadPolicies
258+
// to ensure proper Kubernetes deep copy generation.
259+
type OffloadPolicies struct {
260+
ManagedLedgerOffloadDriver string `json:"managedLedgerOffloadDriver,omitempty"`
261+
ManagedLedgerOffloadMaxThreads int `json:"managedLedgerOffloadMaxThreads,omitempty"`
262+
ManagedLedgerOffloadThresholdInBytes int64 `json:"managedLedgerOffloadThresholdInBytes,omitempty"`
263+
ManagedLedgerOffloadDeletionLagInMillis int64 `json:"managedLedgerOffloadDeletionLagInMillis,omitempty"`
264+
ManagedLedgerOffloadAutoTriggerSizeThresholdBytes int64 `json:"managedLedgerOffloadAutoTriggerSizeThresholdBytes,omitempty"`
265+
S3ManagedLedgerOffloadBucket string `json:"s3ManagedLedgerOffloadBucket,omitempty"`
266+
S3ManagedLedgerOffloadRegion string `json:"s3ManagedLedgerOffloadRegion,omitempty"`
267+
S3ManagedLedgerOffloadServiceEndpoint string `json:"s3ManagedLedgerOffloadServiceEndpoint,omitempty"`
268+
S3ManagedLedgerOffloadCredentialID string `json:"s3ManagedLedgerOffloadCredentialId,omitempty"`
269+
S3ManagedLedgerOffloadCredentialSecret string `json:"s3ManagedLedgerOffloadCredentialSecret,omitempty"`
270+
S3ManagedLedgerOffloadRole string `json:"s3ManagedLedgerOffloadRole,omitempty"`
271+
S3ManagedLedgerOffloadRoleSessionName string `json:"s3ManagedLedgerOffloadRoleSessionName,omitempty"`
272+
OffloadersDirectory string `json:"offloadersDirectory,omitempty"`
273+
ManagedLedgerOffloadDriverMetadata map[string]string `json:"managedLedgerOffloadDriverMetadata,omitempty"`
274+
}
275+
276+
// AutoSubscriptionCreationOverride defines the auto subscription creation override for a topic.
277+
// This is a local type definition that mirrors the external library's AutoSubscriptionCreationOverride
278+
// to ensure proper Kubernetes deep copy generation.
279+
type AutoSubscriptionCreationOverride struct {
280+
AllowAutoSubscriptionCreation bool `json:"allowAutoSubscriptionCreation,omitempty"`
281+
}
282+
283+
// SchemaCompatibilityStrategy defines the schema compatibility strategy for a topic.
284+
// This is a local type definition that mirrors the external library's SchemaCompatibilityStrategy
285+
// to ensure proper Kubernetes deep copy generation.
286+
type SchemaCompatibilityStrategy string
287+
257288
// PulsarTopicStatus defines the observed state of PulsarTopic
258289
type PulsarTopicStatus struct {
259290
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 53 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/resource.streamnative.io_pulsartopics.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,6 @@ spec:
8686
properties:
8787
allowAutoSubscriptionCreation:
8888
type: boolean
89-
required:
90-
- allowAutoSubscriptionCreation
9189
type: object
9290
backlogQuotaLimitSize:
9391
anyOf:

pkg/admin/impl.go

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,50 @@ const (
4646
TopicDomainNonPersistent = "non-persistent"
4747
)
4848

49+
// Type conversion functions for external library types
50+
51+
// convertOffloadPolicies converts our local OffloadPolicies to the external library type
52+
func convertOffloadPolicies(local *v1alpha1.OffloadPolicies) *utils.OffloadPolicies {
53+
if local == nil {
54+
return nil
55+
}
56+
return &utils.OffloadPolicies{
57+
ManagedLedgerOffloadDriver: local.ManagedLedgerOffloadDriver,
58+
ManagedLedgerOffloadMaxThreads: local.ManagedLedgerOffloadMaxThreads,
59+
ManagedLedgerOffloadThresholdInBytes: local.ManagedLedgerOffloadThresholdInBytes,
60+
ManagedLedgerOffloadDeletionLagInMillis: local.ManagedLedgerOffloadDeletionLagInMillis,
61+
ManagedLedgerOffloadAutoTriggerSizeThresholdBytes: local.ManagedLedgerOffloadAutoTriggerSizeThresholdBytes,
62+
S3ManagedLedgerOffloadBucket: local.S3ManagedLedgerOffloadBucket,
63+
S3ManagedLedgerOffloadRegion: local.S3ManagedLedgerOffloadRegion,
64+
S3ManagedLedgerOffloadServiceEndpoint: local.S3ManagedLedgerOffloadServiceEndpoint,
65+
S3ManagedLedgerOffloadCredentialID: local.S3ManagedLedgerOffloadCredentialID,
66+
S3ManagedLedgerOffloadCredentialSecret: local.S3ManagedLedgerOffloadCredentialSecret,
67+
S3ManagedLedgerOffloadRole: local.S3ManagedLedgerOffloadRole,
68+
S3ManagedLedgerOffloadRoleSessionName: local.S3ManagedLedgerOffloadRoleSessionName,
69+
OffloadersDirectory: local.OffloadersDirectory,
70+
ManagedLedgerOffloadDriverMetadata: local.ManagedLedgerOffloadDriverMetadata,
71+
}
72+
}
73+
74+
// convertAutoSubscriptionCreation converts our local AutoSubscriptionCreationOverride to the external library type
75+
func convertAutoSubscriptionCreation(local *v1alpha1.AutoSubscriptionCreationOverride) *utils.AutoSubscriptionCreationOverride {
76+
if local == nil {
77+
return nil
78+
}
79+
return &utils.AutoSubscriptionCreationOverride{
80+
AllowAutoSubscriptionCreation: local.AllowAutoSubscriptionCreation,
81+
}
82+
}
83+
84+
// convertSchemaCompatibilityStrategy converts our local SchemaCompatibilityStrategy to the external library type
85+
func convertSchemaCompatibilityStrategy(local *v1alpha1.SchemaCompatibilityStrategy) *utils.SchemaCompatibilityStrategy {
86+
if local == nil {
87+
return nil
88+
}
89+
strategy := utils.SchemaCompatibilityStrategy(string(*local))
90+
return &strategy
91+
}
92+
4993
// ApplyTenant creates or updates a tenant, if AllowdClusters is not provided, it will list all clusters in pular
5094
// When updates a tenant, If AdminRoles is empty, the current set of roles won't be modified
5195
func (p *PulsarAdminClient) ApplyTenant(name string, params *TenantParams) error {
@@ -496,23 +540,26 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
496540

497541
// Handle offload policies
498542
if params.OffloadPolicies != nil {
499-
err = p.adminClient.Topics().SetOffloadPolicies(*topicName, *params.OffloadPolicies)
543+
externalOffloadPolicies := convertOffloadPolicies(params.OffloadPolicies)
544+
err = p.adminClient.Topics().SetOffloadPolicies(*topicName, *externalOffloadPolicies)
500545
if err != nil {
501546
return err
502547
}
503548
}
504549

505550
// Handle auto subscription creation
506551
if params.AutoSubscriptionCreation != nil {
507-
err = p.adminClient.Topics().SetAutoSubscriptionCreation(*topicName, *params.AutoSubscriptionCreation)
552+
externalAutoSubscription := convertAutoSubscriptionCreation(params.AutoSubscriptionCreation)
553+
err = p.adminClient.Topics().SetAutoSubscriptionCreation(*topicName, *externalAutoSubscription)
508554
if err != nil {
509555
return err
510556
}
511557
}
512558

513559
// Handle schema compatibility strategy
514560
if params.SchemaCompatibilityStrategy != nil {
515-
err = p.adminClient.Topics().SetSchemaCompatibilityStrategy(*topicName, *params.SchemaCompatibilityStrategy)
561+
externalSchemaStrategy := convertSchemaCompatibilityStrategy(params.SchemaCompatibilityStrategy)
562+
err = p.adminClient.Topics().SetSchemaCompatibilityStrategy(*topicName, *externalSchemaStrategy)
516563
if err != nil {
517564
return err
518565
}
@@ -870,8 +917,8 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params
870917
}
871918

872919
if params.SchemaCompatibilityStrategy != nil {
873-
schemaStrategy := *params.SchemaCompatibilityStrategy
874-
err := p.adminClient.Namespaces().SetSchemaCompatibilityStrategy(*naName, schemaStrategy)
920+
externalSchemaStrategy := convertSchemaCompatibilityStrategy(params.SchemaCompatibilityStrategy)
921+
err := p.adminClient.Namespaces().SetSchemaCompatibilityStrategy(*naName, *externalSchemaStrategy)
875922
if err != nil {
876923
return err
877924
}

pkg/admin/interface.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
2424
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/auth"
2525
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
26-
adminutils "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
2726
utils2 "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
2827
"k8s.io/apimachinery/pkg/api/resource"
2928

@@ -57,7 +56,7 @@ type NamespaceParams struct {
5756
Deduplication *bool
5857
BookieAffinityGroup *v1alpha1.BookieAffinityGroupData
5958
TopicAutoCreationConfig *v1alpha1.TopicAutoCreationConfig
60-
SchemaCompatibilityStrategy *utils2.SchemaCompatibilityStrategy
59+
SchemaCompatibilityStrategy *v1alpha1.SchemaCompatibilityStrategy
6160
SchemaValidationEnforced *bool
6261
DispatchRate *v1alpha1.DispatchRate
6362
SubscriptionDispatchRate *v1alpha1.DispatchRate
@@ -107,9 +106,9 @@ type TopicParams struct {
107106
SubscriptionDispatchRate *v1alpha1.DispatchRate
108107
ReplicatorDispatchRate *v1alpha1.DispatchRate
109108
DeduplicationSnapshotInterval *int32
110-
OffloadPolicies *adminutils.OffloadPolicies
111-
AutoSubscriptionCreation *adminutils.AutoSubscriptionCreationOverride
112-
SchemaCompatibilityStrategy *adminutils.SchemaCompatibilityStrategy
109+
OffloadPolicies *v1alpha1.OffloadPolicies
110+
AutoSubscriptionCreation *v1alpha1.AutoSubscriptionCreationOverride
111+
SchemaCompatibilityStrategy *v1alpha1.SchemaCompatibilityStrategy
113112
Properties map[string]string
114113
}
115114

0 commit comments

Comments
 (0)