Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,26 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
}

func (c *client) CreateReader(options ReaderOptions) (Reader, error) {
reader, err := newReader(c, options)
topics, err := c.TopicPartitions(options.Topic)
if err != nil {
return nil, err
}
c.handlers.Add(reader)
return reader, nil
if len(topics) <= 1 {
reader, err := newReader(c, options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the different between a reader and a multi reader? It seems like they should be the same but just consume a different number of partitions?

if err != nil {
return nil, err
}
c.handlers.Add(reader)
return reader, nil
}

// create multi topic reader
multiTopicReader, err := newMultiTopicReader(c, options)
if err != nil {
return nil, err
}
c.handlers.Add(multiTopicReader)
return multiTopicReader, nil
}

func (c *client) TopicPartitions(topic string) ([]string, error) {
Expand Down
18 changes: 18 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ type ConsumerOptions struct {
// > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)`
// > because we are not able to get the redeliveryCount from the message ID.
NackBackoffPolicy NackBackoffPolicy

// startMessageID internally used by multitopic-reader
startMessageID MessageID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ConsumerOptions is a public interface so I don't think we should add package private fields here. Let's remove these.


// startMessageIDInclusive internally used by multitopic-reader
startMessageIDInclusive bool

// subscriptionMode internally used by multitopic-reader
subscriptionMode subscriptionMode
}

// Consumer is an interface that abstracts behavior of Pulsar's consumer
Expand Down Expand Up @@ -248,4 +257,13 @@ type Consumer interface {

// Name returns the name of consumer.
Name() string

// lastDequeuedMsg used for setting last dequeued msg id by internal partition consumers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be adding private methods to the public interface. I think these should be removed. Also, it seems these are only relevant to the reader implementation so I think these helper functions should live within the reader files.

lastDequeuedMsg(MessageID) error

// hasMessages get if messages are available
hasMessages() (bool, error)

// messagesInQueue get the number of messages available in queue
messagesInQueue() int
}
79 changes: 76 additions & 3 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,22 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
ch := make(chan ConsumerError, partitionsToAdd)
wg.Add(partitionsToAdd)

// default start messageId
startMessageID := trackingMessageID{}

// if start message id is provided
if _, ok := c.options.startMessageID.(trackingMessageID); ok {
startMessageID = c.options.startMessageID.(trackingMessageID)
}

// default subscription mode is durable
subscriptionMode := durable

// for reader subscription mode is nonDurabale
if c.options.subscriptionMode == nonDurable {
subscriptionMode = nonDurable
}

for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partitionTopic := partitions[partitionIdx]

Expand All @@ -352,14 +368,15 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
metadata: metadata,
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: trackingMessageID{},
subscriptionMode: durable,
startMessageID: startMessageID,
subscriptionMode: subscriptionMode,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
maxReconnectToBroker: c.options.MaxReconnectToBroker,
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
startMessageIDInclusive: c.options.startMessageIDInclusive,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
Expand Down Expand Up @@ -670,7 +687,6 @@ func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) {

return mid, true
}

func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics interface{}) error {
// decryption is enabled, use default messagecrypto if not provided
if options.Decryption != nil && options.Decryption.MessageCrypto == nil {
Expand All @@ -684,3 +700,60 @@ func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics
}
return nil
}

func (c *consumer) lastDequeuedMsg(msgID MessageID) error {
mid, ok := toTrackingMessageID(msgID)
if !ok {
return newError(InvalidMessage, fmt.Sprintf("invalid message id type %T", msgID))
}

if msgID.PartitionIdx() >= 0 {
c.consumers[mid.PartitionIdx()].lastDequeuedMsg = mid
return nil
}

return newError(InvalidMessage, fmt.Sprintf("invalid message id type %T", msgID))
}

func (c *consumer) hasMessages() (bool, error) {
for _, pc := range c.consumers {
if ok, err := c.hashMoreMessages(pc); err != nil {
return false, err // error in reading last message id from broker
} else if ok { // return true only if messages are available
return ok, nil
}
}
return false, nil // reach here only if done checking for all partition consumers
}

func (c *consumer) hashMoreMessages(pc *partitionConsumer) (bool, error) {
lastMsgID, err := pc.getLastMessageID()
if err != nil {
return false, err
}

// same logic as in reader_impl
if !pc.lastDequeuedMsg.Undefined() {
return lastMsgID.isEntryIDValid() && lastMsgID.greater(pc.lastDequeuedMsg.messageID), nil
}

if pc.options.startMessageIDInclusive {
return lastMsgID.isEntryIDValid() && lastMsgID.greaterEqual(pc.startMessageID.messageID), nil
}

// Non-inclusive
return lastMsgID.isEntryIDValid() && lastMsgID.greater(pc.startMessageID.messageID), nil
}

