Skip to content

Commit 72aed95

Browse files
[fix] fix channel deadlock in regexp consumer (#1141)
### Motivation When using a regexp consumer, there's a race condition between producing and consuming new discovered topics. If the discover topic takes too long or the auto discovery period is too short, then multiple ticker.C messages may be processed in a row which will block on the subcsribe/unsubscribe channels as they only have a buffer size of 1. This will block new topics from being discovered forever. ### Modifications Moved the consumers into their own goroutines and use an unbuffered channel. There's multiple ways to go about it but it's good practice to keep consumers and producers separate. Consumers are run until the channels they are consumed from are closed, which happens when the producer (monitor) returns.
1 parent 067dca1 commit 72aed95

File tree

2 files changed

+74
-20
lines changed

2 files changed

+74
-20
lines changed

pulsar/consumer_regex.go

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ type regexConsumer struct {
5050

5151
consumersLock sync.Mutex
5252
consumers map[string]Consumer
53-
subscribeCh chan []string
54-
unsubscribeCh chan []string
5553

5654
closeOnce sync.Once
5755
closeCh chan struct{}
@@ -75,9 +73,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p
7573
namespace: tn.Namespace,
7674
pattern: pattern,
7775

78-
consumers: make(map[string]Consumer),
79-
subscribeCh: make(chan []string, 1),
80-
unsubscribeCh: make(chan []string, 1),
76+
consumers: make(map[string]Consumer),
8177

8278
closeCh: make(chan struct{}),
8379

@@ -163,12 +159,11 @@ func (c *regexConsumer) Ack(msg Message) error {
163159
return c.AckID(msg.ID())
164160
}
165161

166-
func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
162+
func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
167163
c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
168164
}
169165

170-
func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
171-
delay time.Duration) {
166+
func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ map[string]string, _ time.Duration) {
172167
c.log.Warnf("regexp consumer not support ReconsumeLaterWithCustomProperties yet.")
173168
}
174169

@@ -297,11 +292,11 @@ func (c *regexConsumer) Close() {
297292
})
298293
}
299294

300-
func (c *regexConsumer) Seek(msgID MessageID) error {
295+
func (c *regexConsumer) Seek(_ MessageID) error {
301296
return newError(SeekFailed, "seek command not allowed for regex consumer")
302297
}
303298

304-
func (c *regexConsumer) SeekByTime(time time.Time) error {
299+
func (c *regexConsumer) SeekByTime(_ time.Time) error {
305300
return newError(SeekFailed, "seek command not allowed for regex consumer")
306301
}
307302

@@ -329,14 +324,6 @@ func (c *regexConsumer) monitor() {
329324
if !c.closed() {
330325
c.discover()
331326
}
332-
case topics := <-c.subscribeCh:
333-
if len(topics) > 0 && !c.closed() {
334-
c.subscribe(topics, c.dlq, c.rlq)
335-
}
336-
case topics := <-c.unsubscribeCh:
337-
if len(topics) > 0 && !c.closed() {
338-
c.unsubscribe(topics)
339-
}
340327
}
341328
}
342329
}
@@ -358,8 +345,12 @@ func (c *regexConsumer) discover() {
358345
}).
359346
Debug("discover topics")
360347

361-
c.unsubscribeCh <- staleTopics
362-
c.subscribeCh <- newTopics
348+
if len(staleTopics) > 0 {
349+
c.unsubscribe(staleTopics)
350+
}
351+
if len(newTopics) > 0 {
352+
c.subscribe(newTopics, c.dlq, c.rlq)
353+
}
363354
}
364355

365356
func (c *regexConsumer) knownTopics() []string {

pulsar/consumer_regex_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
241241
func TestRegexConsumer(t *testing.T) {
242242
t.Run("MatchOneTopic", runWithClientNamespace(runRegexConsumerMatchOneTopic))
243243
t.Run("AddTopic", runWithClientNamespace(runRegexConsumerAddMatchingTopic))
244+
t.Run("AutoDiscoverTopics", runWithClientNamespace(runRegexConsumerAutoDiscoverTopics))
244245
}
245246

246247
func runRegexConsumerMatchOneTopic(t *testing.T, c Client, namespace string) {
@@ -346,6 +347,68 @@ func runRegexConsumerAddMatchingTopic(t *testing.T, c Client, namespace string)
346347
}
347348
}
348349

350+
func runRegexConsumerAutoDiscoverTopics(t *testing.T, c Client, namespace string) {
351+
topicsPattern := fmt.Sprintf("persistent://%s/foo.*", namespace)
352+
opts := ConsumerOptions{
353+
TopicsPattern: topicsPattern,
354+
SubscriptionName: "regex-sub",
355+
// this is purposefully short to test parallelism between discover and subscribe calls
356+
AutoDiscoveryPeriod: 1 * time.Nanosecond,
357+
}
358+
consumer, err := c.Subscribe(opts)
359+
if err != nil {
360+
t.Fatal(err)
361+
}
362+
defer consumer.Close()
363+
364+
topicInRegex1 := namespace + "/foo-topic-1"
365+
p1, err := c.CreateProducer(ProducerOptions{
366+
Topic: topicInRegex1,
367+
DisableBatching: true,
368+
})
369+
if err != nil {
370+
t.Fatal(err)
371+
}
372+
defer p1.Close()
373+
374+
topicInRegex2 := namespace + "/foo-topic-2"
375+
p2, err := c.CreateProducer(ProducerOptions{
376+
Topic: topicInRegex2,
377+
DisableBatching: true,
378+
})
379+
if err != nil {
380+
t.Fatal(err)
381+
}
382+
defer p2.Close()
383+
384+
time.Sleep(100 * time.Millisecond)
385+
386+
err = genMessages(p1, 5, func(idx int) string {
387+
return fmt.Sprintf("foo-message-%d", idx)
388+
})
389+
if err != nil {
390+
t.Fatal(err)
391+
}
392+
393+
err = genMessages(p2, 5, func(idx int) string {
394+
return fmt.Sprintf("foo-message-%d", idx)
395+
})
396+
if err != nil {
397+
t.Fatal(err)
398+
}
399+
400+
ctx := context.Background()
401+
for i := 0; i < 10; i++ {
402+
m, err := consumer.Receive(ctx)
403+
if err != nil {
404+
t.Errorf("failed to receive message error: %+v", err)
405+
} else {
406+
assert.Truef(t, strings.HasPrefix(string(m.Payload()), "foo-"),
407+
"message does not start with foo: %s", string(m.Payload()))
408+
}
409+
}
410+
}
411+
349412
func genMessages(p Producer, num int, msgFn func(idx int) string) error {
350413
ctx := context.Background()
351414
for i := 0; i < num; i++ {

0 commit comments

Comments
 (0)