Skip to content

Commit fbed4d3

Browse files
committed
Improve the AckIDList performance when there are many topics subscribed
1 parent 92c6e28 commit fbed4d3

File tree

3 files changed

+122
-17
lines changed

3 files changed

+122
-17
lines changed

pulsar/consumer_multitopic.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -189,25 +189,30 @@ func ackIDListFromMultiTopics(log log.Logger, msgIDs []MessageID, findConsumer f
189189
}
190190
}
191191

192-
ackError := AckError{}
193-
for consumer, ids := range consumerToMsgIDs {
194-
if err := consumer.AckIDList(ids); err != nil {
195-
if topicAckError := err.(AckError); topicAckError != nil {
192+
subErrCh := make(chan error, len(consumerToMsgIDs))
193+
errCh := make(chan error)
194+
go func() {
195+
ackError := AckError{}
196+
for i := 0; i < len(consumerToMsgIDs); i++ {
197+
err := <-subErrCh
198+
if topicAckError, ok := err.(AckError); ok {
196199
for id, err := range topicAckError {
197200
ackError[id] = err
198201
}
199-
} else {
200-
// It should not reach here
201-
for _, id := range ids {
202-
ackError[id] = err
203-
}
204202
}
205203
}
204+
if len(ackError) == 0 {
205+
errCh <- nil
206+
} else {
207+
errCh <- ackError
208+
}
209+
}()
210+
for consumer, ids := range consumerToMsgIDs {
211+
go func() {
212+
subErrCh <- consumer.AckIDList(ids)
213+
}()
206214
}
207-
if len(ackError) == 0 {
208-
return nil
209-
}
210-
return ackError
215+
return <-errCh
211216
}
212217

213218
// AckWithTxn the consumption of a single message with a transaction

pulsar/consumer_multitopic_test.go

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ import (
2323
"testing"
2424
"time"
2525

26+
"github.com/apache/pulsar-client-go/pulsar/internal"
27+
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
2628
"github.com/apache/pulsar-client-go/pulsaradmin"
2729
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
2830
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
29-
3031
"github.com/stretchr/testify/assert"
3132
)
3233

@@ -317,3 +318,102 @@ func runMultiTopicAckIDList(t *testing.T, regex bool) {
317318
assert.Fail(t, "AckIDList should return AckError")
318319
}
319320
}
321+
322+
type dummyConnection struct {
323+
}
324+
325+
func (dummyConnection) SendRequest(_ uint64, _ *pb.BaseCommand, _ func(*pb.BaseCommand, error)) {
326+
}
327+
328+
func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
329+
return nil
330+
}
331+
332+
func (dummyConnection) WriteData(_ internal.Buffer) {
333+
}
334+
335+
func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {
336+
return nil
337+
}
338+
339+
func (dummyConnection) UnregisterListener(_ uint64) {
340+
}
341+
342+
func (dummyConnection) AddConsumeHandler(_ uint64, _ internal.ConsumerHandler) error {
343+
return nil
344+
}
345+
346+
func (dummyConnection) DeleteConsumeHandler(_ uint64) {
347+
}
348+
349+
func (dummyConnection) ID() string {
350+
return "cnx"
351+
}
352+
353+
func (dummyConnection) GetMaxMessageSize() int32 {
354+
return 0
355+
}
356+
357+
func (dummyConnection) Close() {
358+
}
359+
360+
func (dummyConnection) WaitForClose() <-chan struct{} {
361+
return nil
362+
}
363+
364+
func (dummyConnection) IsProxied() bool {
365+
return false
366+
}
367+
368+
func TestMultiTopicAckIDListTimeout(t *testing.T) {
369+
topic := fmt.Sprintf("multiTopicAckIDListTimeout%v", time.Now().UnixNano())
370+
createPartitionedTopic(topic, 5)
371+
372+
cli, err := NewClient(ClientOptions{
373+
URL: "pulsar://localhost:6650",
374+
OperationTimeout: 3 * time.Second,
375+
})
376+
assert.Nil(t, err)
377+
defer cli.Close()
378+
379+
consumer, err := cli.Subscribe(ConsumerOptions{
380+
Topic: topic,
381+
SubscriptionName: "sub",
382+
AckWithResponse: true,
383+
})
384+
assert.Nil(t, err)
385+
defer consumer.Close()
386+
387+
const numMessages = 5
388+
sendMessages(t, cli, topic, 0, numMessages, false)
389+
msgs := receiveMessages(t, consumer, numMessages)
390+
msgIDs := make([]MessageID, len(msgs))
391+
392+
var conn internal.Connection
393+
for i := 0; i < len(msgs); i++ {
394+
msgIDs[i] = msgs[i].ID()
395+
pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer)
396+
assert.True(t, ok)
397+
conn = pc._getConn()
398+
pc._setConn(dummyConnection{})
399+
}
400+
401+
start := time.Now()
402+
err = consumer.AckIDList(msgIDs)
403+
elapsed := time.Since(start)
404+
t.Logf("AckIDList takes %v msv", elapsed)
405+
assert.True(t, elapsed < 4*time.Second && elapsed >= 3*time.Second)
406+
if ackError, ok := err.(AckError); ok {
407+
for _, err := range ackError {
408+
assert.Equal(t, "request timed out", err.Error())
409+
}
410+
} else {
411+
assert.Fail(t, "AckIDList should return AckError")
412+
}
413+
414+
for i := 0; i < len(msgs); i++ {
415+
pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer)
416+
assert.True(t, ok)
417+
pc._setConn(conn)
418+
}
419+
}

pulsar/consumer_partition.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ type partitionConsumer struct {
141141
state uAtomic.Int32
142142
options *partitionConsumerOpts
143143

144-
conn uAtomic.Value
144+
conn atomic.Pointer[internal.Connection]
145145

146146
topic string
147147
name string
@@ -2205,7 +2205,7 @@ func (pc *partitionConsumer) hasMoreMessages() bool {
22052205
// _setConn sets the internal connection field of this partition consumer atomically.
22062206
// Note: should only be called by this partition consumer when a new connection is available.
22072207
func (pc *partitionConsumer) _setConn(conn internal.Connection) {
2208-
pc.conn.Store(conn)
2208+
pc.conn.Store(&conn)
22092209
}
22102210

22112211
// _getConn returns internal connection field of this partition consumer atomically.
@@ -2214,7 +2214,7 @@ func (pc *partitionConsumer) _getConn() internal.Connection {
22142214
// Invariant: The conn must be non-nill for the lifetime of the partitionConsumer.
22152215
// For this reason we leave this cast unchecked and panic() if the
22162216
// invariant is broken
2217-
return pc.conn.Load().(internal.Connection)
2217+
return *pc.conn.Load()
22182218
}
22192219

22202220
func convertToMessageIDData(msgID *trackingMessageID) *pb.MessageIdData {

0 commit comments

Comments
 (0)