Skip to content

Commit 7b4b3a6

Browse files
test: add unit test for unloading topic before consuming zero queue messages (#1434)
Co-authored-by: Copilot <[email protected]>
1 parent 50efe42 commit 7b4b3a6

File tree

1 file changed

+89
-1
lines changed

1 file changed

+89
-1
lines changed

pulsar/consumer_zero_queue_test.go

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ import (
2121
"context"
2222
"fmt"
2323
"log"
24+
"log/slog"
25+
"os"
2426
"testing"
2527
"time"
2628

29+
plog "github.com/apache/pulsar-client-go/pulsar/log"
2730
"github.com/docker/docker/api/types/container"
2831
"github.com/docker/go-connections/nat"
2932
"github.com/stretchr/testify/require"
@@ -202,9 +205,9 @@ func TestReconnectConsumer(t *testing.T) {
202205
go func() {
203206
time.Sleep(3 * time.Second)
204207
log.Println("unloading topic")
205-
// Simulate a broker restart by stopping the pulsar container
206208
topicName, err := utils.GetTopicName(topic)
207209
assert.Nil(t, err)
210+
// unload topic to trigger consumer reconnect
208211
err = admin.Topics().Unload(*topicName)
209212
assert.Nil(t, err)
210213
log.Println("unloaded topic")
@@ -240,6 +243,91 @@ func TestReconnectConsumer(t *testing.T) {
240243
defer c.Terminate(ctx)
241244
}
242245

246+
func TestUnloadTopicBeforeConsume(t *testing.T) {
247+
248+
sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
249+
client, err := NewClient(ClientOptions{
250+
URL: lookupURL,
251+
Logger: plog.NewLoggerWithSlog(sLogger),
252+
})
253+
assert.Nil(t, err)
254+
admin, err := pulsaradmin.NewClient(&config.Config{})
255+
assert.Nil(t, err)
256+
257+
defer client.Close()
258+
259+
topic := newTopicName()
260+
ctx := context.Background()
261+
consumer, err := client.Subscribe(ConsumerOptions{
262+
Topic: topic,
263+
SubscriptionName: "my-sub",
264+
EnableZeroQueueConsumer: true,
265+
})
266+
267+
assert.Nil(t, err)
268+
_, ok := consumer.(*zeroQueueConsumer)
269+
assert.True(t, ok)
270+
defer consumer.Close()
271+
272+
// create producer
273+
producer, err := client.CreateProducer(ProducerOptions{
274+
Topic: topic,
275+
DisableBatching: false,
276+
})
277+
assert.Nil(t, err)
278+
279+
// send 10 messages
280+
for i := 0; i < 10; i++ {
281+
msg, err := producer.Send(ctx, &ProducerMessage{
282+
Payload: []byte(fmt.Sprintf("hello-%d", i)),
283+
Key: "pulsar",
284+
Properties: map[string]string{
285+
"key-1": "pulsar-1",
286+
},
287+
})
288+
assert.Nil(t, err)
289+
log.Printf("send message: %s", msg.String())
290+
}
291+
292+
log.Println("unloading topic")
293+
topicName, err := utils.GetTopicName(topic)
294+
assert.Nil(t, err)
295+
// unload topic to trigger consumer reconnect and send permits
296+
err = admin.Topics().Unload(*topicName)
297+
assert.Nil(t, err)
298+
log.Println("unloaded topic")
299+
300+
// receive 10 messages
301+
for i := 0; i < 10; i++ {
302+
msg, err := consumer.Receive(context.Background())
303+
if err != nil {
304+
log.Fatal(err)
305+
}
306+
307+
expectMsg := fmt.Sprintf("hello-%d", i)
308+
expectProperties := map[string]string{
309+
"key-1": "pulsar-1",
310+
}
311+
assert.Equal(t, []byte(expectMsg), msg.Payload())
312+
assert.Equal(t, "pulsar", msg.Key())
313+
assert.Equal(t, expectProperties, msg.Properties())
314+
// ack message
315+
err = consumer.Ack(msg)
316+
assert.Nil(t, err)
317+
log.Printf("receive message: %s", msg.ID().String())
318+
}
319+
// Make sure there are no more messages
320+
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
321+
defer cancel()
322+
_, err = consumer.Receive(timeout)
323+
assert.Equal(t, context.DeadlineExceeded, err)
324+
325+
err = consumer.Unsubscribe()
326+
assert.Nil(t, err)
327+
consumer.Close()
328+
producer.Close()
329+
}
330+
243331
func TestMultipleConsumer(t *testing.T) {
244332
client, err := NewClient(ClientOptions{
245333
URL: lookupURL,

0 commit comments

Comments
 (0)