Skip to content

Commit d240996

Browse files
authored
fix: Use pointer types for boolean fields in PulsarFunction to prevent omitempty issues (#343)
* fix: Use pointer types for boolean fields in PulsarFunction to prevent omitempty issues * fix ci
1 parent 311a223 commit d240996

File tree

5 files changed

+119
-60
lines changed

5 files changed

+119
-60
lines changed

api/v1alpha1/pulsarfunction_types.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,27 +38,27 @@ type PulsarFunctionSpec struct {
3838

3939
// CleanupSubscription is the flag to indicate whether the subscription should be cleaned up when the function is deleted
4040
// +optional
41-
CleanupSubscription bool `json:"cleanupSubscription"`
41+
CleanupSubscription *bool `json:"cleanupSubscription,omitempty"`
4242

4343
// RetainOrdering is the flag to indicate whether the function should retain ordering
4444
// +optional
45-
RetainOrdering bool `json:"retainOrdering"`
45+
RetainOrdering *bool `json:"retainOrdering,omitempty"`
4646

4747
// RetainKeyOrdering is the flag to indicate whether the function should retain key ordering
4848
// +optional
49-
RetainKeyOrdering bool `json:"retainKeyOrdering"`
49+
RetainKeyOrdering *bool `json:"retainKeyOrdering,omitempty"`
5050

5151
// BatchBuilder is the batch builder that the function uses
5252
// +optional
5353
BatchBuilder *string `json:"batchBuilder,omitempty"`
5454

5555
// ForwardSourceMessageProperty is the flag to indicate whether the function should forward source message properties
5656
// +optional
57-
ForwardSourceMessageProperty bool `json:"forwardSourceMessageProperty"`
57+
ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"`
5858

5959
// AutoAck is the flag to indicate whether the function should auto ack
6060
// +optional
61-
AutoAck bool `json:"autoAck"`
61+
AutoAck *bool `json:"autoAck,omitempty"`
6262

6363
// Parallelism is the parallelism of the function
6464
// +optional
@@ -186,11 +186,11 @@ type PulsarFunctionSpec struct {
186186

187187
// ExposePulsarAdminClientEnabled is the flag to indicate whether the function should expose pulsar admin client
188188
// +optional
189-
ExposePulsarAdminClientEnabled bool `json:"exposePulsarAdminClientEnabled"`
189+
ExposePulsarAdminClientEnabled *bool `json:"exposePulsarAdminClientEnabled,omitempty"`
190190

191191
// SkipToLatest is the flag to indicate whether the function should skip to latest
192192
// +optional
193-
SkipToLatest bool `json:"skipToLatest"`
193+
SkipToLatest *bool `json:"skipToLatest,omitempty"`
194194

195195
// SubscriptionPosition is the subscription position of the function
196196
// +optional

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 35 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/pulsar_sink.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ This table lists specifications available for the `PulsarSink` resource.
4141
| `topicsPattern` | The topic pattern. | Optional |
4242
| `resources` | The resources of the sink. | Optional |
4343
| `timeoutMs` | The timeout in milliseconds. | Optional |
44-
| `cleanupSubscription` | The cleanup subscription (defaults to false when omitted). | Optional |
44+
| `cleanupSubscription` | The cleanup subscription. | Optional |
4545
| `retainOrdering` | The retain ordering. | Optional |
4646
| `retainKeyOrdering` | The retain key ordering. | Optional |
4747
| `autoAck` | The auto ack. | Optional |

pkg/admin/impl.go

Lines changed: 69 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,6 @@ const (
4646
TopicDomainNonPersistent = "non-persistent"
4747
)
4848

49-
// ptrBoolToBool converts a pointer to bool to bool with default value false
50-
func ptrBoolToBool(b *bool) bool {
51-
if b == nil {
52-
return false
53-
}
54-
return *b
55-
}
56-
5749
// Type conversion functions for external library types
5850

5951
// convertOffloadPolicies converts our local OffloadPolicies to the external library type
@@ -1281,37 +1273,58 @@ func (p *PulsarAdminClient) DeletePulsarFunction(tenant, namespace, name string)
12811273
// ApplyPulsarFunction creates or updates a pulsar function
12821274
func (p *PulsarAdminClient) ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec, changed bool) error {
12831275
functionConfig := utils.FunctionConfig{
1284-
Tenant: tenant,
1285-
Namespace: namespace,
1286-
Name: name,
1287-
ClassName: param.ClassName,
1288-
Inputs: param.Inputs,
1289-
Parallelism: param.Parallelism,
1290-
TimeoutMs: param.TimeoutMs,
1291-
TopicsPattern: param.TopicsPattern,
1292-
CleanupSubscription: param.CleanupSubscription,
1293-
RetainOrdering: param.RetainOrdering,
1294-
RetainKeyOrdering: param.RetainKeyOrdering,
1295-
ForwardSourceMessageProperty: param.ForwardSourceMessageProperty,
1296-
AutoAck: param.AutoAck,
1297-
MaxMessageRetries: param.MaxMessageRetries,
1298-
CustomSerdeInputs: param.CustomSerdeInputs,
1299-
CustomSchemaInputs: param.CustomSchemaInputs,
1300-
InputTypeClassName: param.InputTypeClassName,
1301-
Output: param.Output,
1302-
OutputSerdeClassName: param.OutputSerdeClassName,
1303-
OutputSchemaType: param.OutputSchemaType,
1304-
OutputTypeClassName: param.OutputTypeClassName,
1305-
CustomSchemaOutputs: param.CustomSchemaOutputs,
1306-
LogTopic: param.LogTopic,
1307-
ProcessingGuarantees: param.ProcessingGuarantees,
1308-
DeadLetterTopic: param.DeadLetterTopic,
1309-
SubName: param.SubName,
1310-
RuntimeFlags: param.RuntimeFlags,
1311-
MaxPendingAsyncRequests: param.MaxPendingAsyncRequests,
1312-
ExposePulsarAdminClientEnabled: param.ExposePulsarAdminClientEnabled,
1313-
SkipToLatest: param.SkipToLatest,
1314-
SubscriptionPosition: param.SubscriptionPosition,
1276+
Tenant: tenant,
1277+
Namespace: namespace,
1278+
Name: name,
1279+
ClassName: param.ClassName,
1280+
Inputs: param.Inputs,
1281+
Parallelism: param.Parallelism,
1282+
TimeoutMs: param.TimeoutMs,
1283+
TopicsPattern: param.TopicsPattern,
1284+
MaxMessageRetries: param.MaxMessageRetries,
1285+
CustomSerdeInputs: param.CustomSerdeInputs,
1286+
CustomSchemaInputs: param.CustomSchemaInputs,
1287+
InputTypeClassName: param.InputTypeClassName,
1288+
Output: param.Output,
1289+
OutputSerdeClassName: param.OutputSerdeClassName,
1290+
OutputSchemaType: param.OutputSchemaType,
1291+
OutputTypeClassName: param.OutputTypeClassName,
1292+
CustomSchemaOutputs: param.CustomSchemaOutputs,
1293+
LogTopic: param.LogTopic,
1294+
ProcessingGuarantees: param.ProcessingGuarantees,
1295+
DeadLetterTopic: param.DeadLetterTopic,
1296+
SubName: param.SubName,
1297+
RuntimeFlags: param.RuntimeFlags,
1298+
MaxPendingAsyncRequests: param.MaxPendingAsyncRequests,
1299+
SubscriptionPosition: param.SubscriptionPosition,
1300+
}
1301+
1302+
if param.CleanupSubscription != nil {
1303+
functionConfig.CleanupSubscription = *param.CleanupSubscription
1304+
}
1305+
1306+
if param.RetainOrdering != nil {
1307+
functionConfig.RetainOrdering = *param.RetainOrdering
1308+
}
1309+
1310+
if param.RetainKeyOrdering != nil {
1311+
functionConfig.RetainKeyOrdering = *param.RetainKeyOrdering
1312+
}
1313+
1314+
if param.ForwardSourceMessageProperty != nil {
1315+
functionConfig.ForwardSourceMessageProperty = *param.ForwardSourceMessageProperty
1316+
}
1317+
1318+
if param.AutoAck != nil {
1319+
functionConfig.AutoAck = *param.AutoAck
1320+
}
1321+
1322+
if param.ExposePulsarAdminClientEnabled != nil {
1323+
functionConfig.ExposePulsarAdminClientEnabled = *param.ExposePulsarAdminClientEnabled
1324+
}
1325+
1326+
if param.SkipToLatest != nil {
1327+
functionConfig.SkipToLatest = *param.SkipToLatest
13151328
}
13161329

13171330
if param.BatchBuilder != nil {
@@ -1460,11 +1473,7 @@ func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL
14601473
TopicsPattern: param.TopicsPattern,
14611474
TimeoutMs: param.TimeoutMs,
14621475

1463-
CleanupSubscription: ptrBoolToBool(param.CleanupSubscription),
1464-
RetainOrdering: ptrBoolToBool(param.RetainOrdering),
1465-
RetainKeyOrdering: ptrBoolToBool(param.RetainKeyOrdering),
1466-
AutoAck: ptrBoolToBool(param.AutoAck),
1467-
Parallelism: param.Parallelism,
1476+
Parallelism: param.Parallelism,
14681477

14691478
SinkType: param.SinkType,
14701479
Archive: packageURL,
@@ -1487,6 +1496,22 @@ func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL
14871496
TransformFunctionConfig: param.TransformFunctionConfig,
14881497
}
14891498

1499+
if param.CleanupSubscription != nil {
1500+
sinkConfig.CleanupSubscription = *param.CleanupSubscription
1501+
}
1502+
1503+
if param.RetainOrdering != nil {
1504+
sinkConfig.RetainOrdering = *param.RetainOrdering
1505+
}
1506+
1507+
if param.RetainKeyOrdering != nil {
1508+
sinkConfig.RetainKeyOrdering = *param.RetainKeyOrdering
1509+
}
1510+
1511+
if param.AutoAck != nil {
1512+
sinkConfig.AutoAck = *param.AutoAck
1513+
}
1514+
14901515
if param.Resources != nil {
14911516
s, err := strconv.ParseFloat(param.Resources.CPU, 64)
14921517
if err != nil {

tests/utils/spec.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2323
"k8s.io/apimachinery/pkg/api/resource"
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25-
"k8s.io/utils/pointer"
2625
"k8s.io/utils/ptr"
2726

2827
"github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
@@ -513,15 +512,15 @@ func MakePulsarFunction(namespace, name, functionPackageUrl, connectionName stri
513512
ClassName: "org.apache.pulsar.functions.api.examples.ExclamationFunction",
514513
SubName: "test-sub",
515514
SubscriptionPosition: "Latest",
516-
CleanupSubscription: true,
517-
SkipToLatest: true,
518-
ForwardSourceMessageProperty: true,
519-
RetainKeyOrdering: true,
520-
AutoAck: true,
521-
MaxMessageRetries: pointer.Int(101),
515+
CleanupSubscription: ptr.To(true),
516+
SkipToLatest: ptr.To(true),
517+
ForwardSourceMessageProperty: ptr.To(true),
518+
RetainKeyOrdering: ptr.To(true),
519+
AutoAck: ptr.To(true),
520+
MaxMessageRetries: ptr.To(101),
522521
DeadLetterTopic: "dl-topic",
523522
LogTopic: "func-log",
524-
TimeoutMs: pointer.Int64(6666),
523+
TimeoutMs: ptr.To[int64](6666),
525524
Secrets: map[string]v1alpha1.FunctionSecretKeyRef{
526525
"SECRET1": {
527526
"sectest", "hello",

0 commit comments

Comments
 (0)