Skip to content

Conversation

@crossoverJie
Copy link
Member

Motivation

We discussed here that after a zero-queue consumer unloads a topic and then starts consuming again, the internal queue may contain multiple messages.

Modifications

So I submitted a test case to ensure that consuming messages still works correctly after unloading the topic.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a new test case TestUnloadTopicBeforeConsume to verify that zero queue consumers can properly handle topic unloading scenarios before message consumption. The test ensures that the consumer can successfully reconnect and receive all messages after a topic unload event.

  • Adds imports for structured logging (log/slog, os) and Pulsar logging utilities
  • Adds new test TestUnloadTopicBeforeConsume to validate consumer behavior when topic is unloaded before consuming messages
  • Updates comment in TestReconnectConsumer to clarify the purpose of topic unloading

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@crossoverJie crossoverJie added this to the v0.18.0 milestone Oct 30, 2025
@crossoverJie crossoverJie merged commit 7b4b3a6 into apache:master Oct 30, 2025
7 checks passed
@crossoverJie crossoverJie deleted the zero-consumer-unload-topic branch October 30, 2025 14:42
@geniusjoe
Copy link
Contributor

geniusjoe commented Nov 17, 2025

@crossoverJie
I'm sorry for late reply. I think this case passed because Unload() is an async operation and we call Receive() function before it finished.

I modified a little test case to make sure we call Receive() after Unload() finish, which will fail because subscription still has one unack message:

	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topic,
		SubscriptionName:        "my-sub",
		EnableZeroQueueConsumer: true,
		Type:                    Shared,  // using Shared subscription type to support unack subscription stats
	})
        ... 
	// send 10 messages
	for i := 0; i < 10; i++ {
		msg, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
			Key:     "pulsar",
			Properties: map[string]string{
				"key-1": "pulsar-1",
			},
		})
		assert.Nil(t, err)
		log.Printf("send message: %s", msg.String())
	}

	log.Println("unloading topic")
	topicName, err := utils.GetTopicName(topic)
	assert.Nil(t, err)
	err = admin.Topics().Unload(*topicName)
	assert.Nil(t, err)
	log.Println("unloaded topic")
	time.Sleep(1 * time.Minute)    // wait for topic unload finish

	// receive 10 messages
	for i := 0; i < 10; i++ {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}

		expectMsg := fmt.Sprintf("hello-%d", i)
		expectProperties := map[string]string{
			"key-1": "pulsar-1",
		}
		assert.Equal(t, []byte(expectMsg), msg.Payload())
		assert.Equal(t, "pulsar", msg.Key())
		assert.Equal(t, expectProperties, msg.Properties())
		// ack message
		err = consumer.Ack(msg)
		assert.Nil(t, err)
		log.Printf("receive message: %s", msg.ID().String())
	}
	//	send one more message and we do not manually receive it
	_, err = producer.Send(ctx, &ProducerMessage{
		Payload: []byte(fmt.Sprintf("hello-%d", 11)),
		Key:     "pulsar",
		Properties: map[string]string{
			"key-1": "pulsar-1",
		},
	})
	//	wait for broker send messages to consumer and topic stats update finish 
	time.Sleep(1 * time.Minute)
	topicStats, err := admin.Topics().GetStats(*topicName)
	for _, subscriptionStats := range topicStats.Subscriptions {
		assert.Equal(t, subscriptionStats.Consumers[0].UnAckedMessages, 0)
	}
WXWorkCapture_17633877849880

Log below, and we can find that consumer call a requesting more permits=1 before receive(), and store received message in messageCh

