From 68752bebe861a5c7aef867fc3768dc3f53e95f38 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 29 Oct 2025 15:07:13 +0800 Subject: [PATCH 1/3] test: add unit test for unloading topic before consuming messages --- pulsar/consumer_zero_queue_test.go | 91 +++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index 72048d7969..99c45ee079 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -21,9 +21,12 @@ import ( "context" "fmt" "log" + "log/slog" + "os" "testing" "time" + plog "github.com/apache/pulsar-client-go/pulsar/log" "github.com/docker/docker/api/types/container" "github.com/docker/go-connections/nat" "github.com/stretchr/testify/require" @@ -202,9 +205,9 @@ func TestReconnectConsumer(t *testing.T) { go func() { time.Sleep(3 * time.Second) log.Println("unloading topic") - // Simulate a broker restart by stopping the pulsar container topicName, err := utils.GetTopicName(topic) assert.Nil(t, err) + // unload topic to trigger consumer reconnect err = admin.Topics().Unload(*topicName) assert.Nil(t, err) log.Println("unloaded topic") @@ -240,6 +243,92 @@ func TestReconnectConsumer(t *testing.T) { defer c.Terminate(ctx) } +func TestUnloadTopicBeforeConsume(t *testing.T) { + + sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + client, err := NewClient(ClientOptions{ + URL: lookupURL, + Logger: plog.NewLoggerWithSlog(sLogger), + }) + assert.Nil(t, err) + admin, err := pulsaradmin.NewClient(&config.Config{}) + assert.Nil(t, err) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + EnableZeroQueueConsumer: true, + }) + + assert.Nil(t, err) + _, ok := consumer.(*zeroQueueConsumer) + assert.True(t, ok) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + + // send 10 messages + for i := 0; i < 10; i++ { + msg, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }) + assert.Nil(t, err) + log.Printf("send message: %s", msg.String()) + } + + log.Println("unloading topic") + topicName, err := utils.GetTopicName(topic) + assert.Nil(t, err) + // unload topic to trigger consumer reconnect and send a permits + err = admin.Topics().Unload(*topicName) + assert.Nil(t, err) + log.Println("unloaded topic") + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + expectMsg := fmt.Sprintf("hello-%d", i) + expectProperties := map[string]string{ + "key-1": "pulsar-1", + } + assert.Equal(t, []byte(expectMsg), msg.Payload()) + assert.Equal(t, "pulsar", msg.Key()) + assert.Equal(t, expectProperties, msg.Properties()) + // ack message + err = consumer.Ack(msg) + assert.Nil(t, err) + log.Printf("receive message: %s", msg.ID().String()) + } + // Make sure there are no more messages + timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _, err = consumer.Receive(timeout) + assert.Equal(t, context.DeadlineExceeded, err) + + err = consumer.Unsubscribe() + assert.Nil(t, err) + consumer.Close() + producer.Close() +} + func TestMultipleConsumer(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, From a42fa58b11441798e6367071dcd38b32fe97c01e Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 30 Oct 2025 10:34:27 +0800 Subject: [PATCH 2/3] Update pulsar/consumer_zero_queue_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pulsar/consumer_zero_queue_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index 99c45ee079..0fa80f07f6 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -254,7 +254,6 @@ func TestUnloadTopicBeforeConsume(t *testing.T) { admin, err := pulsaradmin.NewClient(&config.Config{}) assert.Nil(t, err) - assert.Nil(t, err) defer client.Close() topic := newTopicName() From 3ff25e3b1412a29f5bf1e677ebd2c5b35a82d674 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 30 Oct 2025 10:34:37 +0800 Subject: [PATCH 3/3] Update pulsar/consumer_zero_queue_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pulsar/consumer_zero_queue_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index 0fa80f07f6..444751a14d 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -292,7 +292,7 @@ func TestUnloadTopicBeforeConsume(t *testing.T) { log.Println("unloading topic") topicName, err := utils.GetTopicName(topic) assert.Nil(t, err) - // unload topic to trigger consumer reconnect and send a permits + // unload topic to trigger consumer reconnect and send permits err = admin.Topics().Unload(*topicName) assert.Nil(t, err) log.Println("unloaded topic")