Skip to content

Zero queue support subscribing topic's partitionΒ #1421

@geniusjoe

Description

@geniusjoe

Is your feature request related to a problem? Please describe.
Main pr #1225
When we create a consumer with ZeroQueueConsumer option enabled, to subscribe a topic's partition as follows, It will throw an exception: ZeroQueueConsumer is not supported for partitioned topics

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic: "public/default/test1-partition-0",
		SubscriptionName: "sub-test",
		Type:             pulsar.Shared,
		EnableZeroQueueConsumer: true
	})

But in Java sdk, we can both subscribe a non-partitioned topic or a specific topic's partition, because it only check partition number regardless of topic type:

src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#doSingleTopicSubscribeAsync
            if (metadata.partitions > 0) {
                consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
                        externalExecutorProvider, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
            } else {
                int partitionIndex = TopicName.getPartitionIndex(topic);
                consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, externalExecutorProvider,
                        partitionIndex, false, consumerSubscribedFuture, null, schema, interceptors,
                        true /* createTopicIfDoesNotExist */);
            }

src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#newConsumerImpl
        if (conf.getReceiverQueueSize() == 0) {
            return new ZeroQueueConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer,
                    subscribeFuture,
                    startMessageId, schema, interceptors,
                    createTopicIfDoesNotExist);
        } else {
            return new ConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer,
                    parentConsumerHasListener,
                    subscribeFuture, startMessageId,
                    startMessageRollbackDurationInSec /* rollback time in sec to start msgId */,
                    schema, interceptors, createTopicIfDoesNotExist);
        }

The main reason we cannot subscribe more than one partitions in zero queue was discussed in this issue:

Since we don't know if a topic will there be any messages written, to avoid a consumer block to one topic(no more data are written)

It means when we subscribing multiple partitions at the same time, we cannot find which partition currently has messages and consumer need to send permit to. And we can directly send permit to a non-partitioned topic or a topic's partition. So that maybe we can keep consistent with Java sdk and support topic's partition as well.

Describe the solution you'd like
The key concept of zero-queue is: only when we call consumer.Receive() function, the sdk will increase permit and call flowIfNeed() to fetch message from broker. So that there is no messages in the pending queue and we don't need to call pc.internalFlow(initialPermits) when connection first established in consumer_partition.go#<-pc.connectedCh.

I think maybe we need modify zero_queue as below:

  1. Remove one partition check in pulsar/consumer_impl.go#newInternalConsumer
	//if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
	//	strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
	//	return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
	//}
  1. Create partitioned_consumer with exact topic partition in pulsar/consumer_zero_queue.go#newZeroConsumer
	tn, err := internal.ParseTopicName(topic)
	if err != nil {
		return nil, err
	}
	opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options) 

Additional context
I think maybe there is one corner case we need to discuss:
In pr #1404

pc.log.Info("Reconnected consumer to broker")
consumer will trigger a pc.availablePermits.inc() in pulsar/consumer_partition.go#reconnectToBroker and fetch a message from broker:

		if err == nil {
			// Successfully reconnected
			pc.log.Info("Reconnected consumer to broker")
			bo.Reset()
			if pc.options.enableZeroQueueConsumer {
				pc.log.Info("zeroQueueConsumer reconnect, reset availablePermits")
				pc.availablePermits.inc()
			}
			return struct{}{}, nil
		}

It's okay when a consumer call Receive() before topic unload and then get message, because currently consumer has sent two permits and only the one in reconnectToBroker() work. But I'm not sure what will happen if the consumer call Receive() after topic unload, at this time will both two permits work and save two messages in the pending queue?
Is it better to refer to Java sdk to save consumer status, and only increase permit when consumer was in waitingOnReceive status before reconnect?

                // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()"
                synchronized (this) {
                    // if message received due to an old flow - discard it and wait for the message from the
                    // latest flow command
                    if (msgCnx == cnx()) {
                        waitingOnReceiveForZeroQueueSize = false;
                        break;
                    }
                }
            } while (true);

    @Override
    protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) {
        super.consumerIsReconnectedToBroker(cnx, currentQueueSize);

        // For zerosize queue : If the connection is reset and someone is waiting for the messages
        // or queue was not empty: send a flow command
        if (waitingOnReceiveForZeroQueueSize
                || currentQueueSize > 0
                || (listener != null && !waitingOnListenerForZeroQueueSize)) {
            increaseAvailablePermits(cnx);
        }
    }

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions