Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions api/v1alpha1/pulsarsink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,19 @@ type PulsarSinkSpec struct {

// CleanupSubscription is the flag to enable or disable the cleanup of subscription
// +optional
CleanupSubscription bool `json:"cleanupSubscription,omitempty"`
CleanupSubscription *bool `json:"cleanupSubscription,omitempty"`

// RetainOrdering is the flag to enable or disable the retain ordering
// +optional
RetainOrdering bool `json:"retainOrdering,omitempty"`
RetainOrdering *bool `json:"retainOrdering,omitempty"`

// RetainKeyOrdering is the flag to enable or disable the retain key ordering
// +optional
RetainKeyOrdering bool `json:"retainKeyOrdering,omitempty"`
RetainKeyOrdering *bool `json:"retainKeyOrdering,omitempty"`

// AutoAck is the flag to enable or disable the auto ack
// +optional
AutoAck bool `json:"autoAck,omitempty"`
AutoAck *bool `json:"autoAck,omitempty"`

// Parallelism is the parallelism of the PulsarSink
// +optional
Expand Down
20 changes: 20 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion config/samples/resource_v1alpha1_pulsarsink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ metadata:
spec:
autoAck: true
className: org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink
cleanupSubscription: false
# Note: cleanupSubscription defaults to false when omitted
# Only set explicitly when you need a non-default value
# cleanupSubscription: false
connectionRef:
name: "test-pulsar-connection"
customRuntimeOptions: {}
Expand Down
2 changes: 1 addition & 1 deletion docs/pulsar_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ This table lists specifications available for the `PulsarSink` resource.
| `topicsPattern` | The topic pattern. | Optional |
| `resources` | The resources of the sink. | Optional |
| `timeoutMs` | The timeout in milliseconds. | Optional |
| `cleanupSubscription` | The cleanup subscription. | Optional |
| `cleanupSubscription` | The cleanup subscription (defaults to false when omitted). | Optional |
| `retainOrdering` | The retain ordering. | Optional |
| `retainKeyOrdering` | The retain key ordering. | Optional |
| `autoAck` | The auto ack. | Optional |
Expand Down
16 changes: 12 additions & 4 deletions pkg/admin/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ const (
TopicDomainNonPersistent = "non-persistent"
)

// ptrBoolToBool converts a pointer to bool to bool with default value false
func ptrBoolToBool(b *bool) bool {
if b == nil {
return false
}
return *b
}

// Type conversion functions for external library types

// convertOffloadPolicies converts our local OffloadPolicies to the external library type
Expand Down Expand Up @@ -1452,10 +1460,10 @@ func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL
TopicsPattern: param.TopicsPattern,
TimeoutMs: param.TimeoutMs,

CleanupSubscription: param.CleanupSubscription,
RetainOrdering: param.RetainOrdering,
RetainKeyOrdering: param.RetainKeyOrdering,
AutoAck: param.AutoAck,
CleanupSubscription: ptrBoolToBool(param.CleanupSubscription),
RetainOrdering: ptrBoolToBool(param.RetainOrdering),
RetainKeyOrdering: ptrBoolToBool(param.RetainKeyOrdering),
AutoAck: ptrBoolToBool(param.AutoAck),
Parallelism: param.Parallelism,

SinkType: param.SinkType,
Expand Down
4 changes: 2 additions & 2 deletions tests/utils/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,9 @@ func MakePulsarSink(namespace, name, sinkPackageUrl, connectionName string, poli
Inputs: []string{"sink-input"},
Parallelism: 1,
ProcessingGuarantees: "EFFECTIVELY_ONCE",
CleanupSubscription: false,
CleanupSubscription: ptr.To(false),
SourceSubscriptionPosition: "Latest",
AutoAck: true,
AutoAck: ptr.To(true),
ClassName: "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink",
Resources: &v1alpha1.Resources{
CPU: "1",
Expand Down