@@ -138,6 +138,12 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
138138 return nil , errors .New ("pulsar error: missing pulsar host" )
139139 }
140140
141+ var err error
142+ m .SubscriptionType , err = parseSubscriptionType (meta .Properties [subscribeTypeKey ])
143+ if err != nil {
144+ return nil , errors .New ("invalid subscription type. Accepted values are `exclusive`, `shared`, `failover` and `key_shared`" )
145+ }
146+
141147 for k , v := range meta .Properties {
142148 switch {
143149 case strings .HasSuffix (k , topicJSONSchemaIdentifier ):
@@ -170,10 +176,8 @@ func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error {
170176 return err
171177 }
172178 pulsarURL := m .Host
173- if ! strings .HasPrefix (m .Host , "http://" ) &&
174- ! strings .HasPrefix (m .Host , "https://" ) {
175- pulsarURL = fmt .Sprintf ("%s%s" , pulsarPrefix , m .Host )
176- }
179+
180+ pulsarURL = sanitiseURL (pulsarURL )
177181 options := pulsar.ClientOptions {
178182 URL : pulsarURL ,
179183 OperationTimeout : 30 * time .Second ,
@@ -226,6 +230,23 @@ func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error {
226230 return nil
227231}
228232
233+ func sanitiseURL (pulsarURL string ) string {
234+ prefixes := []string {"pulsar+ssl://" , "pulsar://" , "http://" , "https://" }
235+
236+ hasPrefix := false
237+ for _ , prefix := range prefixes {
238+ if strings .HasPrefix (pulsarURL , prefix ) {
239+ hasPrefix = true
240+ break
241+ }
242+ }
243+
244+ if ! hasPrefix {
245+ pulsarURL = fmt .Sprintf ("%s%s" , pulsarPrefix , pulsarURL )
246+ }
247+ return pulsarURL
248+ }
249+
229250func (p * Pulsar ) useProducerEncryption () bool {
230251 return p .metadata .PublicKey != "" && p .metadata .Keys != ""
231252}
@@ -370,11 +391,22 @@ func parsePublishMetadata(req *pubsub.PublishRequest, schema schemaMetadata) (
370391 return msg , nil
371392}
372393
373- // default: shared
374- func getSubscribeType (metadata map [string ]string ) pulsar.SubscriptionType {
394+ func parseSubscriptionType (in string ) (string , error ) {
395+ subsType := strings .ToLower (in )
396+ switch subsType {
397+ case subscribeTypeExclusive , subscribeTypeFailover , subscribeTypeShared , subscribeTypeKeyShared :
398+ return subsType , nil
399+ case "" :
400+ return subscribeTypeShared , nil
401+ default :
402+ return "" , fmt .Errorf ("invalid subscription type: %s" , subsType )
403+ }
404+ }
405+
406+ // getSubscribeType doesn't do extra validations, because they were done in parseSubscriptionType.
407+ func getSubscribeType (subsTypeStr string ) pulsar.SubscriptionType {
375408 var subsType pulsar.SubscriptionType
376409
377- subsTypeStr := strings .ToLower (metadata [subscribeTypeKey ])
378410 switch subsTypeStr {
379411 case subscribeTypeExclusive :
380412 subsType = pulsar .Exclusive
@@ -384,8 +416,6 @@ func getSubscribeType(metadata map[string]string) pulsar.SubscriptionType {
384416 subsType = pulsar .Shared
385417 case subscribeTypeKeyShared :
386418 subsType = pulsar .KeyShared
387- default :
388- subsType = pulsar .Shared
389419 }
390420
391421 return subsType
@@ -400,15 +430,27 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
400430
401431 topic := p .formatTopic (req .Topic )
402432
433+ subscribeType := p .metadata .SubscriptionType
434+ if s , exists := req .Metadata [subscribeTypeKey ]; exists {
435+ subscribeType = s
436+ }
437+
403438 options := pulsar.ConsumerOptions {
404439 Topic : topic ,
405440 SubscriptionName : p .metadata .ConsumerID ,
406- Type : getSubscribeType (req . Metadata ),
441+ Type : getSubscribeType (subscribeType ),
407442 MessageChannel : channel ,
408443 NackRedeliveryDelay : p .metadata .RedeliveryDelay ,
409444 ReceiverQueueSize : p .metadata .ReceiverQueueSize ,
410445 }
411446
447+ // Handle KeySharedPolicy for key_shared subscription type
448+ if options .Type == pulsar .KeyShared {
449+ options .KeySharedPolicy = & pulsar.KeySharedPolicy {
450+ Mode : pulsar .KeySharedPolicyModeAutoSplit ,
451+ }
452+ }
453+
412454 if p .useConsumerEncryption () {
413455 var reader crypto.KeyReader
414456 if isValidPEM (p .metadata .PublicKey ) {
@@ -430,6 +472,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
430472 p .logger .Debugf ("Could not subscribe to %s, full topic name in pulsar is %s" , req .Topic , topic )
431473 return err
432474 }
475+ p .logger .Debugf ("Subscribed to '%s'(%s) with type '%s'" , req .Topic , topic , subscribeType )
433476
434477 p .wg .Add (2 )
435478 listenCtx , cancel := context .WithCancel (ctx )
0 commit comments