diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 20227beec7..05db0726ed 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -23,12 +23,9 @@ import ( "fmt" "math/rand" "strconv" - "strings" "sync" "time" - "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" - "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" @@ -269,11 +266,6 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string, return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics") } - if len(partitions) == 1 && options.EnableZeroQueueConsumer && - strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) { - return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics") - } - if len(partitions) == 1 && options.EnableZeroQueueConsumer { return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation) } diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go index 3f2862da2d..5b85df8ec9 100644 --- a/pulsar/consumer_zero_queue.go +++ b/pulsar/consumer_zero_queue.go @@ -66,7 +66,11 @@ func newZeroConsumer(client *client, options ConsumerOptions, topic string, consumerName: options.Name, metrics: client.metrics.GetLeveledMetrics(topic), } - opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 0, zc.options) + tn, err := internal.ParseTopicName(topic) + if err != nil { + return nil, err + } + opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options) conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics) if err != nil { return nil, err @@ -142,11 +146,14 @@ func (z *zeroQueueConsumer) Ack(m Message) error { func (z *zeroQueueConsumer) checkMsgIDPartition(msgID MessageID) error { partition := msgID.PartitionIdx() - if partition != 0 { - z.log.Errorf("invalid partition index %d expected a partition equal to 0", - partition) - return fmt.Errorf("invalid partition index %d expected a partition equal to 0", - partition) + if partition == 0 || partition == -1 { + return nil + } + if partition != z.pc.partitionIdx { + z.log.Errorf("invalid partition index %d expected a partition equal to %d", + partition, z.pc.partitionIdx) + return fmt.Errorf("invalid partition index %d expected a partition equal to %d", + partition, z.pc.partitionIdx) } return nil } diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index 06db433bd9..72048d7969 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -115,7 +115,8 @@ func TestNormalZeroQueueConsumer(t *testing.T) { assert.Equal(t, "pulsar", msg.Key()) assert.Equal(t, expectProperties, msg.Properties()) // ack message - consumer.Ack(msg) + err = consumer.Ack(msg) + assert.Nil(t, err) log.Printf("receive message: %s", msg.ID().String()) } err = consumer.Unsubscribe() @@ -228,7 +229,8 @@ func TestReconnectConsumer(t *testing.T) { assert.Equal(t, "pulsar", msg.Key()) assert.Equal(t, expectProperties, msg.Properties()) // ack message - consumer.Ack(msg) + err = consumer.Ack(msg) + assert.Nil(t, err) log.Printf("receive message: %s", msg.ID().String()) } err = consumer.Unsubscribe() @@ -341,7 +343,7 @@ func TestPartitionZeroQueueConsumer(t *testing.T) { assert.Nil(t, consumer) assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics") } -func TestOnePartitionZeroQueueConsumer(t *testing.T) { +func TestSpecifiedPartitionZeroQueueConsumer(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) @@ -350,17 +352,65 @@ func TestOnePartitionZeroQueueConsumer(t *testing.T) { defer client.Close() topic := newTopicName() - err = createPartitionedTopic(topic, 1) + ctx := context.Background() + err = createPartitionedTopic(topic, 2) + assert.Nil(t, err) + topics, err := client.TopicPartitions(topic) assert.Nil(t, err) // create consumer consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topic, + Topic: topics[1], SubscriptionName: "my-sub", EnableZeroQueueConsumer: true, }) - assert.Nil(t, consumer) - assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics") + assert.Nil(t, err) + _, ok := consumer.(*zeroQueueConsumer) + assert.True(t, ok) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topics[1], + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msg, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }) + assert.Nil(t, err) + log.Printf("send message: %s", msg.String()) + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + expectMsg := fmt.Sprintf("hello-%d", i) + expectProperties := map[string]string{ + "key-1": "pulsar-1", + } + assert.Equal(t, []byte(expectMsg), msg.Payload()) + assert.Equal(t, "pulsar", msg.Key()) + assert.Equal(t, expectProperties, msg.Properties()) + // ack message + err = consumer.Ack(msg) + assert.Nil(t, err) + log.Printf("receive message: %s", msg.ID().String()) + } + err = consumer.Unsubscribe() + assert.Nil(t, err) } func TestZeroQueueConsumerGetLastMessageIDs(t *testing.T) { @@ -576,7 +626,8 @@ func TestZeroQueueConsumer_Nack(t *testing.T) { if i%2 == 0 { // Only acks even messages - consumer.Ack(msg) + err = consumer.Ack(msg) + assert.Nil(t, err) } else { // Fails to process odd messages consumer.Nack(msg) @@ -591,7 +642,8 @@ func TestZeroQueueConsumer_Nack(t *testing.T) { assert.Nil(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) - consumer.Ack(msg) + err = consumer.Ack(msg) + assert.Nil(t, err) } } @@ -641,7 +693,8 @@ func TestZeroQueueConsumer_Seek(t *testing.T) { msg, err := consumer.Receive(ctx) assert.Nil(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) - consumer.Ack(msg) + err = consumer.Ack(msg) + assert.Nil(t, err) } err = consumer.Seek(seekID) @@ -698,7 +751,8 @@ func TestZeroQueueConsumer_SeekByTime(t *testing.T) { msg, err := consumer.Receive(ctx) assert.Nil(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) - consumer.Ack(msg) + err = consumer.Ack(msg) + assert.Nil(t, err) } currentTimestamp := time.Now() @@ -711,6 +765,7 @@ func TestZeroQueueConsumer_SeekByTime(t *testing.T) { msg, err := consumer.Receive(ctx) assert.Nil(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) - consumer.Ack(msg) + err = consumer.Ack(msg) + assert.Nil(t, err) } }