diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 56829270b6..08c31e6409 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -168,12 +168,26 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { } func (c *client) CreateReader(options ReaderOptions) (Reader, error) { - reader, err := newReader(c, options) + topics, err := c.TopicPartitions(options.Topic) if err != nil { return nil, err } - c.handlers.Add(reader) - return reader, nil + if len(topics) <= 1 { + reader, err := newReader(c, options) + if err != nil { + return nil, err + } + c.handlers.Add(reader) + return reader, nil + } + + // create multi topic reader + multiTopicReader, err := newMultiTopicReader(c, options) + if err != nil { + return nil, err + } + c.handlers.Add(multiTopicReader) + return multiTopicReader, nil } func (c *client) TopicPartitions(topic string) ([]string, error) { diff --git a/pulsar/consumer.go b/pulsar/consumer.go index a71a2d450a..a69c54e651 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -182,6 +182,15 @@ type ConsumerOptions struct { // > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)` // > because we are not able to get the redeliveryCount from the message ID. NackBackoffPolicy NackBackoffPolicy + + // startMessageID internally used by multitopic-reader + startMessageID MessageID + + // startMessageIDInclusive internally used by multitopic-reader + startMessageIDInclusive bool + + // subscriptionMode internally used by multitopic-reader + subscriptionMode subscriptionMode } // Consumer is an interface that abstracts behavior of Pulsar's consumer @@ -248,4 +257,13 @@ type Consumer interface { // Name returns the name of consumer. Name() string + + // lastDequeuedMsg used for setting last dequeued msg id by internal partition consumers + lastDequeuedMsg(MessageID) error + + // hasMessages get if messages are available + hasMessages() (bool, error) + + // messagesInQueue get the number of messages available in queue + messagesInQueue() int } diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 2bd3ed50e4..5bd6420082 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -327,6 +327,22 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { ch := make(chan ConsumerError, partitionsToAdd) wg.Add(partitionsToAdd) + // default start messageId + startMessageID := trackingMessageID{} + + // if start message id is provided + if _, ok := c.options.startMessageID.(trackingMessageID); ok { + startMessageID = c.options.startMessageID.(trackingMessageID) + } + + // default subscription mode is durable + subscriptionMode := durable + + // for reader subscription mode is nonDurabale + if c.options.subscriptionMode == nonDurable { + subscriptionMode = nonDurable + } + for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ { partitionTopic := partitions[partitionIdx] @@ -352,14 +368,15 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { metadata: metadata, subProperties: subProperties, replicateSubscriptionState: c.options.ReplicateSubscriptionState, - startMessageID: trackingMessageID{}, - subscriptionMode: durable, + startMessageID: startMessageID, + subscriptionMode: subscriptionMode, readCompacted: c.options.ReadCompacted, interceptors: c.options.Interceptors, maxReconnectToBroker: c.options.MaxReconnectToBroker, keySharedPolicy: c.options.KeySharedPolicy, schema: c.options.Schema, decryption: c.options.Decryption, + startMessageIDInclusive: c.options.startMessageIDInclusive, } cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics) ch <- ConsumerError{ @@ -670,7 +687,6 @@ func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) { return mid, true } - func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics interface{}) error { // decryption is enabled, use default messagecrypto if not provided if options.Decryption != nil && options.Decryption.MessageCrypto == nil { @@ -684,3 +700,60 @@ func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics } return nil } + +func (c *consumer) lastDequeuedMsg(msgID MessageID) error { + mid, ok := toTrackingMessageID(msgID) + if !ok { + return newError(InvalidMessage, fmt.Sprintf("invalid message id type %T", msgID)) + } + + if msgID.PartitionIdx() >= 0 { + c.consumers[mid.PartitionIdx()].lastDequeuedMsg = mid + return nil + } + + return newError(InvalidMessage, fmt.Sprintf("invalid message id type %T", msgID)) +} + +func (c *consumer) hasMessages() (bool, error) { + for _, pc := range c.consumers { + if ok, err := c.hashMoreMessages(pc); err != nil { + return false, err // error in reading last message id from broker + } else if ok { // return true only if messages are available + return ok, nil + } + } + return false, nil // reach here only if done checking for all partition consumers +} + +func (c *consumer) hashMoreMessages(pc *partitionConsumer) (bool, error) { + lastMsgID, err := pc.getLastMessageID() + if err != nil { + return false, err + } + + // same logic as in reader_impl + if !pc.lastDequeuedMsg.Undefined() { + return lastMsgID.isEntryIDValid() && lastMsgID.greater(pc.lastDequeuedMsg.messageID), nil + } + + if pc.options.startMessageIDInclusive { + return lastMsgID.isEntryIDValid() && lastMsgID.greaterEqual(pc.startMessageID.messageID), nil + } + + // Non-inclusive + return lastMsgID.isEntryIDValid() && lastMsgID.greater(pc.startMessageID.messageID), nil +} + +func (c *consumer) messagesInQueue() int { + // messages in partition consumer queue + msgCount := 0 + for _, pc := range c.consumers { + msgCount += pc.messagesInQueue() + } + + // messages in consumer msg channel + msgCount += len(c.messageCh) + + return msgCount +} diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index c1cb3d8482..7c195e2070 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -228,3 +228,16 @@ func (c *multiTopicConsumer) SeekByTime(time time.Time) error { func (c *multiTopicConsumer) Name() string { return c.consumerName } + +func (c *multiTopicConsumer) lastDequeuedMsg(msgID MessageID) error { + return newError(OperationNotSupported, "lastDequeuedMsg is not supported for multi Topic consumer") +} + +func (c *multiTopicConsumer) hasMessages() (bool, error) { + return false, newError(OperationNotSupported, "hasMessages is not supported for multi Topic consumer") +} + +func (c *multiTopicConsumer) messagesInQueue() int { + c.log.Warn("messagesInQueue is not supported for multi topic consumer") + return 0 +} diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 7679a8cad4..0b0aa558d0 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1265,6 +1265,10 @@ func (pc *partitionConsumer) _getConn() internal.Connection { return pc.conn.Load().(internal.Connection) } +func (pc *partitionConsumer) messagesInQueue() int { + return len(pc.queueCh) +} + func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData { if msgID.Undefined() { return nil diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index ed2ae1ae82..372ba9806f 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -369,6 +369,19 @@ func (c *regexConsumer) topics() ([]string, error) { return filtered, nil } +func (c *regexConsumer) lastDequeuedMsg(msgID MessageID) error { + return newError(OperationNotSupported, "setting lastDequeuedMsg is not supported for Regex Topic consumer") +} + +func (c *regexConsumer) hasMessages() (bool, error) { + return false, newError(OperationNotSupported, "hasMessages is not supported for Regex Topic consumer") +} + +func (c *regexConsumer) messagesInQueue() int { + c.log.Warn("messagesInQueue is not supported for Regex topic consumer") + return 0 +} + type consumerError struct { err error topic string diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go index b15a926bee..0c5e395e99 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -18,9 +18,7 @@ package pulsartracing import ( - "context" "testing" - "time" "github.com/apache/pulsar-client-go/pulsar" "github.com/opentracing/opentracing-go" @@ -34,7 +32,7 @@ func TestConsumerBuildAndInjectChildSpan(t *testing.T) { opentracing.SetGlobalTracer(tracer) message := pulsar.ConsumerMessage{ - Consumer: &mockConsumer{}, + Consumer: pulsar.NewMockConsumer(), Message: &mockConsumerMessage{ properties: map[string]string{}, }, @@ -44,46 +42,3 @@ func TestConsumerBuildAndInjectChildSpan(t *testing.T) { assert.NotNil(t, span) assert.True(t, len(message.Properties()) > 0) } - -type mockConsumer struct { -} - -func (c *mockConsumer) Subscription() string { - return "" -} - -func (c *mockConsumer) Unsubscribe() error { - return nil -} - -func (c *mockConsumer) Receive(ctx context.Context) (message pulsar.Message, err error) { - return nil, nil -} - -func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage { - return nil -} - -func (c *mockConsumer) Ack(msg pulsar.Message) {} - -func (c *mockConsumer) AckID(msgID pulsar.MessageID) {} - -func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) {} - -func (c *mockConsumer) Nack(msg pulsar.Message) {} - -func (c *mockConsumer) NackID(msgID pulsar.MessageID) {} - -func (c *mockConsumer) Close() {} - -func (c *mockConsumer) Seek(msgID pulsar.MessageID) error { - return nil -} - -func (c *mockConsumer) SeekByTime(time time.Time) error { - return nil -} - -func (c *mockConsumer) Name() string { - return "" -} diff --git a/pulsar/mock_consumer.go b/pulsar/mock_consumer.go new file mode 100644 index 0000000000..1db1f97749 --- /dev/null +++ b/pulsar/mock_consumer.go @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "time" +) + +func NewMockConsumer() Consumer { + return &mockConsumer{} +} + +type mockConsumer struct { +} + +func (c *mockConsumer) Subscription() string { + return "" +} + +func (c *mockConsumer) Unsubscribe() error { + return nil +} + +func (c *mockConsumer) Receive(ctx context.Context) (message Message, err error) { + return nil, nil +} + +func (c *mockConsumer) Chan() <-chan ConsumerMessage { + return nil +} + +func (c *mockConsumer) Ack(msg Message) {} + +func (c *mockConsumer) AckID(msgID MessageID) {} + +func (c *mockConsumer) ReconsumeLater(msg Message, delay time.Duration) {} + +func (c *mockConsumer) Nack(msg Message) {} + +func (c *mockConsumer) NackID(msgID MessageID) {} + +func (c *mockConsumer) Close() {} + +func (c *mockConsumer) Seek(msgID MessageID) error { + return nil +} + +func (c *mockConsumer) SeekByTime(time time.Time) error { + return nil +} + +func (c *mockConsumer) Name() string { + return "" +} + +func (c *mockConsumer) lastDequeuedMsg(msgID MessageID) error { + return nil +} + +func (c *mockConsumer) messagesInQueue() int { + return 0 +} + +func (c *mockConsumer) hasMessages() (bool, error) { + return false, nil +} diff --git a/pulsar/multitopic_reader_impl.go b/pulsar/multitopic_reader_impl.go new file mode 100644 index 0000000000..d2383b367b --- /dev/null +++ b/pulsar/multitopic_reader_impl.go @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + + "fmt" + + "time" + + "github.com/apache/pulsar-client-go/pulsar/log" +) + +type multiTopicReader struct { + consumer Consumer + client *client + log log.Logger + topic string +} + +func newMultiTopicReader(client *client, options ReaderOptions) (Reader, error) { + + if options.Topic == "" { + return nil, newError(InvalidConfiguration, "Topic is required") + } + + if options.StartMessageID == nil { + return nil, newError(InvalidConfiguration, "StartMessageID is required") + } + startMessageID, ok := toTrackingMessageID(options.StartMessageID) + if !ok { + // a custom type satisfying MessageID may not be a messageID or trackingMessageID + // so re-create messageID using its data + deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize()) + if err != nil { + return nil, err + } + // de-serialized MessageID is a messageID + startMessageID = trackingMessageID{ + messageID: deserMsgID.(messageID), + receivedTime: time.Now(), + } + } + // only earliest/latest message id supported + if !startMessageID.equal(latestMessageID) && !startMessageID.equal(earliestMessageID) { + return nil, newError(OperationNotSupported, "The partitioned topic startMessageId is illegal") + } + + subscriptionName := options.SubscriptionRolePrefix + if subscriptionName == "" { + subscriptionName = "multitopic-reader" + } + + subscriptionName += "-" + generateRandomName() + receiverQueueSize := options.ReceiverQueueSize + if receiverQueueSize <= 0 { + receiverQueueSize = defaultReceiverQueueSize + } + + consumerOptions := ConsumerOptions{ + Topic: options.Topic, + Name: options.Name, + SubscriptionName: subscriptionName, + ReceiverQueueSize: receiverQueueSize, + startMessageID: startMessageID, + Type: Exclusive, + subscriptionMode: nonDurable, + startMessageIDInclusive: options.StartMessageIDInclusive, + ReadCompacted: options.ReadCompacted, + Properties: options.Properties, + NackRedeliveryDelay: defaultNackRedeliveryDelay, + ReplicateSubscriptionState: false, + Decryption: options.Decryption, + } + + multiTopicReader := &multiTopicReader{ + client: client, + log: client.log.SubLogger(log.Fields{"topic": options.Topic}), + topic: options.Topic, + } + + consumer, err := newConsumer(client, consumerOptions) + if err != nil { + return nil, err + } + multiTopicReader.consumer = consumer + return multiTopicReader, nil +} + +func (r *multiTopicReader) Topic() string { + return r.topic +} + +func (r *multiTopicReader) Next(ctx context.Context) (Message, error) { + msg, err := r.consumer.Receive(ctx) + if err != nil { + return nil, err + } + + if mid, ok := toTrackingMessageID(msg.ID()); ok { + err = r.consumer.lastDequeuedMsg(mid) + if err != nil { + return nil, err + } + // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, + // it will specify the subscription position anyway + r.consumer.AckID(mid) + return msg, nil + } + return nil, newError(InvalidMessage, fmt.Sprintf("invalid message id type %T", msg.ID())) +} + +func (r *multiTopicReader) HasNext() bool { + + for { + // messages are available in queue + if r.consumer.messagesInQueue() > 0 { + return true + } + + // check if messages are available on broker + hasMessages, err := r.consumer.hasMessages() + if err != nil { + r.log.WithError(err).Error("Failed to get last message id from broker") + continue + } + return hasMessages + } +} + +func (r *multiTopicReader) Close() { + r.consumer.Close() +} + +func (r *multiTopicReader) Seek(msgID MessageID) error { + return r.consumer.Seek(msgID) +} + +func (r *multiTopicReader) SeekByTime(time time.Time) error { + return r.consumer.SeekByTime(time) +} diff --git a/pulsar/multitopic_reader_test.go b/pulsar/multitopic_reader_test.go new file mode 100644 index 0000000000..f65024f347 --- /dev/null +++ b/pulsar/multitopic_reader_test.go @@ -0,0 +1,475 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar/crypto" + "github.com/stretchr/testify/assert" +) + +func newPartitionedTopic(t *testing.T) string { + topicName := fmt.Sprintf("testReaderPartitions-%v", time.Now().Nanosecond()) + topic := fmt.Sprintf("persistent://public/default/%v", topicName) + testURL := adminURL + "/" + fmt.Sprintf("admin/v2/persistent/public/default/%v/partitions", topicName) + + makeHTTPCall(t, http.MethodPut, testURL, "5") + return topic +} +func TestMultiTopicReaderConfigErrors(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newPartitionedTopic(t) + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + }) + assert.Nil(t, reader) + assert.NotNil(t, err) + + // only earliest/latest message id allowed for partitioned topic + reader, err = client.CreateReader(ReaderOptions{ + StartMessageID: newMessageID(1, 1, 1, 1), + Topic: topic, + }) + + assert.NotNil(t, err) + assert.Nil(t, reader) +} + +func TestMultiTopicReader(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + ctx := context.Background() + topic := newPartitionedTopic(t) + + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + defer reader.Close() + + // expected reader should be of type multiTopicReader + _, ok := reader.(*multiTopicReader) + assert.True(t, ok) + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + } + + // receive 10 messages + count := 0 + for reader.HasNext() { + _, err := reader.Next(context.Background()) + assert.NoError(t, err) + count++ + } + + assert.Equal(t, 10, count) +} + +func TestMultiTopicReaderOnLatestWithBatching(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + ctx := context.Background() + topic := newPartitionedTopic(t) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + BatchingMaxMessages: 4, + BatchingMaxPublishDelay: 1 * time.Second, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + msgIDs := [10]MessageID{} + for i := 0; i < 10; i++ { + idx := i + producer.SendAsync(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }, func(id MessageID, producerMessage *ProducerMessage, err error) { + assert.NoError(t, err) + assert.NotNil(t, id) + msgIDs[idx] = id + }) + } + + err = producer.Flush() + assert.NoError(t, err) + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: LatestMessageID(), + StartMessageIDInclusive: false, + }) + + assert.Nil(t, err) + defer reader.Close() + + // expected reader should be of type multiTopicReader + _, ok := reader.(*multiTopicReader) + assert.True(t, ok) + + // Reader should yield no message since it's at the end of the topic + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + msg, err := reader.Next(ctx) + assert.Error(t, err) + assert.Nil(t, msg) + cancel() +} + +func TestShouldFailMultiTopicReaderOnSpecificMessage(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + ctx := context.Background() + topic := newPartitionedTopic(t) + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + msgIDs := [10]MessageID{} + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + msgIDs[i] = msgID + } + + // create reader on 5th message (not included) + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: msgIDs[4], + }) + + // reader creation should fail as only latest/latest message ids are supported + assert.NotNil(t, err) + assert.Nil(t, reader) +} + +func TestMultiTopicReaderHasNextAgainstEmptyTopic(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newPartitionedTopic(t) + + // create reader on 5th message (not included) + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + + assert.Nil(t, err) + defer reader.Close() + + // expected reader should be of type multiTopicReader + _, ok := reader.(*multiTopicReader) + assert.True(t, ok) + + // should be false since no messages are produced + assert.Equal(t, reader.HasNext(), false) +} + +func TestMultiTopicReaderHasNext(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newPartitionedTopic(t) + ctx := context.Background() + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + + assert.Nil(t, err) + defer reader.Close() + + // expected reader should be of type multiTopicReader + _, ok := reader.(*multiTopicReader) + assert.True(t, ok) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + } + + i := 0 + // hashNext should be false if no more messages are available + for reader.HasNext() { + _, err := reader.Next(ctx) + assert.NoError(t, err) + + i++ + } + assert.Equal(t, 10, i) +} + +func TestMultuTopicReaderHasNextWithReceiverQueueSize(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newPartitionedTopic(t) + ctx := context.Background() + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + ReceiverQueueSize: 1, + }) + + assert.Nil(t, err) + defer reader.Close() + + // expected reader should be of type multiTopicReader + _, ok := reader.(*multiTopicReader) + assert.True(t, ok) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + } + + count := 0 + + // HashNext should be false if no messages are available + for reader.HasNext() { + msg, err := reader.Next(ctx) + assert.Nil(t, err) + assert.NotNil(t, msg) + + count++ + } + + assert.Equal(t, 10, count) +} + +func TestMultiTopicReaderWithLatestMsg(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newPartitionedTopic(t) + ctx := context.Background() + + // create reader on the last message (inclusive) + reader0, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: LatestMessageID(), + StartMessageIDInclusive: true, + }) + + assert.Nil(t, err) + defer reader0.Close() + + // expected reader should be of type multiTopicReader + _, ok := reader0.(*multiTopicReader) + assert.True(t, ok) + + assert.False(t, reader0.HasNext()) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + msgIDMap := make(map[int32]struct{}) + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + + // to track the partitions used for message + msgIDMap[msgID.PartitionIdx()] = struct{}{} + } + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: LatestMessageID(), + StartMessageIDInclusive: true, + }) + + assert.Nil(t, err) + defer reader.Close() + + // expected reader should be of type multiTopicReader + _, ok = reader.(*multiTopicReader) + assert.True(t, ok) + + count := 0 + // should only read latest message from each partition + for reader.HasNext() { + msg, err := reader.Next(ctx) + assert.Nil(t, err) + assert.NotNil(t, msg) + + count++ + } + + // should only read latest message from each partition + assert.Equal(t, len(msgIDMap), count) +} + +func TestMultiTopicProducerReaderRSAEncryption(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newPartitionedTopic(t) + ctx := context.Background() + + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail, + }, + }) + assert.Nil(t, err) + defer reader.Close() + + // expected reader should be of type multiTopicReader + _, ok := reader.(*multiTopicReader) + assert.True(t, ok) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + ProducerCryptoFailureAction: crypto.ProducerCryptoFailureActionFail, + Keys: []string{"client-rsa.pem"}, + }, + }) + + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + } + + // receive 10 messages + count := 0 + for reader.HasNext() { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, msg) + count++ + } + + assert.Equal(t, 10, count) +}