Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cd183c2
feat: add internalSingleSend to send single message
Gleiphir2769 Jul 3, 2022
342e8e5
feat: support chunking for producer
Gleiphir2769 Jul 10, 2022
ff60f94
Merge branch 'master' into chunking_issue_456
Gleiphir2769 Jul 10, 2022
3a90ce5
fix: fix the bug of Send() block when SingleSend() failed; fix the bu…
Gleiphir2769 Jul 11, 2022
ed7915d
feat: Implemented chunking on the consumer side
Gleiphir2769 Jul 28, 2022
0456630
test: add message_chunking_test.go for chunk consume/produce test
Gleiphir2769 Jul 31, 2022
a4769e8
refector: do some clean, removed unAckedChunkedMessageIdSequenceMap b…
Gleiphir2769 Aug 6, 2022
64b9a20
fix: fix the bug of uuid generation
Gleiphir2769 Aug 10, 2022
c8e083a
fix: fix the Seek() for chunk message id
Gleiphir2769 Aug 11, 2022
7cc168f
test: move chunk tests to message_chunking_test
Gleiphir2769 Aug 11, 2022
db7537f
refactor: add licence header
Gleiphir2769 Aug 15, 2022
2537cf4
fix: Revert the change about trackingMessageID. Fix the conflict of b…
Gleiphir2769 Aug 15, 2022
4dc9bb7
fix: fix the license header of timewheel.go.
Gleiphir2769 Aug 16, 2022
0db443e
fix: fix the bug of data race for availablePermits
Gleiphir2769 Aug 16, 2022
92fbff2
fix: 1. fix the comments of EnableChunking and rename the MaxChunkSiz…
Gleiphir2769 Sep 6, 2022
d23d1cd
fix: 1. fix the bug of seq ID. 2. make code pass golangci
Gleiphir2769 Sep 7, 2022
5e098d2
fix: fix the test function TestChunkSize()
Gleiphir2769 Sep 8, 2022
203a1fc
fix: revert negativeAcksTracker.Add()
Gleiphir2769 Sep 9, 2022
2a1342d
fix: modify the comment of ExpireTimeOfIncompleteChunk
Gleiphir2769 Sep 9, 2022
fda6997
fix: fix the ack/nack for chunking
Gleiphir2769 Sep 13, 2022
7fd7ffa
fix: fix the chunking message ack/nack for multiTopicConsumer and Reg…
Gleiphir2769 Sep 13, 2022
8904e3c
refactor: remove the timewheel and do some refactor.
Gleiphir2769 Sep 14, 2022
94a7f6f
fix: fix the bug that may cause index out of range.
Gleiphir2769 Sep 14, 2022
05754c6
fix: fix the lint problem
Gleiphir2769 Sep 14, 2022
8f89741
fix: fix the publishSemaphore leak and modify some comments.
Gleiphir2769 Sep 17, 2022
95ad70b
fix: fix the chunking messages may block when the BlockIfQueueFull en…
Gleiphir2769 Sep 25, 2022
4719d0b
fix: fix bug of totalchunks
Gleiphir2769 Sep 26, 2022
72fe920
fix: fix the bug of single sending stuck by permits acquire
Gleiphir2769 Sep 27, 2022
a962eee
fix: fix the bug of single sending stuck by permits acquire
Gleiphir2769 Sep 27, 2022
09e5bd2
Merge branch 'master' into chunking_issue_456
Gleiphir2769 Sep 27, 2022
fef09d9
refactor: do some refactor
Gleiphir2769 Sep 27, 2022
53a0ebc
Merge branch 'master' into chunking_issue_456 and resolve conflict
Gleiphir2769 Sep 29, 2022
9794560
fix: fix for resolve conflict
Gleiphir2769 Sep 29, 2022
3aeb764
fix: remove the ExpireTimeOfIncompleteChunk limit and do some refactor
Gleiphir2769 Sep 30, 2022
4b53e33
fix: fix the nil pointer error of discardOldestChunkMessage and fix t…
Gleiphir2769 Oct 3, 2022
50f4959
update chunking_issue_456 and resolve conflict
Gleiphir2769 Oct 16, 2022
e80fe90
test: restart workflow
Gleiphir2769 Oct 16, 2022
84c02fe
test: restart workflow
Gleiphir2769 Oct 16, 2022
23570cf
test: restart workflow
Gleiphir2769 Oct 16, 2022
56ef453
test: restart workflow
Gleiphir2769 Oct 16, 2022
a42de10
fix: add logs when chunkMsgCtxMap has closed
Gleiphir2769 Oct 22, 2022
b3b304f
fix: modify by review comment
Gleiphir2769 Oct 24, 2022
6a801cb
fix: inline compare and fix a data race error
Gleiphir2769 Oct 24, 2022
00e9e83
fix: fix nil pointer error
Gleiphir2769 Oct 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,16 @@ type ConsumerOptions struct {
// error information of the Ack method only contains errors that may occur in the Go SDK's own processing.
// Default: false
AckWithResponse bool

// MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100)
MaxPendingChunkedMessage int

// ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message. (default: 60 seconds)
ExpireTimeOfIncompleteChunk time.Duration

// AutoAckIncompleteChunk sets whether consumer auto acknowledges incomplete chunked message when it should
// be removed (e.g.the chunked message pending queue is full). (default: false)
AutoAckIncompleteChunk bool
}

// Consumer is an interface that abstracts behavior of Pulsar's consumer
Expand Down
105 changes: 57 additions & 48 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package pulsar

import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
Expand All @@ -37,9 +36,9 @@ const defaultNackRedeliveryDelay = 1 * time.Minute

