Skip to content

Commit 638a2f5

Browse files
committed
Address comments
1 parent 33f7146 commit 638a2f5

File tree

2 files changed

+17
-23
lines changed

2 files changed

+17
-23
lines changed

pulsar/consumer_multitopic.go

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -190,29 +190,23 @@ func ackIDListFromMultiTopics(log log.Logger, msgIDs []MessageID, findConsumer f
190190
}
191191

192192
subErrCh := make(chan error, len(consumerToMsgIDs))
193-
errCh := make(chan error)
194-
go func() {
195-
ackError := AckError{}
196-
for i := 0; i < len(consumerToMsgIDs); i++ {
197-
err := <-subErrCh
198-
if topicAckError, ok := err.(AckError); ok {
199-
for id, err := range topicAckError {
200-
ackError[id] = err
201-
}
202-
}
203-
}
204-
if len(ackError) == 0 {
205-
errCh <- nil
206-
} else {
207-
errCh <- ackError
208-
}
209-
}()
210193
for consumer, ids := range consumerToMsgIDs {
211194
go func() {
212195
subErrCh <- consumer.AckIDList(ids)
213196
}()
214197
}
215-
return <-errCh
198+
ackError := AckError{}
199+
for i := 0; i < len(consumerToMsgIDs); i++ {
200+
if topicAckError, ok := (<-subErrCh).(AckError); ok {
201+
for id, err := range topicAckError {
202+
ackError[id] = err
203+
}
204+
}
205+
}
206+
if len(ackError) == 0 {
207+
return nil
208+
}
209+
return ackError
216210
}
217211

218212
// AckWithTxn the consumption of a single message with a transaction

pulsar/consumer_multitopic_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package pulsar
1919

2020
import (
21+
"errors"
2122
"fmt"
2223
"strings"
2324
"testing"
@@ -367,7 +368,7 @@ func (dummyConnection) IsProxied() bool {
367368

368369
func TestMultiTopicAckIDListTimeout(t *testing.T) {
369370
topic := fmt.Sprintf("multiTopicAckIDListTimeout%v", time.Now().UnixNano())
370-
createPartitionedTopic(topic, 5)
371+
assert.NoError(t, createPartitionedTopic(topic, 5))
371372

372373
cli, err := NewClient(ClientOptions{
373374
URL: "pulsar://localhost:6650",
@@ -402,13 +403,12 @@ func TestMultiTopicAckIDListTimeout(t *testing.T) {
402403
err = consumer.AckIDList(msgIDs)
403404
elapsed := time.Since(start)
404405
t.Logf("AckIDList takes %v ms", elapsed)
405-
assert.True(t, elapsed < 4*time.Second && elapsed >= 3*time.Second)
406-
if ackError, ok := err.(AckError); ok {
406+
assert.True(t, elapsed < 5*time.Second && elapsed >= 3*time.Second)
407+
var ackError AckError
408+
if errors.As(err, &ackError) {
407409
for _, err := range ackError {
408410
assert.Equal(t, "request timed out", err.Error())
409411
}
410-
} else {
411-
assert.Fail(t, "AckIDList should return AckError")
412412
}
413413

414414
for i := 0; i < len(msgs); i++ {

0 commit comments

Comments
 (0)