2025/11/17 22:10:17 unloading topic
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="Got command! type:CLOSE_PRODUCER close_producer:{producer_id:1 request_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="Received command: type:CLOSE_PRODUCER close_producer:{producer_id:1 request_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.205+08:00 level=INFO msg="Broker notification of Closed producer: 1" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.205+08:00 level=WARN msg="Connection was closed" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1 cnx="<my-pod>:34634 -> <my-pod>:6650"
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="Got command! type:CLOSE_CONSUMER close_consumer:{consumer_id:1 request_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="Received command: type:CLOSE_CONSUMER close_consumer:{consumer_id:1 request_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.205+08:00 level=INFO msg="Broker notification of Closed consumer: 1" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="connection closed and send to connectClosedCh" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="runEventsLoop will reconnect" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650 keySuffix=0
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="Write data: 67" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.205+08:00 level=INFO msg="runEventsLoop will reconnect in producer" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650 keySuffix=0
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-11-17T22:10:17.205+08:00 level=DEBUG msg="Write data: 67" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:7 authoritative:true proxy_through_service_url:false} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:7 authoritative:true proxy_through_service_url:false} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:8 authoritative:true proxy_through_service_url:false} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:8 authoritative:true proxy_through_service_url:false} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-127315028} lookup response: &{Response:type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:8 authoritative:true proxy_through_service_url:false} Cnx:0xc0004d4160}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-127315028} on broker. pulsar://<my-pod>:6650 /  - Use proxy: false" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Lookup result: &{pulsar://<my-pod>:6650 pulsar://<my-pod>:6650}" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="The partition producer schema is nil" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://<my-pod>:6650 physicalAddr=pulsar://<my-pod>:6650 keySuffix=0
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Found connection in pool key=<my-pod>:6650-<my-pod>:6650-0 logical_addr=pulsar://<my-pod>:6650 physical_addr=pulsar://<my-pod>:6650"
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Write data: 94" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-127315028} lookup response: &{Response:type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:7 authoritative:true proxy_through_service_url:false} Cnx:0xc0004d4160}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-127315028} on broker. pulsar://<my-pod>:6650 /  - Use proxy: false" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Lookup result: &{pulsar://<my-pod>:6650 pulsar://<my-pod>:6650}" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="The partition consumer schema is nil" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://<my-pod>:6650 physicalAddr=pulsar://<my-pod>:6650 keySuffix=0
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Found connection in pool key=<my-pod>:6650-<my-pod>:6650-0 logical_addr=pulsar://<my-pod>:6650 physical_addr=pulsar://<my-pod>:6650"
time=2025-11-17T22:10:17.207+08:00 level=DEBUG msg="Write data: 89" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.210+08:00 level=DEBUG msg="Got command! type:ERROR error:{request_id:9 error:ServiceNotReady message:\"org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Topic is temporarily unavailable\"} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.210+08:00 level=DEBUG msg="Received command: type:ERROR error:{request_id:9 error:ServiceNotReady message:\"org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Topic is temporarily unavailable\"} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.210+08:00 level=ERROR msg="Failed to create producer at send PRODUCER request" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1 error="server error: ServiceNotReady: org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Topic is temporarily unavailable"
time=2025-11-17T22:10:17.210+08:00 level=ERROR msg="Failed to create producer at reconnect" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1 error="server error: ServiceNotReady: org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Topic is temporarily unavailable"
time=2025-11-17T22:10:17.210+08:00 level=INFO msg="Reconnecting to broker" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1 assignedBrokerURL="" delayReconnectTime=110.870734ms
time=2025-11-17T22:10:17.210+08:00 level=DEBUG msg="Got command! type:ERROR error:{request_id:10 error:ServiceNotReady message:\"Topic is temporarily unavailable\"} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.210+08:00 level=DEBUG msg="Received command: type:ERROR error:{request_id:10 error:ServiceNotReady message:\"Topic is temporarily unavailable\"} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.210+08:00 level=ERROR msg="Failed to create consumer" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub error="server error: ServiceNotReady: Topic is temporarily unavailable"
time=2025-11-17T22:10:17.210+08:00 level=ERROR msg="Failed to create consumer at reconnect" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub error="server error: ServiceNotReady: Topic is temporarily unavailable"
time=2025-11-17T22:10:17.210+08:00 level=INFO msg="Reconnecting to broker" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub assignedBrokerURL="" delayReconnectTime=112.475971ms
2025/11/17 22:10:17 unloaded topic
time=2025-11-17T22:10:17.321+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650 keySuffix=0
time=2025-11-17T22:10:17.321+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-11-17T22:10:17.321+08:00 level=DEBUG msg="Write data: 67" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.322+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:11 authoritative:true proxy_through_service_url:false} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.322+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:11 authoritative:true proxy_through_service_url:false} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.322+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-127315028} lookup response: &{Response:type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:11 authoritative:true proxy_through_service_url:false} Cnx:0xc0004d4160}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T22:10:17.322+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-127315028} on broker. pulsar://<my-pod>:6650 /  - Use proxy: false" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T22:10:17.322+08:00 level=DEBUG msg="Lookup result: &{pulsar://<my-pod>:6650 pulsar://<my-pod>:6650}" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1
time=2025-11-17T22:10:17.322+08:00 level=DEBUG msg="The partition producer schema is nil" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1
time=2025-11-17T22:10:17.322+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://<my-pod>:6650 physicalAddr=pulsar://<my-pod>:6650 keySuffix=0
time=2025-11-17T22:10:17.322+08:00 level=DEBUG msg="Found connection in pool key=<my-pod>:6650-<my-pod>:6650-0 logical_addr=pulsar://<my-pod>:6650 physical_addr=pulsar://<my-pod>:6650"
time=2025-11-17T22:10:17.322+08:00 level=DEBUG msg="Write data: 94" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.323+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650 keySuffix=0
time=2025-11-17T22:10:17.323+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-11-17T22:10:17.323+08:00 level=DEBUG msg="Write data: 67" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.324+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:13 authoritative:true proxy_through_service_url:false} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.324+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:13 authoritative:true proxy_through_service_url:false} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:17.324+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-127315028} lookup response: &{Response:type:LOOKUP_RESPONSE lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\" response:Connect request_id:13 authoritative:true proxy_through_service_url:false} Cnx:0xc0004d4160}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T22:10:17.324+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-127315028} on broker. pulsar://<my-pod>:6650 /  - Use proxy: false" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T22:10:17.324+08:00 level=DEBUG msg="Lookup result: &{pulsar://<my-pod>:6650 pulsar://<my-pod>:6650}" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:10:17.324+08:00 level=DEBUG msg="The partition consumer schema is nil" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:10:17.324+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://<my-pod>:6650 physicalAddr=pulsar://<my-pod>:6650 keySuffix=0
time=2025-11-17T22:10:17.324+08:00 level=DEBUG msg="Found connection in pool key=<my-pod>:6650-<my-pod>:6650-0 logical_addr=pulsar://<my-pod>:6650 physical_addr=pulsar://<my-pod>:6650"
time=2025-11-17T22:10:17.324+08:00 level=DEBUG msg="Write data: 89" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.342+08:00 level=DEBUG msg="Got command! type:SUCCESS success:{request_id:14} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.342+08:00 level=DEBUG msg="Received command: type:SUCCESS success:{request_id:14} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.342+08:00 level=DEBUG msg="Got command! type:PRODUCER_SUCCESS producer_success:{request_id:12 producer_name:\"standalone-1000-30\" last_sequence_id:-1 schema_version:\"\" producer_ready:true} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.342+08:00 level=DEBUG msg="Received command: type:PRODUCER_SUCCESS producer_success:{request_id:12 producer_name:\"standalone-1000-30\" last_sequence_id:-1 schema_version:\"\" producer_ready:true} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.342+08:00 level=INFO msg="Connected producer" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1 cnx="<my-pod>:34634 -> <my-pod>:6650" epoch=2
time=2025-11-17T22:10:17.342+08:00 level=INFO msg="Reconnected producer to broker" topic=persistent://public/default/my-topic-127315028 producer_name=standalone-1000-30 producerID=1 cnx="<my-pod>:34634 -> <my-pod>:6650"
time=2025-11-17T22:10:17.342+08:00 level=INFO msg="Connected consumer" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:10:17.342+08:00 level=INFO msg="Reconnected consumer to broker" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:10:17.342+08:00 level=INFO msg="zeroQueueConsumer reconnect, reset availablePermits" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub


time=2025-11-17T22:10:17.342+08:00 level=DEBUG msg="requesting more permits=1 available=1" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub


time=2025-11-17T22:10:17.342+08:00 level=DEBUG msg="Write data: 16" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.342+08:00 level=DEBUG msg="dispatcher received connection event" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:10:17.342+08:00 level=DEBUG msg="dispatcher requesting initial permits=0" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:10:17.358+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:1273 entryId:0 partition:-1}} with payload size: 112 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.358+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:1273 entryId:0 partition:-1}} -- payload: &{[14 1 235 51 7 69 0 0 0 60 10 18 115 116 97 110 100 97 108 111 110 101 45 49 48 48 48 45 51 48 16 0 24 213 227 155 145 169 51 34 17 10 5 107 101 121 45 49 18 8 112 117 108 115 97 114 45 49 50 6 112 117 108 115 97 114 72 42 88 1 0 0 0 31 10 17 10 5 107 101 121 45 49 18 8 112 117 108 115 97 114 45 49 18 6 112 117 108 115 97 114 24 7 64 0 104 101 108 108 111 45 48] 0 112 {{} {} 0} <nil> <nil>}" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:17.358+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:1273 entryId:0 partition:-1}" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:47.129+08:00 level=DEBUG msg="Got command! type:PING ping:{} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:47.129+08:00 level=DEBUG msg="Received command: type:PING ping:{} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:47.129+08:00 level=DEBUG msg="Responding to PING request" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:47.129+08:00 level=DEBUG msg="Write data: 13" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:47.133+08:00 level=DEBUG msg="Sending PING" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:47.133+08:00 level=DEBUG msg="Write data: 13" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:47.133+08:00 level=DEBUG msg="Got command! type:PONG pong:{} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:47.133+08:00 level=DEBUG msg="Received command: type:PONG pong:{} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:47.133+08:00 level=DEBUG msg="Received PONG response" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:10:47.137+08:00 level=DEBUG msg="Sending PING" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:47.137+08:00 level=DEBUG msg="Write data: 13" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:47.138+08:00 level=DEBUG msg="Got command! type:PONG pong:{} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:47.138+08:00 level=DEBUG msg="Received command: type:PONG pong:{} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:47.138+08:00 level=DEBUG msg="Received PONG response" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:47.138+08:00 level=DEBUG msg="Got command! type:PING ping:{} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:47.138+08:00 level=DEBUG msg="Received command: type:PING ping:{} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:47.138+08:00 level=DEBUG msg="Responding to PING request" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:10:47.138+08:00 level=DEBUG msg="Write data: 13" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.129+08:00 level=DEBUG msg="Got command! type:PING ping:{} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.129+08:00 level=DEBUG msg="Received command: type:PING ping:{} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.129+08:00 level=DEBUG msg="Responding to PING request" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.129+08:00 level=DEBUG msg="Write data: 13" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.132+08:00 level=DEBUG msg="Sending PING" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.133+08:00 level=DEBUG msg="Write data: 13" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.133+08:00 level=DEBUG msg="Got command! type:PONG pong:{} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.133+08:00 level=DEBUG msg="Received command: type:PONG pong:{} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.133+08:00 level=DEBUG msg="Received PONG response" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.138+08:00 level=DEBUG msg="Sending PING" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.138+08:00 level=DEBUG msg="Write data: 13" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.138+08:00 level=DEBUG msg="Got command! type:PONG pong:{} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.138+08:00 level=DEBUG msg="Received command: type:PONG pong:{} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.138+08:00 level=DEBUG msg="Received PONG response" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.138+08:00 level=DEBUG msg="Got command! type:PING ping:{} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.139+08:00 level=DEBUG msg="Received command: type:PING ping:{} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.139+08:00 level=DEBUG msg="Responding to PING request" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.139+08:00 level=DEBUG msg="Write data: 13" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.173+08:00 level=DEBUG msg="Auto discovering new partitions" topic=my-topic-127315028
time=2025-11-17T22:11:17.174+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650 keySuffix=0
time=2025-11-17T22:11:17.174+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-11-17T22:11:17.174+08:00 level=DEBUG msg="Write data: 63" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.175+08:00 level=DEBUG msg="Got command! type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:15 response:Success} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.175+08:00 level=DEBUG msg="Received command: type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:15 response:Success} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:54964
time=2025-11-17T22:11:17.175+08:00 level=DEBUG msg="Got topic{my-topic-127315028} partitioned metadata response: &{Response:type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 request_id:15 response:Success} Cnx:0xc0004d4160}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T22:11:17.175+08:00 level=DEBUG msg="Number of partitions in topic has not changed" topic=my-topic-127315028
time=2025-11-17T22:11:17.218+08:00 level=DEBUG msg="requesting more permits=1 available=1" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
2025/11/17 22:11:17 receive message: 1273:0:-1
time=2025-11-17T22:11:17.218+08:00 level=DEBUG msg="requesting more permits=1 available=1" consumerID=1 name=dqnmg topic=persistent://public/default/my-topic-127315028 subscription=my-sub
time=2025-11-17T22:11:17.218+08:00 level=DEBUG msg="Write data: 16" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.218+08:00 level=DEBUG msg="Write data: 16" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.223+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:1273 entryId:1 partition:-1}} with payload size: 112 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.223+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:1273 entryId:1 partition:-1}} -- payload: &{[14 1 23 77 77 45 0 0 0 60 10 18 115 116 97 110 100 97 108 111 110 101 45 49 48 48 48 45 51 48 16 1 24 216 227 155 145 169 51 34 17 10 5 107 101 121 45 49 18 8 112 117 108 115 97 114 45 49 50 6 112 117 108 115 97 114 72 42 88 1 0 0 0 31 10 17 10 5 107 101 121 45 49 18 8 112 117 108 115 97 114 45 49 18 6 112 117 108 115 97 114 24 7 64 1 104 101 108 108 111 45 49] 0 112 {{} {} 0} <nil> <nil>}" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.223+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:1273 entryId:1 partition:-1}" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.223+08:00 level=DEBUG msg="Got command! type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:1273 entryId:2 partition:-1}} with payload size: 112 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.223+08:00 level=DEBUG msg="Received command: type:MESSAGE message:{consumer_id:1 message_id:{ledgerId:1273 entryId:2 partition:-1}} -- payload: &{[14 1 124 13 199 141 0 0 0 60 10 18 115 116 97 110 100 97 108 111 110 101 45 49 48 48 48 45 51 48 16 2 24 220 227 155 145 169 51 34 17 10 5 107 101 121 45 49 18 8 112 117 108 115 97 114 45 49 50 6 112 117 108 115 97 114 72 42 88 1 0 0 0 31 10 17 10 5 107 101 121 45 49 18 8 112 117 108 115 97 114 45 49 18 6 112 117 108 115 97 114 24 7 64 2 104 101 108 108 111 45 50] 0 112 {{} {} 0} <nil> <nil>}" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
time=2025-11-17T22:11:17.223+08:00 level=DEBUG msg="Got Message: consumer_id:1 message_id:{ledgerId:1273 entryId:2 partition:-1}" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:34634
2025/11/17 22:11:17 receive message: 1273:1:-1
image

