Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
Expand Down Expand Up @@ -269,11 +266,6 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}

if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}

if len(partitions) == 1 && options.EnableZeroQueueConsumer {
return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
}
Expand Down
19 changes: 13 additions & 6 deletions pulsar/consumer_zero_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ func newZeroConsumer(client *client, options ConsumerOptions, topic string,
consumerName: options.Name,
metrics: client.metrics.GetLeveledMetrics(topic),
}
opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 0, zc.options)
tn, err := internal.ParseTopicName(topic)
if err != nil {
return nil, err
}
opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options)
conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics)
if err != nil {
return nil, err
Expand Down Expand Up @@ -142,11 +146,14 @@ func (z *zeroQueueConsumer) Ack(m Message) error {

func (z *zeroQueueConsumer) checkMsgIDPartition(msgID MessageID) error {
partition := msgID.PartitionIdx()
if partition != 0 {
z.log.Errorf("invalid partition index %d expected a partition equal to 0",
partition)
return fmt.Errorf("invalid partition index %d expected a partition equal to 0",
partition)
if partition == 0 || partition == -1 {
return nil
}
if partition != z.pc.partitionIdx {
z.log.Errorf("invalid partition index %d expected a partition equal to %d",
partition, z.pc.partitionIdx)
return fmt.Errorf("invalid partition index %d expected a partition equal to %d",
partition, z.pc.partitionIdx)
}
return nil
}
Expand Down
79 changes: 67 additions & 12 deletions pulsar/consumer_zero_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
log.Printf("receive message: %s", msg.ID().String())
}
err = consumer.Unsubscribe()
Expand Down Expand Up @@ -228,7 +229,8 @@ func TestReconnectConsumer(t *testing.T) {
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
log.Printf("receive message: %s", msg.ID().String())
}
err = consumer.Unsubscribe()
Expand Down Expand Up @@ -341,7 +343,7 @@ func TestPartitionZeroQueueConsumer(t *testing.T) {
assert.Nil(t, consumer)
assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
}
func TestOnePartitionZeroQueueConsumer(t *testing.T) {
func TestSpecifiedPartitionZeroQueueConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand All @@ -350,17 +352,65 @@ func TestOnePartitionZeroQueueConsumer(t *testing.T) {
defer client.Close()

topic := newTopicName()
err = createPartitionedTopic(topic, 1)
ctx := context.Background()
err = createPartitionedTopic(topic, 2)
assert.Nil(t, err)
topics, err := client.TopicPartitions(topic)
assert.Nil(t, err)

// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Topic: topics[1],
SubscriptionName: "my-sub",
EnableZeroQueueConsumer: true,
})
assert.Nil(t, consumer)
assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
assert.Nil(t, err)
_, ok := consumer.(*zeroQueueConsumer)
assert.True(t, ok)
defer consumer.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topics[1],
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
msg, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
})
assert.Nil(t, err)
log.Printf("send message: %s", msg.String())
}

// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

expectMsg := fmt.Sprintf("hello-%d", i)
expectProperties := map[string]string{
"key-1": "pulsar-1",
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
err = consumer.Ack(msg)
assert.Nil(t, err)
log.Printf("receive message: %s", msg.ID().String())
}
err = consumer.Unsubscribe()
assert.Nil(t, err)
}

func TestZeroQueueConsumerGetLastMessageIDs(t *testing.T) {
Expand Down Expand Up @@ -576,7 +626,8 @@ func TestZeroQueueConsumer_Nack(t *testing.T) {

if i%2 == 0 {
// Only acks even messages
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
} else {
// Fails to process odd messages
consumer.Nack(msg)
Expand All @@ -591,7 +642,8 @@ func TestZeroQueueConsumer_Nack(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))

consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
}
}

Expand Down Expand Up @@ -641,7 +693,8 @@ func TestZeroQueueConsumer_Seek(t *testing.T) {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
}

err = consumer.Seek(seekID)
Expand Down Expand Up @@ -698,7 +751,8 @@ func TestZeroQueueConsumer_SeekByTime(t *testing.T) {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
}

currentTimestamp := time.Now()
Expand All @@ -711,6 +765,7 @@ func TestZeroQueueConsumer_SeekByTime(t *testing.T) {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
}
}