func (c *consumer) messagesInQueue() int {
// messages in partition consumer queue
msgCount := 0
for _, pc := range c.consumers {
msgCount += pc.messagesInQueue()
}

// messages in consumer msg channel
msgCount += len(c.messageCh)

return msgCount
}
13 changes: 13 additions & 0 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,16 @@ func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
func (c *multiTopicConsumer) Name() string {
return c.consumerName
}

func (c *multiTopicConsumer) lastDequeuedMsg(msgID MessageID) error {
return newError(OperationNotSupported, "lastDequeuedMsg is not supported for multi Topic consumer")
}

func (c *multiTopicConsumer) hasMessages() (bool, error) {
return false, newError(OperationNotSupported, "hasMessages is not supported for multi Topic consumer")
}

func (c *multiTopicConsumer) messagesInQueue() int {
c.log.Warn("messagesInQueue is not supported for multi topic consumer")
return 0
}
4 changes: 4 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,10 @@ func (pc *partitionConsumer) _getConn() internal.Connection {
return pc.conn.Load().(internal.Connection)
}

func (pc *partitionConsumer) messagesInQueue() int {
return len(pc.queueCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multi go routines consuming and producing to this channel this value might not be useful after being read.

}

func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData {
if msgID.Undefined() {
return nil
Expand Down
13 changes: 13 additions & 0 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,19 @@ func (c *regexConsumer) topics() ([]string, error) {
return filtered, nil
}

func (c *regexConsumer) lastDequeuedMsg(msgID MessageID) error {
return newError(OperationNotSupported, "setting lastDequeuedMsg is not supported for Regex Topic consumer")
}

func (c *regexConsumer) hasMessages() (bool, error) {
return false, newError(OperationNotSupported, "hasMessages is not supported for Regex Topic consumer")
}

func (c *regexConsumer) messagesInQueue() int {
c.log.Warn("messagesInQueue is not supported for Regex topic consumer")
return 0
}

type consumerError struct {
err error
topic string
Expand Down
47 changes: 1 addition & 46 deletions pulsar/internal/pulsartracing/consumer_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package pulsartracing

import (
"context"
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/opentracing/opentracing-go"
Expand All @@ -34,7 +32,7 @@ func TestConsumerBuildAndInjectChildSpan(t *testing.T) {
opentracing.SetGlobalTracer(tracer)

message := pulsar.ConsumerMessage{
Consumer: &mockConsumer{},
Consumer: pulsar.NewMockConsumer(),
Message: &mockConsumerMessage{
properties: map[string]string{},
},
Expand All @@ -44,46 +42,3 @@ func TestConsumerBuildAndInjectChildSpan(t *testing.T) {
assert.NotNil(t, span)
assert.True(t, len(message.Properties()) > 0)
}

type mockConsumer struct {
}

func (c *mockConsumer) Subscription() string {
return ""
}

func (c *mockConsumer) Unsubscribe() error {
return nil
}

func (c *mockConsumer) Receive(ctx context.Context) (message pulsar.Message, err error) {
return nil, nil
}

func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage {
return nil
}

func (c *mockConsumer) Ack(msg pulsar.Message) {}

func (c *mockConsumer) AckID(msgID pulsar.MessageID) {}

func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) {}

func (c *mockConsumer) Nack(msg pulsar.Message) {}

func (c *mockConsumer) NackID(msgID pulsar.MessageID) {}

func (c *mockConsumer) Close() {}

func (c *mockConsumer) Seek(msgID pulsar.MessageID) error {
return nil
}

func (c *mockConsumer) SeekByTime(time time.Time) error {
return nil
}

func (c *mockConsumer) Name() string {
return ""
}
82 changes: 82 additions & 0 deletions pulsar/mock_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"context"
"time"
)

func NewMockConsumer() Consumer {
return &mockConsumer{}
}

type mockConsumer struct {
}

func (c *mockConsumer) Subscription() string {
return ""
}

func (c *mockConsumer) Unsubscribe() error {
return nil
}

func (c *mockConsumer) Receive(ctx context.Context) (message Message, err error) {
return nil, nil
}

func (c *mockConsumer) Chan() <-chan ConsumerMessage {
return nil
}

func (c *mockConsumer) Ack(msg Message) {}

func (c *mockConsumer) AckID(msgID MessageID) {}

func (c *mockConsumer) ReconsumeLater(msg Message, delay time.Duration) {}

func (c *mockConsumer) Nack(msg Message) {}

func (c *mockConsumer) NackID(msgID MessageID) {}

func (c *mockConsumer) Close() {}

func (c *mockConsumer) Seek(msgID MessageID) error {
return nil
}

func (c *mockConsumer) SeekByTime(time time.Time) error {
return nil
}

func (c *mockConsumer) Name() string {
return ""
}

func (c *mockConsumer) lastDequeuedMsg(msgID MessageID) error {
return nil
}

func (c *mockConsumer) messagesInQueue() int {
return 0
}

func (c *mockConsumer) hasMessages() (bool, error) {
return false, nil
}
Loading