Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
Expand Down Expand Up @@ -269,11 +266,6 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}

if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}

if len(partitions) == 1 && options.EnableZeroQueueConsumer {
return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
}
Expand Down
16 changes: 10 additions & 6 deletions pulsar/consumer_zero_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ func newZeroConsumer(client *client, options ConsumerOptions, topic string,
consumerName: options.Name,
metrics: client.metrics.GetLeveledMetrics(topic),
}
opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 0, zc.options)
tn, err := internal.ParseTopicName(topic)
if err != nil {
return nil, err
}
opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options)
conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics)
if err != nil {
return nil, err
Expand Down Expand Up @@ -142,11 +146,11 @@ func (z *zeroQueueConsumer) Ack(m Message) error {

func (z *zeroQueueConsumer) checkMsgIDPartition(msgID MessageID) error {
partition := msgID.PartitionIdx()
if partition != 0 {
z.log.Errorf("invalid partition index %d expected a partition equal to 0",
partition)
return fmt.Errorf("invalid partition index %d expected a partition equal to 0",
partition)
if partition != z.pc.partitionIdx {
z.log.Errorf("invalid partition index %d expected a partition equal to %d",
partition, z.pc.partitionIdx)
return fmt.Errorf("invalid partition index %d expected a partition equal to %d",
partition, z.pc.partitionIdx)
}
return nil
}
Expand Down
79 changes: 67 additions & 12 deletions pulsar/consumer_zero_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
log.Printf("receive message: %s", msg.ID().String())
}
err = consumer.Unsubscribe()
Expand Down Expand Up @@ -228,7 +229,8 @@ func TestReconnectConsumer(t *testing.T) {
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
log.Printf("receive message: %s", msg.ID().String())
}
err = consumer.Unsubscribe()
Expand Down Expand Up @@ -341,7 +343,7 @@ func TestPartitionZeroQueueConsumer(t *testing.T) {
assert.Nil(t, consumer)
assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
}
func TestOnePartitionZeroQueueConsumer(t *testing.T) {
func TestSpecifiedPartitionZeroQueueConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand All @@ -350,17 +352,65 @@ func TestOnePartitionZeroQueueConsumer(t *testing.T) {
defer client.Close()

topic := newTopicName()
err = createPartitionedTopic(topic, 1)
ctx := context.Background()
err = createPartitionedTopic(topic, 2)
assert.Nil(t, err)
topics, err := client.TopicPartitions(topic)
assert.Nil(t, err)

// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Topic: topics[1],
SubscriptionName: "my-sub",
EnableZeroQueueConsumer: true,
})
assert.Nil(t, consumer)
assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
assert.Nil(t, err)
_, ok := consumer.(*zeroQueueConsumer)
assert.True(t, ok)
defer consumer.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topics[1],
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
msg, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
})
assert.Nil(t, err)
log.Printf("send message: %s", msg.String())
}

// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

expectMsg := fmt.Sprintf("hello-%d", i)
expectProperties := map[string]string{
"key-1": "pulsar-1",
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
err = consumer.Ack(msg)
assert.Nil(t, err)
log.Printf("receive message: %s", msg.ID().String())
}
err = consumer.Unsubscribe()
assert.Nil(t, err)
}

func TestZeroQueueConsumerGetLastMessageIDs(t *testing.T) {
Expand Down Expand Up @@ -576,7 +626,8 @@ func TestZeroQueueConsumer_Nack(t *testing.T) {

if i%2 == 0 {
// Only acks even messages
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
} else {
// Fails to process odd messages
consumer.Nack(msg)
Expand All @@ -591,7 +642,8 @@ func TestZeroQueueConsumer_Nack(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))

consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
}
}

Expand Down Expand Up @@ -641,7 +693,8 @@ func TestZeroQueueConsumer_Seek(t *testing.T) {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
}

err = consumer.Seek(seekID)
Expand Down Expand Up @@ -698,7 +751,8 @@ func TestZeroQueueConsumer_SeekByTime(t *testing.T) {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
}

currentTimestamp := time.Now()
Expand All @@ -711,6 +765,7 @@ func TestZeroQueueConsumer_SeekByTime(t *testing.T) {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
consumer.Ack(msg)
err = consumer.Ack(msg)
assert.Nil(t, err)
}
}
7 changes: 6 additions & 1 deletion pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,12 @@ func (p *producer) internalCreatePartitionsProducers() error {
partition := partitions[partitionIdx]

go func(partitionIdx int, partition string) {
prod, e := newPartitionProducer(p.client, partition, p.options, partitionIdx, p.metrics)
idx := partitionIdx
if newNumPartitions == 1 {
// for single partition topic, we need to use -1 as partitionIdx, keep the same with Java client.
idx = -1
}
prod, e := newPartitionProducer(p.client, partition, p.options, idx, p.metrics)
c <- ProducerError{
partition: partitionIdx,
prod: prod,
Expand Down
1 change: 1 addition & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func TestSimpleProducer(t *testing.T) {

assert.NoError(t, err)
assert.NotNil(t, ID)
assert.Equal(t, int32(-1), ID.PartitionIdx())
}

_, err = producer.Send(context.Background(), nil)
Expand Down
Loading