Skip to content

Commit dcd2b4a

Browse files
committed
Do some refactoring
1 parent 5f0e32d commit dcd2b4a

File tree

5 files changed

+72
-155
lines changed

5 files changed

+72
-155
lines changed

pulsar/consumer_partition.go

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,17 @@ type partitionConsumer struct {
155155
availablePermits *availablePermits
156156

157157
// the size of the queue channel for buffering messages
158-
maxQueueSize int32
159-
queueCh *unboundedChannel[*message]
158+
maxQueueSize int32
159+
160+
// pendingMessages queues all messages received from the broker but not delivered to the user via Chan() or
161+
// Receive() methods.
162+
// There is a background goroutine that sends messages from the connection to `pendingMessages` via `queueInCh` and
163+
// reads messages from `pendingMessages` via `queueOutCh` so that the `dispatcher` goroutine can read messages from
164+
// the `queueOutCh`.
165+
pendingMessages *list.List
166+
queueInCh chan *message
167+
queueOutCh chan *message
168+
160169
startMessageID atomicMessageID
161170
lastDequeuedMsg *trackingMessageID
162171

@@ -354,7 +363,6 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
354363
partitionIdx: int32(options.partitionIdx),
355364
eventsCh: make(chan interface{}, 10),
356365
maxQueueSize: int32(options.receiverQueueSize),
357-
queueCh: newUnboundedChannel[*message](),
358366
startMessageID: atomicMessageID{msgID: options.startMessageID},
359367
connectedCh: make(chan struct{}),
360368
messageCh: messageCh,
@@ -419,6 +427,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
419427
}
420428
pc.log.Info("Created consumer")
421429
pc.setConsumerState(consumerReady)
430+
pc.startQueueMessagesFromBroker()
422431

