diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index c8543259a3..b4bb6aec8a 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -33,12 +33,17 @@ const ( type reader struct { sync.Mutex + topic string client *client - pc *partitionConsumer + options ReaderOptions + consumers []*partitionConsumer messageCh chan ConsumerMessage lastMessageInBroker trackingMessageID + dlq *dlqRouter log log.Logger metrics *internal.LeveledMetrics + stopDiscovery func() + closeOnce sync.Once } func newReader(client *client, options ReaderOptions) (Reader, error) { @@ -50,74 +55,38 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, newError(InvalidConfiguration, "StartMessageID is required") } - startMessageID, ok := toTrackingMessageID(options.StartMessageID) - if !ok { - // a custom type satisfying MessageID may not be a messageID or trackingMessageID - // so re-create messageID using its data - deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize()) - if err != nil { - return nil, err - } - // de-serialized MessageID is a messageID - startMessageID = trackingMessageID{ - messageID: deserMsgID.(messageID), - receivedTime: time.Now(), - } - } - - subscriptionName := options.SubscriptionRolePrefix - if subscriptionName == "" { - subscriptionName = "reader" + if options.ReceiverQueueSize <= 0 { + options.ReceiverQueueSize = defaultReceiverQueueSize } - subscriptionName += "-" + generateRandomName() - receiverQueueSize := options.ReceiverQueueSize - if receiverQueueSize <= 0 { - receiverQueueSize = defaultReceiverQueueSize - } - - consumerOptions := &partitionConsumerOpts{ - topic: options.Topic, - consumerName: options.Name, - subscription: subscriptionName, - subscriptionType: Exclusive, - receiverQueueSize: receiverQueueSize, - startMessageID: startMessageID, - startMessageIDInclusive: options.StartMessageIDInclusive, - subscriptionMode: nonDurable, - readCompacted: options.ReadCompacted, - metadata: options.Properties, - nackRedeliveryDelay: defaultNackRedeliveryDelay, - replicateSubscriptionState: false, - decryption: options.Decryption, + // Provide dummy dlq router with not dlq policy + dlq, err := newDlqRouter(client, nil, client.log) + if err != nil { + return nil, err } reader := &reader{ + topic: options.Topic, client: client, + options: options, messageCh: make(chan ConsumerMessage), - log: client.log.SubLogger(log.Fields{"topic": options.Topic}), + dlq: dlq, metrics: client.metrics.GetLeveledMetrics(options.Topic), + log: client.log.SubLogger(log.Fields{"topic": options.Topic}), } - // Provide dummy dlq router with not dlq policy - dlq, err := newDlqRouter(client, nil, client.log) - if err != nil { + if err := reader.internalTopicReadToPartitions(); err != nil { return nil, err } - pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq, reader.metrics) - if err != nil { - close(reader.messageCh) - return nil, err - } + reader.stopDiscovery = reader.runBackgroundPartitionDiscovery(time.Second * 60) - reader.pc = pc reader.metrics.ReadersOpened.Inc() return reader, nil } func (r *reader) Topic() string { - return r.pc.topic + return r.topic } func (r *reader) Next(ctx context.Context) (Message, error) { @@ -132,8 +101,8 @@ func (r *reader) Next(ctx context.Context) (Message, error) { // it will specify the subscription position anyway msgID := cm.Message.ID() if mid, ok := toTrackingMessageID(msgID); ok { - r.pc.lastDequeuedMsg = mid - r.pc.AckID(mid) + r.consumers[mid.partitionIdx].lastDequeuedMsg = mid + r.consumers[mid.partitionIdx].AckID(mid) return cm.Message, nil } return nil, newError(InvalidMessage, fmt.Sprintf("invalid message id type %T", msgID)) @@ -148,37 +117,64 @@ func (r *reader) HasNext() bool { return true } +retryLoop: for { - lastMsgID, err := r.pc.getLastMessageID() - if err != nil { - r.log.WithError(err).Error("Failed to get last message id from broker") - continue - } else { + consumerLoop: + for _, consumer := range r.consumers { + lastMsgID, err := consumer.getLastMessageID() + if err != nil { + r.log.WithError(err).Error("Failed to get last message id from broker") + continue retryLoop + } + if r.lastMessageInBroker.greater(lastMsgID.messageID) { + continue consumerLoop + } r.lastMessageInBroker = lastMsgID - break } + break retryLoop } return r.hasMoreMessages() } func (r *reader) hasMoreMessages() bool { - if !r.pc.lastDequeuedMsg.Undefined() { - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID) + for _, c := range r.consumers { + if r.consumerHasMoreMessages(c) { + return true + } } + return false +} - if r.pc.options.startMessageIDInclusive { - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID) +func (r *reader) consumerHasMoreMessages(pc *partitionConsumer) bool { + if !pc.lastDequeuedMsg.Undefined() { + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(pc.lastDequeuedMsg.messageID) + } + + if pc.options.startMessageIDInclusive { + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(pc.startMessageID.messageID) } // Non-inclusive - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.startMessageID.messageID) + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(pc.startMessageID.messageID) } func (r *reader) Close() { - r.pc.Close() - r.client.handlers.Del(r) - r.metrics.ReadersClosed.Inc() + r.closeOnce.Do(func() { + r.stopDiscovery() + + r.Lock() + defer r.Unlock() + + for _, consumer := range r.consumers { + if consumer != nil { + consumer.Close() + } + } + r.dlq.close() + r.client.handlers.Del(r) + r.metrics.ReadersClosed.Inc() + }) } func (r *reader) messageID(msgID MessageID) (trackingMessageID, bool) { @@ -207,12 +203,41 @@ func (r *reader) Seek(msgID MessageID) error { return nil } - return r.pc.Seek(mid) + return r.consumers[mid.partitionIdx].Seek(mid) } func (r *reader) SeekByTime(time time.Time) error { r.Lock() defer r.Unlock() - return r.pc.SeekByTime(time) + if len(r.consumers) > 1 { + return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions") + } + return r.consumers[0].SeekByTime(time) +} + +func (r *reader) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) { + var wg sync.WaitGroup + stopDiscoveryCh := make(chan struct{}) + ticker := time.NewTicker(period) + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stopDiscoveryCh: + return + case <-ticker.C: + r.log.Debug("Auto discovering new partitions") + r.internalTopicReadToPartitions() + } + } + }() + + return func() { + ticker.Stop() + close(stopDiscoveryCh) + wg.Wait() + } } diff --git a/pulsar/reader_partition.go b/pulsar/reader_partition.go new file mode 100644 index 0000000000..106bdd9f14 --- /dev/null +++ b/pulsar/reader_partition.go @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package pulsar + +import ( + "sync" + "time" +) + +const ( + ReaderSubNamePrefix = "reader" +) + +func (r *reader) internalTopicReadToPartitions() error { + partitions, err := r.client.TopicPartitions(r.topic) + if err != nil { + return err + } + + oldNumPartitions, newNumPartitions := 0, len(partitions) + + r.Lock() + defer r.Unlock() + + oldReaders, oldNumPartitions := r.consumers, len(r.consumers) + if oldReaders != nil { + if oldNumPartitions == newNumPartitions { + r.log.Debug("Number of partitions in topic has not changed") + return nil + } + + r.log.WithField("old_partitions", oldNumPartitions). + WithField("new_partitions", newNumPartitions). + Info("Changed number of partitions in topic") + } + + r.consumers = make([]*partitionConsumer, newNumPartitions) + + // When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions, + // we need to rebuild the cache of new consumers, otherwise the array will be out of bounds. + if oldReaders != nil && oldNumPartitions < newNumPartitions { + // Copy over the existing consumer instances + for i := 0; i < oldNumPartitions; i++ { + r.consumers[i] = oldReaders[i] + } + } + + type ConsumerError struct { + err error + partition int + consumer *partitionConsumer + } + + startMessageID, ok := toTrackingMessageID(r.options.StartMessageID) + if !ok { + // a custom type satisfying MessageID may not be a messageID or trackingMessageID + // so re-create messageID using its data + deserMsgID, err := deserializeMessageID(r.options.StartMessageID.Serialize()) + if err != nil { + return err + } + // de-serialized MessageID is a messageID + startMessageID = trackingMessageID{ + messageID: deserMsgID.(messageID), + receivedTime: time.Now(), + } + } + + startPartition := oldNumPartitions + partitionsToAdd := newNumPartitions - oldNumPartitions + + if partitionsToAdd < 0 { + partitionsToAdd = newNumPartitions + startPartition = 0 + } + + var wg sync.WaitGroup + ch := make(chan ConsumerError, partitionsToAdd) + wg.Add(partitionsToAdd) + + for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ { + partitionTopic := partitions[partitionIdx] + + go func(idx int, pt string) { + defer wg.Done() + + opts := &partitionConsumerOpts{ + topic: pt, + consumerName: r.options.Name, + subscription: ReaderSubNamePrefix + "-" + generateRandomName(), + subscriptionType: Exclusive, + partitionIdx: idx, + receiverQueueSize: r.options.ReceiverQueueSize, + nackRedeliveryDelay: defaultNackRedeliveryDelay, + metadata: r.options.Properties, + replicateSubscriptionState: false, + startMessageID: startMessageID, + subscriptionMode: nonDurable, + readCompacted: r.options.ReadCompacted, + decryption: r.options.Decryption, + } + + cons, err := newPartitionConsumer(nil, r.client, opts, r.messageCh, r.dlq, r.metrics) + ch <- ConsumerError{ + err: err, + partition: idx, + consumer: cons, + } + }(partitionIdx, partitionTopic) + } + + go func() { + wg.Wait() + close(ch) + }() + + for ce := range ch { + if ce.err != nil { + err = ce.err + } else { + r.consumers[ce.partition] = ce.consumer + } + } + + if err != nil { + // Since there were some failures, + // cleanup all the partitions that succeeded in creating the consumer + for _, c := range r.consumers { + if c != nil { + c.Close() + } + } + return err + } + + if newNumPartitions < oldNumPartitions { + r.metrics.ConsumersPartitions.Set(float64(newNumPartitions)) + } else { + r.metrics.ConsumersPartitions.Add(float64(partitionsToAdd)) + } + return nil +} diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index bdafea086f..4d27504ae3 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -20,6 +20,7 @@ package pulsar import ( "context" "fmt" + "net/http" "testing" "time" @@ -92,6 +93,53 @@ func TestReader(t *testing.T) { } } +func TestMultipleTopicReader(t *testing.T) { + ctx := context.Background() + + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions" + makeHTTPCall(t, http.MethodPut, testURL, "3") + + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + defer reader.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + } +} + func TestReaderConnectError(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar://invalid-hostname:6650",