diff --git a/api/v1alpha1/pulsarsink_types.go b/api/v1alpha1/pulsarsink_types.go index 17db5fc4..47e48afe 100644 --- a/api/v1alpha1/pulsarsink_types.go +++ b/api/v1alpha1/pulsarsink_types.go @@ -42,19 +42,19 @@ type PulsarSinkSpec struct { // CleanupSubscription is the flag to enable or disable the cleanup of subscription // +optional - CleanupSubscription bool `json:"cleanupSubscription,omitempty"` + CleanupSubscription *bool `json:"cleanupSubscription,omitempty"` // RetainOrdering is the flag to enable or disable the retain ordering // +optional - RetainOrdering bool `json:"retainOrdering,omitempty"` + RetainOrdering *bool `json:"retainOrdering,omitempty"` // RetainKeyOrdering is the flag to enable or disable the retain key ordering // +optional - RetainKeyOrdering bool `json:"retainKeyOrdering,omitempty"` + RetainKeyOrdering *bool `json:"retainKeyOrdering,omitempty"` // AutoAck is the flag to enable or disable the auto ack // +optional - AutoAck bool `json:"autoAck,omitempty"` + AutoAck *bool `json:"autoAck,omitempty"` // Parallelism is the parallelism of the PulsarSink // +optional diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index dc5d7b2f..1dda32cc 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -2488,6 +2488,26 @@ func (in *PulsarSinkSpec) DeepCopyInto(out *PulsarSinkSpec) { *out = new(int64) **out = **in } + if in.CleanupSubscription != nil { + in, out := &in.CleanupSubscription, &out.CleanupSubscription + *out = new(bool) + **out = **in + } + if in.RetainOrdering != nil { + in, out := &in.RetainOrdering, &out.RetainOrdering + *out = new(bool) + **out = **in + } + if in.RetainKeyOrdering != nil { + in, out := &in.RetainKeyOrdering, &out.RetainKeyOrdering + *out = new(bool) + **out = **in + } + if in.AutoAck != nil { + in, out := &in.AutoAck, &out.AutoAck + *out = new(bool) + **out = **in + } if in.Archive != nil { in, out := &in.Archive, &out.Archive *out = new(PackageContentRef) diff --git a/config/samples/resource_v1alpha1_pulsarsink.yaml b/config/samples/resource_v1alpha1_pulsarsink.yaml index 88b0d4f3..950e94ee 100644 --- a/config/samples/resource_v1alpha1_pulsarsink.yaml +++ b/config/samples/resource_v1alpha1_pulsarsink.yaml @@ -20,7 +20,9 @@ metadata: spec: autoAck: true className: org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink - cleanupSubscription: false + # Note: cleanupSubscription defaults to false when omitted + # Only set explicitly when you need a non-default value + # cleanupSubscription: false connectionRef: name: "test-pulsar-connection" customRuntimeOptions: {} diff --git a/docs/pulsar_sink.md b/docs/pulsar_sink.md index 8a2bda2b..a5d816aa 100644 --- a/docs/pulsar_sink.md +++ b/docs/pulsar_sink.md @@ -41,7 +41,7 @@ This table lists specifications available for the `PulsarSink` resource. | `topicsPattern` | The topic pattern. | Optional | | `resources` | The resources of the sink. | Optional | | `timeoutMs` | The timeout in milliseconds. | Optional | -| `cleanupSubscription` | The cleanup subscription. | Optional | +| `cleanupSubscription` | The cleanup subscription (defaults to false when omitted). | Optional | | `retainOrdering` | The retain ordering. | Optional | | `retainKeyOrdering` | The retain key ordering. | Optional | | `autoAck` | The auto ack. | Optional | diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 700805c6..331efdc5 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -46,6 +46,14 @@ const ( TopicDomainNonPersistent = "non-persistent" ) +// ptrBoolToBool converts a pointer to bool to bool with default value false +func ptrBoolToBool(b *bool) bool { + if b == nil { + return false + } + return *b +} + // Type conversion functions for external library types // convertOffloadPolicies converts our local OffloadPolicies to the external library type @@ -1452,10 +1460,10 @@ func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL TopicsPattern: param.TopicsPattern, TimeoutMs: param.TimeoutMs, - CleanupSubscription: param.CleanupSubscription, - RetainOrdering: param.RetainOrdering, - RetainKeyOrdering: param.RetainKeyOrdering, - AutoAck: param.AutoAck, + CleanupSubscription: ptrBoolToBool(param.CleanupSubscription), + RetainOrdering: ptrBoolToBool(param.RetainOrdering), + RetainKeyOrdering: ptrBoolToBool(param.RetainKeyOrdering), + AutoAck: ptrBoolToBool(param.AutoAck), Parallelism: param.Parallelism, SinkType: param.SinkType, diff --git a/tests/utils/spec.go b/tests/utils/spec.go index 9396b620..ac6a3b84 100644 --- a/tests/utils/spec.go +++ b/tests/utils/spec.go @@ -555,9 +555,9 @@ func MakePulsarSink(namespace, name, sinkPackageUrl, connectionName string, poli Inputs: []string{"sink-input"}, Parallelism: 1, ProcessingGuarantees: "EFFECTIVELY_ONCE", - CleanupSubscription: false, + CleanupSubscription: ptr.To(false), SourceSubscriptionPosition: "Latest", - AutoAck: true, + AutoAck: ptr.To(true), ClassName: "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink", Resources: &v1alpha1.Resources{ CPU: "1",