423432
startingMessageID := pc.startMessageID.get()
424433
if pc.options.startMessageIDInclusive && startingMessageID != nil && startingMessageID.equal(latestMessageID) {
@@ -949,11 +958,6 @@ func (pc *partitionConsumer) Close() {
949958

950959
// wait for request to finish
951960
<-req.doneCh
952-
953-
// It will close `queueCh.in`. If `MessageReceived` was called after that, it will panic because new messages
954-
// will be sent to a closed channel. However, generally it's impossible because the broker will not be able to
955-
// dispatch messages to this consumer after receiving the close request.
956-
pc.queueCh.stop()
957961
}
958962

959963
func (pc *partitionConsumer) Seek(msgID MessageID) error {
@@ -1176,7 +1180,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
11761180
pc.markScaleIfNeed()
11771181
}
11781182

1179-
pc.queueCh.inCh <- &message{
1183+
pc.queueInCh <- &message{
11801184
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
11811185
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
11821186
key: msgMeta.GetPartitionKey(),
@@ -1378,7 +1382,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
13781382
pc.markScaleIfNeed()
13791383
}
13801384

1381-
pc.queueCh.inCh <- msg
1385+
pc.queueInCh <- msg
13821386
}
13831387

13841388
if skippedMessages > 0 {
@@ -1542,12 +1546,14 @@ func (pc *partitionConsumer) dispatcher() {
15421546
}()
15431547
var queueMsg *message
15441548
for {
1545-
var queueCh <-chan *message
1549+
queueMsgCh := pc.queueOutCh
15461550
var messageCh chan ConsumerMessage
15471551
var nextMessage ConsumerMessage
15481552
var nextMessageSize int
15491553

15501554
if queueMsg != nil {
1555+
// Do not read from the queued message channel since there is already a message polled in the last loop
1556+
queueMsgCh = nil
15511557
nextMessage = ConsumerMessage{
15521558
Consumer: pc.parentConsumer,
15531559
Message: queueMsg,
@@ -1568,8 +1574,6 @@ func (pc *partitionConsumer) dispatcher() {
15681574
} else {
15691575
pc.log.Debug("skip dispatching messages when seeking")
15701576
}
1571-
} else {
1572-
queueCh = pc.queueCh.outCh
15731577
}
15741578

15751579
select {
@@ -1607,7 +1611,7 @@ func (pc *partitionConsumer) dispatcher() {
16071611
pc.log.Debug("received dispatcherSeekingControlCh, set isSeek to true")
16081612
pc.isSeeking.Store(true)
16091613

1610-
case msg, ok := <-queueCh:
1614+
case msg, ok := <-queueMsgCh:
16111615
if !ok {
16121616
return
16131617
}
@@ -1630,9 +1634,9 @@ func (pc *partitionConsumer) dispatcher() {
16301634
// drain the message queue on any new connection by sending a
16311635
// special nil message to the channel so we know when to stop dropping messages
16321636
var nextMessageInQueue *trackingMessageID
1633-
pc.queueCh.inCh <- nil
1637+
pc.queueInCh <- nil
16341638

1635-
for m := range pc.queueCh.outCh {
1639+
for m := range pc.queueOutCh {
16361640
// the queue has been drained
16371641
if m == nil {
16381642
break
@@ -2080,7 +2084,7 @@ func (pc *partitionConsumer) expectMoreIncomingMessages() {
20802084
}
20812085

20822086
func (pc *partitionConsumer) markScaleIfNeed() {
2083-
// availablePermits + incomingMessages (messages in queueCh) is the number of prefetched messages
2087+
// availablePermits + incomingMessages (messages in pendingMessages) is the number of prefetched messages
20842088
// The result of auto-scale we expected is currentQueueSize is slightly bigger than prefetched messages
20852089
prev := pc.scaleReceiverQueueHint.Swap(pc.availablePermits.get()+pc.incomingMessages.Load() >=
20862090
pc.currentQueueSize.Load())
@@ -2220,6 +2224,42 @@ func (pc *partitionConsumer) _getConn() internal.Connection {
22202224
return *pc.conn.Load()
22212225
}
22222226

2227+
func (pc *partitionConsumer) startQueueMessagesFromBroker() {
2228+
pc.queueInCh = make(chan *message)
2229+
pc.queueOutCh = make(chan *message)
2230+
pc.pendingMessages = list.New()
2231+
2232+
go func() {
2233+
defer func() {
2234+
close(pc.queueInCh)
2235+
close(pc.queueOutCh)
2236+
pc.log.Debug("exiting queueMessagesFromBroker")
2237+
}()
2238+
2239+
for {
2240+
front := pc.pendingMessages.Front()
2241+
if front == nil {
2242+
select {
2243+
case msg := <-pc.queueInCh:
2244+
pc.pendingMessages.PushBack(msg)
2245+
case <-pc.closeCh:
2246+
return
2247+
}
2248+
} else {
2249+
msg := front.Value.(*message)
2250+
select {
2251+
case pc.queueOutCh <- msg:
2252+
pc.pendingMessages.Remove(front)
2253+
case msg := <-pc.queueInCh:
2254+
pc.pendingMessages.PushBack(msg)
2255+
case <-pc.closeCh:
2256+
return
2257+
}
2258+
}
2259+
}
2260+
}()
2261+
}
2262+
22232263
func convertToMessageIDData(msgID *trackingMessageID) *pb.MessageIdData {
22242264
if msgID == nil {
22252265
return nil

pulsar/consumer_partition_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
func TestSingleMessageIDNoAckTracker(t *testing.T) {
3232
eventsCh := make(chan interface{}, 1)
3333
pc := partitionConsumer{
34-
queueCh: newUnboundedChannel[*message](),
34+
closeCh: make(chan struct{}),
3535
eventsCh: eventsCh,
3636
compressionProviders: sync.Map{},
3737
options: &partitionConsumerOpts{},
@@ -41,14 +41,15 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
4141
pc.availablePermits = &availablePermits{pc: &pc}
4242
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
4343
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
44+
pc.startQueueMessagesFromBroker()
4445

4546
headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
4647
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
4748
t.Fatal(err)
4849
}
4950

5051
// ensure the tracker was set on the message id
51-
message := <-pc.queueCh.outCh
52+
message := <-pc.queueOutCh
5253
id := message.ID().(*trackingMessageID)
5354
assert.Nil(t, id.tracker)
5455

@@ -69,7 +70,7 @@ func newTestMetrics() *internal.LeveledMetrics {
6970
func TestBatchMessageIDNoAckTracker(t *testing.T) {
7071
eventsCh := make(chan interface{}, 1)
7172
pc := partitionConsumer{
72-
queueCh: newUnboundedChannel[*message](),
73+
closeCh: make(chan struct{}),
7374
eventsCh: eventsCh,
7475
compressionProviders: sync.Map{},
7576
options: &partitionConsumerOpts{},
@@ -79,14 +80,15 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
7980
pc.availablePermits = &availablePermits{pc: &pc}
8081
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
8182
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
83+
pc.startQueueMessagesFromBroker()
8284

8385
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
8486
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
8587
t.Fatal(err)
8688
}
8789

8890
// ensure the tracker was set on the message id
89-
message := <-pc.queueCh.outCh
91+
message := <-pc.queueOutCh
9092
id := message.ID().(*trackingMessageID)
9193
assert.Nil(t, id.tracker)
9294

@@ -104,7 +106,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
104106
func TestBatchMessageIDWithAckTracker(t *testing.T) {
105107
eventsCh := make(chan interface{}, 1)
106108
pc := partitionConsumer{
107-
queueCh: newUnboundedChannel[*message](),
109+
closeCh: make(chan struct{}),
108110
eventsCh: eventsCh,
109111
compressionProviders: sync.Map{},
110112
options: &partitionConsumerOpts{},
@@ -114,6 +116,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
114116
pc.availablePermits = &availablePermits{pc: &pc}
115117
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
116118
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
119+
pc.startQueueMessagesFromBroker()
117120

118121
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
119122
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
@@ -125,7 +128,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
125128
running := true
126129
for running {
127130
select {
128-
case m := <-pc.queueCh.outCh:
131+
case m := <-pc.queueOutCh:
129132
id := m.ID().(*trackingMessageID)
130133
assert.NotNil(t, id.tracker)
131134
messageIDs = append(messageIDs, id)

pulsar/consumer_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4887,14 +4887,15 @@ func TestAckResponseNotBlocked(t *testing.T) {
48874887
defer client.Close()
48884888

48894889
topic := fmt.Sprintf("test-ack-response-not-blocked-%v", time.Now().Nanosecond())
4890+
assert.Nil(t, createPartitionedTopic(topic, 10))
48904891

48914892
producer, err := client.CreateProducer(ProducerOptions{
48924893
Topic: topic,
48934894
})
48944895
assert.Nil(t, err)
48954896

48964897
ctx := context.Background()
4897-
numMessages := 100
4898+
numMessages := 1000
48984899
for i := 0; i < numMessages; i++ {
48994900
producer.SendAsync(ctx, &ProducerMessage{
49004901
Payload: []byte(fmt.Sprintf("value-%d", i)),
@@ -4903,7 +4904,9 @@ func TestAckResponseNotBlocked(t *testing.T) {
49034904
t.Fatal(err)
49044905
}
49054906
})
4906-
time.Sleep(1 * time.Millisecond)
4907+
if i%100 == 99 {
4908+
assert.Nil(t, producer.Flush())
4909+
}
49074910
}
49084911
producer.Flush()
49094912
producer.Close()
@@ -4917,15 +4920,14 @@ func TestAckResponseNotBlocked(t *testing.T) {
49174920
Type: KeyShared,
49184921
EnableBatchIndexAcknowledgment: true,
49194922
AckWithResponse: true,
4920-
ReceiverQueueSize: 10,
4923+
ReceiverQueueSize: 5,
49214924
})
49224925
assert.Nil(t, err)
49234926
msgIDs := make([]MessageID, 0)
49244927
for i := 0; i < numMessages; i++ {
49254928
if msg, err := consumer.Receive(context.Background()); err != nil {
49264929
t.Fatal(err)
49274930
} else {
4928-
t.Log("Received message: ", msg.ID())
49294931
msgIDs = append(msgIDs, msg.ID())
49304932
if len(msgIDs) >= 10 {
49314933
if err := consumer.AckIDList(msgIDs); err != nil {

pulsar/unbounded_channel.go

Lines changed: 0 additions & 68 deletions
This file was deleted.

0 commit comments

Comments
 (0)