Below is previous test case, we can found that we called Receive() before Unload finish, so that the previous flow took no effect, at this time there is only one permit and test passed:

2025/11/17 23:07:49 unloading topic
time=2025-11-17T23:07:49.183+08:00 level=DEBUG msg="Got command! type:CLOSE_PRODUCER  close_producer:{producer_id:1  request_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Received command: type:CLOSE_PRODUCER  close_producer:{producer_id:1  request_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.184+08:00 level=INFO msg="Broker notification of Closed producer: 1" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.184+08:00 level=WARN msg="Connection was closed" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1 cnx="<my-pod>:45038 -> <my-pod>:6650"
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Got command! type:CLOSE_CONSUMER  close_consumer:{consumer_id:1  request_id:18446744073709551615} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.184+08:00 level=INFO msg="runEventsLoop will reconnect in producer" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Received command: type:CLOSE_CONSUMER  close_consumer:{consumer_id:1  request_id:18446744073709551615} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.184+08:00 level=INFO msg="Broker notification of Closed consumer: 1" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="connection closed and send to connectClosedCh" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650 keySuffix=0
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Write data: 66" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="runEventsLoop will reconnect" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650 keySuffix=0
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Write data: 66" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:7  authoritative:true  proxy_through_service_url:false} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.184+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:7  authoritative:true  proxy_through_service_url:false} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:8  authoritative:true  proxy_through_service_url:false} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:8  authoritative:true  proxy_through_service_url:false} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-90865775} lookup response: &{Response:type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:8  authoritative:true  proxy_through_service_url:false} Cnx:0xc0000d4000}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-90865775} on broker. pulsar://<my-pod>:6650 /  - Use proxy: false" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Lookup result: &{pulsar://<my-pod>:6650 pulsar://<my-pod>:6650}" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="The partition consumer schema is nil" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-90865775} lookup response: &{Response:type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:7  authoritative:true  proxy_through_service_url:false} Cnx:0xc0000d4000}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-90865775} on broker. pulsar://<my-pod>:6650 /  - Use proxy: false" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Lookup result: &{pulsar://<my-pod>:6650 pulsar://<my-pod>:6650}" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="The partition producer schema is nil" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://<my-pod>:6650 physicalAddr=pulsar://<my-pod>:6650 keySuffix=0
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://<my-pod>:6650 physicalAddr=pulsar://<my-pod>:6650 keySuffix=0
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Found connection in pool key=<my-pod>:6650-<my-pod>:6650-0 logical_addr=pulsar://<my-pod>:6650 physical_addr=pulsar://<my-pod>:6650"
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Write data: 93" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.185+08:00 level=DEBUG msg="Found connection in pool key=<my-pod>:6650-<my-pod>:6650-0 logical_addr=pulsar://<my-pod>:6650 physical_addr=pulsar://<my-pod>:6650"
time=2025-11-17T23:07:49.186+08:00 level=DEBUG msg="Got command! type:ERROR  error:{request_id:10  error:ServiceNotReady  message:\"org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Topic is temporarily unavailable\"} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.186+08:00 level=DEBUG msg="Received command: type:ERROR  error:{request_id:10  error:ServiceNotReady  message:\"org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Topic is temporarily unavailable\"} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.186+08:00 level=ERROR msg="Failed to create producer at send PRODUCER request" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1 error="server error: ServiceNotReady: org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Topic is temporarily unavailable"
time=2025-11-17T23:07:49.186+08:00 level=ERROR msg="Failed to create producer at reconnect" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1 error="server error: ServiceNotReady: org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Topic is temporarily unavailable"
time=2025-11-17T23:07:49.186+08:00 level=INFO msg="Reconnecting to broker" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1 assignedBrokerURL="" delayReconnectTime=112.746658ms
time=2025-11-17T23:07:49.186+08:00 level=DEBUG msg="Write data: 88" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.187+08:00 level=DEBUG msg="Got command! type:ERROR  error:{request_id:9  error:ServiceNotReady  message:\"Topic is temporarily unavailable\"} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.187+08:00 level=DEBUG msg="Received command: type:ERROR  error:{request_id:9  error:ServiceNotReady  message:\"Topic is temporarily unavailable\"} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.187+08:00 level=ERROR msg="Failed to create consumer" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz error="server error: ServiceNotReady: Topic is temporarily unavailable"
time=2025-11-17T23:07:49.187+08:00 level=ERROR msg="Failed to create consumer at reconnect" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz error="server error: ServiceNotReady: Topic is temporarily unavailable"
time=2025-11-17T23:07:49.187+08:00 level=INFO msg="Reconnecting to broker" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz delayReconnectTime=111.086774ms assignedBrokerURL=""
2025/11/17 23:07:49 unloaded topic


time=2025-11-17T23:07:49.197+08:00 level=DEBUG msg="requesting more permits=1 available=1" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz


time=2025-11-17T23:07:49.197+08:00 level=DEBUG msg="Write data: 16" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.299+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650 keySuffix=0
time=2025-11-17T23:07:49.299+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-11-17T23:07:49.299+08:00 level=DEBUG msg="Write data: 66" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.299+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://localhost:6650 physicalAddr=pulsar://localhost:6650 keySuffix=0
time=2025-11-17T23:07:49.299+08:00 level=DEBUG msg="Found connection in pool key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 physical_addr=pulsar://localhost:6650"
time=2025-11-17T23:07:49.299+08:00 level=DEBUG msg="Write data: 66" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.306+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:11  authoritative:true  proxy_through_service_url:false} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.306+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:11  authoritative:true  proxy_through_service_url:false} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.306+08:00 level=DEBUG msg="Got command! type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:12  authoritative:true  proxy_through_service_url:false} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.306+08:00 level=DEBUG msg="Received command: type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:12  authoritative:true  proxy_through_service_url:false} -- payload: <nil>" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:47754
time=2025-11-17T23:07:49.306+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-90865775} lookup response: &{Response:type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:12  authoritative:true  proxy_through_service_url:false} Cnx:0xc0000d4000}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T23:07:49.306+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-90865775} on broker. pulsar://<my-pod>:6650 /  - Use proxy: false" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="Lookup result: &{pulsar://<my-pod>:6650 pulsar://<my-pod>:6650}" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="The partition producer schema is nil" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://<my-pod>:6650 physicalAddr=pulsar://<my-pod>:6650 keySuffix=0
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="Found connection in pool key=<my-pod>:6650-<my-pod>:6650-0 logical_addr=pulsar://<my-pod>:6650 physical_addr=pulsar://<my-pod>:6650"
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="Write data: 93" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="Got topic{persistent://public/default/my-topic-90865775} lookup response: &{Response:type:LOOKUP_RESPONSE  lookupTopicResponse:{brokerServiceUrl:\"pulsar://<my-pod>:6650\"  response:Connect  request_id:11  authoritative:true  proxy_through_service_url:false} Cnx:0xc0000d4000}" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="Successfully looked up topic{persistent://public/default/my-topic-90865775} on broker. pulsar://<my-pod>:6650 /  - Use proxy: false" serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="Lookup result: &{pulsar://<my-pod>:6650 pulsar://<my-pod>:6650}" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="The partition consumer schema is nil" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="Getting pooled connection" logicalAddr=pulsar://<my-pod>:6650 physicalAddr=pulsar://<my-pod>:6650 keySuffix=0
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="Found connection in pool key=<my-pod>:6650-<my-pod>:6650-0 logical_addr=pulsar://<my-pod>:6650 physical_addr=pulsar://<my-pod>:6650"
time=2025-11-17T23:07:49.307+08:00 level=DEBUG msg="Write data: 88" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.325+08:00 level=DEBUG msg="Got command! type:SUCCESS  success:{request_id:14} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.325+08:00 level=DEBUG msg="Received command: type:SUCCESS  success:{request_id:14} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.325+08:00 level=INFO msg="Connected consumer" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.325+08:00 level=INFO msg="Reconnected consumer to broker" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.325+08:00 level=INFO msg="zeroQueueConsumer reconnect, reset availablePermits" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.325+08:00 level=DEBUG msg="requesting more permits=1 available=1" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.325+08:00 level=DEBUG msg="Write data: 16" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.325+08:00 level=DEBUG msg="dispatcher received connection event" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.326+08:00 level=DEBUG msg="dispatcher requesting initial permits=0" topic=persistent://public/default/my-topic-90865775 subscription=my-sub consumerID=1 name=qzrrz
time=2025-11-17T23:07:49.326+08:00 level=DEBUG msg="Got command! type:PRODUCER_SUCCESS  producer_success:{request_id:13  producer_name:\"standalone-1000-32\"  last_sequence_id:-1  schema_version:\"\"  producer_ready:true} with payload size: 0 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.326+08:00 level=DEBUG msg="Received command: type:PRODUCER_SUCCESS  producer_success:{request_id:13  producer_name:\"standalone-1000-32\"  last_sequence_id:-1  schema_version:\"\"  producer_ready:true} -- payload: <nil>" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.326+08:00 level=INFO msg="Connected producer" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1 cnx="<my-pod>:45038 -> <my-pod>:6650" epoch=2
time=2025-11-17T23:07:49.326+08:00 level=INFO msg="Reconnected producer to broker" topic=persistent://public/default/my-topic-90865775 producer_name=standalone-1000-32 producerID=1 cnx="<my-pod>:45038 -> <my-pod>:6650"
time=2025-11-17T23:07:49.332+08:00 level=DEBUG msg="Got command! type:MESSAGE  message:{consumer_id:1  message_id:{ledgerId:1278  entryId:0  partition:-1}} with payload size: 112 maxMsgSize: 5242880" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.332+08:00 level=DEBUG msg="Received command: type:MESSAGE  message:{consumer_id:1  message_id:{ledgerId:1278  entryId:0  partition:-1}} -- payload: &{[14 1 172 18 214 12 0 0 0 60 10 18 115 116 97 110 100 97 108 111 110 101 45 49 48 48 48 45 51 50 16 0 24 154 188 238 146 169 51 34 17 10 5 107 101 121 45 49 18 8 112 117 108 115 97 114 45 49 50 6 112 117 108 115 97 114 72 42 88 1 0 0 0 31 10 17 10 5 107 101 121 45 49 18 8 112 117 108 115 97 114 45 49 18 6 112 117 108 115 97 114 24 7 64 0 104 101 108 108 111 45 48] 0 112 {{} {} 0} <nil> <nil>}" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
time=2025-11-17T23:07:49.332+08:00 level=DEBUG msg="Got Message: consumer_id:1  message_id:{ledgerId:1278  entryId:0  partition:-1}" remote_addr=pulsar://<my-pod>:6650 local_addr=<my-pod>:45038
2025/11/17 23:07:49 receive message: 1278:0:-1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants