Skip to content

Commit 36a055e

Browse files
bruthItalyPaleAle
andauthored
[pubsub/jetstream] Add missing concurrencyMode support for queue-based handler (dapr#3232)
Signed-off-by: Byron Ruth <[email protected]> Co-authored-by: Alessandro (Ale) Segala <[email protected]>
1 parent 15fa6d1 commit 36a055e

File tree

1 file changed

+20
-21
lines changed

1 file changed

+20
-21
lines changed

pubsub/jetstream/jetstream.go

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,21 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
237237
}
238238
}
239239

240+
// Choose the correct handler based on the concurrency model.
241+
var concHandler nats.MsgHandler
242+
switch js.meta.Concurrency {
243+
case pubsub.Single:
244+
concHandler = natsHandler
245+
case pubsub.Parallel:
246+
concHandler = func(msg *nats.Msg) {
247+
js.wg.Add(1)
248+
go func() {
249+
natsHandler(msg)
250+
js.wg.Done()
251+
}()
252+
}
253+
}
254+
240255
var err error
241256
streamName := js.meta.StreamName
242257
if streamName == "" {
@@ -245,35 +260,19 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
245260
return err
246261
}
247262
}
248-
var subscription *nats.Subscription
263+
var sub *nats.Subscription
249264

250265
consumerInfo, err := js.jsc.AddConsumer(streamName, &consumerConfig)
251266
if err != nil {
252267
return err
253268
}
254269

255270
if queue := js.meta.QueueGroupName; queue != "" {
256-
js.l.Debugf("nats: subscribed to subject %s with queue group %s",
257-
req.Topic, js.meta.QueueGroupName)
258-
subscription, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, nats.Bind(streamName, consumerInfo.Name))
271+
js.l.Debugf("nats: subscribed to subject %s with queue group %s", req.Topic, js.meta.QueueGroupName)
272+
sub, err = js.jsc.QueueSubscribe(req.Topic, queue, concHandler, nats.Bind(streamName, consumerInfo.Name))
259273
} else {
260274
js.l.Debugf("nats: subscribed to subject %s", req.Topic)
261-
subscription, err = js.jsc.Subscribe(
262-
req.Topic,
263-
func(msg *nats.Msg) {
264-
switch js.meta.Concurrency {
265-
case pubsub.Single:
266-
natsHandler(msg)
267-
case pubsub.Parallel:
268-
js.wg.Add(1)
269-
go func() {
270-
natsHandler(msg)
271-
js.wg.Done()
272-
}()
273-
}
274-
},
275-
nats.Bind(streamName, consumerInfo.Name),
276-
)
275+
sub, err = js.jsc.Subscribe(req.Topic, concHandler, nats.Bind(streamName, consumerInfo.Name))
277276
}
278277
if err != nil {
279278
return err
@@ -286,7 +285,7 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
286285
case <-ctx.Done():
287286
case <-js.closeCh:
288287
}
289-
err := subscription.Unsubscribe()
288+
err := sub.Unsubscribe()
290289
if err != nil {
291290
js.l.Warnf("nats: error while unsubscribing from topic %s: %v", req.Topic, err)
292291
}

0 commit comments

Comments
 (0)