From 47039cc181ae91ad881e9c4c57d04a0cd2bd66da Mon Sep 17 00:00:00 2001 From: snbot Date: Mon, 1 Sep 2025 02:10:46 +0000 Subject: [PATCH 1/4] fix: Use pointer types for boolean fields in PulsarSink to prevent omitempty issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When boolean fields have omitempty tag and are set to false, they get omitted from JSON serialization causing the fields to disappear. This creates sync loops with GitOps tools like ArgoCD. Changed CleanupSubscription, RetainOrdering, RetainKeyOrdering, and AutoAck fields to use *bool instead of bool to properly handle false values. Fixes #3133 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- api/v1alpha1/pulsarsink_types.go | 8 ++++---- api/v1alpha1/zz_generated.deepcopy.go | 20 +++++++++++++++++++ .../resource.streamnative.io_pulsarsinks.yaml | 14 ------------- .../resource.streamnative.io_pulsarsinks.yaml | 14 ------------- .../samples/resource_v1alpha1_pulsarsink.yaml | 4 +++- docs/pulsar_sink.md | 2 +- pkg/admin/impl.go | 16 +++++++++++---- 7 files changed, 40 insertions(+), 38 deletions(-) diff --git a/api/v1alpha1/pulsarsink_types.go b/api/v1alpha1/pulsarsink_types.go index 17db5fc4..47e48afe 100644 --- a/api/v1alpha1/pulsarsink_types.go +++ b/api/v1alpha1/pulsarsink_types.go @@ -42,19 +42,19 @@ type PulsarSinkSpec struct { // CleanupSubscription is the flag to enable or disable the cleanup of subscription // +optional - CleanupSubscription bool `json:"cleanupSubscription,omitempty"` + CleanupSubscription *bool `json:"cleanupSubscription,omitempty"` // RetainOrdering is the flag to enable or disable the retain ordering // +optional - RetainOrdering bool `json:"retainOrdering,omitempty"` + RetainOrdering *bool `json:"retainOrdering,omitempty"` // RetainKeyOrdering is the flag to enable or disable the retain key ordering // +optional - RetainKeyOrdering bool `json:"retainKeyOrdering,omitempty"` + RetainKeyOrdering *bool `json:"retainKeyOrdering,omitempty"` // AutoAck is the flag to enable or disable the auto ack // +optional - AutoAck bool `json:"autoAck,omitempty"` + AutoAck *bool `json:"autoAck,omitempty"` // Parallelism is the parallelism of the PulsarSink // +optional diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index dc5d7b2f..1dda32cc 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -2488,6 +2488,26 @@ func (in *PulsarSinkSpec) DeepCopyInto(out *PulsarSinkSpec) { *out = new(int64) **out = **in } + if in.CleanupSubscription != nil { + in, out := &in.CleanupSubscription, &out.CleanupSubscription + *out = new(bool) + **out = **in + } + if in.RetainOrdering != nil { + in, out := &in.RetainOrdering, &out.RetainOrdering + *out = new(bool) + **out = **in + } + if in.RetainKeyOrdering != nil { + in, out := &in.RetainKeyOrdering, &out.RetainKeyOrdering + *out = new(bool) + **out = **in + } + if in.AutoAck != nil { + in, out := &in.AutoAck, &out.AutoAck + *out = new(bool) + **out = **in + } if in.Archive != nil { in, out := &in.Archive, &out.Archive *out = new(PackageContentRef) diff --git a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsinks.yaml b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsinks.yaml index 1aff58a4..0be4caec 100644 --- a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsinks.yaml +++ b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsinks.yaml @@ -1,17 +1,3 @@ -# Copyright 2025 StreamNative -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition diff --git a/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml b/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml index 1aff58a4..0be4caec 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml @@ -1,17 +1,3 @@ -# Copyright 2025 StreamNative -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition diff --git a/config/samples/resource_v1alpha1_pulsarsink.yaml b/config/samples/resource_v1alpha1_pulsarsink.yaml index 88b0d4f3..950e94ee 100644 --- a/config/samples/resource_v1alpha1_pulsarsink.yaml +++ b/config/samples/resource_v1alpha1_pulsarsink.yaml @@ -20,7 +20,9 @@ metadata: spec: autoAck: true className: org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink - cleanupSubscription: false + # Note: cleanupSubscription defaults to false when omitted + # Only set explicitly when you need a non-default value + # cleanupSubscription: false connectionRef: name: "test-pulsar-connection" customRuntimeOptions: {} diff --git a/docs/pulsar_sink.md b/docs/pulsar_sink.md index 8a2bda2b..a5d816aa 100644 --- a/docs/pulsar_sink.md +++ b/docs/pulsar_sink.md @@ -41,7 +41,7 @@ This table lists specifications available for the `PulsarSink` resource. | `topicsPattern` | The topic pattern. | Optional | | `resources` | The resources of the sink. | Optional | | `timeoutMs` | The timeout in milliseconds. | Optional | -| `cleanupSubscription` | The cleanup subscription. | Optional | +| `cleanupSubscription` | The cleanup subscription (defaults to false when omitted). | Optional | | `retainOrdering` | The retain ordering. | Optional | | `retainKeyOrdering` | The retain key ordering. | Optional | | `autoAck` | The auto ack. | Optional | diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 700805c6..331efdc5 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -46,6 +46,14 @@ const ( TopicDomainNonPersistent = "non-persistent" ) +// ptrBoolToBool converts a pointer to bool to bool with default value false +func ptrBoolToBool(b *bool) bool { + if b == nil { + return false + } + return *b +} + // Type conversion functions for external library types // convertOffloadPolicies converts our local OffloadPolicies to the external library type @@ -1452,10 +1460,10 @@ func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL TopicsPattern: param.TopicsPattern, TimeoutMs: param.TimeoutMs, - CleanupSubscription: param.CleanupSubscription, - RetainOrdering: param.RetainOrdering, - RetainKeyOrdering: param.RetainKeyOrdering, - AutoAck: param.AutoAck, + CleanupSubscription: ptrBoolToBool(param.CleanupSubscription), + RetainOrdering: ptrBoolToBool(param.RetainOrdering), + RetainKeyOrdering: ptrBoolToBool(param.RetainKeyOrdering), + AutoAck: ptrBoolToBool(param.AutoAck), Parallelism: param.Parallelism, SinkType: param.SinkType, From 2d5278e6e7436e633d5298ac800574f57e6f5f2c Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 4 Sep 2025 11:52:05 +0800 Subject: [PATCH 2/4] Update charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsinks.yaml Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- .../crds/resource.streamnative.io_pulsarsinks.yaml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsinks.yaml b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsinks.yaml index 0be4caec..1aff58a4 100644 --- a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsinks.yaml +++ b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsinks.yaml @@ -1,3 +1,17 @@ +# Copyright 2025 StreamNative +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition From ce032b560ece49daf6a18317f56cce43f452db42 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 4 Sep 2025 11:52:11 +0800 Subject: [PATCH 3/4] Update config/crd/bases/resource.streamnative.io_pulsarsinks.yaml Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- .../resource.streamnative.io_pulsarsinks.yaml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml b/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml index 0be4caec..1aff58a4 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml @@ -1,3 +1,17 @@ +# Copyright 2025 StreamNative +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition From be63509052b4f949a488fe9b6617acbd7be915ac Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 4 Sep 2025 12:49:07 +0800 Subject: [PATCH 4/4] fix ci --- tests/utils/spec.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/utils/spec.go b/tests/utils/spec.go index 9396b620..ac6a3b84 100644 --- a/tests/utils/spec.go +++ b/tests/utils/spec.go @@ -555,9 +555,9 @@ func MakePulsarSink(namespace, name, sinkPackageUrl, connectionName string, poli Inputs: []string{"sink-input"}, Parallelism: 1, ProcessingGuarantees: "EFFECTIVELY_ONCE", - CleanupSubscription: false, + CleanupSubscription: ptr.To(false), SourceSubscriptionPosition: "Latest", - AutoAck: true, + AutoAck: ptr.To(true), ClassName: "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink", Resources: &v1alpha1.Resources{ CPU: "1",