Skip to content

Commit 7e34cc7

Browse files
pnagaraj80cicoyleyaron2dapr-bot
authored
feat: Add support for replicateSubscriptionState in Pulsar pubsub component (#3853)
Signed-off-by: Prashanth Nagaraj <[email protected]> Co-authored-by: Cassie Coyle <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Co-authored-by: Dapr Bot <[email protected]>
1 parent ff10bea commit 7e34cc7

File tree

4 files changed

+56
-1
lines changed

4 files changed

+56
-1
lines changed

pubsub/pulsar/metadata.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type pulsarMetadata struct {
3939
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`
4040
SubscriptionType string `mapstructure:"subscribeType"`
4141
SubscriptionInitialPosition string `mapstructure:"subscribeInitialPosition"`
42+
ReplicateSubscriptionState bool `mapstructure:"replicateSubscriptionState"`
4243
SubscriptionMode string `mapstructure:"subscribeMode"`
4344
Token string `mapstructure:"token"`
4445
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`

pubsub/pulsar/metadata.yaml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,17 @@ metadata:
202202
url:
203203
title: "Pulsar SubscriptionInitialPosition"
204204
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionInitialPosition"
205+
- name: replicateSubscriptionState
206+
type: bool
207+
description: |
208+
Enable replication of subscription state across geo-replicated Pulsar clusters.
209+
When enabled, subscription state (such as cursor positions and acknowledgments) will be replicated to other clusters in a geo-replicated setup.
210+
This is useful for maintaining subscription consistency during cluster failovers.
211+
default: 'false'
212+
example: '"true", "false"'
213+
url:
214+
title: "Pulsar Geo-Replication"
215+
url: "https://pulsar.apache.org/docs/administration-geo/"
205216
- name: subscribeMode
206217
type: string
207218
description: |
@@ -210,4 +221,4 @@ metadata:
210221
example: '"durable"'
211222
url:
212223
title: "Pulsar SubscriptionMode"
213-
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionMode"
224+
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionMode"

pubsub/pulsar/pulsar.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
509509
MessageChannel: channel,
510510
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
511511
ReceiverQueueSize: p.metadata.ReceiverQueueSize,
512+
ReplicateSubscriptionState: p.metadata.ReplicateSubscriptionState,
512513
}
513514

514515
// Handle KeySharedPolicy for key_shared subscription type

pubsub/pulsar/pulsar_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,48 @@ func TestEncryptionKeys(t *testing.T) {
605605
})
606606
}
607607

608+
func TestParsePulsarMetadataReplicateSubscriptionState(t *testing.T) {
609+
tt := []struct {
610+
name string
611+
replicateSubscriptionState string
612+
expected bool
613+
}{
614+
{
615+
name: "test replicateSubscriptionState true",
616+
replicateSubscriptionState: "true",
617+
expected: true,
618+
},
619+
{
620+
name: "test replicateSubscriptionState false",
621+
replicateSubscriptionState: "false",
622+
expected: false,
623+
},
624+
{
625+
name: "test replicateSubscriptionState empty (defaults to false)",
626+
replicateSubscriptionState: "",
627+
expected: false,
628+
},
629+
}
630+
631+
for _, tc := range tt {
632+
t.Run(tc.name, func(t *testing.T) {
633+
m := pubsub.Metadata{}
634+
m.Properties = map[string]string{
635+
"host": "a",
636+
}
637+
638+
if tc.replicateSubscriptionState != "" {
639+
m.Properties["replicateSubscriptionState"] = tc.replicateSubscriptionState
640+
}
641+
642+
meta, err := parsePulsarMetadata(m)
643+
644+
require.NoError(t, err)
645+
assert.Equal(t, tc.expected, meta.ReplicateSubscriptionState)
646+
})
647+
}
648+
}
649+
608650
func TestSanitiseURL(t *testing.T) {
609651
tests := []struct {
610652
name string

0 commit comments

Comments
 (0)