type acker interface {
// AckID does not handle errors returned by the Broker side, so no need to wait for doneCh to finish.
AckID(id trackingMessageID) error
AckIDWithResponse(id trackingMessageID) error
NackID(id trackingMessageID)
AckID(id MessageID) error
AckIDWithResponse(id MessageID) error
NackID(id MessageID)
NackMsg(msg Message)
}

Expand Down Expand Up @@ -93,6 +92,14 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}
}

if options.MaxPendingChunkedMessage == 0 {
options.MaxPendingChunkedMessage = 100
}

if options.ExpireTimeOfIncompleteChunk == 0 {
options.ExpireTimeOfIncompleteChunk = time.Minute
}

if options.NackBackoffPolicy == nil && options.EnableDefaultNackBackoffPolicy {
options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
}
Expand Down Expand Up @@ -344,28 +351,31 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
nackRedeliveryDelay = c.options.NackRedeliveryDelay
}
opts := &partitionConsumerOpts{
topic: pt,
consumerName: c.consumerName,
subscription: c.options.SubscriptionName,
subscriptionType: c.options.Type,
subscriptionInitPos: c.options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: c.options.NackBackoffPolicy,
metadata: metadata,
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: trackingMessageID{},
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
maxReconnectToBroker: c.options.MaxReconnectToBroker,
backoffPolicy: c.options.BackoffPolicy,
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
ackWithResponse: c.options.AckWithResponse,
topic: pt,
consumerName: c.consumerName,
subscription: c.options.SubscriptionName,
subscriptionType: c.options.Type,
subscriptionInitPos: c.options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: c.options.NackBackoffPolicy,
metadata: metadata,
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: trackingMessageID{},
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
maxReconnectToBroker: c.options.MaxReconnectToBroker,
backoffPolicy: c.options.BackoffPolicy,
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
ackWithResponse: c.options.AckWithResponse,
maxPendingChunkedMessage: c.options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: c.options.AutoAckIncompleteChunk,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
Expand Down Expand Up @@ -456,20 +466,15 @@ func (c *consumer) Ack(msg Message) error {

// AckID the consumption of a single message, identified by its MessageID
func (c *consumer) AckID(msgID MessageID) error {
mid, ok := c.messageID(msgID)
if !ok {
return errors.New("failed to convert trackingMessageID")
}

if mid.consumer != nil {
return mid.Ack()
if err := c.checkMsgIDPartition(msgID); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed covert the msgID from the MessageID to the trackingMessageID type, I'm not sure if we need this.

Why not use messageID(), what did I miss?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trackingMessageID does not records chunking infomation.

For example, Ack() a big message need to ack all the chunks of it. Using trackingMessageID can not figure out which chunk (messageId) need to be ack.

trackingMessageID is designed to tracking batch messages so it shoud not be the messageId type accepted by the method exposed by partitionConsumer. I think the better way would be to accept MessageID as the messageId type in partitionConsumer methods. However, only the necessary interfaces have been modified (Ack, NAck and Seed) for the least changes

return err
}

if c.options.AckWithResponse {
return c.consumers[mid.partitionIdx].AckIDWithResponse(mid)
return c.consumers[msgID.PartitionIdx()].AckIDWithResponse(msgID)
}

return c.consumers[mid.partitionIdx].AckID(mid)
return c.consumers[msgID.PartitionIdx()].AckID(msgID)
}

// ReconsumeLater mark a message for redelivery after custom delay
Expand Down Expand Up @@ -529,7 +534,7 @@ func (c *consumer) Nack(msg Message) {
}

if mid.consumer != nil {
mid.Nack()
mid.consumer.NackID(msg.ID())
return
}
c.consumers[mid.partitionIdx].NackMsg(msg)
Expand All @@ -540,17 +545,11 @@ func (c *consumer) Nack(msg Message) {
}

func (c *consumer) NackID(msgID MessageID) {
mid, ok := c.messageID(msgID)
if !ok {
return
}

if mid.consumer != nil {
mid.Nack()
if err := c.checkMsgIDPartition(msgID); err != nil {
return
}

c.consumers[mid.partitionIdx].NackID(mid)
c.consumers[msgID.PartitionIdx()].NackID(msgID)
}

func (c *consumer) Close() {
Expand Down Expand Up @@ -586,12 +585,11 @@ func (c *consumer) Seek(msgID MessageID) error {
return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
}

mid, ok := c.messageID(msgID)
if !ok {
return nil
if err := c.checkMsgIDPartition(msgID); err != nil {
return err
}

return c.consumers[mid.partitionIdx].Seek(mid)
return c.consumers[msgID.PartitionIdx()].Seek(msgID)
}

func (c *consumer) SeekByTime(time time.Time) error {
Expand All @@ -608,6 +606,17 @@ func (c *consumer) SeekByTime(time time.Time) error {
return errs
}

func (c *consumer) checkMsgIDPartition(msgID MessageID) error {
partition := msgID.PartitionIdx()
if partition < 0 || int(partition) >= len(c.consumers) {
c.log.Errorf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return fmt.Errorf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
}
return nil
}

var r = &random{
R: rand.New(rand.NewSource(time.Now().UnixNano())),
}
Expand Down
6 changes: 3 additions & 3 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
}

if c.options.AckWithResponse {
return mid.AckWithResponse()
return mid.consumer.AckIDWithResponse(msgID)
}

return mid.Ack()
return mid.consumer.AckID(msgID)
}

func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
Expand Down Expand Up @@ -200,7 +200,7 @@ func (c *multiTopicConsumer) NackID(msgID MessageID) {
return
}

mid.Nack()
mid.consumer.NackID(msgID)
}

func (c *multiTopicConsumer) Close() {
Expand Down
Loading